Sviluppa blocchi note Apache Beam con il runner interattivo

Utilizza il runner interattivo Apache Beam con i blocchi note JupyterLab per completare le seguenti attività:

  • Sviluppa in modo iterativo le pipeline.
  • Esamina il grafico della pipeline.
  • Analizza PCollections in un flusso di lavoro read-eval-print-loop (REPL).

Questi blocchi note Apache Beam sono resi disponibili tramite blocchi note gestiti dall'utente Vertex AI Workbench, un servizio che ospita macchine virtuali di blocchi note preinstallate con i più recenti framework di data science e machine learning.

Questa guida si concentra sulla funzionalità introdotta dai blocchi note Apache Beam, ma non mostra come creare un blocco note. Per ulteriori informazioni su Apache Beam, consulta la guida alla programmazione Apache Beam.

Assistenza e limitazioni

  • I blocchi note Apache Beam supportano solo Python.
  • I segmenti di pipeline Apache Beam in esecuzione in questi blocchi note vengono eseguiti in un ambiente di test e non in un runner Apache Beam di produzione. Per avviare i blocchi note sul servizio Dataflow, esporta le pipeline create nel tuo blocco note Apache Beam. Per maggiori dettagli, consulta Avviare job Dataflow da una pipeline creata nel blocco note.

Prima di iniziare

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. Nella pagina del selettore di progetti della console Google Cloud, seleziona o crea un progetto Google Cloud.

    Vai al selettore progetti

  3. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  4. Abilita le API Compute Engine and Notebooks.

    Abilita le API

  5. Nella pagina del selettore di progetti della console Google Cloud, seleziona o crea un progetto Google Cloud.

    Vai al selettore progetti

  6. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  7. Abilita le API Compute Engine and Notebooks.

    Abilita le API

Prima di creare l'istanza di blocco note Apache Beam, abilita API aggiuntive per le pipeline che utilizzano altri servizi, ad esempio Pub/Sub.

Se non specificato, l'istanza di blocco note viene eseguita dall'account di servizio Compute Engine predefinito con il ruolo IAM Editor progetto. Se il progetto limita esplicitamente i ruoli dell'account di servizio, assicurati che abbia comunque autorizzazioni sufficienti per eseguire i blocchi note. Ad esempio, la lettura da un argomento Pub/Sub crea implicitamente una sottoscrizione e il tuo account di servizio deve avere un ruolo di editor IAM Pub/Sub. Al contrario, la lettura da una sottoscrizione Pub/Sub richiede solo un ruolo di sottoscrittore IAM Pub/Sub.

Al termine della guida, per evitare la fatturazione continuativa, elimina le risorse che hai creato. Per maggiori dettagli, consulta Pulizia.

Avvia un'istanza di blocco note Apache Beam

  1. Nella console Google Cloud, vai alla pagina Workbench di Dataflow.

    Vai a Workbench

  2. Assicurati che sia attiva la scheda Blocchi note gestiti dall'utente.

  3. Nella barra degli strumenti, fai clic su Crea nuova.

  4. Nella sezione Ambiente, per Ambiente, seleziona Apache Beam.

  5. (Facoltativo) Se vuoi eseguire blocchi note su una GPU, nella sezione Tipo di macchina, seleziona un tipo di macchina che supporti le GPU, quindi seleziona Installa il driver GPU NVIDIA automaticamente. Per maggiori informazioni, consulta la pagina dedicata alle piattaforme GPU.

  6. Nella sezione Networking, seleziona una subnet per la VM del blocco note.

  7. (Facoltativo) Se vuoi configurare un'istanza di blocco note personalizzata, vedi Creare un'istanza di blocchi note gestiti dall'utente con proprietà specifiche.

  8. Fai clic su Crea. Dataflow Workbench crea una nuova istanza di blocco note Apache Beam.

  9. Dopo la creazione dell'istanza di blocco note, il link Apri JupyterLab diventa attivo. Fai clic su Apri JupyterLab.

(Facoltativo) Installa le dipendenze

Nei blocchi note Apache Beam sono già installate le dipendenze di Apache Beam e del connettore Google Cloud. Se la tua pipeline contiene connettori personalizzati o PTransforms personalizzati che dipendono da librerie di terze parti, installali dopo aver creato un'istanza di blocco note. Per ulteriori informazioni, consulta Installare le dipendenze nella documentazione relativa ai blocchi note gestiti dall'utente.

