Crea una pipeline Dataflow utilizzando Go
Questa pagina mostra come utilizzare l'Apache Beam SDK per Go per creare un programma che definisce una pipeline. Poi esegui la pipeline in locale e sul servizio Dataflow. Per un'introduzione alla pipeline WordCount, guarda il video Come utilizzare WordCount in Apache Beam.
Prima di iniziare
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
Se utilizzi un provider di identità (IdP) esterno, devi prima accedere alla gcloud CLI con la tua identità federata.
-
Per inizializzare gcloud CLI, esegui questo comando:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:
gcloud services enable dataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
-
Install the Google Cloud CLI.
-
Se utilizzi un provider di identità (IdP) esterno, devi prima accedere alla gcloud CLI con la tua identità federata.
-
Per inizializzare gcloud CLI, esegui questo comando:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:
gcloud services enable dataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
Concedi ruoli al account di servizio Compute Engine predefinito. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- Sostituisci
PROJECT_ID
con l'ID progetto. - Sostituisci
PROJECT_NUMBER
con il numero del tuo progetto. Per trovare il numero del progetto, consulta Identificare i progetti o utilizza il comandogcloud projects describe
. - Sostituisci
SERVICE_ACCOUNT_ROLE
con ogni singolo ruolo.
-
Create a Cloud Storage bucket and configure it as follows:
-
Set the storage class to
S
(Standard). -
Imposta la posizione di archiviazione su:
US
(Stati Uniti). -
Sostituisci
BUCKET_NAME
con un nome di bucket univoco. Non includere informazioni sensibili nel nome del bucket perché lo spazio dei nomi dei bucket è globale e visibile pubblicamente. - Copia l'ID progetto Google Cloud e il nome del bucket Cloud Storage. Questi valori ti serviranno più avanti in questa guida rapida.
- Legge un file di testo come input. Per impostazione predefinita, legge un file di testo che si trova in un bucket Cloud Storage con il nome della risorsa
gs://dataflow-samples/shakespeare/kinglear.txt
. - Analizza ogni riga in parole.
- Esegue un conteggio della frequenza delle parole tokenizzate.
Utilizza il comando
git clone
per clonare il repository GitHubapache/beam
:git clone https://github.com/apache/beam.git
Passa alla directory
beam/sdks/go
:cd beam/sdks/go
Utilizza il seguente comando per eseguire la pipeline:
go run examples/wordcount/wordcount.go \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
Il flag
input
specifica il file da leggere, mentre il flagoutput
specifica il nome del file per l'output del conteggio della frequenza.Crea una directory per il modulo Go nella posizione che preferisci:
mkdir wordcount
cd wordcount
Crea un modulo Go. Per questo esempio, utilizza
example/dataflow
come percorso del modulo.go mod init example/dataflow
Scarica l'ultima copia del codice
wordcount
dal repository GitHub di Apache Beam. Inserisci questo file nella directorywordcount
che hai creato.Se utilizzi un sistema operativo non Linux, devi scaricare il pacchetto Go
unix
. Questo pacchetto è necessario per eseguire le pipeline sul servizio Dataflow.go get -u golang.org/x/sys/unix
Assicurati che il file
go.mod
corrisponda al codice sorgente del modulo:go mod tidy
Dal terminale, crea ed esegui la pipeline in locale:
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
Visualizza i risultati dell'output:
more outputs*
Per uscire, premi q.
Apri il file
wordcount.go
in un editor a tua scelta.Esamina il blocco
init
(i commenti sono stati rimossi per maggiore chiarezza):func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() }
Aggiungi una nuova riga per registrare la funzione
strings.ToLower
:func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() register.Function1x1(strings.ToLower) }
Esamina la funzione
CountWords
:func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Count the number of times each word occurs. return stats.Count(s, col) }
Per convertire le parole in minuscolo, aggiungi un ParDo che applichi
strings.ToLower
a ogni parola:func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Map all letters to lowercase. lowercaseWords := beam.ParDo(s, strings.ToLower, col) // Count the number of times each word occurs. return stats.Count(s, lowercaseWords) }
Salva il file.
Crea ed esegui la pipeline
wordcount
modificata:go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
Visualizza i risultati dell'output della pipeline modificata. Tutte le parole devono essere in minuscolo.
more outputs*
Per uscire, premi q.
BUCKET_NAME
: il nome del bucket Cloud Storage.PROJECT_ID
: l' Google Cloud ID progetto.DATAFLOW_REGION
: la regione in cui vuoi eseguire il deployment del job Dataflow. Ad esempio:europe-west1
. Per un elenco delle località disponibili, consulta Località di Dataflow. Il flag--region
sostituisce la regione predefinita impostata nel server dei metadati, nel client locale o nelle variabili di ambiente.Per elencare i file di output, utilizza il comando
gcloud storage ls
:gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
Sostituisci
BUCKET_NAME
con il nome del bucket Cloud Storage di output specificato.Per visualizzare i risultati nei file di output, utilizza il comando
gcloud storage cat
:gcloud storage cat gs://BUCKET_NAME/results/outputs*
-
Elimina il bucket:
gcloud storage buckets delete BUCKET_NAME
Se mantieni il progetto, revoca i ruoli che hai concesso al account di servizio Compute Engine predefinito. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
Configurazione dell'ambiente di sviluppo
L'SDK Apache Beam è un modello di programmazione open source per pipeline di dati. Definisci una pipeline con un programma Apache Beam e poi scegli un runner, ad esempio Dataflow, per eseguire la pipeline.
Ti consigliamo di utilizzare l'ultima versione di Go quando lavori con l'SDK Apache Beam per Go. Se non hai installato l'ultima versione di Go, utilizza la guida al download e all'installazione di Go per scaricare e installare Go per il tuo sistema operativo specifico.
Per verificare la versione di Go installata, esegui questo comando nel terminale locale:
go version
Esegui l'esempio di conteggio parole di Beam
L'SDK Apache Beam per Go include un
wordcount
esempio di pipeline. L'esempiowordcount
esegue le seguenti operazioni:Per eseguire l'ultima versione dell'esempio di Beam
wordcount
sulla tua macchina locale, segui questi passaggi:Al termine della pipeline, visualizza i risultati dell'output:
more outputs*
Per uscire, premi q.
Modifica il codice della pipeline
La pipeline Beam
wordcount
distingue tra parole maiuscole e minuscole. I seguenti passaggi mostrano come creare il tuo modulo Go, modificare la pipelinewordcount
in modo che non faccia distinzione tra maiuscole e minuscole ed eseguirla su Dataflow.Crea un modulo Go
Per apportare modifiche al codice della pipeline:
Esegui la pipeline non modificata
Verifica che la pipeline
wordcount
non modificata venga eseguita localmente.Modificare il codice della pipeline
Per modificare la pipeline in modo che non faccia distinzione tra maiuscole e minuscole, modifica il codice in modo da applicare la funzione
strings.ToLower
a tutte le parole.Esegui la pipeline aggiornata in locale
Esegui la pipeline
wordcount
aggiornata localmente e verifica che l'output sia cambiato.Esegui la pipeline sul servizio Dataflow
Per eseguire l'esempio
wordcount
aggiornato sul servizio Dataflow, utilizza il seguente comando:go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner dataflow \ --project PROJECT_ID \ --region DATAFLOW_REGION \ --staging_location gs://BUCKET_NAME/binaries/
Sostituisci quanto segue:
Visualizza i tuoi risultati
Puoi visualizzare un elenco dei tuoi job Dataflow nella consoleGoogle Cloud . Nella console Google Cloud , vai alla pagina Job Dataflow.
La pagina Job mostra i dettagli del job
wordcount
, incluso uno stato inizialmente In esecuzione e poi Riuscito.Quando esegui una pipeline utilizzando Dataflow, i risultati vengono archiviati in un bucket Cloud Storage. Visualizza i risultati dell'output utilizzando la consoleGoogle Cloud o il terminale locale.
Console
Per visualizzare i risultati nella console Google Cloud , vai alla pagina Bucket in Cloud Storage.
Nell'elenco dei bucket nel progetto, fai clic sul bucket di archiviazione che hai creato in precedenza. I file di output creati dal job vengono visualizzati nella directory
results
.Terminale
Visualizza i risultati dal terminale o utilizzando Cloud Shell.
Esegui la pulizia
Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa pagina, elimina il progetto Google Cloud con le risorse.
Passaggi successivi
-
Set the storage class to