Transmite mensajes desde Pub/Sub con Dataflow

Dataflow es un servicio completamente administrado para transformar y enriquecer datos en modos de transmisión (tiempo real) y por lotes con la misma confiabilidad y expresividad. Proporciona un entorno de desarrollo de canalización simplificado con el SDK de Apache Beam, que tiene un conjunto amplio de primitivas de análisis de sesiones y sistemas de ventanas, además de un ecosistema de conectores fuente y receptores. En esta guía de inicio rápido, se muestra cómo usar Dataflow para realizar las siguientes acciones:

  • Leer mensajes publicados en un tema de Pub/Sub
  • Mostrar mensajes en ventanas, o agruparlos, por marca de tiempo
  • Escribir mensajes a Cloud Storage

En esta guía de inicio rápido, se explica el uso de Dataflow en Java y Python. SQL también es compatible. Esta guía de inicio rápido también se ofrece como un instructivo de Google Cloud Skills Boost, que proporciona credenciales temporales para ayudarte a comenzar.

Si tu intención no es realizar un procesamiento de datos personalizado, puedes comenzar a usar las plantillas de Dataflow basadas en IU.

Antes de comenzar

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. Instala Google Cloud CLI.
  3. Para inicializar la CLI de gcloud, ejecuta el siguiente comando:

    gcloud init
  4. Crea o selecciona un proyecto de Google Cloud.

    • Crea un proyecto de Google Cloud:

      gcloud projects create PROJECT_ID

      Reemplaza PROJECT_ID por un nombre para el proyecto de Google Cloud que estás creando.

    • Selecciona el proyecto de Google Cloud que creaste:

      gcloud config set project PROJECT_ID

      Reemplaza PROJECT_ID por el nombre del proyecto de Google Cloud.

  5. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  6. Habilita las APIs de Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler:

    gcloud services enable dataflow.googleapis.com  compute.googleapis.com  logging.googleapis.com  storage-component.googleapis.com  storage-api.googleapis.com  pubsub.googleapis.com  cloudresourcemanager.googleapis.com  cloudscheduler.googleapis.com
  7. Configura la autenticación:

    1. Crea la cuenta de servicio:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Reemplaza SERVICE_ACCOUNT_NAME por un nombre para la cuenta de servicio.

    2. Otorga roles a la cuenta de servicio. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Reemplaza lo siguiente:

      • SERVICE_ACCOUNT_NAME: el nombre de la cuenta de servicio
      • PROJECT_ID: el ID del proyecto en el que creaste la cuenta de servicio
      • ROLE: el rol a otorgar
    3. Otorga a tu Cuenta de Google un rol que te permita usar los roles de la cuenta de servicio y conectar la cuenta de servicio a otros recursos:

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Reemplaza lo siguiente:

      • SERVICE_ACCOUNT_NAME: el nombre de la cuenta de servicio
      • PROJECT_ID: el ID del proyecto en el que creaste la cuenta de servicio
      • USER_EMAIL: La dirección de correo electrónico de tu Cuenta de Google
  8. Instala Google Cloud CLI.
  9. Para inicializar la CLI de gcloud, ejecuta el siguiente comando:

    gcloud init
  10. Crea o selecciona un proyecto de Google Cloud.

    • Crea un proyecto de Google Cloud:

      gcloud projects create PROJECT_ID

      Reemplaza PROJECT_ID por un nombre para el proyecto de Google Cloud que estás creando.

    • Selecciona el proyecto de Google Cloud que creaste:

      gcloud config set project PROJECT_ID

      Reemplaza PROJECT_ID por el nombre del proyecto de Google Cloud.

  11. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  12. Habilita las APIs de Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler:

    gcloud services enable dataflow.googleapis.com  compute.googleapis.com  logging.googleapis.com  storage-component.googleapis.com  storage-api.googleapis.com  pubsub.googleapis.com  cloudresourcemanager.googleapis.com  cloudscheduler.googleapis.com
  13. Configura la autenticación:

    1. Crea la cuenta de servicio:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Reemplaza SERVICE_ACCOUNT_NAME por un nombre para la cuenta de servicio.

    2. Otorga roles a la cuenta de servicio. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Reemplaza lo siguiente:

      • SERVICE_ACCOUNT_NAME: el nombre de la cuenta de servicio
      • PROJECT_ID: el ID del proyecto en el que creaste la cuenta de servicio
      • ROLE: el rol a otorgar
    3. Otorga a tu Cuenta de Google un rol que te permita usar los roles de la cuenta de servicio y conectar la cuenta de servicio a otros recursos:

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Reemplaza lo siguiente:

      • SERVICE_ACCOUNT_NAME: el nombre de la cuenta de servicio
      • PROJECT_ID: el ID del proyecto en el que creaste la cuenta de servicio
      • USER_EMAIL: La dirección de correo electrónico de tu Cuenta de Google
  14. Crea credenciales de autenticación locales para tu Cuenta de Google:

    gcloud auth application-default login