Esempi di blocchi note Apache Beam

Dopo aver creato un'istanza di blocchi note gestiti dall'utente, aprila in JupyterLab. Nella scheda File della barra laterale di JupyterLab, la cartella Esempi contiene blocchi note di esempio. Per ulteriori informazioni sull'utilizzo dei file JupyterLab, consulta Utilizzo dei file nella guida dell'utente di JupyterLab.

Sono disponibili i seguenti blocchi note:

  • Conteggio parole
  • Conteggio parole in streaming
  • Streaming dei dati sulle corse in taxi a NYC
  • SQL Apache Beam nei blocchi note con confronti con le pipeline
  • SQL Apache Beam nei blocchi note con Dataflow Runner
  • SQL Apache Beam nei blocchi note
  • Conteggio parole Dataflow
  • Flink interattivo su larga scala
  • RunInference
  • Utilizzo di GPU con Apache Beam
  • Visualizzare i dati

La cartella Tutorial contiene tutorial aggiuntivi che spiegano le nozioni di base di Apache Beam. Sono disponibili i seguenti tutorial:

  • Operazioni di base
  • Operazioni Wise degli elementi
  • Aggregazioni
  • Windows
  • Operazioni di I/O
  • Buffer dei
  • Allenamenti finali

Questi blocchi note includono testo esplicativo e blocchi di codice commentati per aiutarti a comprendere i concetti di Apache Beam e l'utilizzo delle API. I tutorial forniscono anche esercizi per mettere in pratica i concetti.

Le seguenti sezioni utilizzano il codice di esempio del blocco note per il conteggio di parole in streaming. Gli snippet di codice in questa guida e quelli che si trovano nel blocco note per il conteggio di parole in streaming potrebbero presentare lievi discrepanze.

Crea un'istanza di blocco note

Vai a File > Nuovo > Blocco note e seleziona un kernel Apache Beam 2.22 o versioni successive.

I blocchi note Apache Beam si basano sul ramo master dell'SDK Apache Beam. Questo significa che l'ultima versione del kernel mostrata nell'interfaccia utente dei blocchi note potrebbe essere precedente all'ultima versione rilasciata dell'SDK.

Apache Beam è installato sulla tua istanza di blocco note, quindi includi i moduli interactive_runner e interactive_beam nel blocco note.

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

Se il blocco note utilizza altre API di Google, aggiungi le seguenti istruzioni di importazione:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

Impostare le opzioni di interattività

La riga seguente imposta la quantità di tempo in cui InteractiveRunner registra i dati da un'origine non limitata. In questo esempio, la durata è impostata su 10 minuti.

ib.options.recording_duration = '10m'

Puoi anche modificare il limite delle dimensioni della registrazione (in byte) per un'origine non limitata utilizzando la proprietà recording_size_limit.

# Set the recording size limit to 1 GB.
ib.options.recording_size_limit = 1e9

Per ulteriori opzioni interattive, consulta il corso interattivo_beam.options.

Crea la tua pipeline

Inizializza la pipeline utilizzando un oggetto InteractiveRunner.

options = pipeline_options.PipelineOptions(flags={})

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Set the project to the default project in your current Google Cloud environment.
# The project is used to create a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

p = beam.Pipeline(InteractiveRunner(), options=options)

Leggere e visualizzare i dati

L'esempio seguente mostra una pipeline Apache Beam che crea una sottoscrizione all'argomento Pub/Sub specificato e legge dalla sottoscrizione.

words = p | "read" >> beam.io.ReadFromPubSub(topic="projects/pubsub-public-data/topics/shakespeare-kinglear")

La pipeline conteggia le parole in base alle finestre dall'origine. Crea finestre fisse con una durata di 10 secondi per ogni finestra.

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

Dopo che i dati vengono visualizzati in finestre, le parole vengono conteggiate in un intervallo di tempo.

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

Il metodo show() visualizza la PCollection risultante nel blocco note.

ib.show(windowed_word_counts, include_window_info=True)

Il metodo show che visualizza una PCollection in formato tabulare.

