Transmite mensajes de Pub/Sub Lite con Dataflow

Como alternativa a escribir y ejecutar tus propios programas de procesamiento de datos, puedes usar Dataflow con el conector de E/S de Pub/Sub Lite para Apache Beam. Dataflow es un servicio completamente administrado para transformar y enriquecer datos en modos de transmisión (en tiempo real) y por lotes con la misma confiabilidad y expresividad. Ejecuta de manera confiable programas desarrollados con el SDK de Apache Beam, que tiene un conjunto extensible de abstracciones de procesamiento con estado potentes y conectores de E/S con otros sistemas de transmisión y por lotes.

En esta guía de inicio rápido, se muestra cómo escribir una canalización de Apache Beam que realizará las siguientes acciones:

  • Lee mensajes de Pub/Sub Lite
  • Mostrar mensajes en ventanas, o agruparlos, por marca de tiempo de publicación
  • Escribir mensajes a Cloud Storage

Además, te muestra cómo hacer lo siguiente:

  • Enviar tu canalización para que se ejecute en Dataflow
  • Crear una plantilla flexible de Dataflow desde tu canalización

Para este instructivo, se necesita Maven, pero también es posible convertir el proyecto de ejemplo de Maven a Gradle. Para obtener más información, consulta Opcional: Convierte de Maven a Gradle.

Antes de comenzar

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. Instala Google Cloud CLI.
  3. Para inicializar la CLI de gcloud, ejecuta el siguiente comando:

    gcloud init
  4. Crea o selecciona un proyecto de Google Cloud.

    • Crea un proyecto de Google Cloud:

      gcloud projects create PROJECT_ID

      Reemplaza PROJECT_ID por un nombre para el proyecto de Google Cloud que estás creando.

    • Selecciona el proyecto de Google Cloud que creaste:

      gcloud config set project PROJECT_ID

      Reemplaza PROJECT_ID por el nombre del proyecto de Google Cloud.

  5. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  6. Habilita las APIs de Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging:

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  7. Configura la autenticación:

    1. Crea la cuenta de servicio:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Reemplaza SERVICE_ACCOUNT_NAME por un nombre para la cuenta de servicio.

    2. Otorga roles a la cuenta de servicio. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Reemplaza lo siguiente:

      • SERVICE_ACCOUNT_NAME: el nombre de la cuenta de servicio
      • PROJECT_ID: el ID del proyecto en el que creaste la cuenta de servicio
      • ROLE: el rol a otorgar
    3. Otorga a tu Cuenta de Google un rol que te permita usar los roles de la cuenta de servicio y conectar la cuenta de servicio a otros recursos:

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Reemplaza lo siguiente:

      • SERVICE_ACCOUNT_NAME: el nombre de la cuenta de servicio
      • PROJECT_ID: el ID del proyecto en el que creaste la cuenta de servicio
      • USER_EMAIL: La dirección de correo electrónico de tu Cuenta de Google
  8. Instala Google Cloud CLI.
  9. Para inicializar la CLI de gcloud, ejecuta el siguiente comando:

    gcloud init
  10. Crea o selecciona un proyecto de Google Cloud.

    • Crea un proyecto de Google Cloud:

      gcloud projects create PROJECT_ID

      Reemplaza PROJECT_ID por un nombre para el proyecto de Google Cloud que estás creando.

    • Selecciona el proyecto de Google Cloud que creaste:

      gcloud config set project PROJECT_ID

      Reemplaza PROJECT_ID por el nombre del proyecto de Google Cloud.

  11. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  12. Habilita las APIs de Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging:

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  13. Configura la autenticación:

    1. Crea la cuenta de servicio:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Reemplaza SERVICE_ACCOUNT_NAME por un nombre para la cuenta de servicio.

    2. Otorga roles a la cuenta de servicio. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Reemplaza lo siguiente:

      • SERVICE_ACCOUNT_NAME: el nombre de la cuenta de servicio
      • PROJECT_ID: el ID del proyecto en el que creaste la cuenta de servicio
      • ROLE: el rol a otorgar
    3. Otorga a tu Cuenta de Google un rol que te permita usar los roles de la cuenta de servicio y conectar la cuenta de servicio a otros recursos:

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Reemplaza lo siguiente:

      • SERVICE_ACCOUNT_NAME: el nombre de la cuenta de servicio
      • PROJECT_ID: el ID del proyecto en el que creaste la cuenta de servicio
      • USER_EMAIL: La dirección de correo electrónico de tu Cuenta de Google
  14. Crea credenciales de autenticación locales para tu Cuenta de Google:

    gcloud auth application-default login

