Trasmetti il flusso di messaggi da Pub/Sub con Dataflow

Dataflow è un servizio completamente gestito per la trasformazione l'arricchimento dei dati in modalità flusso (in tempo reale) e batch con lo stesso affidabilità ed espressività. Fornisce una pipeline semplificata utilizzando l'SDK Apache Beam, che dispone di un set di windowing e primitive di analisi delle sessioni, nonché un ecosistema di fonti e connettori dei sink. Questa guida rapida mostra come utilizzare Dataflow per:

  • Leggere i messaggi pubblicati in un argomento Pub/Sub
  • Finestra (o raggruppamento) dei messaggi in base al timestamp
  • Scrivi i messaggi in Cloud Storage

Questa guida rapida illustra l'utilizzo di Dataflow in Java e come Python. SQL . Questa guida rapida viene offerta anche come Tutorial di Google Cloud Skills Boost che offre credenziali temporanee per aiutarti a iniziare.

Puoi anche iniziare utilizzando Dataflow basato sull'interfaccia utente modelli se non intendi eseguire il trattamento personalizzato dei dati.

Prima di iniziare

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. Installa Google Cloud CLI.
  3. Per initialize gcloud CLI, esegui questo comando:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  6. Abilita le API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler.

    gcloud services enable
  7. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
    3. Grant the required role to the principal that will attach the service account to other resources.

      gcloud iam service-accounts add-iam-policy-binding --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • USER_EMAIL: the email address for a Google Account
  14. Crea credenziali di autenticazione locali per il tuo Account Google:

    gcloud auth application-default login

configura il progetto Pub/Sub

  1. Crea variabili per bucket, progetto e regione. I nomi dei bucket Cloud Storage devono essere univoci a livello globale. Seleziona uno Dataflow region vicino a in cui eseguirai i comandi di questa guida rapida. Il valore dell'attributo REGION deve essere un nome di regione valido. Per ulteriori informazioni su regioni e località, vedi Località di Dataflow.

    PROJECT_ID=$(gcloud config get-value project)
  2. Crea un bucket Cloud Storage di proprietà di questo progetto:

    gcloud storage buckets create gs://$BUCKET_NAME
  3. Crea un argomento Pub/Sub in questo progetto:

    gcloud pubsub topics create $TOPIC_ID
  4. Crea un job Cloud Scheduler in questo progetto. Il job viene pubblicato un messaggio a un argomento Pub/Sub a intervalli di un minuto.

    Se non esiste un'app App Engine per il progetto, questo passaggio ne creerà uno.

    gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
        --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION

    Avvia il job.

    gcloud scheduler jobs run publisher-job --location=$REGION
  5. Usa i comandi seguenti per clonare il repository della guida rapida ed esplorare alla directory codice campione:


    git clone
    cd java-docs-samples/pubsub/streaming-analytics


    git clone
    cd python-docs-samples/pubsub/streaming-analytics
    pip install -r requirements.txt  # Install Apache Beam dependencies

Trasmetti messaggi da Pub/Sub a Cloud Storage

Esempio di codice

Questo codice campione utilizza Dataflow per:

  • Leggere i messaggi Pub/Sub.
  • Crea finestre (o raggruppa) i messaggi in intervalli di dimensioni fisse in base ai timestamp di pubblicazione.
  • Scrivere i messaggi in ogni finestra nei file in Cloud Storage.


import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

public class PubSubToGcs {
   * Define your own configuration options. Add your own arguments to be processed
   * by the command-line parser, and specify default values for them.
  public interface PubSubToGcsOptions extends StreamingOptions {
    @Description("The Cloud Pub/Sub topic to read from.")
    String getInputTopic();

    void setInputTopic(String value);

    @Description("Output file's window size in number of minutes.")
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Path of the output file including its filename prefix.")
    String getOutput();

    void setOutput(String value);

