Diffuser des messages en continu depuis Pub/Sub à l'aide de Dataflow

Dataflow est un service entièrement géré permettant de transformer et d'enrichir les données par flux (en temps réel) et par lots avec un niveau identique de fiabilité et d'expressivité. Il fournit un environnement de développement de pipeline simplifié à l'aide du SDK Apache Beam, qui offre de nombreuses primitives de fenêtrage et d'analyse de sessions, ainsi qu'un écosystème de connecteurs de sources et de récepteurs. Ce guide de démarrage rapide vous explique comment effectuer les opérations suivantes à l'aide de Dataflow :

  • Lire les messages publiés dans un sujet Pub/Sub
  • Effectuer le fenêtrage (ou le regroupement) de messages par horodatage
  • Écrire les messages dans Cloud Storage

Ce guide de démarrage rapide vous explique comment utiliser Dataflow en Java et Python. SQL est également compatible. Ce guide de démarrage rapide est également proposé en tant que tutoriel Google Cloud Skills, qui fournit des identifiants temporaires pour vous aider à démarrer.

Vous pouvez également commencer par utiliser des modèles Dataflow basés sur l'interface utilisateur si vous n'avez pas l'intention d'effectuer un traitement de données personnalisé.

Avant de commencer

  1. Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
  2. Installez Google Cloud CLI.
  3. Pour initialiser gcloudCLI, exécutez la commande suivante :

    gcloud init
  4. Créez ou sélectionnez un projet Google Cloud.

    • Créez un projet Google Cloud :

      gcloud projects create PROJECT_ID

      Remplacez PROJECT_ID par le nom du projet Google Cloud que vous créez.

    • Sélectionnez le projet Google Cloud que vous avez créé :

      gcloud config set project PROJECT_ID

      Remplacez PROJECT_ID par le nom de votre projet Google Cloud.

  5. Vérifiez que la facturation est activée pour votre projet Google Cloud.

  6. Activer les API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler :

    gcloud services enable dataflow.googleapis.com  compute.googleapis.com  logging.googleapis.com  storage-component.googleapis.com  storage-api.googleapis.com  pubsub.googleapis.com  cloudresourcemanager.googleapis.com  cloudscheduler.googleapis.com
  7. Configurez l'authentification :

    1. Créez le compte de service :

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Remplacez SERVICE_ACCOUNT_NAME par le nom que vous souhaitez donner au compte de service.

    2. Attribuez des rôles au compte de service. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants : roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin :

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Remplacez les éléments suivants :

      • SERVICE_ACCOUNT_NAME : nom du compte de service.
      • PROJECT_ID : ID du projet dans lequel vous avez créé le compte de service.
      • ROLE : rôle à accorder
    3. Attribuez à votre compte Google un rôle vous permettant d'utiliser les rôles du compte de service et d'associer le compte de service à d'autres ressources :

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Remplacez les éléments suivants :

      • SERVICE_ACCOUNT_NAME : nom du compte de service.
      • PROJECT_ID : ID du projet dans lequel vous avez créé le compte de service.
      • USER_EMAIL: adresse e-mail de votre compte Google
  8. Installez Google Cloud CLI.
  9. Pour initialiser gcloudCLI, exécutez la commande suivante :

    gcloud init
  10. Créez ou sélectionnez un projet Google Cloud.

    • Créez un projet Google Cloud :

      gcloud projects create PROJECT_ID

      Remplacez PROJECT_ID par le nom du projet Google Cloud que vous créez.

    • Sélectionnez le projet Google Cloud que vous avez créé :

      gcloud config set project PROJECT_ID

      Remplacez PROJECT_ID par le nom de votre projet Google Cloud.

  11. Vérifiez que la facturation est activée pour votre projet Google Cloud.

  12. Activer les API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler :

    gcloud services enable dataflow.googleapis.com  compute.googleapis.com  logging.googleapis.com  storage-component.googleapis.com  storage-api.googleapis.com  pubsub.googleapis.com  cloudresourcemanager.googleapis.com  cloudscheduler.googleapis.com
  13. Configurez l'authentification :

    1. Créez le compte de service :

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Remplacez SERVICE_ACCOUNT_NAME par le nom que vous souhaitez donner au compte de service.

    2. Attribuez des rôles au compte de service. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants : roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin :

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Remplacez les éléments suivants :

      • SERVICE_ACCOUNT_NAME : nom du compte de service.
      • PROJECT_ID : ID du projet dans lequel vous avez créé le compte de service.
      • ROLE : rôle à accorder
    3. Attribuez à votre compte Google un rôle vous permettant d'utiliser les rôles du compte de service et d'associer le compte de service à d'autres ressources :

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Remplacez les éléments suivants :

      • SERVICE_ACCOUNT_NAME : nom du compte de service.
      • PROJECT_ID : ID du projet dans lequel vous avez créé le compte de service.
      • USER_EMAIL: adresse e-mail de votre compte Google
  14. Créez des identifiants d'authentification locaux pour votre compte Google :

    gcloud auth application-default login

