Uso de plantillas Flex

En este instructivo, se muestra cómo crear y ejecutar un trabajo de plantilla flexible de Dataflow con una imagen personalizada de Docker mediante la herramienta de línea de comandos de gcloud. En este instructivo, se explica un ejemplo de canalización de transmisión que lee mensajes codificados en JSON desde Pub/Sub, transforma datos de mensajes con Beam SQL y escribe los resultados en una tabla de BigQuery.

Para obtener más información sobre las plantillas flexibles, consulta Plantillas de Dataflow.

Objetivos

  • Compilar una imagen de contenedor de Docker
  • Crear y ejecutar una plantilla flexible de Dataflow

Costos

En este instructivo, se usan los siguientes componentes facturables de Google Cloud:

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • Cloud Scheduler
  • App Engine
  • Container Registry
  • Cloud Build
  • BigQuery

Usa la calculadora de precios para generar una estimación de los costos según el uso previsto.

Antes de comenzar

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyecto

  3. Asegúrate de que la facturación esté habilitada para tu proyecto de Cloud. Descubre cómo confirmar que tienes habilitada la facturación en un proyecto.

  4. Habilita las API de Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Cloud Scheduler, and Cloud Build.

    Habilita las API

  5. Crea una cuenta de servicio:

    1. En Cloud Console, ve a la página Crear cuenta de servicio.

      Ir a Crear cuenta de servicio
    2. Selecciona un proyecto
    3. Ingresa un nombre en el campo Nombre de cuenta de servicio. Cloud Console completa el campo ID de cuenta de servicio con este nombre.

      En el campo Descripción de la cuenta de servicio, ingresa una descripción. Por ejemplo, Service account for quickstart.

    4. Haga clic en Crear.
    5. Haz clic en el campo Seleccionar una función.

      En Acceso rápido, haz clic en Básica y, luego, en Propietario.

    6. Haga clic en Continuar.
    7. Haz clic en Listo para terminar de crear la cuenta de servicio.

      No cierres la ventana del navegador. La usarás en la próxima tarea.

  6. Para crear una clave de cuenta de servicio, haz lo siguiente:

    1. En Cloud Console, haz clic en la dirección de correo electrónico de la cuenta de servicio que creaste.
    2. Haz clic en Claves.
    3. Haz clic en Agregar clave y, luego, en Crear clave nueva.
    4. Haga clic en Crear. Se descargará un archivo de claves JSON a tu computadora.
    5. Haga clic en Cerrar.
  7. Configura la variable de entorno GOOGLE_APPLICATION_CREDENTIALS en la ruta del archivo JSON que contiene la clave de tu cuenta de servicio. Esta variable solo se aplica a la sesión actual de shell. Por lo tanto, si abres una sesión nueva, deberás volver a configurar la variable.

Cuando finalices este instructivo, puedes borrar los recursos creados para evitar que se te siga facturando. Para obtener más detalles, consulta cómo realizar una limpieza.

Crea la fuente y el receptor de ejemplo

En esta sección, se explica cómo crear lo siguiente:

  • Una fuente de transmisión de datos mediante Pub/Sub
  • Un conjunto de datos para cargar los datos en BigQuery

Cree un bucket de Cloud Storage

Usa el comando gsutil mb:

export BUCKET="my-storage-bucket"
gsutil mb gs://$BUCKET

Crea un tema de Pub/Sub y una suscripción a ese tema

Usa la herramienta de línea de comandos gcloud:

export TOPIC="messages"
export SUBSCRIPTION="ratings"

gcloud pubsub topics create $TOPIC
gcloud pubsub subscriptions create --topic $TOPIC $SUBSCRIPTION

Crea un trabajo de Cloud Scheduler

En este paso, usamos la herramienta de línea de comandos gcloud para crear y ejecutar un trabajo de Cloud Scheduler que publica “calificaciones positivas” y “calificaciones negativas”.

  1. Crea un trabajo de Cloud Scheduler para este proyecto de Google Cloud.
    gcloud scheduler jobs create pubsub positive-ratings-publisher \
      --schedule="* * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "positive"}'
    
  2. Esto crea y ejecuta un publicador de “calificaciones positivas” que publica 1 mensaje por minuto.
  3. Inicia el trabajo de Cloud Scheduler.
    gcloud scheduler jobs run positive-ratings-publisher
    
  4. Crea y ejecuta otro publicador similar para “calificaciones negativas” que publique 1 mensaje cada 2 minutos.
    gcloud scheduler jobs create pubsub negative-ratings-publisher \
      --schedule="*/2 * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "negative"}'
    
    gcloud scheduler jobs run negative-ratings-publisher
    