  public static void main(String[] args) throws IOException {
    // The maximum number of shards when writing output.
    int numShards = 1;

    PubSubToGcsOptions options =


    Pipeline pipeline = Pipeline.create(options);

        // 1) Read string messages from a Pub/Sub topic.
        .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
        // 2) Group the messages into fixed-sized minute intervals.
        // 3) Write one file to GCS for every window of messages.
        .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline and wait until it finishes running.;


import argparse
from datetime import datetime
import logging
import random

from apache_beam import (
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows

class GroupMessagesByFixedWindows(PTransform):
    """A composite transform that groups Pub/Sub messages based on publish time
    and outputs a list of tuples, each containing a message and its publish time.

    def __init__(self, window_size, num_shards=5):
        # Set window size to 60 seconds.
        self.window_size = int(window_size * 60)
        self.num_shards = num_shards

    def expand(self, pcoll):
        return (
            # Bind window info to each element using element timestamp (or publish time).
            | "Window into fixed intervals"
            >> WindowInto(FixedWindows(self.window_size))
            | "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
            # Assign a random key to each windowed element based on the number of shards.
            | "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1))
            # Group windowed elements by key. All the elements in the same window must fit
            # memory for this. If not, you need to use `beam.util.BatchElements`.
            | "Group by key" >> GroupByKey()

class AddTimestamp(DoFn):
    def process(self, element, publish_time=DoFn.TimestampParam):
        """Processes each windowed element by extracting the message body and its
        publish time into a tuple.
        yield (
                "%Y-%m-%d %H:%M:%S.%f"

class WriteToGCS(DoFn):
    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, key_value, window=DoFn.WindowParam):
        """Write messages in a batch to Google Cloud Storage."""

        ts_format = "%H:%M"
        window_start = window.start.to_utc_datetime().strftime(ts_format)
        window_end = window.end.to_utc_datetime().strftime(ts_format)
        shard_id, batch = key_value
        filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])

        with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
            for message_body, publish_time in batch:

def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
    # Set `save_main_session` to True so DoFns can access globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True

    with Pipeline(options=pipeline_options) as pipeline:
            # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam
            # binds the publish time returned by the Pub/Sub server for each message
            # to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.
            | "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
            | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
            | "Write to GCS" >> ParDo(WriteToGCS(output_path))

if __name__ == "__main__":

    parser = argparse.ArgumentParser()
        help="The Cloud Pub/Sub topic to read from."
        help="Output file's window size in minutes.",
        help="Path of the output GCS file including the prefix.",
        help="Number of shards to use when writing windowed elements to GCS.",
    known_args, pipeline_args = parser.parse_known_args()


Avvia la pipeline

Per avviare la pipeline, esegui questo comando:


mvn compile exec:java \
  -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \
  -Dexec.cleanupDaemonThreads=false \
  -Dexec.args=" \
    --project=$PROJECT_ID \
    --region=$REGION \
    --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \
    --output=gs://$BUCKET_NAME/samples/output \
    --gcpTempLocation=gs://$BUCKET_NAME/temp \
    --runner=DataflowRunner \
    --windowSize=2 \


python \
  --project=$PROJECT_ID \
  --region=$REGION \
  --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \
  --output_path=gs://$BUCKET_NAME/samples/output \
  --runner=DataflowRunner \
  --window_size=2 \
  --num_shards=2 \
  --temp_location=gs://$BUCKET_NAME/temp \

Il comando precedente viene eseguito in locale e avvia un job Dataflow eseguito nel cloud. Quando il comando restituisce JOB_MESSAGE_DETAILED: Workers have started successfully, esci dal programma locale utilizzando Ctrl+C.

Osserva l'avanzamento di job e pipeline

Puoi osservare l'avanzamento del job nella console Dataflow.

Vai alla console Dataflow

Osserva i progressi del job

Apri la visualizzazione dei dettagli del job per vedere:

  • Struttura lavorativa
  • Log job
  • Metriche di fase

Osserva i progressi del job

Potrebbe essere necessario attendere qualche minuto prima di visualizzare i file di output di archiviazione ideale in Cloud Storage.

Osserva i progressi del job

In alternativa, usa la riga di comando qui sotto per verificare quali file sono stati scritti fuori.

gcloud storage ls gs://${BUCKET_NAME}/samples/

L'output dovrebbe essere simile al seguente:





Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi per le risorse utilizzate in questa pagina, elimina il progetto Google Cloud Google Cloud.

  1. Elimina il job Cloud Scheduler.

    gcloud scheduler jobs delete publisher-job --location=$REGION
  2. Arresta il job nella console Dataflow. Annulla la pipeline senza svuotarlo.

  3. Elimina l'argomento.

    gcloud pubsub topics delete $TOPIC_ID
  4. Elimina i file creati dalla pipeline.

    gcloud storage rm "gs://${BUCKET_NAME}/samples/output*" --recursive --continue-on-error
    gcloud storage rm "gs://${BUCKET_NAME}/temp/*" --recursive --continue-on-error
  5. Rimuovi il bucket Cloud Storage.

    gcloud storage rm gs://${BUCKET_NAME} --recursive

  6. Elimina l'account di servizio:
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  7. Facoltativo: revoca le credenziali di autenticazione che hai creato ed elimina il file delle credenziali locale.

    gcloud auth application-default revoke
  8. Facoltativo: revoca le credenziali dallgcloud CLI.

    gcloud auth revoke

Passaggi successivi