Sviluppa blocchi note Apache Beam con il runner interattivo

Utilizza il runner interattivo Apache Beam con i notebook JupyterLab per completare le seguenti attività:

  • Sviluppare le pipeline in modo iterativo.
  • Controlla il grafico della pipeline.
  • Analizza i singoli PCollections in un flusso di lavoro Read–Eval–Print Loop (REPL).

Questi blocchi note Apache Beam sono resi disponibili Blocchi note gestiti dall'utente di Vertex AI Workbench, è un servizio contiene macchine virtuali notebook preinstallate con le versioni più recenti di data science e framework di machine learning. Dataflow supporta solo Istanze di blocchi note gestiti dall'utente.

Questa guida si concentra sulle funzionalità introdotte dai blocchi note Apache Beam, ma non mostra come creare un blocco note. Per ulteriori informazioni su Apache Beam, vedi consulta la guida alla programmazione di Apache Beam.

Supporto e limitazioni

  • I notebook Apache Beam supportano solo Python.
  • I segmenti della pipeline Apache Beam in esecuzione in questi blocchi note vengono eseguiti in un ambiente di test e non con un runner Apache Beam di produzione. Per avviare i notebook sul servizio Dataflow, esporta le pipeline create nel notebook Apache Beam. Per ulteriori dettagli, vedi Avvia i job Dataflow da una pipeline creata nel blocco note.

Prima di iniziare

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Compute Engine and Notebooks APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine and Notebooks APIs.

    Enable the APIs

Prima di creare l'istanza di blocco note Apache Beam, abilita API per pipeline che utilizzano altri servizi, come Pub/Sub.

Se non specificato, l'istanza del notebook viene eseguita dall'account di servizio Compute Engine predefinito con il ruolo Editor del progetto IAM. Se il progetto in modo esplicito limita i ruoli dell'account di servizio, assicurati che abbia comunque un numero sufficiente per eseguire i blocchi note. Ad esempio, leggendo da un L'argomento Pub/Sub crea implicitamente una sottoscrizione e il tuo servizio deve avere un ruolo IAM editor Pub/Sub. Al contrario, la lettura da una sottoscrizione Pub/Sub richiede solo un ruolo IAM Pub/Sub Subscriber.

Al termine di questa guida, per evitare interruzioni nella fatturazione, elimina il le risorse che hai creato. Per ulteriori dettagli, consulta la sezione Pulizia.

avvia un'istanza di blocco note Apache Beam

  1. Nella console Google Cloud, vai alla pagina Workbench di Dataflow.

    Vai a Workbench

  2. Assicurati che sia attiva la scheda Blocchi note gestiti dall'utente.

  3. Nella barra degli strumenti, fai clic su Crea nuova.

  4. Nella sezione Ambiente, per Ambiente, seleziona Apache Beam.

  5. (Facoltativo) Se vuoi eseguire blocchi note su una GPU, nella sezione Tipo di macchina seleziona un tipo di macchina che supporti le GPU, quindi Seleziona Installa il driver GPU NVIDIA automaticamente per me. Per ulteriori informazioni, consulta Piattaforme GPU.

  6. Nella sezione Networking, seleziona una subnet per la VM notebook.

  7. (Facoltativo) Se vuoi configurare un'istanza di blocco note personalizzata, consulta Crea un'istanza di blocchi note gestiti dall'utente con proprietà specifiche.

  8. Fai clic su Crea. Dataflow Workbench crea una nuova istanza di blocco note Apache Beam.

  9. Dopo aver creato l'istanza del blocco note, il link Apri JupyterLab diventa attivo. Fai clic su Apri JupyterLab.

(Facoltativo) Installa le dipendenze

I blocchi note Apache Beam includono già Apache Beam Dipendenze del connettore Google Cloud installate. Se la pipeline contiene connettori personalizzati o PTransforms personalizzati che dipendono da librerie di terze parti, dopo aver creato un'istanza del blocco note. Per ulteriori informazioni, vedi Installa le dipendenze consulta la documentazione relativa ai blocchi note gestiti dall'utente.

Blocchi note Apache Beam di esempio

