Démarrage rapide : traitement par flux avec Dataflow

Dataflow est un service entièrement géré permettant de transformer et d'enrichir les données par flux (en temps réel) et par lots avec un niveau identique de fiabilité et d'expressivité. Il fournit un environnement de développement de pipeline simplifié à l'aide du SDK Apache Beam, qui offre de nombreuses primitives de fenêtrage et d'analyse de sessions, ainsi qu'un écosystème de connecteurs de sources et de récepteurs. Ce guide de démarrage rapide vous explique comment effectuer les opérations suivantes à l'aide de Dataflow :

  • Lire les messages publiés dans un sujet Pub/Sub
  • Effectuer le fenêtrage (ou le regroupement) de messages par horodatage
  • Écrire les messages dans Cloud Storage

Ce guide de démarrage rapide vous explique comment utiliser Dataflow en Java et Python. SQL est également compatible.

Vous pouvez également commencer par utiliser des modèles Dataflow basés sur l'interface utilisateur si vous n'avez pas l'intention d'effectuer un traitement de données personnalisé.

Avant de commencer

  1. Suivez les instructions pour installer et initialiser le SDK Cloud.
  2. Activez la facturation pour votre projet.
  3. Pour terminer ce démarrage rapide, vous devez activer les API suivantes : Compute Engine, la suite des opérations de Google Cloud, Cloud Storage, Cloud Storage JSON, Pub/Sub, Cloud Scheduler, Resource Manager et App Engine.

    Activer les API

    L'affichage des API dans la console peut prendre quelques instants.

  4. Créez une clé de compte de service :

    Créer une clé de compte de service

    1. Dans la liste Compte de service, sélectionnez Nouveau compte de service.
    2. Saisissez un nom dans le champ Nom du compte de service.
    3. Dans la liste Rôle, sélectionnez Projet > Propriétaire.
    4. Cliquez sur Créer.

    La clé est envoyée dans le dossier "Téléchargements" par défaut de votre navigateur.

  5. Définissez la variable d'environnement GOOGLE_APPLICATION_CREDENTIALS pour qu'elle pointe vers la clé du compte de service.

    export GOOGLE_APPLICATION_CREDENTIALS=path/to/my/credentials.json
    
  6. Créez des variables pour votre bucket et votre projet. Les noms des buckets Cloud Storage doivent être uniques.

    BUCKET_NAME=bucket-name
    PROJECT_NAME=$(gcloud config get-value project)
    
  7. Créez un bucket Cloud Storage appartenant à ce projet :

    gsutil mb gs://$BUCKET_NAME
    
  8. Créez un sujet Pub/Sub dans ce projet :

    gcloud pubsub topics create cron-topic
    
  9. Créez une tâche Cloud Scheduler dans ce projet. La tâche publie un message sur un sujet Cloud Pub/Sub chaque minute.

    Si une application App Engine n'existe pas pour le projet, cette étape va en créer une.

    gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
     --topic=cron-topic --message-body="Hello!"
    

    Démarrez la tâche.

    gcloud scheduler jobs run publisher-job
    
  10. Utilisez la commande suivante pour cloner le dépôt du guide de démarrage rapide et accéder au répertoire de l'exemple de code :

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
    cd java-docs-samples/pubsub/streaming-analytics
    

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsub/streaming-analytics
    pip install -r requirements.txt  # Install Apache Beam dependencies
    

Diffuser des messages depuis Pub/Sub vers Cloud Storage

Exemple de code

Cet exemple de code utilise Dataflow pour effectuer les opérations suivantes :

  • Lire les messages Pub/Sub
  • Effectuer le fenêtrage (ou le regroupement) de messages en intervalles fixes par données d'horodatage
  • Écrire les messages dans chaque fenêtre dans des fichiers dans Cloud Storage

Java


import java.io.IOException;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

public class PubSubToGcs {
  /*
   * Define your own configuration options. Add your own arguments to be processed
   * by the command-line parser, and specify default values for them.
   */
  public interface PubSubToGcsOptions extends PipelineOptions, StreamingOptions {
    @Description("The Cloud Pub/Sub topic to read from.")
    @Required
    String getInputTopic();

    void setInputTopic(String value);

    @Description("Output file's window size in number of minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Path of the output file including its filename prefix.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  public static void main(String[] args) throws IOException {
    // The maximum number of shards when writing output.
    int numShards = 1;

    PubSubToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);

    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        // 1) Read string messages from a Pub/Sub topic.
        .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
        // 2) Group the messages into fixed-sized minute intervals.
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        // 3) Write one file to GCS for every window of messages.
        .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline and wait until it finishes running.
    pipeline.run().waitUntilFinish();
  }
}

Python

import argparse
import datetime
import json
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam.transforms.window as window

class GroupWindowsIntoBatches(beam.PTransform):
    """A composite transform that groups Pub/Sub messages based on publish
    time and outputs a list of dictionaries, where each contains one message
    and its publish timestamp.
    """

    def __init__(self, window_size):
        # Convert minutes into seconds.
        self.window_size = int(window_size * 60)

    def expand(self, pcoll):
        return (
            pcoll
            # Assigns window info to each Pub/Sub message based on its
            # publish timestamp.
            | "Window into Fixed Intervals"
            >> beam.WindowInto(window.FixedWindows(self.window_size))
            | "Add timestamps to messages" >> beam.ParDo(AddTimestamps())
            # Use a dummy key to group the elements in the same window.
            # Note that all the elements in one window must fit into memory
            # for this. If the windowed elements do not fit into memory,
            # please consider using `beam.util.BatchElements`.
            # https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
            | "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
            | "Groupby" >> beam.GroupByKey()
            | "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
        )

