Trasmetti flussi di messaggi da Pub/Sub utilizzando Dataflow
Dataflow è un servizio completamente gestito per la trasformazione e l'arricchimento dei dati in modalità flusso (in tempo reale) e batch con pari affidabilità ed espressività. Offre un ambiente di sviluppo della pipeline semplificato grazie all'SDK Apache Beam, con un ricco set di primitive di windowing e analisi delle sessioni nonché un ecosistema di connettori di origine e sink. Questa guida rapida mostra come utilizzare Dataflow per:
- Leggere i messaggi pubblicati in un argomento Pub/Sub
- Raggruppa (o raggruppa) i messaggi in base al timestamp
- Scrivi i messaggi su Cloud Storage
Questa guida rapida spiega come utilizzare Dataflow in Java e Python. È supportato anche SQL. Questa guida rapida è disponibile anche come tutorial su Google Cloud Skills Boost che offre credenziali temporanee per iniziare.
Puoi anche iniziare a utilizzare i modelli di Dataflow basati sull'interfaccia utente se non intendi eseguire il trattamento personalizzato dei dati.
Prima di iniziare
- Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
- Installa Google Cloud CLI.
-
Per inizializzare l'interfaccia a riga di comando gcloud, esegui il comando seguente:
gcloud init
-
Crea o seleziona un progetto Google Cloud.
-
Creare un progetto Cloud:
gcloud projects create PROJECT_ID
-
Seleziona il progetto Cloud che hai creato:
gcloud config set project PROJECT_ID
-
-
Assicurati che la fatturazione sia attivata per il tuo progetto Cloud. Scopri come verificare se la fatturazione è abilitata su un progetto.
-
Abilita le API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler.
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Configurare l'autenticazione:
-
Crea l'account di servizio:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Sostituisci
SERVICE_ACCOUNT_NAME
con un nome per l'account di servizio. -
Concedi i ruoli all'account di servizio. Esegui il comando seguente una volta per ciascuno dei seguenti ruoli IAM:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Sostituisci quanto segue:
SERVICE_ACCOUNT_NAME
: nome dell'account di servizioPROJECT_ID
: l'ID progetto in cui hai creato l'account di servizioROLE
: il ruolo da concedere
-
Concedi al tuo Account Google un ruolo che ti consenta di utilizzare i ruoli dell'account di servizio e collegarlo ad altre risorse:
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Sostituisci quanto segue:
SERVICE_ACCOUNT_NAME
: nome dell'account di servizioPROJECT_ID
: l'ID progetto in cui hai creato l'account di servizioUSER_EMAIL
: l'indirizzo email del tuo Account Google
-
- Installa Google Cloud CLI.
-
Per inizializzare l'interfaccia a riga di comando gcloud, esegui il comando seguente:
gcloud init
-
Crea o seleziona un progetto Google Cloud.
-
Creare un progetto Cloud:
gcloud projects create PROJECT_ID
-
Seleziona il progetto Cloud che hai creato:
gcloud config set project PROJECT_ID
-
-
Assicurati che la fatturazione sia attivata per il tuo progetto Cloud. Scopri come verificare se la fatturazione è abilitata su un progetto.
-
Abilita le API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler.
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Configurare l'autenticazione:
-
Crea l'account di servizio:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Sostituisci
SERVICE_ACCOUNT_NAME
con un nome per l'account di servizio. -
Concedi i ruoli all'account di servizio. Esegui il comando seguente una volta per ciascuno dei seguenti ruoli IAM:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Sostituisci quanto segue:
SERVICE_ACCOUNT_NAME
: nome dell'account di servizioPROJECT_ID
: l'ID progetto in cui hai creato l'account di servizioROLE
: il ruolo da concedere
-
Concedi al tuo Account Google un ruolo che ti consenta di utilizzare i ruoli dell'account di servizio e collegarlo ad altre risorse:
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Sostituisci quanto segue:
SERVICE_ACCOUNT_NAME
: nome dell'account di servizioPROJECT_ID
: l'ID progetto in cui hai creato l'account di servizioUSER_EMAIL
: l'indirizzo email del tuo Account Google
-
-
Crea le credenziali di autenticazione per il tuo Account Google:
gcloud auth application-default login
Configurare il progetto Pub/Sub
-
Crea variabili per il bucket, il progetto e la regione. I nomi dei bucket Cloud Storage devono essere univoci a livello globale. Seleziona una regione di Dataflow vicino alla posizione in cui esegui i comandi in questa guida rapida. Il valore della variabile
REGION
deve essere un nome di una regione valido. Per saperne di più sulle regioni e sulle località, consulta Località di Dataflow.BUCKET_NAME=BUCKET_NAME PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=TOPIC_ID REGION=DATAFLOW_REGION SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
-
Crea un bucket Cloud Storage appartenente a questo progetto:
gsutil mb gs://$BUCKET_NAME
-
Crea un argomento Pub/Sub in questo progetto:
gcloud pubsub topics create $TOPIC_ID
-
Creare un job Cloud Scheduler in questo progetto. Il job pubblica un messaggio in un argomento Pub/Sub a intervalli di 1 minuto.
Se non esiste un'app App Engine per il progetto, questo passaggio ne creerà uno.
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
Avvia il job.
gcloud scheduler jobs run publisher-job --location=$REGION
-
Utilizza i comandi seguenti per clonare il repository della guida rapida e vai alla directory del codice di esempio:
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Trasmetti flussi di messaggi da Pub/Sub a Cloud Storage
Esempio di codice
Questo codice campione utilizza Dataflow per:
- Leggere i messaggi Pub/Sub.
- Finestra (o gruppo) di messaggi in intervalli di dimensioni fisse pubblicando timestamp.
Scrivi i messaggi in ogni finestra in file in Cloud Storage.
Java
Python
Avvia la pipeline
Per avviare la pipeline, esegui il comando seguente:
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2 \ --serviceAccount=$SERVICE_ACCOUNT"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp \ --service_account_email=$SERVICE_ACCOUNT
Il comando precedente viene eseguito localmente e avvia un job Dataflow che viene eseguito nel cloud. Quando il comando restituisce JOB_MESSAGE_DETAILED: Workers
have started successfully
, esci dal programma locale utilizzando Ctrl+C
.
Osserva l'avanzamento di job e pipeline
Puoi osservare l'avanzamento del job nella console di Dataflow.
Apri la visualizzazione dei dettagli del job per vedere:
- Struttura del job
- Log job
- Metriche della fase
Potresti dover attendere qualche minuto per vedere i file di output in Cloud Storage.
In alternativa, utilizza la riga di comando riportata di seguito per controllare quali file sono stati scritti.
gsutil ls gs://${BUCKET_NAME}/samples/
L'output dovrebbe avere l'aspetto seguente:
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
Esegui la pulizia
Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa pagina, elimina il progetto Cloud con le risorse.
Elimina il job Cloud Scheduler.
gcloud scheduler jobs delete publisher-job --location=$REGION
Nella console di Dataflow, arresta il job. Annulla la pipeline senza svuotarla.
Eliminare l'argomento.
gcloud pubsub topics delete $TOPIC_ID
Elimina i file creati dalla pipeline.
gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*" gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
Rimuovi il bucket Cloud Storage.
gsutil rb gs://${BUCKET_NAME}
-
Elimina l'account di servizio:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
(Facoltativo) Revoca le credenziali di autenticazione che hai creato ed elimina il file delle credenziali locale.
gcloud auth application-default revoke
-
Facoltativo: revoca le credenziali dall'interfaccia a riga di comando gcloud.
gcloud auth revoke
Passaggi successivi
Se vuoi visualizzare i messaggi Pub/Sub in base a un timestamp personalizzato, puoi specificare il timestamp come attributo nel messaggio Pub/Sub e utilizzarlo con
withTimestampAttribute
di PubsubIO.Dai un'occhiata ai modelli Dataflow open source progettati per lo streaming.
Scopri di più sull'integrazione di Dataflow con Pub/Sub.
Dai un'occhiata a questo tutorial, che legge da Pub/Sub e scrive in BigQuery utilizzando i modelli Dataflow.
Per ulteriori informazioni sul windowing, consulta l'esempio di Apache Beam Mobile Gaming Pipeline.