Cómo configurar tu proyecto de Pub/Sub Lite

  1. Crea variables para tu bucket de Cloud Storage, proyecto y región de Dataflow. Los nombres de buckets de Cloud Storage deben ser únicos a nivel global. La región de Dataflow debe ser una región válida donde puedas ejecutar tu trabajo. Para obtener más información sobre las regiones y ubicaciones, consulta Ubicaciones de Dataflow.

    export PROJECT_ID=$(gcloud config get-value project)
    export SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
    export BUCKET=BUCKET_NAME
    export DATAFLOW_REGION=DATAFLOW_REGION
    
  2. Crea un bucket de Cloud Storage que sea propiedad de este proyecto:

       gsutil mb gs://$BUCKET
    

Crea una suscripción y un tema zonal de Pub/Sub Lite

Crear un tema zonal de Pub/Sub Lite y una suscripción a Lite

Para la ubicación de Lite, elige una ubicación de Pub/Sub Lite compatible. También debes especificar una zona para la región. Por ejemplo, us-central1-a

export TOPIC=LITE_TOPIC_ID
export SUBSCRIPTION=LITE_SUBSCRIPTION_ID
export LITE_LOCATION=LITE_LOCATION
gcloud pubsub lite-topics create $TOPIC \
      --location=$LITE_LOCATION \
      --partitions=1 \
      --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
      --location=$LITE_LOCATION \
      --topic=$TOPIC \
      --starting-offset=beginning

Transmite mensajes a Dataflow

Descarga el código de muestra de la guía de inicio rápido

Clona el repositorio de la guía de inicio rápido y navega al directorio del código de muestra.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics

Código de muestra

En este código de muestra, se usa Dataflow para realizar las siguientes acciones:

  • Leer mensajes de una suscripción a Pub/Sub Lite como una fuente no delimitada.
  • Agrupa los mensajes según sus marcas de tiempo de publicación mediante períodos de tiempo fijo y el activador predeterminado.
  • Escribir los mensajes agrupados en archivos en Cloud Storage.

Java

Antes de ejecutar esta muestra, sigue las instrucciones de configuración de Java en las bibliotecas cliente de Pub/Sub Lite.