Configurer votre projet Pub/Sub

  1. Créez des variables pour votre bucket, votre projet et votre région. Les noms des buckets Cloud Storage doivent être uniques. Sélectionnez une région Dataflow proche de l'emplacement où vous exécutez les commandes dans ce guide de démarrage rapide. La valeur de la variable REGION doit être un nom de région valide. Pour en savoir plus sur les régions et les emplacements, consultez la page Emplacements Dataflow.

    BUCKET_NAME=BUCKET_NAME
    PROJECT_ID=$(gcloud config get-value project)
    TOPIC_ID=TOPIC_ID
    REGION=DATAFLOW_REGION
    SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
    
  2. Créez un bucket Cloud Storage appartenant à ce projet :

    gsutil mb gs://$BUCKET_NAME
  3. Créez un sujet Pub/Sub dans ce projet :

    gcloud pubsub topics create $TOPIC_ID
  4. Créez une tâche Cloud Scheduler dans ce projet. La tâche publie un message sur un sujet Cloud Pub/Sub chaque minute.

    Si une application App Engine n'existe pas pour le projet, cette étape va en créer une.

    gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
        --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION

    Démarrez la tâche.

    gcloud scheduler jobs run publisher-job --location=$REGION
  5. Utilisez la commande suivante pour cloner le dépôt du guide de démarrage rapide et accéder au répertoire de l'exemple de code :

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
    cd java-docs-samples/pubsub/streaming-analytics

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsub/streaming-analytics
    pip install -r requirements.txt  # Install Apache Beam dependencies

Diffuser des messages depuis Pub/Sub vers Cloud Storage

Exemple de code

Cet exemple de code utilise Dataflow pour effectuer les opérations suivantes :

  • Lire les messages Pub/Sub
  • Effectuer le fenêtrage (ou le regroupement) de messages en intervalles fixes par données d'horodatage
  • Écrire les messages dans chaque fenêtre dans des fichiers dans Cloud Storage

Java


import java.io.IOException;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

public class PubSubToGcs {
  /*
   * Define your own configuration options. Add your own arguments to be processed
   * by the command-line parser, and specify default values for them.
   */
  public interface PubSubToGcsOptions extends StreamingOptions {
    @Description("The Cloud Pub/Sub topic to read from.")
    @Required
    String getInputTopic();

    void setInputTopic(String value);

    @Description("Output file's window size in number of minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Path of the output file including its filename prefix.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  public static void main(String[] args) throws IOException {
    // The maximum number of shards when writing output.
    int numShards = 1;

    PubSubToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);

    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        // 1) Read string messages from a Pub/Sub topic.
        .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
        // 2) Group the messages into fixed-sized minute intervals.
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        // 3) Write one file to GCS for every window of messages.
        .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline and wait until it finishes running.
    pipeline.run().waitUntilFinish();
  }
}

Python

import argparse
from datetime import datetime
import logging
import random

from apache_beam import (
    DoFn,
    GroupByKey,
    io,
    ParDo,
    Pipeline,
    PTransform,
    WindowInto,
    WithKeys,
)
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows

class GroupMessagesByFixedWindows(PTransform):
    """A composite transform that groups Pub/Sub messages based on publish time
    and outputs a list of tuples, each containing a message and its publish time.
    """

    def __init__(self, window_size, num_shards=5):
        # Set window size to 60 seconds.
        self.window_size = int(window_size * 60)
        self.num_shards = num_shards

    def expand(self, pcoll):
        return (
            pcoll
            # Bind window info to each element using element timestamp (or publish time).
            | "Window into fixed intervals"
            >> WindowInto(FixedWindows(self.window_size))
            | "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
            # Assign a random key to each windowed element based on the number of shards.
            | "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1))
            # Group windowed elements by key. All the elements in the same window must fit
            # memory for this. If not, you need to use `beam.util.BatchElements`.
            | "Group by key" >> GroupByKey()
        )

class AddTimestamp(DoFn):
    def process(self, element, publish_time=DoFn.TimestampParam):
        """Processes each windowed element by extracting the message body and its
        publish time into a tuple.
        """
        yield (
            element.decode("utf-8"),
            datetime.utcfromtimestamp(float(publish_time)).strftime(
                "%Y-%m-%d %H:%M:%S.%f"
            ),
        )