Puoi limitare l'ambito del risultato impostato da show() impostando due parametri facoltativi: n e duration.

  • Imposta n per limitare il set di risultati in modo che mostri al massimo n un numero di elementi, ad esempio 20. Se n non è impostato, il comportamento predefinito prevede l'elenco degli elementi acquisiti più di recente fino al termine della registrazione di origine.
  • Imposta duration per limitare il set di risultati a un numero specificato di secondi di dati a partire dall'inizio della registrazione di origine. Se l'impostazione duration non è impostata, il comportamento predefinito prevede l'elenco di tutti gli elementi fino al termine della registrazione.

Se vengono impostati entrambi i parametri facoltativi, show() si interrompe quando viene raggiunta una delle soglie. Nell'esempio seguente, show() restituisce al massimo 20 elementi calcolati in base ai dati delle origini registrate per i primi 30 secondi.

ib.show(windowed_word_counts, include_window_info=True, n=20, duration=30)

Per mostrare visualizzazioni dei tuoi dati, trasmetti visualize_data=True al metodo show(). Puoi applicare più filtri alle visualizzazioni. La seguente visualizzazione consente di filtrare per etichetta e asse:

Il metodo show che mostra una PCollection come un ricco insieme di elementi UI filtrabili.

Per garantire la riproduzione durante la prototipazione delle pipeline in modalità flusso, le chiamate al metodo show() riutilizzano i dati acquisiti per impostazione predefinita. Per modificare questo comportamento e fare in modo che il metodo show() recuperi sempre nuovi dati, imposta interactive_beam.options.enable_capture_replay = False. Inoltre, se aggiungi una seconda origine non limitata al blocco note, i dati dell'origine non limitata precedente vengono eliminati.

Un'altra visualizzazione utile nei blocchi note Apache Beam è un DataFrame Pandas. L'esempio seguente converte prima le parole in minuscolo, quindi calcola la frequenza di ogni parola.

windowed_lower_word_counts = (windowed_words
   | beam.Map(lambda word: word.lower())
   | "count" >> beam.combiners.Count.PerElement())

Il metodo collect() fornisce l'output in un DataFrame Pandas.

ib.collect(windowed_lower_word_counts, include_window_info=True)

Il metodo collect che rappresenta una PCollection in un DataFrame Pandas.

La modifica e la nuova esecuzione di una cella è una pratica comune nello sviluppo di blocchi note. Quando modifichi e riesegui una cella in un blocco note Apache Beam, la cella non annulla l'azione prevista del codice nella cella originale. Ad esempio, se una cella aggiunge un elemento PTransform a una pipeline, la sua esecuzione aggiunge un altro PTransform alla pipeline. Se vuoi cancellare lo stato, riavvia il kernel, quindi esegui nuovamente le celle.

Visualizzare i dati tramite l'ispettore Interactive Beam

Potrebbe essere distraente introspezione dei dati di PCollection richiamando costantemente show() e collect(), soprattutto quando l'output occupa molto spazio sullo schermo e rende difficile la navigazione nel blocco note. Potresti anche voler confrontare più PCollections uno accanto all'altro per verificare se una trasformazione funziona come previsto. Ad esempio, quando un elemento PCollection passa attraverso una trasformazione e produce l'altro. Per questi casi d'uso, lo strumento di controllo Interactive Beam è una soluzione comoda.

L'ispettore Beam interattivo è fornito come estensione JupyterLab apache-beam-jupyterlab-sidepanel preinstallata nel blocco note Apache Beam. Con l'estensione puoi esaminare in modo interattivo lo stato delle pipeline e dei dati associati a ogni PCollection senza chiamare esplicitamente show() o collect().

Esistono tre modi per aprire lo strumento di controllo:

  • Fai clic su Interactive Beam nella barra dei menu in alto di JupyterLab. Nel menu a discesa, trova Open Inspector e fai clic per aprire la finestra di ispezione.

    Apri Controllo tramite il menu

  • Utilizza la pagina Avvio app. Se non è presente alcuna pagina Avvio app aperta, fai clic su File -> New Launcher per aprirla. Nella pagina Avvio app, individua Interactive Beam e fai clic su Open Inspector per aprire la finestra di ispezione.

    Apri strumento di ispezione tramite Avvio app

  • Utilizza la tavolozza dei comandi. Nella barra dei menu di JupyterLab, fai clic su View > Activate Command Palette. Nella finestra di dialogo, cerca Interactive Beam per visualizzare tutte le opzioni dell'estensione. Fai clic su Open Inspector per aprire lo strumento di controllo.

    Apri ispettore tramite la tavolozza dei comandi

