Sviluppa blocchi note Apache Beam con il runner interattivo

Utilizza l'esecutore interattivo Apache Beam con i blocchi note JupyterLab per completare le attività seguenti:

  • Sviluppare iterativamente le pipeline.
  • Esamina il grafico della pipeline.
  • Analizza il singolo PCollections in un flusso di lavoro Read-Eval-print-loop (REPL).

Questi blocchi note Apache Beam sono resi disponibili tramite Blocchi note gestiti dall'utente di Vertex AI Workbench, un servizio che ospita macchine virtuali di blocchi note preinstallate con i più recenti framework di data science e machine learning.

Questa guida si concentra sulle funzionalità introdotte dai blocchi note Apache Beam, ma non mostra come creare un blocco note. Per ulteriori informazioni su Apache Beam, consulta la guida alla programmazione di Apache Beam.

Supporto e limitazioni

  • I blocchi note Apache Beam supportano solo Python.
  • I segmenti della pipeline Apache Beam in esecuzione in questi blocchi note vengono eseguiti in un ambiente di test e non su un runner Apache Beam di produzione. Per avviare i blocchi note sul servizio Dataflow, esporta le pipeline create nel tuo blocco note Apache Beam. Per maggiori dettagli, consulta Avviare i job Dataflow da una pipeline creata nel tuo blocco note.

Prima di iniziare

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  4. Abilita le API Compute Engine and Notebooks.

    Abilita le API

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  7. Abilita le API Compute Engine and Notebooks.

    Abilita le API

Prima di creare l'istanza di blocco note Apache Beam, abilita API aggiuntive per le pipeline che utilizzano altri servizi, ad esempio Pub/Sub.

Se non specificato, l'istanza del blocco note viene eseguita dall'account di servizio Compute Engine predefinito con il ruolo di editor del progetto IAM. Se il progetto limita esplicitamente i ruoli dell'account di servizio, assicurati che disponga comunque di autorizzazioni sufficienti per eseguire i blocchi note. Ad esempio, la lettura di un argomento Pub/Sub crea implicitamente una sottoscrizione e il tuo account di servizio deve avere un ruolo IAM editor Pub/Sub. Al contrario, la lettura da una sottoscrizione Pub/Sub richiede solo un ruolo di sottoscrittore IAM Pub/Sub.

Al termine di questa guida, per evitare la fatturazione continua, elimina le risorse che hai creato. Per maggiori dettagli, consulta Pulizia.

avvia un'istanza di blocco note Apache Beam

  1. Nella console Google Cloud, vai alla pagina Workbench di Dataflow.

    Vai al workbench

  2. Assicurati che sia attiva la scheda Blocchi note gestiti dall'utente.

  3. Nella barra degli strumenti, fai clic su Crea nuovo.

  4. Nella sezione Ambiente, per Ambiente, seleziona Apache Beam.

  5. (Facoltativo) Se vuoi eseguire blocchi note su una GPU, nella sezione Tipo di macchina, seleziona un tipo di macchina che supporti le GPU, quindi seleziona Installa il driver GPU NVIDIA automaticamente per me. Per ulteriori informazioni, vedi Piattaforme GPU.

  6. Nella sezione Networking, seleziona una subnet per la VM del blocco note.

  7. (Facoltativo) Se vuoi configurare un'istanza di blocco note personalizzata, consulta Creare un'istanza di blocchi note gestiti dall'utente con proprietà specifiche.

  8. Fai clic su Crea. Dataflow Workbench crea una nuova istanza di blocco note Apache Beam.

  9. Dopo aver creato l'istanza del blocco note, il link Apri JupyterLab diventa attivo. Fai clic su Apri JupyterLab.

(Facoltativo) Installa le dipendenze

Nei blocchi note Apache Beam sono già installate le dipendenze di Apache Beam e del connettore Google Cloud. Se la pipeline contiene connettori personalizzati o PTransforms personalizzati che dipendono da librerie di terze parti, installali dopo aver creato un'istanza di blocco note. Per ulteriori informazioni, consulta Installare le dipendenze nella documentazione sui blocchi note gestiti dall'utente.

Blocchi note Apache Beam di esempio

