Unisciti alla community Apache Beam dal 18 al 20 luglio per partecipare al Beam Summit 2022, dove troverai ulteriori informazioni su Beam e sulla tua esperienza.
Guida rapida: creazione di una pipeline Dataflow utilizzando Python

Creare una pipeline Dataflow usando Python

In questa guida rapida, imparerai a utilizzare l'SDK Apache Beam per Python per creare un programma che definisce una pipeline. Quindi, esegui la pipeline utilizzando un runner locale diretto o un runner basato su cloud come Dataflow.


Per indicazioni dettagliate su questa attività direttamente in Cloud Console, fai clic su Guida:

Procedura guidata


Nelle sezioni seguenti puoi seguire la stessa procedura utilizzata per fare clic su Procedura guidata.

Prima di iniziare

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. Nella pagina del selettore dei progetti in Google Cloud Console, seleziona o crea un progetto Google Cloud.

    Vai al selettore progetti

  3. Assicurati che la fatturazione sia attivata per il tuo progetto Cloud. Scopri come verificare se la fatturazione è abilitata su un progetto.

  4. Abilita le API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore e Cloud Resource Manager.

    Abilita le API

  5. Crea un account di servizio:

    1. In Cloud Console, vai alla pagina Crea account di servizio.

      Vai a Crea account di servizio
    2. Seleziona il progetto.
    3. Inserisci un nome nel campo Nome account di servizio. Cloud Console compila il campo ID account di servizio in base a questo nome.

      Nel campo Descrizione account di servizio, inserisci una descrizione. Ad esempio, Service account for quickstart.

    4. Fai clic su Crea e continua.
    5. Per fornire l'accesso al tuo progetto, concedi i seguenti ruoli al tuo account di servizio: Progetto > proprietario.

      Nell'elenco Seleziona un ruolo, scegli un ruolo.

      Per ulteriori ruoli, fai clic su Aggiungi un altro ruolo e aggiungi ogni ruolo aggiuntivo.

    6. Fai clic su Continua.
    7. Fai clic su Fine per completare la creazione dell'account di servizio.

      Non chiudere la finestra del browser. La utilizzerai nel passaggio successivo.

  6. Crea una chiave dell'account di servizio:

    1. In Cloud Console, fai clic sull'indirizzo email dell'account di servizio che hai creato.
    2. Fai clic su Chiavi.
    3. Fai clic su Aggiungi chiave, quindi su Crea nuova chiave.
    4. Fai clic su Crea. Il file di una chiave JSON viene scaricato sul computer.
    5. Fai clic su Chiudi.
  7. Imposta la variabile di ambiente GOOGLE_APPLICATION_CREDENTIALS sul percorso del file JSON che contiene la chiave dell'account di servizio. Questa variabile si applica solo alla sessione shell corrente, quindi se apri una nuova sessione, impostala di nuovo.

  8. Nella pagina del selettore dei progetti in Google Cloud Console, seleziona o crea un progetto Google Cloud.

    Vai al selettore progetti

  9. Assicurati che la fatturazione sia attivata per il tuo progetto Cloud. Scopri come verificare se la fatturazione è abilitata su un progetto.

  10. Abilita le API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore e Cloud Resource Manager.

    Abilita le API

  11. Crea un account di servizio:

    1. In Cloud Console, vai alla pagina Crea account di servizio.

      Vai a Crea account di servizio
    2. Seleziona il progetto.
    3. Inserisci un nome nel campo Nome account di servizio. Cloud Console compila il campo ID account di servizio in base a questo nome.

      Nel campo Descrizione account di servizio, inserisci una descrizione. Ad esempio, Service account for quickstart.

    4. Fai clic su Crea e continua.
    5. Per fornire l'accesso al tuo progetto, concedi i seguenti ruoli al tuo account di servizio: Progetto > proprietario.

      Nell'elenco Seleziona un ruolo, scegli un ruolo.

      Per ulteriori ruoli, fai clic su Aggiungi un altro ruolo e aggiungi ogni ruolo aggiuntivo.

    6. Fai clic su Continua.
    7. Fai clic su Fine per completare la creazione dell'account di servizio.

      Non chiudere la finestra del browser. La utilizzerai nel passaggio successivo.

  12. Crea una chiave dell'account di servizio:

    1. In Cloud Console, fai clic sull'indirizzo email dell'account di servizio che hai creato.
    2. Fai clic su Chiavi.
    3. Fai clic su Aggiungi chiave, quindi su Crea nuova chiave.
    4. Fai clic su Crea. Il file di una chiave JSON viene scaricato sul computer.
    5. Fai clic su Chiudi.
  13. Imposta la variabile di ambiente GOOGLE_APPLICATION_CREDENTIALS sul percorso del file JSON che contiene la chiave dell'account di servizio. Questa variabile si applica solo alla sessione shell corrente, quindi se apri una nuova sessione, impostala di nuovo.

  14. Crea un bucket Cloud Storage:
    1. In Cloud Console, vai alla pagina Browser di Cloud Storage.

      Vai al browser

    2. Fai clic su Crea bucket.
    3. Nella pagina Crea un bucket, inserisci le informazioni sul bucket. Per andare al passaggio successivo, fai clic su Continua.
      • In Assegna un nome al bucket, inserisci un nome univoco per il bucket. Non includere informazioni sensibili nel nome del bucket, poiché lo spazio dei nomi del bucket è globale e visibile pubblicamente.
      • Per scegliere dove archiviare i dati, segui questi passaggi:
        • Seleziona un'opzione Tipo di località.
        • Seleziona un'opzione Località.
      • Per Scegli una classe di archiviazione predefinita per i tuoi dati, seleziona quanto segue: Standard.
      • In Scegli come controllare l'accesso agli oggetti, seleziona un'opzione di Controllo dell'accesso.
      • (Facoltativo) Per le Impostazioni avanzate, specifica un metodo di crittografia, un criterio di conservazione o etichette di bucket.
    4. Fai clic su Crea.
  15. Copia l'ID progetto di Google Cloud e il nome del bucket Cloud Storage. Questi valori sono necessari in un secondo momento in questo documento.