Configura tu proyecto de Pub/Sub

  1. Crea variables para tu bucket, tu proyecto y la región. Los nombres de depósitos de Cloud Storage deben ser únicos a nivel global. Selecciona una región de Dataflow cercana a donde ejecutas los comandos de esta guía de inicio rápido. El valor de la variable REGION debe ser un nombre de región válido. Para obtener más información sobre las regiones y ubicaciones, consulta Ubicaciones de Dataflow.

    BUCKET_NAME=BUCKET_NAME
    PROJECT_ID=$(gcloud config get-value project)
    TOPIC_ID=TOPIC_ID
    REGION=DATAFLOW_REGION
    SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
    
  2. Crea un bucket de Cloud Storage que sea propiedad de este proyecto:

    gsutil mb gs://$BUCKET_NAME
  3. Crea un tema de Pub/Sub en este proyecto:

    gcloud pubsub topics create $TOPIC_ID
  4. Crea un trabajo de Cloud Scheduler en este proyecto. El trabajo publica un mensaje en un tema de Pub/Sub con intervalos de un minuto.

    Si una app de App Engine no existe en el proyecto, crea una con el siguiente comando.

    gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
        --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION

    Inicia el trabajo.

    gcloud scheduler jobs run publisher-job --location=$REGION
  5. Usa los siguientes comandos para clonar el repositorio de la guía de inicio rápido y navega al directorio de código de muestra:

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
    cd java-docs-samples/pubsub/streaming-analytics

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsub/streaming-analytics
    pip install -r requirements.txt  # Install Apache Beam dependencies

Transmite mensajes desde Pub/Sub a Cloud Storage

Muestra de código

En este código de muestra, se usa Dataflow para realizar las siguientes acciones:

  • Leer mensajes de Pub/Sub
  • Mostrar mensajes en ventanas, o agruparlos, en intervalos de tamaño fijo con marcas de tiempo públicas.
  • Escribir los mensajes en cada ventana en archivos en Cloud Storage

Java


import java.io.IOException;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

public class PubSubToGcs {
  /*
   * Define your own configuration options. Add your own arguments to be processed
   * by the command-line parser, and specify default values for them.
   */
  public interface PubSubToGcsOptions extends StreamingOptions {
    @Description("The Cloud Pub/Sub topic to read from.")
    @Required
    String getInputTopic();

    void setInputTopic(String value);

    @Description("Output file's window size in number of minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Path of the output file including its filename prefix.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  public static void main(String[] args) throws IOException {
    // The maximum number of shards when writing output.
    int numShards = 1;

    PubSubToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);

    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        // 1) Read string messages from a Pub/Sub topic.
        .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
        // 2) Group the messages into fixed-sized minute intervals.
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        // 3) Write one file to GCS for every window of messages.
        .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline and wait until it finishes running.
    pipeline.run().waitUntilFinish();
  }
}

Python

import argparse
from datetime import datetime
import logging
import random

from apache_beam import (
    DoFn,
    GroupByKey,
    io,
    ParDo,
    Pipeline,
    PTransform,
    WindowInto,
    WithKeys,
)
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows

class GroupMessagesByFixedWindows(PTransform):
    """A composite transform that groups Pub/Sub messages based on publish time
    and outputs a list of tuples, each containing a message and its publish time.
    """

    def __init__(self, window_size, num_shards=5):
        # Set window size to 60 seconds.
        self.window_size = int(window_size * 60)
        self.num_shards = num_shards

    def expand(self, pcoll):
        return (
            pcoll
            # Bind window info to each element using element timestamp (or publish time).
            | "Window into fixed intervals"
            >> WindowInto(FixedWindows(self.window_size))
            | "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
            # Assign a random key to each windowed element based on the number of shards.
            | "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1))
            # Group windowed elements by key. All the elements in the same window must fit
            # memory for this. If not, you need to use `beam.util.BatchElements`.
            | "Group by key" >> GroupByKey()
        )