Dopo aver creato un'istanza di blocchi note gestiti dall'utente, aprila in JupyterLab. Nella scheda File della barra laterale JupyterLab, la cartella Esempi contiene blocchi note di esempio. Per ulteriori informazioni sull'utilizzo dei file JupyterLab, consulta Utilizzo dei file nella guida dell'utente di JupyterLab.

Sono disponibili i seguenti blocchi note:

  • Conteggio parole
  • Conteggio parole in modalità flusso
  • Streaming dei dati delle corse in taxi a New York
  • Apache Beam SQL nei blocchi note con confronti con le pipeline
  • SQL Apache Beam nei blocchi note con Dataflow Runner
  • Apache Beam SQL nei blocchi note
  • Conteggio parole Dataflow
  • Flink interattivo su larga scala
  • RunInference
  • Utilizza GPU con Apache Beam
  • Visualizzare i dati

La cartella Tutorials contiene tutorial aggiuntivi che spiegano le nozioni di base di Apache Beam. Sono disponibili i seguenti tutorial:

  • Operazioni di base
  • Operazioni Element Wise
  • Aggregazioni
  • Windows
  • Operazioni I/O
  • Dispositivi di streaming
  • Esercizi finali

Questi blocchi note includono testo esplicativo e blocchi di codice commentato per aiutarti a comprendere i concetti di Apache Beam e l'utilizzo delle API. I tutorial forniscono anche esercizi per mettere in pratica i concetti.

Le sezioni seguenti utilizzano un codice di esempio del blocco note Conteggio parole in streaming. Gli snippet di codice in questa guida e ciò che si trova nel blocco note Conteggio parole in streaming potrebbero presentare lievi discrepanze.

Crea un'istanza di blocco note

Vai a File > Nuovo > Blocco note e seleziona un kernel che sia Apache Beam 2.22 o successivo.

I blocchi note Apache Beam vengono creati in base al ramo master dell'SDK Apache Beam. Ciò significa che l'ultima versione del kernel mostrata nell'interfaccia utente del blocco note potrebbe essere antecedente alla versione dell'SDK rilasciata più di recente.

Apache Beam è installato sull'istanza del blocco note, quindi includi i moduli interactive_runner e interactive_beam nel blocco note.

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

Se il blocco note utilizza altre API di Google, aggiungi le seguenti istruzioni di importazione:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

Imposta opzioni di interattività

La riga seguente imposta la quantità di tempo in cui InteractiveRunner registra i dati da un'origine illimitata. In questo esempio, la durata è impostata su 10 minuti.

ib.options.recording_duration = '10m'

Puoi anche modificare il limite delle dimensioni di registrazione (in byte) per un'origine illimitata utilizzando la proprietà recording_size_limit.

# Set the recording size limit to 1 GB.
ib.options.recording_size_limit = 1e9

Per ulteriori opzioni interattive, consulta la classe interactive_beam.options.

Crea la tua pipeline

Inizializza la pipeline utilizzando un oggetto InteractiveRunner.

options = pipeline_options.PipelineOptions(flags={})

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Set the project to the default project in your current Google Cloud environment.
# The project is used to create a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

p = beam.Pipeline(InteractiveRunner(), options=options)

Leggere e visualizzare i dati

L'esempio seguente mostra una pipeline Apache Beam che crea una sottoscrizione all'argomento Pub/Sub in questione e legge dalla sottoscrizione.

words = p | "read" >> beam.io.ReadFromPubSub(topic="projects/pubsub-public-data/topics/shakespeare-kinglear")

La pipeline conteggia le parole per finestre dall'origine. Crea un windowing fisso con una durata di 10 secondi.

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

Quando i dati vengono inseriti in una finestra, le parole vengono conteggiate in base alla finestra.

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

Il metodo show() visualizza la PCollection risultante nel blocco note.

ib.show(windowed_word_counts, include_window_info=True)

Il metodo show che visualizza una PCollection in formato tabulare.

Puoi definire l'ambito del set di risultati arretrato da show() impostando due parametri facoltativi: n e duration.

  • Imposta n per limitare l'insieme di risultati in modo che mostri al massimo n numero di elementi, ad esempio 20. Se il criterio n non viene configurato, il comportamento predefinito prevede l'elenco degli elementi acquisiti più di recente fino al termine della registrazione sorgente.
  • Imposta duration per limitare il set di risultati a un numero specifico di secondi di dati a partire dall'inizio della registrazione di origine. Se il criterio duration non viene impostato, il comportamento predefinito prevede l'elenco di tutti gli elementi fino al termine della registrazione.

