Creare una pipeline Dataflow utilizzando Go

Questa pagina mostra come utilizzare l'SDK Apache Beam per Go per creare un programma che definisce una pipeline. Poi esegui la pipeline in locale e sul servizio Dataflow. Per un'introduzione alla pipeline WordCount, guarda il video Come utilizzare WordCount in Apache Beam.

Prima di iniziare

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Concedi i ruoli all'account di servizio predefinito di Compute Engine. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Sostituisci PROJECT_ID con l'ID progetto.
    • Sostituisci PROJECT_NUMBER con il numero del tuo progetto. Per trovare il numero del progetto, consulta Identificazione dei progetti oppure utilizza il comando gcloud projects describe.
    • Sostituisci SERVICE_ACCOUNT_ROLE con ogni singolo ruolo.
  17. Create a Cloud Storage bucket and configure it as follows:
    • Set the storage class to S (standard).
    • Imposta la posizione di archiviazione su quanto segue: US (Stati Uniti).
    • Sostituisci BUCKET_NAME con un nome di bucket univoco. Non includere informazioni sensibili nel nome del bucket, poiché lo spazio dei nomi dei bucket è globale e visibile pubblicamente.
    • gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
    • Copia l'ID progetto Google Cloud e il nome del bucket Cloud Storage. Avrai bisogno di questi valori in un secondo momento in questa guida rapida.

Configurazione dell'ambiente di sviluppo

L'SDK Apache Beam è un modello di programmazione open source per pipeline di dati. Definisci una pipeline con un programma Apache Beam e poi scegli un runner, come Dataflow, per eseguirla.

Ti consigliamo di utilizzare la versione più recente di Go quando lavori con l'SDK Apache Beam per Go. Se non hai installato la versione più recente di Go, utilizza la guida per il download e l'installazione di Go per scaricare e installare Go per il tuo sistema operativo specifico.

Per verificare la versione di Go che hai installato, esegui il seguente comando nel terminale locale:

go version

Esegui l'esempio di conteggio parole di Beam

L'SDK Apache Beam per Go include un wordcount esempio di pipeline. L'esempio wordcount esegue le seguenti operazioni:

  1. Legge un file di testo come input. Per impostazione predefinita, legge un file di testo situato in un bucket Cloud Storage con il nome risorsa gs://dataflow-samples/shakespeare/kinglear.txt.
  2. Analizza ogni riga in parole.
  3. Esegue un conteggio della frequenza delle parole tokenizzate.

Per eseguire la versione più recente dell'esempio Beam wordcount sulla tua macchina locale, utilizza il seguente comando. Il flag input specifica il file da leggere, mentre il flag output specifica il nome del file per l'output del conteggio della frequenza.

go run github.com/apache/beam/sdks/v2/go/examples/wordcount@latest \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output outputs

Al termine della pipeline, visualizza i risultati dell'output:

more outputs*

Per uscire, premi q.

Modificare il codice della pipeline

La pipeline wordcount di Beam distingue tra parole maiuscole e minuscole. I passaggi riportati di seguito mostrano come creare il tuo modulo Go, modificare la pipeline wordcount in modo che non sia sensibile alle maiuscole ed eseguirla su Dataflow.

Creare un modulo Go

Per apportare modifiche al codice della pipeline, segui questi passaggi.

  1. Crea una directory per il modulo Go in una posizione a tua scelta:

    mkdir wordcount
    cd wordcount
  2. Crea un modulo Go. Per questo esempio, utilizza example/dataflow come percorso del modulo.

    go mod init example/dataflow
  3. Scarica la copia più recente del codice wordcount dal repository GitHub di Apache Beam. Inserisci questo file nella directory wordcount che hai creato.

  4. Se utilizzi un sistema operativo diverso da Linux, devi scaricare il pacchetto Go unix. Questo pacchetto è necessario per eseguire le pipeline sul servizio Dataflow.

    go get -u golang.org/x/sys/unix
  5. Assicurati che il file go.mod corrisponda al codice sorgente del modulo:

    go mod tidy

Esegui la pipeline non modificata

Verifica che la pipeline wordcount non modificata venga eseguita in locale.

  1. Dal terminale, crea ed esegui la pipeline in locale:

     go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
         --output outputs
  2. Visualizza i risultati dell'output:

     more outputs*
  3. Per uscire, premi q.

