Crea una pipeline Dataflow utilizzando Go

Questa pagina mostra come utilizzare l'SDK Apache Beam per Go per creare un programma che definisca una pipeline. Quindi, esegui la pipeline in locale e sul servizio Dataflow. Per un'introduzione alla pipeline WordCount, guarda il video Come utilizzare WordCount in Apache Beam.

Prima di iniziare

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. Installa Google Cloud CLI.
  3. Per initialize gcloud CLI, esegui questo comando:

    gcloud init
  4. Crea o seleziona un progetto Google Cloud.

    • Crea un progetto Google Cloud:

      gcloud projects create PROJECT_ID

      Sostituisci PROJECT_ID con un nome per il progetto Google Cloud che stai creando.

    • Seleziona il progetto Google Cloud che hai creato:

      gcloud config set project PROJECT_ID

      Sostituisci PROJECT_ID con il nome del tuo progetto Google Cloud.

  5. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  6. Abilita le API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager.

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  7. Crea credenziali di autenticazione locali per il tuo Account Google:

    gcloud auth application-default login
  8. Concedi i ruoli al tuo Account Google. Esegui questo comando una volta per ciascuno dei seguenti ruoli IAM: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Sostituisci PROJECT_ID con l'ID progetto.
    • Sostituisci EMAIL_ADDRESS con il tuo indirizzo email.
    • Sostituisci ROLE con ogni singolo ruolo.
  9. Installa Google Cloud CLI.
  10. Per initialize gcloud CLI, esegui questo comando:

    gcloud init
  11. Crea o seleziona un progetto Google Cloud.

    • Crea un progetto Google Cloud:

      gcloud projects create PROJECT_ID

      Sostituisci PROJECT_ID con un nome per il progetto Google Cloud che stai creando.

    • Seleziona il progetto Google Cloud che hai creato:

      gcloud config set project PROJECT_ID

      Sostituisci PROJECT_ID con il nome del tuo progetto Google Cloud.

  12. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  13. Abilita le API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager.

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  14. Crea credenziali di autenticazione locali per il tuo Account Google:

    gcloud auth application-default login
  15. Concedi i ruoli al tuo Account Google. Esegui questo comando una volta per ciascuno dei seguenti ruoli IAM: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Sostituisci PROJECT_ID con l'ID progetto.
    • Sostituisci EMAIL_ADDRESS con il tuo indirizzo email.
    • Sostituisci ROLE con ogni singolo ruolo.
  16. Concedi ruoli al tuo account di servizio predefinito di Compute Engine. Esegui il comando seguente una volta per ognuno dei seguenti ruoli IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Sostituisci PROJECT_ID con l'ID progetto.
    • Sostituisci PROJECT_NUMBER con il numero del progetto. Per trovare il numero del progetto, consulta Identifica i progetti o utilizza il comando gcloud projects describe.
    • Sostituisci SERVICE_ACCOUNT_ROLE con ogni singolo ruolo.
  17. Crea un bucket Cloud Storage e configuralo nel seguente modo:
    • Imposta la classe di archiviazione su S (standard).
    • Imposta la località di archiviazione come segue: US (Stati Uniti).
    • Sostituisci BUCKET_NAME con un nome di bucket univoco. Non includere informazioni sensibili nel nome del bucket perché lo spazio dei nomi dei bucket è globale e visibile pubblicamente.
    gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
  18. Copia l'ID progetto Google Cloud e il nome del bucket Cloud Storage. Questi valori saranno necessari più avanti in questa guida rapida.

Configurazione dell'ambiente di sviluppo

L'SDK Apache Beam è un modello di programmazione open source per le pipeline di dati. Puoi definire una pipeline con un programma Apache Beam, quindi scegliere un runner, ad esempio Dataflow, per eseguire la pipeline.

Ti consigliamo di utilizzare la versione più recente di Go quando lavori con l'SDK Apache Beam per Go. Se non hai installato la versione più recente di Go, utilizza la guida al download e l'installazione di Go per scaricare e installare Go sul tuo sistema operativo specifico.

Per verificare la versione di Go installata, esegui questo comando nel terminale locale:

go version

Esegui l'esempio di conteggio parole Beam

L'SDK Apache Beam per Go include un esempio di pipeline wordcount. Nell'esempio wordcount si verifica quanto segue:

  1. Legge un file di testo come input. Per impostazione predefinita, legge un file di testo che si trova in un bucket Cloud Storage con il nome della risorsa gs://dataflow-samples/shakespeare/kinglear.txt.
  2. Analizza ogni riga in parole.
  3. Esegue un conteggio di frequenza sulle parole tokenizzate.

Per eseguire la versione più recente dell'esempio Beam wordcount sulla tua macchina locale, utilizza il comando seguente. Il flag input specifica il file da leggere, mentre il flag output specifica il nome file per l'output del conteggio di frequenza.

go run github.com/apache/beam/sdks/v2/go/examples/wordcount@latest \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output outputs

Al termine della pipeline, visualizza i risultati di output:

more outputs*

Per uscire, premi Q.

Modifica il codice della pipeline

La pipeline Beam wordcount distingue tra parole maiuscole e minuscole. I passaggi seguenti mostrano come creare il tuo modulo Go, modificare la pipeline wordcount in modo che non sia sensibile alle maiuscole ed eseguirla in Dataflow.

Crea un modulo Go

