Trasmetti il flusso di messaggi da Pub/Sub con Dataflow
Dataflow è un servizio completamente gestito per la trasformazione l'arricchimento dei dati in modalità flusso (in tempo reale) e batch con lo stesso affidabilità ed espressività. Fornisce una pipeline semplificata utilizzando l'SDK Apache Beam, che dispone di un set di windowing e primitive di analisi delle sessioni, nonché un ecosistema di fonti e connettori dei sink. Questa guida rapida mostra come utilizzare Dataflow per:
- Leggere i messaggi pubblicati in un argomento Pub/Sub
- Finestra (o raggruppamento) dei messaggi in base al timestamp
- Scrivi i messaggi in Cloud Storage
Questa guida rapida illustra l'utilizzo di Dataflow in Java e come Python. SQL . Questa guida rapida viene offerta anche come Tutorial di Google Cloud Skills Boost che offre credenziali temporanee per aiutarti a iniziare.
Puoi anche iniziare utilizzando Dataflow basato sull'interfaccia utente modelli se non intendi eseguire il trattamento personalizzato dei dati.
Prima di iniziare
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
configura il progetto Pub/Sub
-
Crea variabili per bucket, progetto e regione. I nomi dei bucket Cloud Storage devono essere univoci a livello globale. Seleziona uno Dataflow region vicino a in cui eseguirai i comandi di questa guida rapida. Il valore dell'attributo
REGION
deve essere un nome di regione valido. Per ulteriori informazioni su regioni e località, vedi Località di Dataflow.BUCKET_NAME=BUCKET_NAME PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=TOPIC_ID REGION=DATAFLOW_REGION SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
-
Crea un bucket Cloud Storage di proprietà di questo progetto:
gcloud storage buckets create gs://$BUCKET_NAME
-
Crea un argomento Pub/Sub in questo progetto:
gcloud pubsub topics create $TOPIC_ID
-
Crea un job Cloud Scheduler in questo progetto. Il job viene pubblicato un messaggio a un argomento Pub/Sub a intervalli di un minuto.
Se non esiste un'app App Engine per il progetto, questo passaggio ne creerà uno.
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
Avvia il job.
gcloud scheduler jobs run publisher-job --location=$REGION
-
Usa i comandi seguenti per clonare il repository della guida rapida ed esplorare alla directory codice campione:
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Trasmetti i messaggi da Pub/Sub a Cloud Storage
Esempio di codice
Questo codice campione utilizza Dataflow per:
- Leggere i messaggi Pub/Sub.
- Crea finestre (o raggruppa) i messaggi in intervalli di dimensioni fisse in base ai timestamp di pubblicazione.
Scrivi i messaggi in ogni finestra in file in Cloud Storage.
Java
Python
Avvia la pipeline
Per avviare la pipeline, esegui questo comando:
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2 \ --serviceAccount=$SERVICE_ACCOUNT"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp \ --service_account_email=$SERVICE_ACCOUNT
Il comando precedente viene eseguito in locale e avvia un job Dataflow
eseguito nel cloud. Quando il comando restituisce JOB_MESSAGE_DETAILED: Workers
have started successfully
, esci dal programma locale utilizzando Ctrl+C
.
Osserva l'avanzamento di job e pipeline
Puoi osservare l'avanzamento del job nella console Dataflow.
Apri la visualizzazione dei dettagli del job per vedere:
- Struttura lavorativa
- Log job
- Metriche di fase
Potrebbe essere necessario attendere qualche minuto prima di visualizzare i file di output di archiviazione ideale in Cloud Storage.
In alternativa, usa la riga di comando qui sotto per verificare quali file sono stati scritti fuori.
gcloud storage ls gs://${BUCKET_NAME}/samples/
L'output dovrebbe essere simile al seguente:
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
Esegui la pulizia
Per evitare che al tuo account Google Cloud vengano addebitati costi per le risorse utilizzate in questa pagina, elimina il progetto Google Cloud Google Cloud.
Elimina il job Cloud Scheduler.
gcloud scheduler jobs delete publisher-job --location=$REGION
Arresta il job nella console Dataflow. Annulla la pipeline senza svuotarlo.
Elimina l'argomento.
gcloud pubsub topics delete $TOPIC_ID
Elimina i file creati dalla pipeline.
gcloud storage rm "gs://${BUCKET_NAME}/samples/output*" --recursive --continue-on-error gcloud storage rm "gs://${BUCKET_NAME}/temp/*" --recursive --continue-on-error
Rimuovi il bucket Cloud Storage.
gcloud storage rm gs://${BUCKET_NAME} --recursive
-
Elimina l'account di servizio:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
Passaggi successivi
Se vuoi visualizzare i messaggi Pub/Sub in una finestra timestamp, puoi specificare il timestamp come attributo nella messaggio Pub/Sub, quindi utilizzare il timestamp personalizzato
withTimestampAttribute
di PubsubIO.Dai un'occhiata ai modelli Dataflow open source di Google progettato per lo streaming.
Scopri di più su come Dataflow si integra con in Pub/Sub.
Guarda questo tutorial che legge da Pub/Sub e scrive BigQuery utilizzando i modelli Dataflow Flex.
Per ulteriori informazioni sul windowing, vedi l'esempio della pipeline di gioco mobile di Apache Beam.