Usa Pub/Sub con Dataflow

Cloud Dataflow es un servicio completamente administrado para transformar y enriquecer datos en modos de transmisión (en tiempo real) y por lotes con la misma confiabilidad y expresividad. Proporciona un entorno de desarrollo de canalización simplificado con el SDK de Apache Beam, que tiene un conjunto amplio de primitivas de análisis de sesiones y sistemas de ventanas, además de un ecosistema de conectores fuente y receptores. En esta guía de inicio rápido, se muestra cómo usar Dataflow para realizar las siguientes acciones:

  • Leer mensajes publicados en un tema de Pub/Sub
  • Mostrar mensajes en ventanas, o almacenarlos en búfer, en lotes
  • Escribir mensajes a Cloud Storage

En esta guía de inicio rápido, se explica el uso de Dataflow en Java y Python. SQL también es compatible.

Si tu intención no es realizar un procesamiento de datos personalizado, puedes comenzar a usar las plantillas de Dataflow basadas en IU.

Antes de comenzar

Configura tu entorno de Cloud Storage

  1. Sigue las instrucciones para instalar e inicializar el SDK de Cloud.
  2. Habilita la facturación en tu proyecto.
  3. Para completar esta guía de inicio rápido, debes habilitar las siguientes API: Compute Engine, el paquete de operaciones de Google Cloud, Cloud Storage, Cloud Storage JSON, Pub/Sub, Cloud Scheduler, Resource Manager y App Engine.

    Habilitar las API

    Pueden pasar unos minutos hasta que las API aparezcan en Console.

  4. Para crear una clave de cuenta de servicio, haz lo siguiente:

    Crear una clave de cuenta de servicio

    1. En la lista Cuenta de servicio, selecciona Cuenta de servicio nueva.
    2. Ingresa un nombre en el campo Nombre de cuenta de servicio.
    3. En la lista Función, selecciona Proyecto > Propietario.
    4. Haz clic en Crear.

    La clave se envía a la carpeta de descargas predeterminada del navegador.

  5. Establece la variable de entorno GOOGLE_APPLICATION_CREDENTIALS para que apunte a la clave de la cuenta de servicio.

        export GOOGLE_APPLICATION_CREDENTIALS=path/to/my/credentials.json
        
  6. Crea variables para tu depósito y tu proyecto. Los nombres de depósitos de Cloud Storage deben ser únicos a nivel global.

        BUCKET_NAME=bucket-name
        PROJECT_NAME=$(gcloud config get-value project)
        
  7. Crea un depósito de Cloud Storage que sea propiedad de este proyecto:

        gsutil mb gs://$BUCKET_NAME
        
  8. Crea un tema de Pub/Sub en este proyecto:

        gcloud pubsub topics create cron-topic
        
  9. Crea un trabajo de Cloud Scheduler en este proyecto. El trabajo publica un mensaje en un tema de Cloud Pub/Sub con intervalos de un minuto.

    Si una app de App Engine no existe en el proyecto, crea una con el siguiente comando.

        gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
         --topic=cron-topic --message-body="Hello!"
        

    Inicia el trabajo.

        gcloud scheduler jobs run publisher-job
        
  10. Usa el siguiente comando para clonar el repositorio de la guía de inicio rápido y navega al directorio de código de muestra:

    java

        git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
        cd java-docs-samples/pubsub/streaming-analytics
        

    python

        git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
        cd python-docs-samples/pubsub/streaming-analytics
        pip install -r requirements.txt  # Install Apache Beam dependencies
        

Transmite mensajes desde Pub/Sub a Cloud Storage

Muestra de código

En este código de muestra, se usa Dataflow para realizar las siguientes acciones:

  • Leer mensajes de Pub/Sub
  • Mostrar mensajes en ventanas, o agruparlos, por marca de tiempo con intervalos de 2 minutos
  • Guardar los mensajes en cada ventana como archivos en Cloud Storage

java