Dopo aver creato un'istanza di blocchi note gestiti dall'utente, aprila in JupyterLab. Nella scheda File della barra laterale JupyterLab, la cartella Esempi contiene blocchi note di esempio. Per ulteriori informazioni sull'utilizzo dei file JupyterLab, consulta Utilizzare i file nella guida utente di JupyterLab.

Sono disponibili i seguenti notebook:

  • Conteggio parole
  • Conteggio parole in modalità flusso
  • Streaming dei dati delle corse in taxi a New York
  • Apache Beam SQL nei notebook con confronti con le pipeline
  • Apache Beam SQL nei notebook con il runner Dataflow
  • Apache Beam SQL nei blocchi note
  • Conteggio parole Dataflow
  • Flink interattivo su larga scala
  • RunInference
  • Utilizza GPU con Apache Beam
  • Visualizzare i dati

La cartella Tutorial contiene tutorial aggiuntivi che spiegano i fondamenti di Apache Beam. Sono disponibili i seguenti tutorial:

  • Operazioni di base
  • Operazioni Element Wise
  • Aggregazioni
  • Windows
  • Operazioni I/O
  • Streaming
  • Esercizi finali

Questi blocchi note includono testo esplicativo e blocchi di codice commentato per aiutarti comprendere i concetti di Apache Beam e l'utilizzo delle API. I tutorial offrono inoltre ti forniscono degli esercizi per mettere in pratica i concetti.

Le sezioni seguenti utilizzano il codice di esempio del blocco note Conteggio parole in streaming. Gli snippet di codice in questa guida e cosa si trova nel conteggio delle parole dei flussi di dati. il blocco note potrebbe avere lievi discrepanze.

Crea un'istanza di notebook

Vai a File > Nuovo > blocco note e seleziona un kernel Apache Beam 2.22 o versioni successive.

I notebook Apache Beam vengono compilati in base al ramo master dell'SDK Apache Beam. Ciò significa che la versione più recente del kernel mostrata nell'interfaccia utente dei notebook potrebbe essere precedente alla versione dell'SDK rilasciata più di recente.

Apache Beam è installato nell'istanza notebook, quindi includi i moduli interactive_runner e interactive_beam nel notebook.

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

Se il notebook utilizza altre API di Google, aggiungi le seguenti istruzioni di importazione:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

Impostare le opzioni di interattività

La riga seguente imposta la quantità di tempo per cui l'InteractiveRunner registra i dati provenienti da un'origine illimitata. In questo esempio, la durata è impostata su 10 minuti.

ib.options.recording_duration = '10m'

Puoi anche modificare il limite delle dimensioni di registrazione (in byte) per un'origine illimitata utilizzando la proprietà recording_size_limit.

# Set the recording size limit to 1 GB.
ib.options.recording_size_limit = 1e9

Per ulteriori opzioni interattive, consulta la classe interactive_beam.options.

Crea la tua pipeline

Inizializza la pipeline utilizzando un oggetto InteractiveRunner.

options = pipeline_options.PipelineOptions(flags={})

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Set the project to the default project in your current Google Cloud environment.
# The project is used to create a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

p = beam.Pipeline(InteractiveRunner(), options=options)

Leggere e visualizzare i dati

L'esempio seguente mostra una pipeline Apache Beam che crea una sottoscrizione all'argomento Pub/Sub specificato e legge da questa sottoscrizione.

words = p | "read" >> beam.io.ReadFromPubSub(topic="projects/pubsub-public-data/topics/shakespeare-kinglear")

La pipeline conteggia le parole per finestre dall'origine. Crea finestre fisse con una durata di 10 secondi ciascuna.

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

Quando i dati sono filtrati, le parole vengono conteggiate per finestra.

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

Il metodo show() visualizza la PCollection risultante nel notebook.

ib.show(windowed_word_counts, include_window_info=True)

Il metodo show che visualizza una PCollection in formato tabulare.

Puoi definire l'ambito del set di risultati arretrato da show() impostando due facoltativi parametri: n e duration.

  • Imposta n per limitare il set di risultati in modo da mostrare al massimo n elementi, ad esempio 20. Se n non è impostato, il comportamento predefinito è elencare gli elementi acquisiti più recenti fino al termine della registrazione della sorgente.
  • Imposta duration per limitare il risultato impostato a un numero specifico di secondi di dati a partire dall'inizio della registrazione di origine. Se duration non è impostato, il comportamento predefinito è elencare tutti gli elementi fino al termine della registrazione.