import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubsubliteToGcs {
  /*
   * Define your own configuration options. Add your arguments to be processed
   * by the command-line parser.
   */
  public interface PubsubliteToGcsOptions extends StreamingOptions {
    @Description("Your Pub/Sub Lite subscription.")
    @Required
    String getSubscription();

    void setSubscription(String value);

    @Description("Window size of output files in minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Filename prefix of output files.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  private static final Logger LOG = LoggerFactory.getLogger(PubsubliteToGcs.class);

  public static void main(String[] args) throws InterruptedException {
    // The maximum number of shards when writing output files.
    int numShards = 1;

    PubsubliteToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubsubliteToGcsOptions.class);

    options.setStreaming(true);

    SubscriberOptions subscriberOptions =
        SubscriberOptions.newBuilder()
            .setSubscriptionPath(SubscriptionPath.parse(options.getSubscription()))
            .build();

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("Read From Pub/Sub Lite", PubsubLiteIO.read(subscriberOptions))
        .apply(
            "Convert messages",
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (SequencedMessage sequencedMessage) -> {
                      String data = sequencedMessage.getMessage().getData().toStringUtf8();
                      LOG.info("Received: " + data);
                      long publishTime = sequencedMessage.getPublishTime().getSeconds();
                      return data + "\t" + publishTime;
                    }))
        .apply(
            "Apply windowing function",
            Window
                // Group the elements using fixed-sized time intervals based on the element
                // timestamp (using the default event time trigger). The element timestamp
                // is the publish timestamp associated with a message.
                //
                // NOTE: If data is not being continuously ingested, such as with a batch or
                // intermittent publisher, the final window will never close as the watermark
                // will not advance. If this is a possibility with your pipeline, you should
                // add an additional processing time trigger to force window closure after
                // enough time has passed. See
                // https://beam.apache.org/documentation/programming-guide/#triggers
                // for more information.
                .<String>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        .apply("Write elements to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline. You may add `.waitUntilFinish()` to observe logs in your console, but
    // `waitUntilFinish()` will not work in Dataflow Flex Templates.
    pipeline.run();
  }
}

Inicia la canalización de Dataflow

Para iniciar la canalización en Dataflow, ejecuta el siguiente comando:

mvn compile exec:java \
    -Dexec.mainClass=examples.PubsubliteToGcs \
    -Dexec.args=" \
        --subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
        --output=gs://$BUCKET/samples/output \
        --windowSize=1 \
        --project=$PROJECT_ID \
        --region=$DATAFLOW_REGION \
        --tempLocation=gs://$BUCKET/temp \
        --runner=DataflowRunner \
        --serviceAccount=$SERVICE_ACCOUNT"

El comando anterior inicia un trabajo de Dataflow. Sigue el vínculo que se muestra en el resultado de la consola para acceder al trabajo en la consola de supervisión de Dataflow.

Observa el progreso del trabajo

Observa el progreso del trabajo en la consola de Dataflow.

Ir a la consola de Dataflow

Abre la vista de detalles de trabajos para ver lo siguiente:

  • Grafo del trabajo
  • Detalles de la ejecución
  • Métricas del trabajo

Publica algunos mensajes en tu tema de Lite.

gcloud pubsub lite-topics publish $TOPIC \
    --location=$LITE_LOCATION \
    --message="Hello World!"

Es posible que debas esperar unos minutos para ver los mensajes en tus registros de trabajador.

Usa el siguiente comando para verificar qué archivos se escribieron en Cloud Storage.

gsutil ls "gs://$BUCKET/samples/"

El resultado debe tener el siguiente aspecto:

 gs://$BUCKET/samples/output-19:41-19:42-0-of-1
 gs://$BUCKET/samples/output-19:47-19:48-0-of-1
 gs://$BUCKET/samples/output-19:48-19:49-0-of-1

Usa el siguiente comando para ver el contenido de un archivo:

gsutil cat "gs://$BUCKET/samples/your-filename"

Opcional: Crea una plantilla de Dataflow

De manera opcional, puedes crear una plantilla flexible de Dataflow basada en la canalización. Las plantillas de Dataflow te permiten ejecutar trabajos con diferentes parámetros de entrada desde la consola de Google Cloud o la línea de comandos sin la necesidad de configurar un entorno de desarrollo de Java completo.

  1. Crea un JAR grande que incluya todas las dependencias de tu canalización. Deberías ver target/pubsublite-streaming-bundled-1.0.jar después de que se ejecute el comando.

    mvn clean package -DskipTests=true
    
  2. Proporciona nombres y ubicaciones para tu archivo de plantilla y tu imagen de contenedor de plantilla.

    export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
    export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
    
  3. Crea una plantilla flexible personalizada. Se proporcionó un archivo metadata.json obligatorio, que contiene las especificaciones necesarias para ejecutar el trabajo con el ejemplo.

    gcloud dataflow flex-template build $TEMPLATE_PATH \
        --image-gcr-path $TEMPLATE_IMAGE \
        --sdk-language "JAVA" \
        --flex-template-base-image "JAVA11" \
        --metadata-file "metadata.json" \
        --jar "target/pubsublite-streaming-bundled-1.0.jar" \
        --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
    
  4. Ejecuta un trabajo con la plantilla flexible personalizada.

Console

  1. Crea un trabajo a partir de una plantilla.

  2. Ingresa un nombre de trabajo.

  3. Ingresa tu región de Dataflow.

  4. Elige tu Plantilla personalizada.

  5. Ingresa la ruta de acceso de la plantilla.

  6. Ingresa los parámetros obligatorios.

  7. Haga clic en Ejecutar trabajo.

gcloud

gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
     --template-file-gcs-location $TEMPLATE_PATH \
     --parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
     --parameters output="gs://$BUCKET/samples/template-output" \
     --parameters windowSize=1 \
     --region $DATAFLOW_REGION \
     --serviceAccount=$SERVICE_ACCOUNT

Limpia

Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos que se usaron en esta página, borra el proyecto de Cloud que tiene los recursos.

  1. En la consola de Dataflow, detén el trabajo. Cancela la canalización en lugar de desviarla.

  2. Borra el tema y la suscripción.

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  3. Borra los archivos que se crearon con la canalización.

    gsutil -m rm -rf "gs://$BUCKET/samples/*"
    gsutil -m rm -rf "gs://$BUCKET/temp/*"
    
  4. Borra la imagen de la plantilla y el archivo de la plantilla, si existen.

    gcloud container images delete $TEMPLATE_IMAGE
    gsutil rm $TEMPLATE_PATH
    
  5. Quita el bucket de Cloud Storage.

    gsutil rb gs://$BUCKET
    

  6. Borra la cuenta de servicio:
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  7. Opcional: Revoca las credenciales de autenticación que creaste y borra el archivo local de credenciales.

    gcloud auth application-default revoke
  8. Opcional: Revoca credenciales desde gcloud CLI.

    gcloud auth revoke

¿Qué sigue?