Se sono impostati entrambi i parametri facoltativi, show() si interrompe ogni volta che viene raggiunta una delle due soglie. Nell'esempio seguente, show() restituisce al massimo 20 elementi calcolati in base ai primi 30 secondi di dati provenienti dalle sorgenti registrate.

ib.show(windowed_word_counts, include_window_info=True, n=20, duration=30)

Per mostrare le visualizzazioni dei tuoi dati, passa visualize_data=True al metodo show(). Puoi applicare più filtri alle visualizzazioni. La visualizzazione seguente ti consente di filtrare per etichetta e asse:

Il metodo show che visualizza una PCollection come un ricco insieme di elementi dell'interfaccia utente filtrabili.

Per garantire la riproducibilità durante la prototipazione delle pipeline di flusso, le chiamate al metodo show() riutilizzano i dati acquisiti per impostazione predefinita. Per modificare questo comportamento e fare in modo che il metodo show() recuperi sempre nuovi dati, imposta interactive_beam.options.enable_capture_replay = False. Inoltre, se aggiungi una seconda origine illimitata al blocco note, i dati della precedente origine illimitata vengono eliminati.

Un'altra visualizzazione utile nei blocchi note Apache Beam è un DataFrame Panda. L'esempio seguente converte prima le parole in lettere minuscole, quindi calcola la frequenza di ogni parola.

windowed_lower_word_counts = (windowed_words
   | beam.Map(lambda word: word.lower())
   | "count" >> beam.combiners.Count.PerElement())

Il metodo collect() fornisce l'output in un DataFrame Pandas.

ib.collect(windowed_lower_word_counts, include_window_info=True)

Il metodo collect che rappresenta una PCollection in un DataFrame Pandas.

La modifica e la nuova esecuzione di una cella è una pratica comune nello sviluppo dei blocchi note. Quando modifichi ed esegui nuovamente una cella in un blocco note Apache Beam, la cella non annulla l'azione prevista del codice nella cella originale. Ad esempio, se una cella aggiunge un PTransform a una pipeline, rieseguire la cella aggiunge un altro PTransform alla pipeline. Se vuoi cancellare lo stato, riavvia il kernel e le celle.

Visualizza i dati tramite l'ispettore Interactive Beam

Potrebbe distrarre introspezionare i dati di un PCollection chiamando costantemente show() e collect(), soprattutto se l'output occupa molto spazio sullo schermo e rende difficile navigare nel blocco note. Potresti anche voler confrontare più PCollections uno accanto all'altro per verificare se una trasformazione funziona come previsto. Ad esempio, quando un PCollection sottopone a una trasformazione e produce l'altra. Per questi casi d'uso, l'ispezione Beam interattiva è una soluzione pratica.

Lo strumento di controllo interattivo Beam viene fornito come estensione JupyterLab apache-beam-jupyterlab-sidepanel preinstallata nel blocco note Apache Beam. Con l'estensione, puoi ispezionare in modo interattivo lo stato delle pipeline e dei dati associati a ogni PCollection senza richiamare esplicitamente show() o collect().

Puoi aprire la finestra di ispezione in tre modi:

  • Fai clic su Interactive Beam sulla barra dei menu in alto di JupyterLab. Nel menu a discesa, individua Open Inspector e fai clic per aprire la finestra di ispezione.

    Apri controllo tramite il menu

  • Utilizza la pagina Avvio app. Se non è aperta una pagina Avvio app, fai clic su File -> New Launcher per aprirla. Nella pagina Avvio app, individua Interactive Beam e fai clic su Open Inspector per aprire la finestra di ispezione.

    Apri Controllo tramite Avvio app

  • Utilizza la tavolozza dei comandi. Nella barra dei menu JupyterLab, fai clic su View > Activate Command Palette. Nella finestra di dialogo, cerca Interactive Beam per elencare tutte le opzioni dell'estensione. Fai clic su Open Inspector per aprire la finestra di ispezione.

    Apri il controllo tramite la tavolozza dei comandi