Configura l'ambiente

In questa sezione, utilizza il prompt dei comandi per configurare un ambiente virtuale Python isolato per eseguire il progetto della pipeline utilizzando venv. Questo processo consente di isolare le dipendenze di un progetto dalle dipendenze di altri progetti.

Se non hai un prompt dei comandi prontamente disponibile, puoi utilizzare Cloud Shell. Cloud Shell ha già installato il gestore di pacchetti per Python 3, quindi puoi passare alla creazione di un ambiente virtuale.

Per installare Python e creare un ambiente virtuale, segui questi passaggi:

  1. Verifica di avere in esecuzione Python 3 e pip nel tuo sistema:
    python --version
    python -m pip --version
    
  2. Se necessario, installa Python 3 e quindi configura un ambiente virtuale Python: segui le istruzioni fornite nelle sezioni Installazione di Python e Configurazione di venv della pagina Configurazione di un ambiente di sviluppo Python.

Dopo aver completato la guida rapida, puoi disattivare l'ambiente virtuale eseguendo deactivate.

Scarica l'SDK Apache Beam

L'SDK Apache Beam è un modello di programmazione open source per le pipeline di dati. Devi definire una pipeline con un programma Apache Beam e scegliere un runner, ad esempio Dataflow, per eseguire la pipeline.

Per scaricare e installare l'SDK Apache Beam, segui questi passaggi:

  1. Verifica di trovarti nell'ambiente virtuale Python creato nella sezione precedente. Assicurati che la richiesta inizi con <env_name>, dove env_name è il nome dell'ambiente virtuale.
  2. Installa lo standard di confezionamento Python wheel:
    pip install wheel
    
  3. Installa la versione più recente dell'SDK Apache Beam per Python:
  4. pip install 'apache-beam[gcp]'

    A seconda della connessione, l'installazione potrebbe richiedere del tempo.

Esegui la pipeline in locale

Per vedere come viene eseguita una pipeline in locale, utilizza un modulo Python già pronto per l'esempio wordcount incluso nel pacchetto apache_beam.

L'esempio di pipeline wordcount esegue quanto segue:

  1. Applica come input un file di testo.

    Questo file di testo si trova in un bucket Cloud Storage con il nome della risorsa gs://dataflow-samples/shakespeare/kinglear.txt.

  2. Analizza ogni riga in parole.
  3. Esegue un conteggio della frequenza sulle parole tokenizzate.

