Creare una pipeline Dataflow utilizzando Python

In questa guida rapida imparerai a utilizzare l'SDK Apache Beam per Python per creare un programma che definisce una pipeline. Quindi, esegui la pipeline utilizzando un runner locale diretto o un come Dataflow. Per un'introduzione alla pipeline WordCount, consulta Video su come utilizzare WordCount in Apache Beam.


Per seguire le indicazioni dettagliate per questa attività direttamente nella console Google Cloud, fai clic su Procedura guidata:

Procedura guidata


Prima di iniziare

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  6. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  13. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Concedi ruoli al tuo account di servizio predefinito Compute Engine. Esegui il comando seguente una volta per ciascuno dei seguenti ruoli IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Sostituisci PROJECT_ID con l'ID progetto.
    • Sostituisci PROJECT_NUMBER con il numero del tuo progetto. Per trovare il numero del progetto, consulta Identificare i progetti o utilizza il comando gcloud projects describe.
    • Sostituisci SERVICE_ACCOUNT_ROLE con ogni singolo ruolo.
  17. Create a Cloud Storage bucket and configure it as follows:
    • Set the storage class to S (standard).
    • Imposta la posizione di archiviazione come segue: US (Stati Uniti).
    • Sostituisci BUCKET_NAME con un nome bucket univoco. Non includere informazioni sensibili nella perché lo spazio dei nomi dei bucket è globale e visibile pubblicamente.
    • gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
    • Copia l'ID progetto Google Cloud e il nome del bucket Cloud Storage. Ti servono questi valori più avanti in questo documento.

Configura l'ambiente

In questa sezione, utilizza il prompt dei comandi per configurare un ambiente virtuale Python isolato in cui eseguire il progetto della pipeline mediante venv. Questo processo consente di isolare le dipendenze di un progetto da quelle di altri progetti.

Se non hai un prompt dei comandi subito disponibile, puoi utilizzare Cloud Shell. In Cloud Shell è già installato il gestore di pacchetti per Python 3, quindi puoi passare alla creazione in un ambiente virtuale.

Per installare Python e quindi creare un ambiente virtuale, segui questi passaggi:

  1. Verifica che nel sistema siano in esecuzione Python 3 e pip:
    python --version
    python -m pip --version
    
  2. Se necessario, installa Python 3, quindi configura un ambiente virtuale Python: segui le istruzioni forniti nelle sezioni Installazione di Python e Impostazione di Venv del . Configurazione di una pagina dell'ambiente di sviluppo Python. Se utilizzi Python 3.10 o versioni successive, devi anche abilitare Dataflow Runner v2. Per utilizzare Runner v1, utilizza Python 3.9 o versioni precedenti.
di Gemini Advanced.

Dopo aver completato la guida rapida, puoi disattivare l'ambiente virtuale eseguendo deactivate.

Ottieni l'SDK Apache Beam

L'SDK Apache Beam è un modello di programmazione open source per pipeline di dati. Tu definisci con un programma Apache Beam, quindi scegli un runner, come Dataflow, per eseguire la pipeline.

Per scaricare e installare l'SDK Apache Beam, segui questi passaggi:

  1. Verifica di trovarti nell'ambiente virtuale Python creato nella sezione precedente. Assicurati che il prompt inizi con <env_name>, dove env_name è il nome dell'ambiente virtuale.
  2. Installa lo standard per la pacchettizzazione della ruota Python:
    pip install wheel
    
  3. Installa la versione più recente dell'SDK Apache Beam per Python:
  4. pip install 'apache-beam[gcp]'

    Su Microsoft Windows, utilizza il seguente comando:

    pip install apache-beam[gcp]

    A seconda della connessione, l'installazione potrebbe richiedere del tempo.

Esegui la pipeline in locale

Per vedere come viene eseguita in locale una pipeline, utilizza un modulo Python pronto all'uso per wordcount incluso nel pacchetto apache_beam.

L'esempio di pipeline wordcount esegue queste operazioni:

  1. Prende un file di testo come input.

    Questo file di testo si trova in un bucket Cloud Storage con il nome risorsa gs://dataflow-samples/shakespeare/kinglear.txt.

  2. Analizza ogni riga in parole.
  3. Esegue un conteggio della frequenza sulle parole tokenizzate.

Per organizzare in un'area intermedia la pipeline wordcount in locale, segui questi passaggi:

  1. Dal tuo terminale locale, esegui l'esempio wordcount:
    python -m apache_beam.examples.wordcount \
      --output outputs
  2. Visualizza l'output della pipeline:
    more outputs*
  3. Per uscire, premi q.