Quando la finestra di controllo sta per aprire:

  • Se c'è esattamente un blocco note aperto, la finestra di ispezione si connette automaticamente.

  • Se non è aperto alcun blocco note, viene visualizzata una finestra di dialogo che ti consente di selezionare un kernel.

  • Se sono aperti più blocchi note, viene visualizzata una finestra di dialogo che ti consente di selezionare la sessione corrispondente.

    Seleziona il blocco note a cui connetterti

Ti consigliamo di aprire almeno un blocco note e di selezionare un kernel prima di aprire la finestra di ispezione. Se apri una finestra di ispezione con un kernel prima di aprire qualsiasi blocco note, in seguito, quando aprirai un blocco note per connetterti alla finestra di ispezione, dovrai selezionare Interactive Beam Inspector Session da Use Kernel from Preferred Session. Un inspector e un blocco note sono connessi quando condividono la stessa sessione, non sessioni diverse create dallo stesso kernel. Se selezioni lo stesso kernel da Start Preferred Kernel, viene creata una nuova sessione indipendente dalle sessioni esistenti di blocchi note o ispettori aperti.

Puoi aprire più controlli per un blocco note aperto e organizzare i controlli trascinandone liberamente le schede nell'area di lavoro.

Apri due ispettori e disponili uno accanto all'altro

La pagina di controllo si aggiorna automaticamente quando esegui celle nel blocco note. La pagina elenca le pipeline e PCollections definite nel blocco note collegato. I file PCollections sono organizzati in base alle pipeline a cui appartengono e puoi comprimerli facendo clic sulla pipeline di intestazione.

Per gli elementi nelle pipeline e nell'elenco PCollections, quando viene fatto clic, la finestra di controllo mostra le visualizzazioni corrispondenti sul lato destro:

  • Se si tratta di un PCollection, la finestra di controllo mostra i propri dati (in modo dinamico se i dati vengono ancora inviati per PCollections illimitato) con widget aggiuntivi per ottimizzare la visualizzazione dopo aver fatto clic sul pulsante APPLY.

    Pagina di controllo

    Poiché lo strumento di controllo e il blocco note aperto condividono la stessa sessione kernel, si bloccano l'esecuzione a vicenda. Ad esempio, se il blocco note è impegnato nell'esecuzione di codice, la finestra di ispezione non viene aggiornata fino a quando il blocco note non completa l'esecuzione. Al contrario, se vuoi eseguire il codice immediatamente nel blocco note mentre la finestra di ispezione visualizza un PCollection in modo dinamico, devi fare clic sul pulsante STOP per interrompere la visualizzazione e rilasciare preventivamente il kernel nel blocco note.

  • Se si tratta di una pipeline, lo strumento di controllo mostra il grafico della pipeline.

    Pagina di controllo

Potresti notare pipeline anonime. Queste pipeline hanno PCollections a cui puoi accedere, ma la sessione principale non fa più riferimento a queste pipeline. Ad esempio:

p = beam.Pipeline()
pcoll = p | beam.Create([1, 2, 3])

p = beam.Pipeline()

L'esempio precedente crea una pipeline vuota p e una pipeline anonima che contiene una pipeline PCollection pcoll. Puoi accedere alla pipeline anonima utilizzando pcoll.pipeline.

Puoi attivare/disattivare la pipeline e l'elenco PCollection per risparmiare spazio per visualizzazioni di grandi dimensioni. Elenco a sinistra Toggel

Comprendi lo stato di registrazione di una pipeline

Oltre alle visualizzazioni, puoi anche controllare lo stato della registrazione per una o tutte le pipeline nell'istanza del blocco note chiamando describe.

# Return the recording status of a specific pipeline. Leave the parameter list empty to return
# the recording status of all pipelines.
ib.recordings.describe(p)