import org.apache.beam.examples.common.WriteOneFilePerWindow;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
    import org.apache.beam.sdk.options.Default;
    import org.apache.beam.sdk.options.Description;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.options.StreamingOptions;
    import org.apache.beam.sdk.options.Validation.Required;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.transforms.windowing.FixedWindows;
    import org.apache.beam.sdk.transforms.windowing.Window;
    import org.joda.time.Duration;

    import java.io.IOException;

    public class PubSubToGCS {
      /*
      * Define your own configuration options. Add your own arguments to be processed
      * by the command-line parser, and specify default values for them.
      */
      public interface PubSubToGCSOptions extends PipelineOptions, StreamingOptions {
        @Description("The Cloud Pub/Sub topic to read from.")
        @Required
        String getInputTopic();
        void setInputTopic(String value);

        @Description("Output file's window size in number of minutes.")
        @Default.Integer(1)
        Integer getWindowSize();
        void setWindowSize(Integer value);

        @Description("Path of the output file including its filename prefix.")
        @Required
        String getOutput();
        void setOutput(String value);
      }

      public static void main(String[] args) throws IOException {
        // The maximum number of shards when writing output.
        int numShards = 1;

        PubSubToGCSOptions options = PipelineOptionsFactory
          .fromArgs(args)
          .withValidation()
          .as(PubSubToGCSOptions.class);

        options.setStreaming(true);

        Pipeline pipeline = Pipeline.create(options);

        pipeline
          // 1) Read string messages from a Pub/Sub topic.
          .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
          // 2) Group the messages into fixed-sized minute intervals.
          .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
          // 3) Write one file to GCS for every window of messages.
          .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

        // Execute the pipeline and wait until it finishes running.
        pipeline.run().waitUntilFinish();
      }
    }

Python

import argparse
    import datetime
    import json
    import logging

    import apache_beam as beam
    import apache_beam.transforms.window as window
    from apache_beam.options.pipeline_options import PipelineOptions

    class GroupWindowsIntoBatches(beam.PTransform):
        """A composite transform that groups Pub/Sub messages based on publish
        time and outputs a list of dictionaries, where each contains one message
        and its publish timestamp.
        """

        def __init__(self, window_size):
            # Convert minutes into seconds.
            self.window_size = int(window_size * 60)

        def expand(self, pcoll):
            return (
                pcoll
                # Assigns window info to each Pub/Sub message based on its
                # publish timestamp.
                | "Window into Fixed Intervals"
                >> beam.WindowInto(window.FixedWindows(self.window_size))
                | "Add timestamps to messages" >> beam.ParDo(AddTimestamps())
                # Use a dummy key to group the elements in the same window.
                # Note that all the elements in one window must fit into memory
                # for this. If the windowed elements do not fit into memory,
                # please consider using `beam.util.BatchElements`.
                # https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
                | "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
                | "Groupby" >> beam.GroupByKey()
                | "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
            )

    class AddTimestamps(beam.DoFn):
        def process(self, element, publish_time=beam.DoFn.TimestampParam):
            """Processes each incoming windowed element by extracting the Pub/Sub
            message and its publish timestamp into a dictionary. `publish_time`
            defaults to the publish timestamp returned by the Pub/Sub server. It
            is bound to each element by Beam at runtime.
            """

            yield {
                "message_body": element.decode("utf-8"),
                "publish_time": datetime.datetime.utcfromtimestamp(
                    float(publish_time)
                ).strftime("%Y-%m-%d %H:%M:%S.%f"),
            }

    class WriteBatchesToGCS(beam.DoFn):
        def __init__(self, output_path):
            self.output_path = output_path

        def process(self, batch, window=beam.DoFn.WindowParam):
            """Write one batch per file to a Google Cloud Storage bucket. """

            ts_format = "%H:%M"
            window_start = window.start.to_utc_datetime().strftime(ts_format)
            window_end = window.end.to_utc_datetime().strftime(ts_format)
            filename = "-".join([self.output_path, window_start, window_end])

            with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode="w") as f:
                for element in batch:
                    f.write("{}\n".format(json.dumps(element)).encode("utf-8"))

    def run(input_topic, output_path, window_size=1.0, pipeline_args=None):
        # `save_main_session` is set to true because some DoFn's rely on
        # globally imported modules.
        pipeline_options = PipelineOptions(
            pipeline_args, streaming=True, save_main_session=True
        )

        with beam.Pipeline(options=pipeline_options) as pipeline:
            (
                pipeline
                | "Read PubSub Messages"
                >> beam.io.ReadFromPubSub(topic=input_topic)
                | "Window into" >> GroupWindowsIntoBatches(window_size)
                | "Write to GCS" >> beam.ParDo(WriteBatchesToGCS(output_path))
            )

    if __name__ == "__main__":  # noqa
        logging.getLogger().setLevel(logging.INFO)

        parser = argparse.ArgumentParser()
        parser.add_argument(
            "--input_topic",
            help="The Cloud Pub/Sub topic to read from.\n"
            '"projects/<PROJECT_NAME>/topics/<TOPIC_NAME>".',
        )
        parser.add_argument(
            "--window_size",
            type=float,
            default=1.0,
            help="Output file's window size in number of minutes.",
        )
        parser.add_argument(
            "--output_path",
            help="GCS Path of the output file including filename prefix.",
        )
        known_args, pipeline_args = parser.parse_known_args()

        run(
            known_args.input_topic,
            known_args.output_path,
            known_args.window_size,
            pipeline_args,
        )