class AddTimestamp(DoFn):
    def process(self, element, publish_time=DoFn.TimestampParam):
        """Processes each windowed element by extracting the message body and its
        publish time into a tuple.
        """
        yield (
            element.decode("utf-8"),
            datetime.utcfromtimestamp(float(publish_time)).strftime(
                "%Y-%m-%d %H:%M:%S.%f"
            ),
        )

class WriteToGCS(DoFn):
    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, key_value, window=DoFn.WindowParam):
        """Write messages in a batch to Google Cloud Storage."""

        ts_format = "%H:%M"
        window_start = window.start.to_utc_datetime().strftime(ts_format)
        window_end = window.end.to_utc_datetime().strftime(ts_format)
        shard_id, batch = key_value
        filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])

        with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
            for message_body, publish_time in batch:
                f.write(f"{message_body},{publish_time}\n".encode())

def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
    # Set `save_main_session` to True so DoFns can access globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )

    with Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam
            # binds the publish time returned by the Pub/Sub server for each message
            # to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.
            # https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub
            | "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
            | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
            | "Write to GCS" >> ParDo(WriteToGCS(output_path))
        )

if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_topic",
        help="The Cloud Pub/Sub topic to read from."
        '"projects/<PROJECT_ID>/topics/<TOPIC_ID>".',
    )
    parser.add_argument(
        "--window_size",
        type=float,
        default=1.0,
        help="Output file's window size in minutes.",
    )
    parser.add_argument(
        "--output_path",
        help="Path of the output GCS file including the prefix.",
    )
    parser.add_argument(
        "--num_shards",
        type=int,
        default=5,
        help="Number of shards to use when writing windowed elements to GCS.",
    )
    known_args, pipeline_args = parser.parse_known_args()

    run(
        known_args.input_topic,
        known_args.output_path,
        known_args.window_size,
        known_args.num_shards,
        pipeline_args,
    )

Comienza la canalización

Para iniciar la canalización, ejecuta el siguiente comando:

Java

mvn compile exec:java \
  -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \
  -Dexec.cleanupDaemonThreads=false \
  -Dexec.args=" \
    --project=$PROJECT_ID \
    --region=$REGION \
    --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \
    --output=gs://$BUCKET_NAME/samples/output \
    --gcpTempLocation=gs://$BUCKET_NAME/temp \
    --runner=DataflowRunner \
    --windowSize=2 \
    --serviceAccount=$SERVICE_ACCOUNT"

Python

python PubSubToGCS.py \
  --project=$PROJECT_ID \
  --region=$REGION \
  --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \
  --output_path=gs://$BUCKET_NAME/samples/output \
  --runner=DataflowRunner \
  --window_size=2 \
  --num_shards=2 \
  --temp_location=gs://$BUCKET_NAME/temp \
  --service_account_email=$SERVICE_ACCOUNT

El comando anterior se ejecuta de manera local y, luego, inicia un trabajo de Dataflow que se ejecuta en la nube. Cuando el comando muestre JOB_MESSAGE_DETAILED: Workers have started successfully, sal del programa local con Ctrl+C.

Observa el progreso del trabajo y la canalización

Puedes observar el progreso del trabajo en la consola de Dataflow.

Ir a la consola de Dataflow

Observar el progreso del trabajo

Abre la vista de detalles de trabajos para ver lo siguiente:

  • Estructura del trabajo
  • Registros del trabajo
  • Métricas de etapas

Observar el progreso del trabajo

Puede que debas esperar unos minutos para ver los archivos de salida en Cloud Storage.

Observar el progreso del trabajo

También puedes usar la línea de comandos que se muestra a continuación para verificar qué archivos se escribieron.

gsutil ls gs://${BUCKET_NAME}/samples/

El resultado debe tener el siguiente aspecto:

Java

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1

Python

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1

Limpia

Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos que se usaron en esta página, borra el proyecto de Cloud que tiene los recursos.

  1. Borra el trabajo de Cloud Scheduler.

    gcloud scheduler jobs delete publisher-job --location=$REGION
    
  2. En la consola de Dataflow, detén el trabajo. Cancela la canalización sin desviarla.

  3. Borra el tema.

    gcloud pubsub topics delete $TOPIC_ID
    
  4. Borra los archivos que se crearon con la canalización.

    gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*"
    gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
    
  5. Quita el bucket de Cloud Storage.

    gsutil rb gs://${BUCKET_NAME}
    

  6. Borra la cuenta de servicio:
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  7. Opcional: Revoca las credenciales de autenticación que creaste y borra el archivo local de credenciales.

    gcloud auth application-default revoke
  8. Opcional: Revoca credenciales desde gcloud CLI.

    gcloud auth revoke

¿Qué sigue?