Trasmetti il flusso di messaggi Pub/Sub Lite con Dataflow
In alternativa alla scrittura ed esecuzione dei tuoi programmi di elaborazione dati, puoi utilizzare Dataflow con il connettore I/O Pub/Sub Lite per Apache Beam. Dataflow è un servizio completamente gestito per la trasformazione arricchimento dei dati in modalità flusso (in tempo reale) e batch con pari affidabilità ed espressività. Esegue in modo affidabile i programmi sviluppati utilizzando Apache Beam che ha un set estensibile di potenti astrazioni di elaborazione stateful, e connettori I/O ad altri sistemi di flussi di dati e batch.
Questa guida rapida mostra come scrivere una pipeline Apache Beam che:
- Leggere i messaggi da Pub/Sub Lite
- Finestra (o raggruppamento) dei messaggi in base al timestamp di pubblicazione
- Scrivere i messaggi in Cloud Storage
Inoltre, illustra come:
- Invia la pipeline da eseguire su Dataflow
- Crea un modello flessibile Dataflow dalla tua pipeline
Questo tutorial richiede Maven, ma è anche possibile convertire l'esempio progetto da Maven a Gradle. Per scoprire di più, consulta la sezione Facoltativo: converti da Maven a Gradle.
Prima di iniziare
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:
gcloud services enable pubsublite.googleapis.com
dataflow.googleapis.com storage-api.googleapis.com logging.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:
gcloud services enable pubsublite.googleapis.com
dataflow.googleapis.com storage-api.googleapis.com logging.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
Configura il progetto Pub/Sub Lite
Creare variabili per bucket Cloud Storage, progetti nella regione Dataflow. I nomi dei bucket Cloud Storage devono essere univoci a livello globale. La regione Dataflow deve essere una regione valida in cui puoi eseguire il job. Per ulteriori informazioni su regioni e località, vedi Località di Dataflow.
export PROJECT_ID=$(gcloud config get-value project)
export SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
export BUCKET=BUCKET_NAME
export DATAFLOW_REGION=DATAFLOW_REGION
Crea un bucket Cloud Storage di proprietà di questo progetto:
gcloud storage buckets create gs://$BUCKET
Creare un argomento e una sottoscrizione Pub/Sub Lite a livello di zona
Crea un argomento Lite Pub/Sub Lite e una sottoscrizione Lite a livello di zona.
Per la località Lite, scegli una località Pub/Sub Lite supportata. Devi anche
specificare una zona per la regione. Ad esempio, us-central1-a
.
export TOPIC=LITE_TOPIC_ID
export SUBSCRIPTION=LITE_SUBSCRIPTION_ID
export LITE_LOCATION=LITE_LOCATION
gcloud pubsub lite-topics create $TOPIC \ --location=$LITE_LOCATION \ --partitions=1 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$LITE_LOCATION \ --topic=$TOPIC \ --starting-offset=beginning
Trasmetti messaggi a Dataflow
Scarica il codice di esempio della guida di avvio rapido
Clona il repository della guida rapida e vai alla directory del codice campione.
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics
Codice di esempio
Questo codice campione utilizza Dataflow per:
- Leggi i messaggi da una sottoscrizione Pub/Sub Lite come origine illimitata.
- Raggruppa i messaggi in base ai timestamp di pubblicazione utilizzando finestre temporali fisse e ai attivatore predefinito.
Scrive i messaggi raggruppati in file su Cloud Storage.
Java
Prima di eseguire questo esempio, segui le istruzioni di configurazione di Java riportate in Librerie client Pub/Sub Lite.
avvia la pipeline Dataflow
Per avviare la pipeline in Dataflow, esegui il seguente comando:
mvn compile exec:java \
-Dexec.mainClass=examples.PubsubliteToGcs \
-Dexec.args=" \
--subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
--output=gs://$BUCKET/samples/output \
--windowSize=1 \
--project=$PROJECT_ID \
--region=$DATAFLOW_REGION \
--tempLocation=gs://$BUCKET/temp \
--runner=DataflowRunner \
--serviceAccount=$SERVICE_ACCOUNT"
Il comando precedente avvia un job Dataflow. Segui il link nell'output della console per accedere al job in Dataflow nella console di monitoraggio.
Osservare l'avanzamento del lavoro
Osserva l'avanzamento del job nella console Dataflow.
Apri la visualizzazione dei dettagli del job per vedere:
- Grafico job
- Dettagli esecuzione
- Metriche del job
Pubblica alcuni messaggi nell'argomento Lite.
gcloud pubsub lite-topics publish $TOPIC \
--location=$LITE_LOCATION \
--message="Hello World!"
Potrebbe essere necessario attendere qualche minuto prima di visualizzare i messaggi nei log dei worker.
Utilizza il comando seguente per controllare quali file sono stati scritti in Cloud Storage.
gcloud storage ls "gs://$BUCKET/samples/"
L'output dovrebbe essere simile al seguente:
gs://$BUCKET/samples/output-19:41-19:42-0-of-1
gs://$BUCKET/samples/output-19:47-19:48-0-of-1
gs://$BUCKET/samples/output-19:48-19:49-0-of-1
Utilizza il comando seguente per esaminare i contenuti di un file:
gcloud storage cat "gs://$BUCKET/samples/your-filename"
(Facoltativo) Crea un modello Dataflow
Se vuoi, puoi creare un modello flessibile Dataflow personalizzato in base alla tua pipeline. I modelli Dataflow ti consentono di eseguire job con diverse i parametri di input dalla console Google Cloud o dalla riga di comando senza devi configurare un ambiente di sviluppo Java completo.
Crea un JAR grasso che includa tutte le dipendenze della pipeline. Tu dovrebbe vedere
target/pubsublite-streaming-bundled-1.0.jar
dopo il comando è stata eseguita.mvn clean package -DskipTests=true
Specifica i nomi e le posizioni per il file e il contenitore dei modelli dell'immagine.
export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
Crea un modello flessibile personalizzato. Nell'esempio è stato fornito un file
metadata.json
obbligatorio, contenente le specifiche necessarie per eseguire il job.gcloud dataflow flex-template build $TEMPLATE_PATH \ --image-gcr-path $TEMPLATE_IMAGE \ --sdk-language "JAVA" \ --flex-template-base-image "JAVA11" \ --metadata-file "metadata.json" \ --jar "target/pubsublite-streaming-bundled-1.0.jar" \ --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
Esegui un job utilizzando il modello flessibile personalizzato.
Console
Inserisci un nome job.
Inserisci la regione Dataflow.
Scegli il modello personalizzato.
Inserisci il percorso del modello.
Inserisci i parametri richiesti.
Fai clic su Esegui job.
gcloud
gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
--template-file-gcs-location $TEMPLATE_PATH \
--parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
--parameters output="gs://$BUCKET/samples/template-output" \
--parameters windowSize=1 \
--region $DATAFLOW_REGION \
--serviceAccount=$SERVICE_ACCOUNT
Esegui la pulizia
Per evitare che al tuo account Google Cloud vengano addebitati costi per le risorse utilizzate in questa pagina, elimina il progetto Google Cloud Google Cloud.
Nella console Dataflow, arresta il job. Annullare la pipeline instead of draining it.
Elimina l'argomento e la sottoscrizione.
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
Elimina i file creati dalla pipeline.
gcloud storage rm "gs://$BUCKET/samples/*" --recursive --continue-on-error
gcloud storage rm "gs://$BUCKET/temp/*" --recursive --continue-on-error
Elimina l'immagine e il file del modello, se presenti.
gcloud container images delete $TEMPLATE_IMAGE
gcloud storage rm $TEMPLATE_PATH
Rimuovi il bucket Cloud Storage.
gcloud storage rm gs://$BUCKET --recursive
-
Elimina l'account di servizio:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
Passaggi successivi
Scopri di più sulla configurazione dei modelli flessibili Dataflow.
Scopri le pipeline in modalità flusso di Dataflow.