Comienza la canalización

Para iniciar la canalización, usa el siguiente comando:

java

    mvn compile exec:java \
      -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGCS \
      -Dexec.cleanupDaemonThreads=false \
      -Dexec.args=" \
        --project=$PROJECT_NAME \
        --inputTopic=projects/$PROJECT_NAME/topics/cron-topic \
        --output=gs://$BUCKET_NAME/samples/output \
        --runner=DataflowRunner \
        --windowSize=2"
    

python

    python PubSubToGCS.py \
      --project=$PROJECT_NAME \
      --input_topic=projects/$PROJECT_NAME/topics/cron-topic \
      --output_path=gs://$BUCKET_NAME/samples/output \
      --runner=DataflowRunner \
      --window_size=2 \
      --temp_location=gs://$BUCKET_NAME/temp
    

Observa el progreso del trabajo y la canalización

Puedes observar el progreso del trabajo en la consola de Dataflow.

Ir a la consola de Dataflow

Observar el progreso del trabajo

Abre la vista de detalles de trabajos para ver lo siguiente:

  • Estructura del trabajo
  • Registros del trabajo
  • Métricas de etapas

Observar el progreso del trabajo

Puede que debas esperar unos minutos para ver los archivos de salida en Cloud Storage.

Observar el progreso del trabajo

También puedes usar la línea de comandos que se muestra a continuación para verificar qué archivos se escribieron.

    gsutil ls gs://${BUCKET_NAME}/samples/
    

El resultado debe tener el siguiente aspecto:

java

    gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
    gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
    gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
    gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
    

python

    gs://{$BUCKET_NAME}/samples/output-22:30-22:32
    gs://{$BUCKET_NAME}/samples/output-22:32-22:34
    gs://{$BUCKET_NAME}/samples/output-22:34-22:36
    gs://{$BUCKET_NAME}/samples/output-22:36-22:38
    

Limpieza

  1. Borra el trabajo de Cloud Scheduler.

        gcloud scheduler jobs delete publisher-job
        
  2. Usa Ctrl+C para detener el programa en tu terminal.

  3. En la consola de Dataflow, detén el trabajo. Cancela la canalización sin desviarla.

  4. Borra el tema.

        gcloud pubsub topics delete cron-topic
        
  5. Borra los archivos que se crearon con la canalización.

        gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*"
        
  6. Quita el depósito de Cloud Storage.

        gsutil rb gs://${BUCKET_NAME}
        

Qué sigue