Se sono impostati entrambi i parametri facoltativi, show() si interrompe quando viene raggiunta una delle due soglie. Nell'esempio seguente, show() restituisce al massimo 20 elementi calcolati in base ai dati dei primi 30 secondi delle sorgenti registrate.

ib.show(windowed_word_counts, include_window_info=True, n=20, duration=30)

Per mostrare le visualizzazioni dei dati, trasmetti visualize_data=True al Metodo show(). Puoi applicare più filtri alle visualizzazioni. La che ti consente di filtrare per etichetta e asse:

Il metodo show che visualizza una PCollection come un insieme completo di elementi dell'interfaccia utente filtrabili.

Per garantire la riproducibilità durante la prototipazione delle pipeline di streaming, il metodo show() richiama per impostazione predefinita il riutilizzo dei dati acquisiti. Per modificare questo comportamento e fare in modo che il metodo show() recuperi sempre nuovi dati, imposta interactive_beam.options.enable_capture_replay = False. Inoltre, se aggiungi seconda origine illimitata al blocco note, i dati della precedente viene ignorato.

Un'altra visualizzazione utile nei blocchi note Apache Beam è DataFrame Panda. L'esempio seguente converte prima le parole in minuscolo e poi calcola la frequenza di ciascuna parola.

windowed_lower_word_counts = (windowed_words
   | beam.Map(lambda word: word.lower())
   | "count" >> beam.combiners.Count.PerElement())

Il metodo collect() fornisce l'output in un DataFrame Pandas.

ib.collect(windowed_lower_word_counts, include_window_info=True)

Il metodo collect che rappresenta una PCollection in un DataFrame Pandas.

La modifica e la riesecuzione di una cella è una pratica comune nello sviluppo di notebook. Quando modifichi e riesegui una cella in un blocco note Apache Beam, la cella non annulla l'azione prevista del codice nella cella originale. Ad esempio, se una cella aggiunge un PTransform a una pipeline, la riesecuzione della cella aggiunge un altro PTransform alla pipeline. Se vuoi cancellare lo stato, riavviare il kernel e quindi eseguire nuovamente le celle.

Visualizza i dati tramite lo strumento di controllo Beam interattivo

Potresti trovare distratto eseguire l'introspezione dei dati di un PCollection chiamando costantemente show() e collect(), soprattutto quando l'output occupa molto spazio sullo schermo e rende difficile la navigazione nel blocco note. Ti consigliamo inoltre di confrontare più PCollections una accanto all'altra per verificare se una trasformazione funziona come previsto. Ad esempio, quando un PCollection undergoes a transform and produces the other. Per questi casi d'uso, L'ispettore interattivo Beam è una soluzione pratica.

L'ispettore Beam interattivo è fornito come estensione JupyterLab apache-beam-jupyterlab-sidepanel preinstallata nel notebook Apache Beam. Con l'estensione, puoi esaminare in modo interattivo lo stato delle pipeline e dei dati associati ogni PCollection senza richiamare esplicitamente show() o collect().

Puoi aprire la finestra di ispezione in tre modi:

  • Fai clic su Interactive Beam sulla barra dei menu in alto di JupyterLab. Nel menu a discesa, Individua Open Inspector e fai clic per aprire la finestra di ispezione.

    Aprire lo strumento di controllo tramite il menu

  • Utilizza la pagina di avvio. Se non è aperta alcuna pagina Avvio app, fai clic su File -> New Launcher per aprirlo. Nella pagina del programma di avvio, individua Interactive Beam e fai clic su Open Inspector per aprire l'ispezionatore.

    Aprire lo strumento di controllo tramite Avvio app

  • Utilizza la tavolozza dei comandi. Nella barra dei menu di JupyterLab, fai clic su View > Activate Command Palette. Nella finestra di dialogo, cerca Interactive Beam per elencare tutti le opzioni dell'estensione. Fai clic su Open Inspector per aprire l'ispezione.

    Aprire lo strumento di controllo tramite la tavolozza dei comandi

