Trasmetti un flusso di messaggi da Pub/Sub utilizzando Dataflow
Dataflow è un servizio completamente gestito per la trasformazione e l'arricchimento dei dati in modalità flusso (in tempo reale) e batch con pari affidabilità ed espressività. Fornisce un ambiente di sviluppo semplificato delle pipeline utilizzando l'SDK Apache Beam, che offre un ricco set di primitive per windowing e analisi delle sessioni, nonché un ecosistema di connettori di origine e sink. Questa guida rapida illustra come utilizzare Dataflow per:
- Leggere i messaggi pubblicati in un argomento Pub/Sub
- Finestra (o raggruppa) i messaggi in base al timestamp
- Scrivi i messaggi in Cloud Storage
Questa guida rapida illustra l'utilizzo di Dataflow in Java e Python. È supportato anche SQL. Questa guida rapida è offerta anche come tutorial di Google Cloud Skills Boost che offre credenziali temporanee per iniziare.
Se non intendi eseguire l'elaborazione personalizzata dei dati, puoi anche iniziare utilizzando i modelli Dataflow basati sull'interfaccia utente.
Prima di iniziare
- Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
- Installa Google Cloud CLI.
-
Per initialize gcloud CLI, esegui questo comando:
gcloud init
-
Crea o seleziona un progetto Google Cloud.
-
Crea un progetto Google Cloud:
gcloud projects create PROJECT_ID
Sostituisci
PROJECT_ID
con un nome per il progetto Google Cloud che stai creando. -
Seleziona il progetto Google Cloud che hai creato:
gcloud config set project PROJECT_ID
Sostituisci
PROJECT_ID
con il nome del tuo progetto Google Cloud.
-
-
Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.
-
Abilita le API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler.
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Configura l'autenticazione:
-
Crea l'account di servizio:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Sostituisci
SERVICE_ACCOUNT_NAME
con un nome per l'account di servizio. -
Concedi i ruoli all'account di servizio. Esegui il comando seguente una volta per ciascuno dei seguenti ruoli IAM:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Sostituisci quanto segue:
SERVICE_ACCOUNT_NAME
: il nome dell'account di servizio.PROJECT_ID
: l'ID progetto in cui hai creato l'account di servizioROLE
: il ruolo da concedere
-
Concedi al tuo Account Google un ruolo che ti consenta di utilizzare i ruoli dell'account di servizio e collegarlo ad altre risorse:
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Sostituisci quanto segue:
SERVICE_ACCOUNT_NAME
: il nome dell'account di servizio.PROJECT_ID
: l'ID progetto in cui hai creato l'account di servizioUSER_EMAIL
: l'indirizzo email del tuo Account Google
-
- Installa Google Cloud CLI.
-
Per initialize gcloud CLI, esegui questo comando:
gcloud init
-
Crea o seleziona un progetto Google Cloud.
-
Crea un progetto Google Cloud:
gcloud projects create PROJECT_ID
Sostituisci
PROJECT_ID
con un nome per il progetto Google Cloud che stai creando. -
Seleziona il progetto Google Cloud che hai creato:
gcloud config set project PROJECT_ID
Sostituisci
PROJECT_ID
con il nome del tuo progetto Google Cloud.
-
-
Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.
-
Abilita le API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler.
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Configura l'autenticazione:
-
Crea l'account di servizio:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Sostituisci
SERVICE_ACCOUNT_NAME
con un nome per l'account di servizio. -
Concedi i ruoli all'account di servizio. Esegui il comando seguente una volta per ciascuno dei seguenti ruoli IAM:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Sostituisci quanto segue:
SERVICE_ACCOUNT_NAME
: il nome dell'account di servizio.PROJECT_ID
: l'ID progetto in cui hai creato l'account di servizioROLE
: il ruolo da concedere
-
Concedi al tuo Account Google un ruolo che ti consenta di utilizzare i ruoli dell'account di servizio e collegarlo ad altre risorse:
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Sostituisci quanto segue:
SERVICE_ACCOUNT_NAME
: il nome dell'account di servizio.PROJECT_ID
: l'ID progetto in cui hai creato l'account di servizioUSER_EMAIL
: l'indirizzo email del tuo Account Google
-
-
Crea credenziali di autenticazione locali per il tuo Account Google:
gcloud auth application-default login
Configura il progetto Pub/Sub
-
Crea variabili per bucket, progetto e regione. I nomi dei bucket Cloud Storage devono essere univoci a livello globale. Seleziona una regione Dataflow vicina alla località in cui esegui i comandi di questa guida rapida. Il valore della variabile
REGION
deve essere un nome di regione valido. Per maggiori informazioni su regioni e località, consulta Località Dataflow.BUCKET_NAME=BUCKET_NAME PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=TOPIC_ID REGION=DATAFLOW_REGION SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
-
Crea un bucket Cloud Storage di proprietà di questo progetto:
gsutil mb gs://$BUCKET_NAME
-
Crea un argomento Pub/Sub in questo progetto:
gcloud pubsub topics create $TOPIC_ID
-
Crea un job Cloud Scheduler in questo progetto. Il job pubblica un messaggio in un argomento Pub/Sub a intervalli di un minuto.
Se non esiste un'app App Engine per il progetto, questo passaggio ne creerà una.
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
Avvia il job.
gcloud scheduler jobs run publisher-job --location=$REGION
-
Utilizza i comandi seguenti per clonare il repository della guida rapida e passare alla directory del codice campione:
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Trasmetti il flusso di messaggi da Pub/Sub a Cloud Storage
Esempio di codice
Questo codice campione utilizza Dataflow per:
- Leggere i messaggi Pub/Sub.
- Raggruppa i messaggi in intervalli di dimensioni fisse in base ai timestamp di pubblicazione.
Scrivi i messaggi in ogni finestra nei file in Cloud Storage.
Java
Python
Avvia la pipeline
Per avviare la pipeline, esegui questo comando:
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2 \ --serviceAccount=$SERVICE_ACCOUNT"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp \ --service_account_email=$SERVICE_ACCOUNT
Il comando precedente viene eseguito localmente e avvia un job Dataflow
che viene eseguito nel cloud. Quando il comando restituisce JOB_MESSAGE_DETAILED: Workers
have started successfully
, esci dal programma locale utilizzando Ctrl+C
.
Osserva l'avanzamento di job e pipeline
Puoi osservare l'avanzamento del job nella console Dataflow.
Apri la visualizzazione dei dettagli del job per vedere:
- Struttura del job
- Log job
- Metriche della fase
Potresti dover attendere qualche minuto per visualizzare i file di output in Cloud Storage.
In alternativa, utilizza la riga di comando riportata di seguito per verificare quali file sono stati scritti.
gsutil ls gs://${BUCKET_NAME}/samples/
L'output dovrebbe essere simile al seguente:
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
Esegui la pulizia
Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa pagina, elimina il progetto Google Cloud con le risorse.
Elimina il job Cloud Scheduler.
gcloud scheduler jobs delete publisher-job --location=$REGION
Nella console Dataflow, interrompi il job. Annulla la pipeline senza svuotarla.
Elimina l'argomento.
gcloud pubsub topics delete $TOPIC_ID
Eliminare i file creati dalla pipeline.
gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*" gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
Rimuovi il bucket Cloud Storage.
gsutil rb gs://${BUCKET_NAME}
-
Elimina l'account di servizio:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Facoltativo: revoca le credenziali di autenticazione che hai creato ed elimina il file delle credenziali locale.
gcloud auth application-default revoke
-
Facoltativo: revoca le credenziali dallgcloud CLI.
gcloud auth revoke
Passaggi successivi
Se vuoi visualizzare i messaggi Pub/Sub in base a un timestamp personalizzato, puoi specificare il timestamp come attributo nel messaggio Pub/Sub, quindi utilizzare il timestamp personalizzato con
withTimestampAttribute
di Pub/Sub.Dai un'occhiata ai modelli Dataflow open source progettati per l'inserimento di flussi.
Scopri di più su come Dataflow si integra con Pub/Sub.
Guarda questo tutorial che legge da Pub/Sub e scrive in BigQuery utilizzando i modelli Dataflow Flex.
Per scoprire di più sulla creazione di finestre, consulta l'esempio Apache Beam Mobile Gaming Pipeline.