Cette page a été traduite par l'API Cloud Translation.
Switch to English

Démarrage rapide : traitement par flux avec Dataflow

Dataflow est un service entièrement géré permettant de transformer et d'enrichir les données par flux (en temps réel) et par lots avec un niveau identique de fiabilité et d'expressivité. Il fournit un environnement de développement de pipeline simplifié à l'aide du SDK Apache Beam, qui offre de nombreuses primitives de fenêtrage et d'analyse de sessions, ainsi qu'un écosystème de connecteurs de sources et de récepteurs. Ce guide de démarrage rapide vous explique comment effectuer les opérations suivantes à l'aide de Dataflow :

  • Lire les messages publiés dans un sujet Pub/Sub
  • Effectuer le fenêtrage (ou le regroupement) de messages par horodatage
  • Écrire les messages dans Cloud Storage

Ce guide de démarrage rapide vous explique comment utiliser Dataflow en Java et Python. SQL est également compatible.

Vous pouvez également commencer par utiliser des modèles Dataflow basés sur l'interface utilisateur si vous n'avez pas l'intention d'effectuer un traitement de données personnalisé.

Avant de commencer

  1. Suivez les instructions pour installer et initialiser le SDK Cloud.
  2. Activez la facturation pour votre projet.
  3. Pour terminer ce démarrage rapide, vous devez activer les API suivantes : Compute Engine, la suite des opérations de Google Cloud, Cloud Storage, Cloud Storage JSON, Pub/Sub, Cloud Scheduler, Resource Manager et App Engine.

    Activer les API

    L'affichage des API dans la console peut prendre quelques instants.

  4. Créez une clé de compte de service :

    Créer une clé de compte de service

    1. Dans la liste Compte de service, sélectionnez Nouveau compte de service.
    2. Saisissez un nom dans le champ Nom du compte de service.
    3. Dans la liste Rôle, sélectionnez Projet > Propriétaire.
    4. Cliquez sur Créer.

    La clé est envoyée dans le dossier "Téléchargements" par défaut de votre navigateur.

  5. Définissez la variable d'environnement GOOGLE_APPLICATION_CREDENTIALS pour qu'elle pointe vers la clé du compte de service.

    export GOOGLE_APPLICATION_CREDENTIALS=path/to/my/credentials.json
    
  6. Créez des variables pour votre bucket, votre projet et votre région. Les noms des buckets Cloud Storage doivent être uniques. Sélectionnez une région Dataflow proche de l'endroit où vous exécutez les commandes dans ce guide de démarrage rapide.

    BUCKET_NAME=your-bucket-name
    PROJECT_ID=$(gcloud config get-value project)
    TOPIC_ID=your-topic-id
    REGION=dataflow-region
    
  7. Créez un bucket Cloud Storage appartenant à ce projet :

    gsutil mb gs://$BUCKET_NAME
    
  8. Créez un sujet Pub/Sub dans ce projet :

    gcloud pubsub topics create $TOPIC_ID
    
  9. Créez une tâche Cloud Scheduler dans ce projet. La tâche publie un message sur un sujet Cloud Pub/Sub chaque minute.

    Si une application App Engine n'existe pas pour le projet, cette étape va en créer une.

    gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
     --topic=$TOPIC_ID --message-body="Hello!"
    

    Démarrez la tâche.

    gcloud scheduler jobs run publisher-job
    
  10. Utilisez la commande suivante pour cloner le dépôt du guide de démarrage rapide et accéder au répertoire de l'exemple de code :

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
    cd java-docs-samples/pubsub/streaming-analytics
    

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsub/streaming-analytics
    pip install -r requirements.txt  # Install Apache Beam dependencies
    

Diffuser des messages depuis Pub/Sub vers Cloud Storage

Exemple de code

Cet exemple de code utilise Dataflow pour effectuer les opérations suivantes :

  • Lire les messages Pub/Sub
  • Effectuer le fenêtrage (ou le regroupement) de messages en intervalles fixes par données d'horodatage
  • Écrire les messages dans chaque fenêtre dans des fichiers dans Cloud Storage

Java


import java.io.IOException;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

public class PubSubToGcs {
  /*
   * Define your own configuration options. Add your own arguments to be processed
   * by the command-line parser, and specify default values for them.
   */
  public interface PubSubToGcsOptions extends PipelineOptions, StreamingOptions {
    @Description("The Cloud Pub/Sub topic to read from.")
    @Required
    String getInputTopic();

    void setInputTopic(String value);

    @Description("Output file's window size in number of minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Path of the output file including its filename prefix.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  public static void main(String[] args) throws IOException {
    // The maximum number of shards when writing output.
    int numShards = 1;

    PubSubToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);

    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        // 1) Read string messages from a Pub/Sub topic.
        .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
        // 2) Group the messages into fixed-sized minute intervals.
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        // 3) Write one file to GCS for every window of messages.
        .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline and wait until it finishes running.
    pipeline.run().waitUntilFinish();
  }
}

Python

import argparse
from datetime import datetime
import logging
import random

from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows

class GroupMessagesByFixedWindows(PTransform):
    """A composite transform that groups Pub/Sub messages based on publish time
    and outputs a list of tuples, each containing a message and its publish time.
    """

    def __init__(self, window_size, num_shards=5):
        # Set window size to 60 seconds.
        self.window_size = int(window_size * 60)
        self.num_shards = num_shards

    def expand(self, pcoll):
        return (
            pcoll
            # Bind window info to each element using element timestamp (or publish time).
            | "Window into fixed intervals"
            >> WindowInto(FixedWindows(self.window_size))
            | "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
            # Assign a random key to each windowed element based on the number of shards.
            | "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1))
            # Group windowed elements by key. All the elements in the same window must fit
            # memory for this. If not, you need to use `beam.util.BatchElements`.
            | "Group by key" >> GroupByKey()
        )

class AddTimestamp(DoFn):
    def process(self, element, publish_time=DoFn.TimestampParam):
        """Processes each windowed element by extracting the message body and its
        publish time into a tuple.
        """
        yield (
            element.decode("utf-8"),
            datetime.utcfromtimestamp(float(publish_time)).strftime(
                "%Y-%m-%d %H:%M:%S.%f"
            ),
        )

class WriteToGCS(DoFn):
    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, key_value, window=DoFn.WindowParam):
        """Write messages in a batch to Google Cloud Storage."""

        ts_format = "%H:%M"
        window_start = window.start.to_utc_datetime().strftime(ts_format)
        window_end = window.end.to_utc_datetime().strftime(ts_format)
        shard_id, batch = key_value
        filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])

        with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
            for message_body, publish_time in batch:
                f.write(f"{message_body},{publish_time}\n".encode("utf-8"))

def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
    # Set `save_main_session` to True so DoFns can access globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )

    with Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam
            # binds the publish time returned by the Pub/Sub server for each message
            # to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.
            # https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub
            | "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
            | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
            | "Write to GCS" >> ParDo(WriteToGCS(output_path))
        )

if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_topic",
        help="The Cloud Pub/Sub topic to read from."
        '"projects/<PROJECT_ID>/topics/<TOPIC_ID>".',
    )
    parser.add_argument(
        "--window_size",
        type=float,
        default=1.0,
        help="Output file's window size in minutes.",
    )
    parser.add_argument(
        "--output_path",
        help="Path of the output GCS file including the prefix.",
    )
    parser.add_argument(
        "--num_shards",
        type=int,
        default=5,
        help="Number of shards to use when writing windowed elements to GCS.",
    )
    known_args, pipeline_args = parser.parse_known_args()

    run(
        known_args.input_topic,
        known_args.output_path,
        known_args.window_size,
        known_args.num_shards,
        pipeline_args,
    )

Démarrer le pipeline

Pour démarrer le pipeline, exécutez la commande suivante:

Java

mvn compile exec:java \
  -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \
  -Dexec.cleanupDaemonThreads=false \
  -Dexec.args=" \
    --project=$PROJECT_ID \
    --region=$REGION \
    --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \
    --output=gs://$BUCKET_NAME/samples/output \
    --runner=DataflowRunner \
    --windowSize=2"

Python

python PubSubToGCS.py \
  --project=$PROJECT_ID \
  --region=$REGION \
  --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \
  --output_path=gs://$BUCKET_NAME/samples/output \
  --runner=DataflowRunner \
  --window_size=2 \
  --num_shards=2 \
  --temp_location=gs://$BUCKET_NAME/temp

La commande précédente s'exécute localement et lance une tâche Dataflow qui s'exécute dans le cloud. Lorsque la commande renvoie JOB_MESSAGE_DETAILED: Workers have started successfully, quittez le programme local à l'aide de Ctrl+C.

Observer la progression de la tâche et du pipeline

Vous pouvez observer la progression de la tâche dans la console Dataflow.

Accéder à la console Dataflow

Observer la progression de la tâche

Ouvrez la vue "Détails de la tâche" pour afficher les éléments suivants :

  • Structure de la tâche
  • Journaux de la tâche
  • Métriques de l'étape

Observer la progression de la tâche

Vous devrez peut-être patienter quelques minutes avant que les fichiers de sortie ne s'affichent dans Cloud Storage.

Observer la progression de la tâche

Vous pouvez également utiliser la ligne de commande ci-dessous pour afficher les fichiers qui ont été écrits.

gsutil ls gs://${BUCKET_NAME}/samples/

Le résultat doit se présenter sous la forme suivante :

Java

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1

Python

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1

Nettoyage

  1. Supprimez la tâche Cloud Scheduler.

    gcloud scheduler jobs delete publisher-job
    
  2. Dans la console Dataflow, arrêtez la tâche. Annulez le pipeline sans le drainer.

  3. Supprimez le sujet.

    gcloud pubsub topics delete $TOPIC_ID
    
  4. Supprimez les fichiers créés par le pipeline.

    gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*"
    gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
    
  5. Supprimez le bucket Cloud Storage.

    gsutil rb gs://${BUCKET_NAME}
    

Étape suivante