Quando la finestra di controllo sta per aprire:

  • Se c'è esattamente un blocco note aperto, la finestra di ispezione si connette automaticamente che le sono assegnati.

  • Se non è aperto alcun notebook, viene visualizzata una finestra di dialogo che ti consente di selezionare un kernel.

  • Se sono aperti più notebook, viene visualizzata una finestra di dialogo che ti consente di selezionare la sessione del notebook.

    Seleziona il blocco note a cui connetterti

Ti consigliamo di aprire almeno un notebook e di selezionarne un kernel prima di aprire l'ispettore. Se apri una finestra di ispezione con un kernel prima l'apertura di qualsiasi blocco note, in un secondo momento, quando aprirai un blocco note per connetterti inspector, devi selezionare Interactive Beam Inspector Session da Use Kernel from Preferred Session. Un ispettore e un notebook sono collegati quando condividono la stessa sessione, non sessioni diverse create dallo stesso kernel. Se selezioni lo stesso kernel da Start Preferred Kernel, viene creata una una nuova sessione indipendente dalle sessioni esistenti di blocchi note aperti ispettori.

Puoi aprire più ispettori per un notebook aperto e disporli trascinandone le schede liberamente nell'area di lavoro.

Apri due ispettori e disponili uno accanto all'altro

La pagina di controllo si aggiorna automaticamente quando esegui celle nella un blocco note personalizzato. La pagina elenca le pipeline e i PCollections definiti nel blocco note collegato. PCollections sono organizzati in base alle pipeline a cui appartengono e puoi comprimirli facendo clic sulla pipeline dell'intestazione.

Per gli elementi delle pipeline e dell'elenco PCollections, al clic l'ispezionatore visualizza le visualizzazioni corrispondenti sul lato destro:

  • Se si tratta di un PCollection, la finestra di controllo esegue il rendering dei propri dati (in modo dinamico se i dati è ancora disponibile per PCollections illimitato) con widget aggiuntivi da ottimizzare la visualizzazione dopo aver fatto clic sul pulsante APPLY.

    Pagina di controllo

    Poiché l'ispettore e il notebook aperto condividono la stessa sessione del kernel, si bloccano a vicenda. Ad esempio, se il notebook è impegnato a eseguire codice, l'ispettore non si aggiorna finché il notebook non completa l'esecuzione. Al contrario, se vuoi il codice immediatamente nel blocco note mentre la finestra di ispezione visualizzare un PCollection in modo dinamico, devi fare clic sul pulsante STOP per interrompere la visualizzazione e rilasciare preventivamente il kernel nel blocco note.

  • Se si tratta di una pipeline, l'ispettore mostra il grafico della pipeline.

    Pagina di ispezione

Potresti notare pipeline anonime. Queste pipeline hanno PCollections a cui puoi accedere, ma a cui non fa più riferimento durante la sessione. Ad esempio:

p = beam.Pipeline()
pcoll = p | beam.Create([1, 2, 3])

p = beam.Pipeline()

L'esempio precedente crea una pipeline vuota p e una pipeline anonima che contiene un PCollection pcoll. Puoi accedere alla pipeline anonima mediante pcoll.pipeline.

Puoi attivare/disattivare la pipeline e l'elenco PCollection per risparmiare spazio per le visualizzazioni di grandi dimensioni. Espandi/comprimi elenco a sinistra

Comprendi lo stato di registrazione di una pipeline

Oltre alle visualizzazioni, puoi anche controllare lo stato della registrazione di una o tutte le pipeline nell'istanza del notebook chiamando describe.

# Return the recording status of a specific pipeline. Leave the parameter list empty to return
# the recording status of all pipelines.
ib.recordings.describe(p)