class WriteToGCS(DoFn):
    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, key_value, window=DoFn.WindowParam):
        """Write messages in a batch to Google Cloud Storage."""

        ts_format = "%H:%M"
        window_start = window.start.to_utc_datetime().strftime(ts_format)
        window_end = window.end.to_utc_datetime().strftime(ts_format)
        shard_id, batch = key_value
        filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])

        with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
            for message_body, publish_time in batch:
                f.write(f"{message_body},{publish_time}\n".encode())

def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
    # Set `save_main_session` to True so DoFns can access globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )

    with Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam
            # binds the publish time returned by the Pub/Sub server for each message
            # to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.
            # https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub
            | "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
            | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
            | "Write to GCS" >> ParDo(WriteToGCS(output_path))
        )

if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_topic",
        help="The Cloud Pub/Sub topic to read from."
        '"projects/<PROJECT_ID>/topics/<TOPIC_ID>".',
    )
    parser.add_argument(
        "--window_size",
        type=float,
        default=1.0,
        help="Output file's window size in minutes.",
    )
    parser.add_argument(
        "--output_path",
        help="Path of the output GCS file including the prefix.",
    )
    parser.add_argument(
        "--num_shards",
        type=int,
        default=5,
        help="Number of shards to use when writing windowed elements to GCS.",
    )
    known_args, pipeline_args = parser.parse_known_args()

    run(
        known_args.input_topic,
        known_args.output_path,
        known_args.window_size,
        known_args.num_shards,
        pipeline_args,
    )

Démarrer le pipeline

Pour démarrer le pipeline, exécutez la commande suivante :

Java

mvn compile exec:java \
  -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \
  -Dexec.cleanupDaemonThreads=false \
  -Dexec.args=" \
    --project=$PROJECT_ID \
    --region=$REGION \
    --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \
    --output=gs://$BUCKET_NAME/samples/output \
    --gcpTempLocation=gs://$BUCKET_NAME/temp \
    --runner=DataflowRunner \
    --windowSize=2 \
    --serviceAccount=$SERVICE_ACCOUNT"

Python

python PubSubToGCS.py \
  --project=$PROJECT_ID \
  --region=$REGION \
  --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \
  --output_path=gs://$BUCKET_NAME/samples/output \
  --runner=DataflowRunner \
  --window_size=2 \
  --num_shards=2 \
  --temp_location=gs://$BUCKET_NAME/temp \
  --service_account_email=$SERVICE_ACCOUNT

La commande précédente s'exécute localement et lance une tâche Dataflow dans le cloud. Lorsque la commande renvoie JOB_MESSAGE_DETAILED: Workers have started successfully, quittez le programme local à l'aide de Ctrl+C.

Observer la progression de la tâche et du pipeline

Vous pouvez observer la progression de la tâche dans la console Dataflow.

Accéder à la console Dataflow

Observer la progression de la tâche

Ouvrez la vue "Détails de la tâche" pour afficher les éléments suivants :

  • Structure de la tâche
  • Journaux de la tâche
  • Métriques de l'étape

Observer la progression de la tâche

Vous devrez peut-être patienter quelques minutes avant que les fichiers de sortie ne s'affichent dans Cloud Storage.

Observer la progression de la tâche

Vous pouvez également utiliser la ligne de commande ci-dessous pour afficher les fichiers qui ont été écrits.

gsutil ls gs://${BUCKET_NAME}/samples/

Le résultat doit se présenter sous la forme suivante :

Java

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1

Python

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1

Effectuer un nettoyage

Pour éviter que les ressources utilisées sur cette page ne soient facturées sur votre compte Google Cloud, supprimez le projet Google Cloud contenant les ressources.

  1. Supprimez la tâche Cloud Scheduler.

    gcloud scheduler jobs delete publisher-job --location=$REGION
    
  2. Dans la console Dataflow, arrêtez la tâche. Annulez le pipeline sans le drainer.

  3. Supprimez le sujet.

    gcloud pubsub topics delete $TOPIC_ID
    
  4. Supprimez les fichiers créés par le pipeline.

    gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*"
    gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
    
  5. Supprimez le bucket Cloud Storage.

    gsutil rb gs://${BUCKET_NAME}
    

  6. Supprimez le compte de service :
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  7. Facultatif : Révoquez les identifiants d'authentification que vous avez créés et supprimez le fichier d'identifiants local.

    gcloud auth application-default revoke
  8. Facultatif : Révoquez les identifiants de la CLI gcloud.

    gcloud auth revoke

Étapes suivantes