Quando lo strumento di controllo sta per aprire:

  • Se c'è esattamente un blocco note aperto, lo strumento di ispezione si connette automaticamente.

  • Se non è aperto alcun blocco note, viene visualizzata una finestra di dialogo che consente di selezionare un kernel.

  • Se sono aperti più blocchi note, viene visualizzata una finestra di dialogo che ti consente di selezionare la sessione del blocco note.

    Seleziona il blocco note a cui connetterti

Ti consigliamo di aprire almeno un blocco note e selezionare un kernel prima di aprire la finestra di ispezione. Se apri un ispettore con un kernel prima di aprire un blocco note, in un secondo momento aprirai un blocco note per la connessione all'inspector, dovrai selezionare Interactive Beam Inspector Session da Use Kernel from Preferred Session. Un ispettore e un blocco note sono connessi quando condividono la stessa sessione, non diverse sessioni create dallo stesso kernel. Se selezioni lo stesso kernel da Start Preferred Kernel, viene creata una nuova sessione indipendente dalle sessioni esistenti dei blocchi note o degli ispettori aperti.

Puoi aprire più finestre di controllo per un blocco note aperto e riordinarli trascinando liberamente le schede nell'area di lavoro.

Apri 2 ispettori e disponili uno accanto all'altro

La pagina di controllo si aggiorna automaticamente quando esegui celle nel blocco note. La pagina elenca le pipeline e i valori PCollections definiti nel blocco note connesso. Gli asset PCollections sono organizzati in base alle pipeline a cui appartengono e puoi comprimerli facendo clic sulla pipeline di intestazione.

Per gli elementi nelle pipeline e nell'elenco PCollections, al clic, lo strumento di controllo mostra le visualizzazioni corrispondenti sul lato destro:

  • Se si tratta di un PCollection, lo strumento di controllo esegue il rendering dei suoi dati (in modo dinamico se i dati vengono ancora ricevuti per PCollections illimitate) con widget aggiuntivi per ottimizzare la visualizzazione dopo aver fatto clic sul pulsante APPLY.

    Pagina di ispezione

    Poiché l'ispezione e il blocco note aperto condividono la stessa sessione kernel, si bloccano l'esecuzione a vicenda. Ad esempio, se il blocco note è impegnato nell'esecuzione del codice, la funzionalità di ispezione non si aggiorna finché l'esecuzione non viene completata dal blocco note. Al contrario, se vuoi eseguire il codice immediatamente nel blocco note mentre lo strumento di ispezione visualizza un PCollection in modo dinamico, devi fare clic sul pulsante STOP per interrompere la visualizzazione e rilasciare preventivamente il kernel nel blocco note.

  • Se si tratta di una pipeline, lo strumento di controllo mostra il relativo grafico.

    Pagina di ispezione

Potresti notare pipeline anonime. Puoi accedere a queste pipeline PCollections, ma la sessione principale non fa più riferimento a queste pipeline. Ad esempio:

p = beam.Pipeline()
pcoll = p | beam.Create([1, 2, 3])

p = beam.Pipeline()

L'esempio precedente crea una pipeline vuota p e una pipeline anonima che contiene un elemento pcoll PCollection. Puoi accedere alla pipeline anonima utilizzando pcoll.pipeline.

Puoi attivare/disattivare la pipeline e l'elenco PCollection per risparmiare spazio per le visualizzazioni di grandi dimensioni. Elenco sinistro di Toggel

Informazioni sullo stato di registrazione di una pipeline

Oltre alle visualizzazioni, puoi anche esaminare lo stato della registrazione per una o tutte le pipeline nell'istanza di blocco note chiamando describe.

# Return the recording status of a specific pipeline. Leave the parameter list empty to return
# the recording status of all pipelines.
ib.recordings.describe(p)

Il metodo describe() fornisce i seguenti dettagli:

  • Dimensioni totali (in byte) di tutte le registrazioni per la pipeline su disco
  • Ora di inizio dell'avvio del job di registrazione in background (in secondi dall'epoca di Unix)
  • Stato attuale della pipeline del job di registrazione in background
  • Variabile Python per la pipeline