Il metodo describe() fornisce i seguenti dettagli:

  • Dimensioni totali (in byte) di tutte le registrazioni per la pipeline su disco
  • Ora di inizio del job di registrazione in background (in secondi dall'epoca Unix)
  • Stato attuale della pipeline del job di registrazione in background
  • Variabile Python per la pipeline

Avviare job Dataflow da una pipeline creata nel notebook

  1. (Facoltativo) Prima di utilizzare il notebook per eseguire i job Dataflow, riavvia il kernel, esegui di nuovo tutte le celle e verifica l'output. Se salti questo passaggio, gli stati nascosti nel blocco note potrebbero influire sul grafico del job nella pipeline .
  2. Attiva l'API Dataflow.
  3. Aggiungi la seguente istruzione di importazione:

    from apache_beam.runners import DataflowRunner
    
  4. Passa le opzioni della pipeline.

    # Set up Apache Beam pipeline options.
    options = pipeline_options.PipelineOptions()
    
    # Set the project to the default project in your current Google Cloud
    # environment.
    _, options.view_as(GoogleCloudOptions).project = google.auth.default()
    
    # Set the Google Cloud region to run Dataflow.
    options.view_as(GoogleCloudOptions).region = 'us-central1'
    
    # Choose a Cloud Storage location.
    dataflow_gcs_location = 'gs://<change me>/dataflow'
    
    # Set the staging location. This location is used to stage the
    # Dataflow pipeline and SDK binary.
    options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
    
    # Set the temporary location. This location is used to store temporary files
    # or intermediate results before outputting to the sink.
    options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
    
    # If and only if you are using Apache Beam SDK built from source code, set
    # the SDK location. This is used by Dataflow to locate the SDK
    # needed to run the pipeline.
    options.view_as(pipeline_options.SetupOptions).sdk_location = (
        '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' %
        beam.version.__version__)
    

    Puoi modificare i valori parametro. Ad esempio, puoi modificare il valore region da us-central1.

  5. Esegui la pipeline con DataflowRunner. Questo passaggio esegue il job sul servizio Dataflow.

    runner = DataflowRunner()
    runner.run_pipeline(p, options=options)
    

    p è un oggetto pipeline di creazione una pipeline di Cloud Shell.

Per un esempio di come eseguire questa conversione in un notebook interattivo, consulta il notebook Conteggio parole di Dataflow nell'istanza del notebook.

In alternativa, puoi esportare il notebook come script eseguibile, modificare il file .py generato utilizzando i passaggi precedenti e poi eseguire il deployment della pipeline nel servizio Dataflow.

Salva il blocco note

Notebooks che crei vengono salvati localmente nell'istanza del blocco note in esecuzione. Se reset o l'istanza del blocco note durante lo sviluppo, i nuovi blocchi note rimangono valide purché siano create nella directory /home/jupyter. Tuttavia, se viene eliminata un'istanza di blocco note, vengono eliminati anche i relativi blocchi note.

Per conservare i blocchi note per l'uso futuro, scaricali in locale sul tuo workstation, salvale su GitHub, o esportarle in un formato file diverso.

Salva il blocco note su dischi permanenti aggiuntivi

Se vuoi conservare i tuoi lavori, ad esempio notebook e script, in varie istanze di notebook, archiviali in un disco permanente.

  1. Crea o allega un Persistent Disk (Disco permanente). Segui le istruzioni per utilizzare ssh per connetterti alla VM dell'istanza del notebook ed emettere comandi in Cloud Shell.

  2. Annota la directory in cui è montato il Persistent Disk, ad esempio: /mnt/myDisk.

  3. Modifica i dettagli della VM dell'istanza del notebook per aggiungere una voce a Custom metadata: chiave - container-custom-params; valore - -v /mnt/myDisk:/mnt/myDisk. Metadati aggiuntivi necessari per associare il PD montato

  4. Fai clic su Salva.

  5. Per aggiornare queste modifiche, reimposta l'istanza del blocco note. Reimpostare un&#39;istanza di notebook

  6. Dopo il ripristino, fai clic su Apri JupyterLab. Potrebbe essere necessario del tempo prima che l'interfaccia utente di JupyterLab diventi disponibile. Dopo l'interfaccia utente apri un terminale ed esegui questo comando: ls -al /mnt Dovrebbe essere presente la directory /mnt/myDisk. Volume elenco limitato

Ora puoi salvare il tuo lavoro nella directory /mnt/myDisk. Anche se l'istanza del notebook viene eliminata, il disco persistente esiste nel progetto. Puoi quindi collegare questo disco permanente ad altre istanze del notebook.

Esegui la pulizia

Dopo aver finito di utilizzare l'istanza di blocco note Apache Beam, esegui la pulizia create su Google Cloud l'arresto dell'istanza del blocco note.

Passaggi successivi