Per mettere in locale la pipeline wordcount in locale, segui questi passaggi:

  1. Dal terminale locale, esegui l'esempio wordcount:
    python -m apache_beam.examples.wordcount \
      --output outputs
  2. Visualizza l'output della pipeline:
    more outputs*
  3. Per uscire, premi Q.
L'esecuzione locale della pipeline ti consente di testare ed eseguire il debug del tuo programma Apache Beam. Puoi visualizzare il codice sorgente wordcount.py su Apache Beam GitHub.

Esegui la pipeline sul servizio Dataflow

In questa sezione, esegui la pipeline di esempio wordcount dal pacchetto apache_beam sul servizio Dataflow. In questo esempio specifica DataflowRunner come parametro per --runner.
  • Esegui la pipeline:
    python -m apache_beam.examples.wordcount \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://STORAGE_BUCKET/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://STORAGE_BUCKET/tmp/

    Sostituisci quanto segue:

Visualizza i tuoi risultati

Quando esegui una pipeline utilizzando Dataflow, i risultati vengono archiviati in un bucket Cloud Storage. In questa sezione, verifica che la pipeline sia in esecuzione utilizzando Cloud Console o il terminale locale.

Cloud Console

Per visualizzare i risultati in Cloud Console, segui questi passaggi:

  1. In Cloud Console, vai alla pagina Job di Dataflow.

    Vai ai job

    Nella pagina Job vengono visualizzati i dettagli del job wordcount, incluso lo stato In esecuzione all'inizio, quindi Riuscito.

  2. Vai alla pagina del browser Cloud Storage.

    Vai alla pagina del browser

  3. Dall'elenco dei bucket nel progetto, fai clic sul bucket di archiviazione creato in precedenza.

    Nella directory wordcount vengono visualizzati i file di output creati dal job.

Terminale locale

Per visualizzare i risultati dal tuo terminale, utilizza lo strumento gsutil. Puoi anche eseguire i comandi da Cloud Shell.

  1. Elenca i file di output:
    gsutil ls -lh "gs://STORAGE_BUCKET/results/outputs*"  
  2. Sostituisci STORAGE_BUCKET con il nome del bucket Cloud Storage utilizzato nel programma della pipeline.

  3. Visualizza i risultati nei file di output:
    gsutil cat "gs://STORAGE_BUCKET/results/outputs*"

Modifica il codice della pipeline

La pipeline wordcount negli esempi precedenti distingue tra parole maiuscole e minuscole. I seguenti passaggi mostrano come modificare la pipeline in modo che la pipeline wordcount non faccia distinzione tra maiuscole e minuscole.
  1. Sul computer locale, scarica l'ultima copia del codice wordcount dal repository GitHub di Apache Beam.
  2. Dal terminale locale, esegui la pipeline:
    python wordcount.py --output outputs
  3. Visualizza i risultati:
    more outputs*
  4. Per uscire, premi Q.
  5. In un editor a tua scelta, apri il file wordcount.py.
  6. All'interno della funzione run, esamina i passaggi della pipeline:
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'PairWIthOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum))

    Dopo il giorno split, le righe sono divise in parole come stringhe.

  7. Per contenere le stringhe in minuscolo, modifica la riga dopo split:
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'lowercase' >> beam.Map(str.lower)
            | 'PairWIthOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum)) 
    Questa modifica mappa la funzione str.lower su ogni parola. Questa riga equivale a beam.Map(lambda word: str.lower(word)).
  8. Salva il file ed esegui il job wordcount modificato:
    python wordcount.py --output outputs
  9. Visualizza i risultati della pipeline modificata:
    more outputs*
  10. Per uscire, premi Q.

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa pagina, segui questi passaggi.

  1. In Cloud Console, vai alla pagina Browser di Cloud Storage.

    Vai al browser

  2. Fai clic sulla casella di controllo del bucket da eliminare.
  3. Per eliminare il bucket, fai clic su Elimina e segui le istruzioni.

Passaggi successivi

Apache Beamecomm è un marchio di Apache Software Foundation o delle sue società consociate negli Stati Uniti e/o in altri paesi.