Avvia job Dataflow da una pipeline creata nel tuo blocco note

  1. (Facoltativo) Prima di utilizzare il blocco note per eseguire i job Dataflow, riavvia il kernel, esegui nuovamente tutte le celle e verifica l'output. Se salti questo passaggio, gli stati nascosti nel blocco note potrebbero influire sul grafico del job nell'oggetto della pipeline.
  2. Abilita l'API Dataflow.
  3. Aggiungi la seguente istruzione di importazione:

    from apache_beam.runners import DataflowRunner
    
  4. Inserisci le opzioni della pipeline.

    # Set up Apache Beam pipeline options.
    options = pipeline_options.PipelineOptions()
    
    # Set the project to the default project in your current Google Cloud
    # environment.
    _, options.view_as(GoogleCloudOptions).project = google.auth.default()
    
    # Set the Google Cloud region to run Dataflow.
    options.view_as(GoogleCloudOptions).region = 'us-central1'
    
    # Choose a Cloud Storage location.
    dataflow_gcs_location = 'gs://<change me>/dataflow'
    
    # Set the staging location. This location is used to stage the
    # Dataflow pipeline and SDK binary.
    options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
    
    # Set the temporary location. This location is used to store temporary files
    # or intermediate results before outputting to the sink.
    options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
    
    # If and only if you are using Apache Beam SDK built from source code, set
    # the SDK location. This is used by Dataflow to locate the SDK
    # needed to run the pipeline.
    options.view_as(pipeline_options.SetupOptions).sdk_location = (
        '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' %
        beam.version.__version__)
    

    Puoi modificare i valori dei parametri. Ad esempio, puoi modificare il valore region da us-central1.

  5. Esegui la pipeline con DataflowRunner. Questo passaggio esegue il job sul servizio Dataflow.

    runner = DataflowRunner()
    runner.run_pipeline(p, options=options)
    

    p è un oggetto pipeline della sezione Creazione della pipeline.

Per un esempio su come eseguire questa conversione su un blocco note interattivo, consulta il blocco note per il conteggio di parole Dataflow nell'istanza di blocco note.

In alternativa, puoi esportare il blocco note come script eseguibile, modificare il file .py generato utilizzando i passaggi precedenti e quindi eseguire il deployment della pipeline nel servizio Dataflow.

Salva il blocco note

Notebooks che crei vengono salvati localmente nell'istanza di blocco note in esecuzione. Se reimposti o arresti l'istanza di blocco note durante lo sviluppo, i nuovi blocchi note rimangono attivi finché vengono creati nella directory /home/jupyter. Tuttavia, se viene eliminata un'istanza di blocco note, vengono eliminati anche questi blocchi note.

Per conservare i blocchi note per un uso futuro, scaricali in locale sulla tua workstation, salvali in GitHub o esportali in un formato file diverso.

Salva il blocco note su dischi permanenti aggiuntivi

Se vuoi conservare il lavoro, ad esempio blocchi note e script, in varie istanze di blocchi note, archiviali in un Persistent Disk.

  1. Crea o collega un disco permanente. Segui le istruzioni per utilizzare ssh per connetterti alla VM dell'istanza di blocco note ed emettere i comandi nella Cloud Shell aperta.

  2. Annota la directory in cui è montato il Persistent Disk, ad esempio /mnt/myDisk.

  3. Modifica i dettagli della VM dell'istanza di blocco note per aggiungere una voce alla chiave Custom metadata: chiave - container-custom-params; valore - -v /mnt/myDisk:/mnt/myDisk. Metadati aggiuntivi necessari per associare il DP montato

  4. Fai clic su Salva.

  5. Per aggiornare queste modifiche, reimposta l'istanza di blocco note. Reimposta un&#39;istanza di blocco note

  6. Al termine del ripristino, fai clic su Apri JupyterLab. Potrebbe essere necessario un po' di tempo prima che l'interfaccia utente JupyterLab diventi disponibile. Quando viene visualizzata l'UI, apri un terminale ed esegui questo comando: ls -al /mnt La directory /mnt/myDisk dovrebbe essere elencata. Volume elenco vincolato

Ora puoi salvare il tuo lavoro nella directory /mnt/myDisk. Anche se l'istanza del blocco note viene eliminata, il Persistent Disk esiste nel progetto. Puoi quindi collegare questo Persistent Disk ad altre istanze di blocco note.

Esegui la pulizia

Dopo aver completato l'utilizzo dell'istanza di blocco note Apache Beam, esegui la pulizia delle risorse create su Google Cloud chiudendo l'istanza del blocco note.

Passaggi successivi