Uso de plantillas Flex

En este instructivo, se muestra cómo crear y ejecutar un trabajo de plantilla flexible de Dataflow con una imagen personalizada de Docker mediante la herramienta de línea de comandos de gcloud. En este instructivo, se explica un ejemplo de canalización de transmisión que lee mensajes codificados en JSON desde Pub/Sub, transforma datos de mensajes con Beam SQL y escribe los resultados en una tabla de BigQuery.

Para obtener más información sobre las plantillas flexibles, consulta Plantillas de Dataflow.

Objetivos

  • Compilar una imagen de contenedor de Docker
  • Crear y ejecutar una plantilla flexible de Dataflow

Costos

En este instructivo, se usan los siguientes componentes facturables de Google Cloud:

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • Cloud Scheduler
  • App Engine
  • Container Registry
  • Cloud Build
  • BigQuery

Usa la calculadora de precios para generar una estimación de los costos según el uso previsto.

Antes de comenzar

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyectos

  3. Asegúrate de que la facturación esté habilitada para tu proyecto de Cloud. Descubre cómo confirmar que tienes habilitada la facturación en un proyecto.

  4. Habilita las API de Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Cloud Scheduler, and Cloud Build.

    Habilita las API

  5. Crea una cuenta de servicio:

    1. En Cloud Console, ve a la página Crear cuenta de servicio.

      Ir a Crear cuenta de servicio
    2. Selecciona un proyecto
    3. Ingresa un nombre en el campo Nombre de cuenta de servicio. Cloud Console completa el campo ID de cuenta de servicio según este nombre.

      Opcional: en el campo Descripción de la cuenta de servicio, ingresa una descripción. Por ejemplo, Service account for quickstart.

    4. Haz clic en Crear y continuar.
    5. Haz clic en el campo Seleccionar una función.

      En Acceso rápido, haz clic en Básico y, luego, en Propietario.

    6. Haga clic en Continuar.
    7. Haz clic en Listo para terminar de crear la cuenta de servicio.

      No cierres la ventana del navegador. La usarás en la próxima tarea.

  6. Para crear una clave de cuenta de servicio, haz lo siguiente:

    1. En Cloud Console, haz clic en la dirección de correo electrónico de la cuenta de servicio que creaste.
    2. Haga clic en Claves.
    3. Haz clic en Agregar clave, luego haz clic en Crear clave nueva.
    4. Haga clic en Crear. Se descargará un archivo de claves JSON en tu computadora.
    5. Haga clic en Cerrar.
  7. Configura la variable de entorno GOOGLE_APPLICATION_CREDENTIALS en la ruta del archivo JSON que contiene la clave de tu cuenta de servicio. Esta variable solo se aplica a la sesión actual de shell. Por lo tanto, si abres una sesión nueva, deberás volver a configurar la variable.

  8. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyectos

  9. Asegúrate de que la facturación esté habilitada para tu proyecto de Cloud. Descubre cómo confirmar que tienes habilitada la facturación en un proyecto.

  10. Habilita las API de Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Cloud Scheduler, and Cloud Build.

    Habilita las API

  11. Crea una cuenta de servicio:

    1. En Cloud Console, ve a la página Crear cuenta de servicio.

      Ir a Crear cuenta de servicio
    2. Selecciona un proyecto
    3. Ingresa un nombre en el campo Nombre de cuenta de servicio. Cloud Console completa el campo ID de cuenta de servicio según este nombre.

      Opcional: en el campo Descripción de la cuenta de servicio, ingresa una descripción. Por ejemplo, Service account for quickstart.

    4. Haz clic en Crear y continuar.
    5. Haz clic en el campo Seleccionar una función.

      En Acceso rápido, haz clic en Básico y, luego, en Propietario.

    6. Haga clic en Continuar.
    7. Haz clic en Listo para terminar de crear la cuenta de servicio.

      No cierres la ventana del navegador. La usarás en la próxima tarea.

  12. Para crear una clave de cuenta de servicio, haz lo siguiente:

    1. En Cloud Console, haz clic en la dirección de correo electrónico de la cuenta de servicio que creaste.
    2. Haga clic en Claves.
    3. Haz clic en Agregar clave, luego haz clic en Crear clave nueva.
    4. Haga clic en Crear. Se descargará un archivo de claves JSON en tu computadora.
    5. Haga clic en Cerrar.
  13. Configura la variable de entorno GOOGLE_APPLICATION_CREDENTIALS en la ruta del archivo JSON que contiene la clave de tu cuenta de servicio. Esta variable solo se aplica a la sesión actual de shell. Por lo tanto, si abres una sesión nueva, deberás volver a configurar la variable.

Cuando finalices este instructivo, puedes borrar los recursos creados para evitar que se te siga facturando. Para obtener más detalles, consulta cómo realizar una limpieza.