Modificare il codice della pipeline

Per modificare la pipeline in modo che non sia sensibile alle maiuscole, modifica il codice in modo da applicare la funzione strings.ToLower a tutte le parole.

  1. Apri il file wordcount.go in un editor a tua scelta.

  2. Esamina il blocco init (i commenti sono stati rimossi per maggiore chiarezza):

     func init() {
       register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
       register.Function2x1(formatFn)
       register.Emitter1[string]()
     }
    
  3. Aggiungi una nuova riga per registrare la funzione strings.ToLower:

     func init() {
       register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
       register.Function2x1(formatFn)
       register.Emitter1[string]()
       register.Function1x1(strings.ToLower)
     }
    
  4. Esamina la funzione CountWords:

     func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
       s = s.Scope("CountWords")
    
       // Convert lines of text into individual words.
       col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
    
       // Count the number of times each word occurs.
       return stats.Count(s, col)
     }
    
  5. Per scrivere le parole in minuscolo, aggiungi un ParDo che applichi strings.ToLower a ogni parola:

     func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
       s = s.Scope("CountWords")
    
       // Convert lines of text into individual words.
       col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
    
       // Map all letters to lowercase.
       lowercaseWords := beam.ParDo(s, strings.ToLower, col)
    
       // Count the number of times each word occurs.
       return stats.Count(s, lowercaseWords)
     }
    
  6. Salva il file.

Esegui la pipeline aggiornata localmente

Esegui la pipeline wordcount aggiornata in locale e verifica che l'output sia cambiato.

  1. Crea ed esegui la pipeline wordcount modificata:

     go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
         --output outputs
  2. Visualizza i risultati dell'output della pipeline modificata. Tutte le parole devono essere in minuscolo.

     more outputs*
  3. Per uscire, premi q.

Esegui la pipeline nel servizio Dataflow

Per eseguire l'esempio wordcount aggiornato sul servizio Dataflow, utilizza il seguente comando:

go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://BUCKET_NAME/results/outputs \
    --runner dataflow \
    --project PROJECT_ID \
    --region DATAFLOW_REGION \
    --staging_location gs://BUCKET_NAME/binaries/

Sostituisci quanto segue:

  • BUCKET_NAME: il nome del bucket Cloud Storage.

  • PROJECT_ID: l'ID progetto Google Cloud.

  • DATAFLOW_REGION: la regione in cui vuoi eseguire il deployment del job Dataflow. Ad esempio: europe-west1. Per un elenco delle località disponibili, consulta Località di Dataflow. Il flag --region sostituisce la regione predefinita impostata nel server dei metadati, nel client locale o nelle variabili di ambiente.

Visualizza i tuoi risultati

Puoi visualizzare un elenco dei tuoi job Dataflow nella console Google Cloud. Nella console Google Cloud, vai alla pagina Job di Dataflow.

Vai a Job

La pagina Job mostra i dettagli del job wordcount, incluso uno stato iniziale di In esecuzione e poi Riuscito.

Quando esegui una pipeline utilizzando Dataflow, i risultati vengono archiviati in un bucket Cloud Storage. Visualizza i risultati dell'output utilizzando la console Google Cloud o il terminale locale.

Console

Per visualizzare i risultati nella console Google Cloud, vai alla pagina Bucket in Cloud Storage.

Vai a Bucket

Nell'elenco dei bucket del progetto, fai clic sul bucket di archiviazione creato in precedenza. I file di output creati dal job vengono visualizzati nella directoryresults.

Terminale

Visualizza i risultati dal terminale o utilizzando Cloud Shell.

  1. Per elencare i file di output, utilizza il comando gcloud storage ls:

    gcloud storage ls gs://BUCKET_NAME/results/outputs* --long

    Sostituisci BUCKET_NAME con il nome del bucket Cloud Storage di output specificato.

  2. Per visualizzare i risultati nei file di output, utilizza il comando gcloud storage cat:

    gcloud storage cat gs://BUCKET_NAME/results/outputs*

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate su questa pagina, elimina il progetto Google Cloud con le risorse.

  1. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  2. Click the checkbox for the bucket that you want to delete.
  3. To delete the bucket, click Delete, and then follow the instructions.
  4. Se mantieni il progetto, revoca i ruoli che hai concesso all'account di servizio predefinito di Compute Engine. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  6. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

Passaggi successivi