Il metodo describe() fornisce i seguenti dettagli:

  • Dimensioni totali (in byte) di tutte le registrazioni per la pipeline su disco
  • Ora di inizio del processo di registrazione in background (in secondi dall'epoca Unix)
  • Stato attuale della pipeline del job di registrazione in background
  • Variabile Python per la pipeline

Avvia i job Dataflow da una pipeline creata nel tuo blocco note

  1. (Facoltativo) Prima di utilizzare il blocco note per eseguire job Dataflow, riavvia il kernel, esegui nuovamente tutte le celle e verifica l'output. Se salti questo passaggio, gli stati nascosti nel blocco note potrebbero influire sul grafico del job nell'oggetto della pipeline.
  2. Abilitare l'API Dataflow.
  3. Aggiungi la seguente istruzione di importazione:

    from apache_beam.runners import DataflowRunner
    
  4. Passa le opzioni della pipeline.

    # Set up Apache Beam pipeline options.
    options = pipeline_options.PipelineOptions()
    
    # Set the project to the default project in your current Google Cloud
    # environment.
    _, options.view_as(GoogleCloudOptions).project = google.auth.default()
    
    # Set the Google Cloud region to run Dataflow.
    options.view_as(GoogleCloudOptions).region = 'us-central1'
    
    # Choose a Cloud Storage location.
    dataflow_gcs_location = 'gs://<change me>/dataflow'
    
    # Set the staging location. This location is used to stage the
    # Dataflow pipeline and SDK binary.
    options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
    
    # Set the temporary location. This location is used to store temporary files
    # or intermediate results before outputting to the sink.
    options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
    
    # If and only if you are using Apache Beam SDK built from source code, set
    # the SDK location. This is used by Dataflow to locate the SDK
    # needed to run the pipeline.
    options.view_as(pipeline_options.SetupOptions).sdk_location = (
        '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' %
        beam.version.__version__)
    

    Puoi modificare i valori dei parametri. Ad esempio, puoi modificare il valore region da us-central1.

  5. Esegui la pipeline con DataflowRunner. Questo passaggio esegue il job sul servizio Dataflow.

    runner = DataflowRunner()
    runner.run_pipeline(p, options=options)
    

    p è un oggetto pipeline di creazione della pipeline.

Per un esempio su come eseguire questa conversione in un blocco note interattivo, vedi il blocco note Conteggio parole di Dataflow nella tua istanza blocco note.

In alternativa, puoi esportare il blocco note come script eseguibile, modificare il file .py generato seguendo i passaggi precedenti, quindi eseguire il deployment della pipeline al servizio Dataflow.

Salva il blocco note

Notebooks che crei vengono salvati localmente nell'istanza del blocco note in esecuzione. Se reimposti o arresti l'istanza del blocco note durante lo sviluppo, i nuovi blocchi note rimarranno invariati finché vengono creati nella directory /home/jupyter. Tuttavia, se viene eliminata un'istanza di blocco note, vengono eliminati anche i relativi blocchi note.

Per conservare i blocchi note per l'uso futuro, scaricali localmente sulla tua workstation, salvali su GitHub o esportali in un formato file diverso.

Salva il blocco note su dischi permanenti aggiuntivi

Se vuoi conservare il lavoro, ad esempio blocchi note e script in varie istanze di blocchi note, archiviali in un Persistent Disk.

  1. Crea o collega un disco permanente. Segui le istruzioni per utilizzare ssh per connetterti alla VM dell'istanza del blocco note e inviare i comandi nel Cloud Shell aperto.

  2. Annota la directory in cui è montato il Persistent Disk, ad esempio /mnt/myDisk.

  3. Modifica i dettagli della VM dell'istanza del blocco note per aggiungere una voce alla chiave Custom metadata: - container-custom-params; valore - -v /mnt/myDisk:/mnt/myDisk. Sono necessari metadati aggiuntivi per associare il DP montato

  4. Fai clic su Salva.

  5. Per aggiornare queste modifiche, reimposta l'istanza del blocco note. Reimposta un&#39;istanza di blocco note

  6. Dopo il ripristino, fai clic su Apri JupyterLab. Potrebbe essere necessario del tempo prima che la UI JupyterLab sia disponibile. Quando compare la UI, apri un terminale ed esegui questo comando: ls -al /mnt La directory /mnt/myDisk dovrebbe essere elencata. Volume elenco limitato

Ora puoi salvare il tuo lavoro nella directory /mnt/myDisk. Anche se l'istanza del blocco note viene eliminata, il Persistent Disk esiste nel progetto. Puoi quindi collegare questo Persistent Disk ad altre istanze del blocco note.

Esegui la pulizia

Dopo aver completato l'utilizzo dell'istanza del blocco note Apache Beam, esegui la pulizia delle risorse che hai creato su Google Cloud arrestando l'istanza del blocco note.

Passaggi successivi