Crea la fuente y el receptor de ejemplo

En esta sección, se explica cómo crear lo siguiente:

  • Una fuente de transmisión de datos mediante Pub/Sub
  • Un conjunto de datos para cargar los datos en BigQuery

Cree un bucket de Cloud Storage

Usa el comando gsutil mb:

export BUCKET="my-storage-bucket"
gsutil mb gs://$BUCKET

Crea un tema de Pub/Sub y una suscripción a ese tema

Usa la herramienta de línea de comandos gcloud:

export TOPIC="messages"
export SUBSCRIPTION="ratings"

gcloud pubsub topics create $TOPIC
gcloud pubsub subscriptions create --topic $TOPIC $SUBSCRIPTION

Crea un trabajo de Cloud Scheduler

En este paso, usamos la herramienta de línea de comandos gcloud para crear y ejecutar un trabajo de Cloud Scheduler que publica “calificaciones positivas” y “calificaciones negativas”.

  1. Crea un trabajo de Cloud Scheduler para este proyecto de Google Cloud.
    gcloud scheduler jobs create pubsub positive-ratings-publisher \
      --schedule="* * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "positive"}'
    
  2. Esto crea y ejecuta un publicador de “calificaciones positivas” que publica 1 mensaje por minuto.
  3. Inicia el trabajo de Cloud Scheduler.
    gcloud scheduler jobs run positive-ratings-publisher
    
  4. Crea y ejecuta otro publicador similar para “calificaciones negativas” que publique 1 mensaje cada 2 minutos.
    gcloud scheduler jobs create pubsub negative-ratings-publisher \
      --schedule="*/2 * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "negative"}'
    
    gcloud scheduler jobs run negative-ratings-publisher
    

Cree un conjunto de datos de BigQuery

Usa el comando bq mk:

export PROJECT="$(gcloud config get-value project)"
export DATASET="beam_samples"
export TABLE="streaming_beam_sql"

bq mk --dataset "$PROJECT:$DATASET"

Descarga el código de muestra

  1. Descarga la muestra de código.

    Java

    Clona el repositorio java-docs-samples y navega a la muestra de código para este instructivo.

      git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
      cd java-docs-samples/dataflow/flex-templates/streaming_beam_sql

    Python

    Clona el repositorio python-docs-samples y navega hasta la muestra de código para este instructivo.

      git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
      cd python-docs-samples/dataflow/flex-templates/streaming_beam
  2. Exporta el TEMPLATE_IMAGE para este instructivo.
    export TEMPLATE_IMAGE="gcr.io/$PROJECT/samples/dataflow/streaming-beam-sql:latest"
    

Configura tu entorno de desarrollo