di Gemini Advanced. L'esecuzione della pipeline in locale ti consente di testare ed eseguire il debug del tuo programma Apache Beam. Puoi visualizzare il codice sorgente di wordcount.py su Apache Beam GitHub.

Esegui la pipeline sul servizio Dataflow

In questa sezione, esegui la pipeline di esempio wordcount dal apache_beam sul servizio Dataflow. Questo esempio specifica DataflowRunner come parametro per --runner.
  • Esegui la pipeline:
    python -m apache_beam.examples.wordcount \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://BUCKET_NAME/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://BUCKET_NAME/tmp/

    Sostituisci quanto segue:

    • DATAFLOW_REGION: il regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio europe-west1

      Il flag --region sostituisce la regione predefinita che è impostato nel server dei metadati, nel client locale o nell'ambiente come la codifica one-hot delle variabili categoriche.

    • BUCKET_NAME: il Il nome del bucket Cloud Storage che hai copiato precedenti
    • PROJECT_ID: l'ID del progetto Google Cloud che hai copiato in precedenza
di Gemini Advanced.

Visualizza i tuoi risultati

Quando esegui una pipeline utilizzando Dataflow, i risultati vengono archiviati in un bucket Cloud Storage. In questa sezione, verifica che la pipeline sia in esecuzione utilizzando la console Google Cloud o il terminale locale.

Console Google Cloud

Per visualizzare i risultati nella console Google Cloud, segui questi passaggi:

  1. Nella console Google Cloud, vai alla pagina Job di Dataflow.

    Vai a Job

    La pagina Job mostra i dettagli del job wordcount, incluso lo stato In esecuzione, poi Riuscito.

  2. Vai alla pagina Bucket di Cloud Storage.

    Vai a Bucket

  3. Nell'elenco dei bucket del progetto, fai clic sul bucket di archiviazione che hai creato in precedenza.

    Nella directory wordcount vengono visualizzati i file di output creati dal job.

Terminale locale

Visualizza i risultati dal terminale o utilizzando Cloud Shell.

  1. Per elencare i file di output, utilizza il comando gcloud storage ls:
    gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
  2. Sostituisci BUCKET_NAME con il nome del bucket Cloud Storage utilizzato nel programma della pipeline.

  3. Per visualizzare i risultati nei file di output, utilizza il comando gcloud storage cat:
    gcloud storage cat gs://BUCKET_NAME/results/outputs*

Modifica il codice della pipeline

La pipeline wordcount negli esempi precedenti fa distinzione tra parole maiuscole e minuscole. I passaggi seguenti mostrano come modificare la pipeline in modo che la pipeline wordcount non fa distinzione tra maiuscole e minuscole.
  1. Sul computer locale, scarica l'ultima copia Codice wordcount dal repository GitHub di Apache Beam.
  2. Dal terminale locale, esegui la pipeline:
    python wordcount.py --output outputs
  3. Visualizza i risultati:
    more outputs*
  4. Per uscire, premi q.
  5. Apri il file wordcount.py in un editor a tua scelta.
  6. All'interno della funzione run, esamina i passaggi della pipeline:
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum))

    Dopo split, le linee vengono suddivise in parole come stringhe.

  7. Per scrivere le stringhe in minuscolo, modifica la riga dopo split:
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'lowercase' >> beam.Map(str.lower)
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum)) 
    Questa modifica mappa la funzione str.lower su ogni parola. Questa linea è equivalente a beam.Map(lambda word: str.lower(word)).
  8. Salva il file ed esegui il job wordcount modificato:
    python wordcount.py --output outputs
  9. Visualizza i risultati della pipeline modificata:
    more outputs*
  10. Per uscire, premi q.
  11. Esegui la pipeline modificata nel servizio Dataflow:
    python wordcount.py \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://BUCKET_NAME/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://BUCKET_NAME/tmp/

    Sostituisci quanto segue:

    • DATAFLOW_REGION: il regione in cui eseguire il deployment del job Dataflow
    • BUCKET_NAME: il tuo Nome bucket Cloud Storage
    • PROJECT_ID: il tuo ID progetto Google Cloud

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi per le risorse utilizzate in questa pagina, elimina il progetto Google Cloud Google Cloud.

  1. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  2. Click the checkbox for the bucket that you want to delete.
  3. To delete the bucket, click Delete, and then follow the instructions.
  4. Se mantieni il progetto, revoca i ruoli che hai concesso all'account di servizio predefinito di Compute Engine. Esegui il comando seguente una volta per ciascuno dei seguenti ruoli IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  6. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

Passaggi successivi