class AddTimestamps(beam.DoFn):
    def process(self, element, publish_time=beam.DoFn.TimestampParam):
        """Processes each incoming windowed element by extracting the Pub/Sub
        message and its publish timestamp into a dictionary. `publish_time`
        defaults to the publish timestamp returned by the Pub/Sub server. It
        is bound to each element by Beam at runtime.
        """

        yield {
            "message_body": element.decode("utf-8"),
            "publish_time": datetime.datetime.utcfromtimestamp(
                float(publish_time)
            ).strftime("%Y-%m-%d %H:%M:%S.%f"),
        }

class WriteBatchesToGCS(beam.DoFn):
    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, batch, window=beam.DoFn.WindowParam):
        """Write one batch per file to a Google Cloud Storage bucket. """

        ts_format = "%H:%M"
        window_start = window.start.to_utc_datetime().strftime(ts_format)
        window_end = window.end.to_utc_datetime().strftime(ts_format)
        filename = "-".join([self.output_path, window_start, window_end])

        with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode="w") as f:
            for element in batch:
                f.write("{}\n".format(json.dumps(element)).encode("utf-8"))

def run(input_topic, output_path, window_size=1.0, pipeline_args=None):
    # `save_main_session` is set to true because some DoFn's rely on
    # globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )

    with beam.Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            | "Read PubSub Messages"
            >> beam.io.ReadFromPubSub(topic=input_topic)
            | "Window into" >> GroupWindowsIntoBatches(window_size)
            | "Write to GCS" >> beam.ParDo(WriteBatchesToGCS(output_path))
        )

if __name__ == "__main__":  # noqa
    logging.getLogger().setLevel(logging.INFO)

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_topic",
        help="The Cloud Pub/Sub topic to read from.\n"
        '"projects/<PROJECT_NAME>/topics/<TOPIC_NAME>".',
    )
    parser.add_argument(
        "--window_size",
        type=float,
        default=1.0,
        help="Output file's window size in number of minutes.",
    )
    parser.add_argument(
        "--output_path",
        help="GCS Path of the output file including filename prefix.",
    )
    known_args, pipeline_args = parser.parse_known_args()

    run(
        known_args.input_topic,
        known_args.output_path,
        known_args.window_size,
        pipeline_args,
    )

Démarrer le pipeline

Pour démarrer le pipeline, exécutez la commande suivante :

Java

mvn compile exec:java \
  -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \
  -Dexec.cleanupDaemonThreads=false \
  -Dexec.args=" \
    --project=$PROJECT_NAME \
    --inputTopic=projects/$PROJECT_NAME/topics/cron-topic \
    --output=gs://$BUCKET_NAME/samples/output \
    --runner=DataflowRunner \
    --windowSize=2"

Python

python PubSubToGCS.py \
  --project=$PROJECT_NAME \
  --input_topic=projects/$PROJECT_NAME/topics/cron-topic \
  --output_path=gs://$BUCKET_NAME/samples/output \
  --runner=DataflowRunner \
  --window_size=2 \
  --temp_location=gs://$BUCKET_NAME/temp

Observer la progression de la tâche et du pipeline

Vous pouvez observer la progression de la tâche dans la console Dataflow.

Accéder à la console Dataflow

Observer la progression de la tâche

Ouvrez la vue "Détails de la tâche" pour afficher les éléments suivants :

  • Structure de la tâche
  • Journaux de la tâche
  • Métriques de l'étape

Observer la progression de la tâche

Vous devrez peut-être patienter quelques minutes avant que les fichiers de sortie ne s'affichent dans Cloud Storage.

Observer la progression de la tâche

Vous pouvez également utiliser la ligne de commande ci-dessous pour afficher les fichiers qui ont été écrits.

gsutil ls gs://${BUCKET_NAME}/samples/

Le résultat doit se présenter sous la forme suivante :

Java

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1

Python

gs://{$BUCKET_NAME}/samples/output-22:30-22:32
gs://{$BUCKET_NAME}/samples/output-22:32-22:34
gs://{$BUCKET_NAME}/samples/output-22:34-22:36
gs://{$BUCKET_NAME}/samples/output-22:36-22:38

Nettoyage

  1. Supprimez la tâche Cloud Scheduler.

    gcloud scheduler jobs delete publisher-job
    
  2. Utilisez Ctrl+C pour arrêter le programme dans votre terminal.

  3. Dans la console Dataflow, arrêtez la tâche. Annulez le pipeline sans le drainer.

  4. Supprimez le sujet.

    gcloud pubsub topics delete cron-topic
    
  5. Supprimez les fichiers créés par le pipeline.

    gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*"
    gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
    
  6. Supprimez le bucket Cloud Storage.

    gsutil rb gs://${BUCKET_NAME}
    

Étape suivante