Java

  1. Descarga y, luego, instala la versión 11 del Java Development Kit (JDK). Verifica que la variable de entorno JAVA_HOME esté configurada y orientada a la instalación del JDK.
  2. Descarga y, luego, instala Apache Maven mediante los pasos de la guía de instalación de Maven para tu sistema operativo específico.
  3. Ejecuta la canalización de Apache Beam de forma local para el desarrollo. (Opcional)
      mvn compile exec:java \
        -Dexec.mainClass=org.apache.beam.samples.StreamingBeamSql \
        -Dexec.args="\
          --project=$PROJECT \
          --inputSubscription=$SUBSCRIPTION \
          --outputTable=$PROJECT:$DATASET.$TABLE \
          --tempLocation=gs://$BUCKET/samples/dataflow/temp"
  4. Compila el proyecto de Java en un archivo Uber JAR.
      mvn clean package
  5. Ten en cuenta el tamaño del archivo Uber JAR en comparación con el archivo original. (Opcional)
      ls -lh target/*.jar
    Este archivo Uber JAR tiene todas las dependencias incorporadas. Puedes ejecutar este archivo como una aplicación independiente sin dependencias externas en otras bibliotecas.

Python

Usa el SDK de Apache Beam para Python.

Solo para Python: Crea y compila una imagen de contenedor

En esta sección, se incluyen los pasos para usuarios de Python. Si usas Java, omite los siguientes pasos.

Si tu trabajo no se ejecuta y aparece el mensaje de error A Timeout in polling error message, consulta los pasos para solucionar problemas.

  1. Habilita el uso de caché de Kaniko de forma predeterminada. (Opcional)
    gcloud config set builds/use_kaniko True
    
    Kaniko almacena en caché los artefactos de compilación de contenedores, por lo que usar esta opción acelera las compilaciones posteriores.
  2. Crea el Dockerfile. (Opcional) Usa este instructivo para personalizar el Dockerfile. El archivo inicial se ve de la siguiente manera:

    Python

      FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
    
      ARG WORKDIR=/dataflow/template
      RUN mkdir -p ${WORKDIR}
      WORKDIR ${WORKDIR}
    
      COPY requirements.txt .
      COPY streaming_beam.py .
    
      # Do not include `apache-beam` in requirements.txt
      ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
      ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py"
    
      # Install apache-beam and other dependencies to launch the pipeline
      RUN pip install apache-beam[gcp]
      RUN pip install -U -r ./requirements.txt

    Este Dockerfile contiene los comandos FROM, ENV y COPY, que puedes leer en la referencia de Dockerfile.

    Las imágenes que comienzan con gcr.io/PROJECT/ se guardan en Container Registry de tu proyecto, donde otros productos de Google Cloud pueden acceder a ellas.
  3. Compila la imagen de Docker mediante Dockerfile con Cloud Build.
    gcloud builds submit --tag $TEMPLATE_IMAGE .
    

Metadatos

Puedes ampliar tu plantilla con metadatos adicionales para que los parámetros personalizados se validen cuando se ejecute la plantilla. Si deseas crear metadatos para tu plantilla, sigue estos pasos:

  1. Crea un archivo metadata.json con los parámetros de Parámetros de metadatos.

    Para ver un ejemplo, consulta Archivo de metadatos de ejemplo.

  2. Almacena el archivo de metadatos en Cloud Storage en la misma carpeta que la plantilla.

Parámetros de metadatos

Clave de parámetro Obligatorio Descripción del valor
name El nombre de tu plantilla
description No Un breve párrafo de texto que describe la plantilla
parameters No Un array de parámetros adicionales que usa la plantilla De forma predeterminada, se usa un array vacío.
name El nombre del parámetro que se usa en tu plantilla.
label Una string legible que se usa en Cloud Console para etiquetar el parámetro.
helpText Un breve párrafo de texto que describe el parámetro.
isOptional No false si el parámetro es obligatorio y true si el parámetro es opcional A menos que se configure con un valor, isOptional se establece de forma predeterminada en false. Si no incluyes esta clave de parámetro en tus metadatos, estos se convierten en un parámetro obligatorio.
regexes No Un array de expresiones regulares POSIX-egrep en formato de string que se usará para validar el valor del parámetro. Por ejemplo, ["^[a-zA-Z][a-zA-Z0-9]+"] es una expresión regular individual que valida que el valor comienza con una letra y, luego, tiene uno o más caracteres. De forma predeterminada, se usa un array vacío.

Ejemplo de archivo de metadatos

Java

{
  "name": "Streaming Beam SQL",
  "description": "An Apache Beam streaming pipeline that reads JSON encoded messages from Pub/Sub, uses Beam SQL to transform the message data, and writes the results to a BigQuery",
  "parameters": [
    {
      "name": "inputSubscription",
      "label": "Pub/Sub input subscription.",
      "helpText": "Pub/Sub subscription to read from.",
      "regexes": [
        "[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}"
      ]
    },
    {
      "name": "outputTable",
      "label": "BigQuery output table",
      "helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.",
      "isOptional": true,
      "regexes": [
        "[^:]+:[^.]+[.].+"
      ]
    }
  ]
}

Python

{
  "name": "Streaming beam Python flex template",
  "description": "Streaming beam example for python flex template.",
  "parameters": [
    {
      "name": "input_subscription",
      "label": "Input PubSub subscription.",
      "helpText": "Name of the input PubSub subscription to consume from.",
      "regexes": [
        "projects/[^/]+/subscriptions/[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}"
      ]
    },
    {
      "name": "output_table",
      "label": "BigQuery output table name.",
      "helpText": "Name of the BigQuery output table name.",
      "isOptional": true,
      "regexes": [
        "[^:]+:[^.]+[.].+"
      ]
    }
  ]
}

Puedes descargar archivos de metadatos para las plantillas proporcionadas por Google desde el directorio de plantillas de Dataflow.

Crea una plantilla flexible

Para ejecutar una plantilla, debes crear un archivo de especificaciones de la plantilla en Cloud Storage que contenga toda la información necesaria para ejecutar el trabajo, como la información y los metadatos del SDK.

En este instructivo, se usa el archivo de metadatos de ejemplo, que contiene información adicional para la plantilla, como los campos name, description y parameters de entrada.

  1. Crea un archivo de especificaciones de plantilla que contenga toda la información necesaria para ejecutar el trabajo, como la información y los metadatos del SDK.
    export TEMPLATE_PATH="gs://$BUCKET/samples/dataflow/templates/streaming-beam-sql.json"
    
  2. Compila la plantilla flexible.

    Java

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image-gcr-path "$TEMPLATE_IMAGE" \
          --sdk-language "JAVA" \
          --flex-template-base-image JAVA11 \
          --metadata-file "metadata.json" \
          --jar "target/streaming-beam-sql-1.0.jar" \
          --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.apache.beam.samples.StreamingBeamSql"

    Python

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image "$TEMPLATE_IMAGE" \
          --sdk-language "PYTHON" \
          --metadata-file "metadata.json"

Ahora, la plantilla está disponible a través del archivo de plantilla en la ubicación de Cloud Storage que especificaste.

Ejecuta una canalización de plantilla flexible

Ahora puedes ejecutar la canalización de Apache Beam en Dataflow mediante una consulta al archivo de plantilla y pasar los parámetros de plantilla que requiere la canalización.

  1. En tu shell o terminal, ejecuta la plantilla:

    Java

    export REGION="us-central1"
    
    gcloud dataflow flex-template run "streaming-beam-sql-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters inputSubscription="$SUBSCRIPTION" \
        --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"

    Python

    export REGION="us-central1"
    
    gcloud dataflow flex-template run "streaming-beam-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters input_subscription="projects/$PROJECT/subscriptions/$SUBSCRIPTION" \
        --parameters output_table="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"
    Como alternativa, puedes ejecutar la plantilla con una solicitud a la API de REST:

    Java

    export REGION="us-central1"
    
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/$PROJECT/locations/$REGION/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "inputSubscription": "'$SUBSCRIPTION'",
            "outputTable": "'$PROJECT:$DATASET.$TABLE'"
          },
          "containerSpecGcsPath": "'$TEMPLATE_PATH'"
        }
      }'

    Python

    export REGION="us-central1"
    
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/$PROJECT/locations/$REGION/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "input_subscription": "projects/'$PROJECT'/subscriptions/'$SUBSCRIPTION'",
            "output_table": "'$PROJECT:$DATASET.$TABLE'"
          },
          "containerSpecGcsPath": "'$TEMPLATE_PATH'"
        }
      }'
  2. Después de ejecutar el comando para ejecutar la plantilla flexible, Dataflow muestra un ID de tarea con el estado En cola. Es posible que el estado demore varios minutos en Ejecutarse y que pueda acceder al grafo del trabajo.
  3. Ejecuta la siguiente consulta para verificar los resultados en BigQuery:
    bq query --use_legacy_sql=false 'SELECT * FROM `'"$PROJECT.$DATASET.$TABLE"'`'
    
    Mientras esta canalización se está ejecutando, puedes ver filas nuevas agregadas a la tabla de BigQuery cada minuto.

Limpieza

Una vez completado el instructivo, puedes limpiar los recursos creados en Google Cloud para que no se te facture por ellos en el futuro. En las siguientes secciones, se describe cómo borrarlos o desactivarlos.

Limpia los recursos de la plantilla flexible

  1. Detiene la canalización de Dataflow.
    gcloud dataflow jobs list \
      --filter 'NAME=streaming-beam-sql AND STATE=Running' \
      --format 'value(JOB_ID)' \
      --region "$REGION" \
      | xargs gcloud dataflow jobs cancel --region "$REGION"
    
  2. Borra el archivo de especificaciones de la plantilla de Cloud Storage.
    gsutil rm $TEMPLATE_PATH
    
  3. Borra la imagen del contenedor de la plantilla flexible de Container Registry.
    gcloud container images delete $TEMPLATE_IMAGE --force-delete-tags
    

Borra los recursos del proyecto de Google Cloud

  1. Borra los trabajos de Cloud Scheduler.
    gcloud scheduler jobs delete negative-ratings-publisher
    gcloud scheduler jobs delete positive-ratings-publisher
    
  2. Borra la suscripción y el tema de Pub/Sub.
    gcloud pubsub subscriptions delete $SUBSCRIPTION
    gcloud pubsub topics delete $TOPIC
    
  3. Borra la tabla de BigQuery.
    bq rm -f -t $PROJECT:$DATASET.$TABLE
    
  4. Borra el conjunto de datos de BigQuery, ya que esto no genera cargos.

    El siguiente comando también borra todas las tablas del conjunto de datos. No se podrán recuperar las tablas y los datos.

    bq rm -r -f -d $PROJECT:$DATASET
    
  5. Borra el bucket de Cloud Storage, ya que esto no genera cargos.

    El siguiente comando también borra todos los objetos del bucket. Estos objetos no se podrán recuperar.

    gsutil rm -r gs://$BUCKET
    

Limitaciones

Se aplican las siguientes limitaciones a los trabajos de plantillas flexibles:

  • Debes usar una imagen base que proporciona Google para empaquetar tus contenedores con Docker. Para obtener una lista de imágenes aplicables, consulta Imágenes base de la plantilla flexible.
  • A fin de que la canalización se inicie, el programa que construye la canalización debe salir después de que se llame a run.
  • No se admiten waitUntilFinish (Java) ni wait_until_finish (Python).

¿Qué sigue?