Cree un conjunto de datos de BigQuery

Usa el comando bq mk:

export PROJECT="$(gcloud config get-value project)"
export DATASET="beam_samples"
export TABLE="streaming_beam_sql"

bq mk --dataset "$PROJECT:$DATASET"

Descarga el código de muestra

  1. Descarga la muestra de código.

    Java

    Clona el repositorio java-docs-samples y navega a la muestra de código para este instructivo.

      git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
      cd java-docs-samples/dataflow/flex-templates/streaming_beam_sql

    Python

    Clona el repositorio python-docs-samples y navega hasta la muestra de código para este instructivo.

      git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
      cd python-docs-samples/dataflow/flex-templates/streaming_beam
  2. Exporta el TEMPLATE_IMAGE para este instructivo.
    export TEMPLATE_IMAGE="gcr.io/$PROJECT/samples/dataflow/streaming-beam-sql:latest"
    

Configura tu entorno de desarrollo

Java

  1. Descarga y, luego, instala la versión 11 del Java Development Kit (JDK). Verifica que la variable de entorno JAVA_HOME esté configurada y orientada a la instalación del JDK.
  2. Descarga y, luego, instala Apache Maven mediante los pasos de la guía de instalación de Maven para tu sistema operativo específico.
  3. Ejecuta la canalización de Apache Beam de forma local para el desarrollo. (Opcional)
      mvn compile exec:java \
        -Dexec.mainClass=org.apache.beam.samples.StreamingBeamSql \
        -Dexec.args="\
          --project=$PROJECT \
          --inputSubscription=$SUBSCRIPTION \
          --outputTable=$PROJECT:$DATASET.$TABLE \
          --tempLocation=gs://$BUCKET/samples/dataflow/temp"
  4. Compila el proyecto de Java en un archivo Uber JAR.
      mvn clean package
  5. Ten en cuenta el tamaño del archivo Uber JAR en comparación con el archivo original. (Opcional)
      ls -lh target/*.jar
    Este archivo Uber JAR tiene todas las dependencias incorporadas. Puedes ejecutar este archivo como una aplicación independiente sin dependencias externas en otras bibliotecas.

Python

Usa el SDK de Apache Beam para Python con pip y la versión 2.7, 3.5, 3.6 o 3.7 de Python. Ejecuta este comando para comprobar que tengas Python y una instalación pip en funcionamiento:

    python --version
    python -m pip --version

Si no tienes Python, busca los pasos de instalación para tu sistema operativo en la página Instalación de Python.

Solo para Python: Crea y compila una imagen de contenedor

En esta sección, se incluyen los pasos para usuarios de Python. Si usas Java, omite los siguientes pasos.

  1. Habilita el uso de caché de Kaniko de forma predeterminada. (Opcional)
    gcloud config set builds/use_kaniko True
    
    Kaniko almacena en caché los artefactos de compilación de contenedores, por lo que usar esta opción acelera las compilaciones posteriores.
  2. Crea el Dockerfile. (Opcional) Usa este instructivo para personalizar el Dockerfile. El archivo inicial se ve de la siguiente manera:

    Python

      FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
    
      ARG WORKDIR=/dataflow/template
      RUN mkdir -p ${WORKDIR}
      WORKDIR ${WORKDIR}
    
      # Due to a change in the Apache Beam base image in version 2.24, you must to install
      # libffi-dev manually as a dependency. For more information:
      #   https://github.com/GoogleCloudPlatform/python-docs-samples/issues/4891
      RUN apt-get update && apt-get install -y libffi-dev && rm -rf /var/lib/apt/lists/*
    
      COPY requirements.txt .
      COPY streaming_beam.py .
    
      ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
      ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py"
    
      RUN pip install -U -r ./requirements.txt

    Este Dockerfile contiene los comandos FROM, ENV y COPY, que puedes leer en la referencia de Dockerfile.

    Las imágenes que comienzan con gcr.io/PROJECT/ se guardan en Container Registry de tu proyecto, donde otros productos de Google Cloud pueden acceder a ellas.
  3. Compila la imagen de Docker mediante Dockerfile con Cloud Build.
    gcloud builds submit --tag $TEMPLATE_IMAGE .
    

Crea una plantilla flexible

Para ejecutar una plantilla, debes crear un archivo de especificaciones de la plantilla en Cloud Storage que contenga toda la información necesaria para ejecutar el trabajo, como la información y los metadatos del SDK.

El archivo metadata.json en este ejemplo contiene información adicional para la plantilla, como los campos name, description y parameters de entrada.

  1. Crea un archivo de especificaciones de plantilla que contenga toda la información necesaria para ejecutar el trabajo, como la información y los metadatos del SDK.
    export TEMPLATE_PATH="gs://$BUCKET/samples/dataflow/templates/streaming-beam-sql.json"
    
  2. Compila la plantilla flexible.

    Java

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image-gcr-path "$TEMPLATE_IMAGE" \
          --sdk-language "JAVA" \
          --flex-template-base-image JAVA11 \
          --metadata-file "metadata.json" \
          --jar "target/streaming-beam-sql-1.0.jar" \
          --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.apache.beam.samples.StreamingBeamSql"

    Python

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image "$TEMPLATE_IMAGE" \
          --sdk-language "PYTHON" \
          --metadata-file "metadata.json"

Ahora, la plantilla está disponible a través del archivo de plantilla en la ubicación de Cloud Storage que especificaste.

Ejecuta una canalización de plantilla flexible

Ahora puedes ejecutar la canalización de Apache Beam en Dataflow mediante una consulta al archivo de plantilla y pasar los parámetros de plantilla que requiere la canalización.

  1. Ejecuta la plantilla.

    Java

      export REGION="us-central1"
    
      gcloud dataflow flex-template run "streaming-beam-sql-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters inputSubscription="$SUBSCRIPTION" \
        --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"

    Python

      export REGION="us-central1"
    
      gcloud dataflow flex-template run "streaming-beam-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters input_subscription="$SUBSCRIPTION" \
        --parameters output_table="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"
    Como alternativa, ejecuta la plantilla con una solicitud a la API de REST.
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/$PROJECT/locations/us-central1/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "inputSubscription": "'$SUBSCRIPTION'",
            "outputTable": "'$PROJECT:$DATASET.$TABLE'"
          },
          "containerSpecGcsPath": "'$TEMPLATE_PATH'"
        }
      }'
    
  2. Después de ejecutar el comando para ejecutar la plantilla flexible, Dataflow muestra un ID de tarea con el estado En cola. Es posible que el estado demore varios minutos en Ejecutarse y que pueda acceder al grafo del trabajo.
  3. Ejecuta la siguiente consulta para verificar los resultados en BigQuery:
    bq query --use_legacy_sql=false 'SELECT * FROM `'"$PROJECT.$DATASET.$TABLE"'`'
    
    Mientras esta canalización se está ejecutando, puedes ver filas nuevas agregadas a la tabla de BigQuery cada minuto.

Limpieza

Una vez completado el instructivo, puedes limpiar los recursos creados en Google Cloud para que no se te facture por ellos en el futuro. En las siguientes secciones, se describe cómo borrarlos o desactivarlos.

Limpia los recursos de la plantilla flexible

  1. Detiene la canalización de Dataflow.
    gcloud dataflow jobs list \
      --filter 'NAME:streaming-beam-sql AND STATE=Running' \
      --format 'value(JOB_ID)' \
      --region "$REGION" \
      | xargs gcloud dataflow jobs cancel --region "$REGION"
    
  2. Borra el archivo de especificaciones de la plantilla de Cloud Storage.
    gsutil rm $TEMPLATE_PATH
    
  3. Borra la imagen del contenedor de la plantilla flexible de Container Registry.
    gcloud container images delete $TEMPLATE_IMAGE --force-delete-tags
    

Borra los recursos del proyecto de Google Cloud

  1. Borra los trabajos de Cloud Scheduler.
    gcloud scheduler jobs delete negative-ratings-publisher
    gcloud scheduler jobs delete positive-ratings-publisher
    
  2. Borra la suscripción y el tema de Pub/Sub.
    gcloud pubsub subscriptions delete $SUBSCRIPTION
    gcloud pubsub topics delete $TOPIC
    
  3. Borra la tabla de BigQuery.
    bq rm -f -t $PROJECT:$DATASET.$TABLE
    
  4. Borra el conjunto de datos de BigQuery, ya que esto no genera cargos.

    El siguiente comando también borra todas las tablas del conjunto de datos. No se podrán recuperar las tablas y los datos.

    bq rm -r -f -d $PROJECT:$DATASET
    
  5. Borra el bucket de Cloud Storage, ya que esto no genera cargos.

    El siguiente comando también borra todos los objetos del bucket. Estos objetos no se podrán recuperar.

    gsutil rm -r gs://$BUCKET
    

Limitaciones

Se aplican las siguientes limitaciones a los trabajos de plantillas flexibles:

  • Debes usar una imagen base que proporciona Google para empaquetar tus contenedores con Docker.
  • No se admiten waitUntilFinish (Java) ni wait_until_finish (Python).

¿Qué sigue?