Per apportare modifiche al codice della pipeline:

  1. Crea una directory per il tuo modulo Go in una posizione a tua scelta:

    mkdir wordcount
    cd wordcount
    
  2. Creare un modulo Go. Per questo esempio, utilizza example/dataflow come percorso del modulo.

    go mod init example/dataflow
    
  3. Scarica la copia più recente del codice wordcount dal repository GitHub di Apache Beam. Inserisci questo file nella directory wordcount che hai creato.

  4. Se utilizzi un sistema operativo non Linux, devi scaricare il pacchetto Go unix. Questo pacchetto è necessario per eseguire le pipeline sul servizio Dataflow.

    go get -u golang.org/x/sys/unix
    
  5. Assicurati che il file go.mod corrisponda al codice sorgente del modulo:

    go mod tidy
    

Esegui la pipeline non modificata

Verifica che la pipeline wordcount non modificata venga eseguita localmente.

  1. Dal terminale, crea ed esegui la pipeline in locale:

     go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
         --output outputs
    
  2. Visualizza i risultati di output:

     more outputs*
    
  3. Per uscire, premi Q.

Modifica il codice della pipeline

Per cambiare la pipeline in modo che non sia sensibile alle maiuscole, modifica il codice per applicare la funzione strings.ToLower a tutte le parole.

  1. In un editor a tua scelta, apri il file wordcount.go.

  2. Esamina il blocco init (i commenti sono stati rimossi per maggiore chiarezza):

     func init() {
       register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
       register.Function2x1(formatFn)
       register.Emitter1[string]()
     }
    
  3. Aggiungi una nuova riga per registrare la funzione strings.ToLower:

     func init() {
       register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
       register.Function2x1(formatFn)
       register.Emitter1[string]()
       register.Function1x1(strings.ToLower)
     }
    
  4. Esamina la funzione CountWords:

     func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
       s = s.Scope("CountWords")
    
       // Convert lines of text into individual words.
       col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
    
       // Count the number of times each word occurs.
       return stats.Count(s, col)
     }
    
  5. Per far scrivere le parole in minuscolo, aggiungi un ParDo che applichi strings.ToLower a ogni parola:

     func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
       s = s.Scope("CountWords")
    
       // Convert lines of text into individual words.
       col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
    
       // Map all letters to lowercase.
       lowercaseWords := beam.ParDo(s, strings.ToLower, col)
    
       // Count the number of times each word occurs.
       return stats.Count(s, lowercaseWords)
     }
    
  6. Salva il file.

Esegui la pipeline aggiornata in locale

Esegui la pipeline wordcount aggiornata in locale e verifica che l'output sia cambiato.

  1. Crea ed esegui la pipeline wordcount modificata:

     go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
         --output outputs
    
  2. Visualizza i risultati di output della pipeline modificata. Tutte le parole devono essere in minuscolo.

     more outputs*
    
  3. Per uscire, premi Q.

Esegui la pipeline sul servizio Dataflow

Per eseguire l'esempio wordcount aggiornato sul servizio Dataflow, utilizza questo comando:

go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://BUCKET_NAME/results/outputs \
    --runner dataflow \
    --project PROJECT_ID \
    --region DATAFLOW_REGION \
    --staging_location gs://BUCKET_NAME/binaries/

Sostituisci quanto segue:

  • BUCKET_NAME: nome del bucket Cloud Storage.

  • PROJECT_ID: l'ID del progetto Google Cloud.

  • DATAFLOW_REGION: la regione in cui vuoi eseguire il deployment del job Dataflow. Ad esempio: europe-west1. Per un elenco delle località disponibili, consulta Località di Dataflow. Il flag --region sostituisce la regione predefinita impostata nel server di metadati, nel client locale o nelle variabili di ambiente.

Visualizza i tuoi risultati

Puoi visualizzare un elenco dei tuoi job Dataflow nella console Google Cloud. Nella console Google Cloud, vai alla pagina Job di Dataflow.

Vai a Job

La pagina Job mostra i dettagli del job wordcount, tra cui lo stato In esecuzione prima del job, quindi Riuscito.

Quando esegui una pipeline utilizzando Dataflow, i risultati vengono archiviati in un bucket Cloud Storage. Visualizza i risultati di output utilizzando la console Google Cloud o il terminale locale.

Console

Per visualizzare i risultati nella console Google Cloud, vai alla pagina Bucket di Cloud Storage.

Vai a Bucket

Nell'elenco dei bucket del progetto, fai clic sul bucket di archiviazione che hai creato in precedenza. I file di output creati dal job vengono visualizzati nella directory results.

Terminale

Visualizza i risultati dal terminale o utilizzando Cloud Shell.

  1. Per elencare i file di output, utilizza il comando gcloud storage ls:

    gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
    

    Sostituisci BUCKET_NAME con il nome del bucket Cloud Storage di output specificato.

  2. Per visualizzare i risultati nei file di output, utilizza il comando gcloud storage cat:

    gcloud storage cat gs://BUCKET_NAME/results/outputs*
    

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa pagina, elimina il progetto Google Cloud con le risorse.

  1. Nella console Google Cloud, vai alla pagina Bucket di Cloud Storage.

    Vai a Bucket

  2. Fai clic sulla casella di controllo relativa al bucket da eliminare.
  3. Per eliminare il bucket, fai clic su Elimina e segui le istruzioni.
  4. Se mantieni il progetto, revoca i ruoli che hai concesso all'account di servizio predefinito di Compute Engine. Esegui il comando seguente una volta per ognuno dei seguenti ruoli IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. Facoltativo: revoca le credenziali di autenticazione che hai creato ed elimina il file delle credenziali locale.

    gcloud auth application-default revoke
  6. Facoltativo: revoca le credenziali dallgcloud CLI.

    gcloud auth revoke

Passaggi successivi