Uso de plantillas Flex

En este instructivo, se muestra cómo crear y ejecutar un trabajo de plantilla flexible de Dataflow con una imagen personalizada de Docker mediante la herramienta de línea de comandos de gcloud. En este instructivo, se explica un ejemplo de canalización de transmisión que lee mensajes codificados en JSON desde Pub/Sub, transforma datos de mensajes con Beam SQL y escribe los resultados en una tabla de BigQuery.

Objetivos

  • Compilar una imagen de contenedor de Docker
  • Crear y ejecutar una plantilla flexible de Dataflow

Costos

En este instructivo, se usan los siguientes componentes facturables de Google Cloud:

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • Cloud Scheduler
  • App Engine
  • Container Registry
  • Cloud Build
  • BigQuery

Usa la calculadora de precios para generar una estimación de los costos según el uso previsto.

Antes de comenzar

  1. Accede a tu Cuenta de Google.

    Si todavía no tienes una cuenta, regístrate para obtener una nueva.

  2. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

    Ir a la página del selector de proyectos

  3. Comprueba que la facturación esté habilitada en tu proyecto.

    Descubre cómo puedes habilitar la facturación

  4. Habilita las API de Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Cloud Scheduler, and Cloud Build.

    Habilita las API

  5. Configura la autenticación
    1. En Cloud Console, ve a la página Crea una clave de cuenta de servicio.

      Ir a la página Crea una clave de la cuenta de servicio
    2. En la lista Cuenta de servicio, selecciona Cuenta de servicio nueva.
    3. Ingresa un nombre en el campo Nombre de cuenta de servicio.
    4. En la lista Función, selecciona Proyecto > Propietario.

    5. Haz clic en Crear. Se descargará un archivo JSON que contiene tus claves a tu computadora.
  6. Configura la variable de entorno GOOGLE_APPLICATION_CREDENTIALS en la ruta del archivo JSON que contiene la clave de tu cuenta de servicio. Esta variable solo se aplica a la sesión actual de shell. Por lo tanto, si abres una sesión nueva, deberás volver a configurar la variable.

Cuando finalices este instructivo, puedes borrar los recursos creados para evitar que se te siga facturando. Para obtener más detalles, consulta cómo realizar una limpieza.

Crea la fuente y el receptor de ejemplo

En esta sección, se explica cómo crear lo siguiente:

  • Una fuente de transmisión de datos mediante Pub/Sub
  • Un conjunto de datos para cargar los datos en BigQuery

Crea un depósito de Cloud Storage

Usa el comando gsutil mb:

export BUCKET="my-storage-bucket"
gsutil mb gs://$BUCKET

Crea un tema de Pub/Sub y una suscripción a ese tema

Usa la herramienta de línea de comandos gcloud:

export TOPIC="messages"
export SUBSCRIPTION="ratings"

gcloud pubsub topics create $TOPIC
gcloud pubsub subscriptions create --topic $TOPIC $SUBSCRIPTION

Crea un trabajo de Cloud Scheduler

En este paso, usamos la herramienta de línea de comandos gcloud para crear y ejecutar un trabajo de Cloud Scheduler que publica “calificaciones positivas” y “calificaciones negativas”.

  1. Crea un trabajo de Cloud Scheduler para este proyecto de Google Cloud.
    gcloud scheduler jobs create pubsub positive-ratings-publisher \
      --schedule="* * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "positive"}'
    
  2. Esto crea y ejecuta un publicador de “calificaciones positivas” que publica 1 mensaje por minuto.
  3. Inicia el trabajo de Cloud Scheduler.
    gcloud scheduler jobs run positive-ratings-publisher
    
  4. Crea y ejecuta otro publicador similar para “calificaciones negativas” que publique 1 mensaje cada 2 minutos.
    gcloud scheduler jobs create pubsub negative-ratings-publisher \
      --schedule="*/2 * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "negative"}'
    
    gcloud scheduler jobs run negative-ratings-publisher
    

Crea un conjunto de datos de BigQuery

Usa el comando bq mk:

export PROJECT="$(gcloud config get-value project)"
export DATASET="beam_samples"
export TABLE="streaming_beam_sql"

bq mk --dataset "$PROJECT:$DATASET"

Descarga el código de muestra

  1. Descarga la muestra de código.

    Java

    Clona el repositorio java-docs-samples y navega a la muestra de código para este instructivo.

      git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
      cd java-docs-samples/dataflow/flex-templates/streaming_beam_sql

    Python

    Clona el repositorio python-docs-samples y navega hasta la muestra de código para este instructivo.

      git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
      cd python-docs-samples/dataflow/flex-templates/streaming_beam
  2. Exporta el TEMPLATE_IMAGE para este instructivo.
    export TEMPLATE_IMAGE="gcr.io/$PROJECT/samples/dataflow/streaming-beam-sql:latest"
    

Configura tu entorno de desarrollo

Java

  1. Descarga y, luego, instala la versión 11 del Java Development Kit (JDK). Verifica que la variable de entorno JAVA_HOME esté configurada y orientada a la instalación del JDK.
  2. Descarga y, luego, instala Apache Maven mediante los pasos de la guía de instalación de Maven para tu sistema operativo específico.
  3. Ejecuta la canalización de Apache Beam de forma local para el desarrollo. (Opcional)
      mvn compile exec:java \
        -Dexec.mainClass=org.apache.beam.samples.StreamingBeamSQL \
        -Dexec.args="\
          --project=$PROJECT \
          --inputSubscription=$SUBSCRIPTION \
          --outputTable=$PROJECT:$DATASET.$TABLE \
          --tempLocation=gs://$BUCKET/samples/dataflow/temp"
  4. Compila el proyecto de Java en un archivo Uber JAR.
      mvn clean package
  5. Ten en cuenta el tamaño del archivo Uber JAR en comparación con el archivo original. (Opcional)
      ls -lh target/*.jar
    Este archivo Uber JAR tiene todas las dependencias incorporadas. Puedes ejecutar este archivo como una aplicación independiente sin dependencias externas en otras bibliotecas.

Python

Usa el SDK de Apache Beam para Python con pip y la versión 2.7, 3.5, 3.6 o 3.7 de Python. Ejecuta este comando para comprobar que tengas Python y una instalación pip en funcionamiento:

    python --version
    python -m pip --version

Si no tienes Python, busca los pasos de instalación para tu sistema operativo en la página Instalación de Python.

Solo para Python: Crea y compila una imagen de contenedor

En esta sección, se incluyen los pasos para usuarios de Python. Si usas Java, omite los siguientes pasos.

  1. Habilita el uso de caché de Kaniko de forma predeterminada. (Opcional)
    gcloud config set builds/use_kaniko True
    
    Kaniko almacena en caché los artefactos de compilación de contenedores, por lo que usar esta opción acelera las compilaciones posteriores.
  2. Crea el Dockerfile. (Opcional) Usa este instructivo para personalizar el Dockerfile. El archivo inicial se ve de la siguiente manera:

    Python

      FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
    
      ARG WORKDIR=/dataflow/template
      RUN mkdir -p ${WORKDIR}
      WORKDIR ${WORKDIR}
    
      COPY requirements.txt .
      COPY streaming_beam.py .
    
      ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
      ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py"
    
      RUN pip install -U -r ./requirements.txt

    Este Dockerfile contiene los comandos FROM, ENV y COPY, que puedes leer en la referencia de Dockerfile.

    Las imágenes que comienzan con gcr.io/PROJECT/ se guardan en Container Registry de tu proyecto, donde otros productos de Google Cloud pueden acceder a ellas.
  3. Compila la imagen de Docker mediante Dockerfile con Cloud Build.
    gcloud builds submit --tag $TEMPLATE_IMAGE .
    

Crea una plantilla flexible

Para ejecutar una plantilla, debes crear un archivo de especificaciones de la plantilla en Cloud Storage que contenga toda la información necesaria para ejecutar el trabajo, como la información y los metadatos del SDK.

El archivo metadata.json en este ejemplo contiene información adicional para la plantilla, como los campos name, description y parameters de entrada.

  1. Crea un archivo de especificaciones de plantilla que contenga toda la información necesaria para ejecutar el trabajo, como la información y los metadatos del SDK.
    export TEMPLATE_PATH="gs://$BUCKET/samples/dataflow/templates/streaming-beam-sql.json"
    
  2. Compila la plantilla flexible.

    Java

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image-gcr-path "$TEMPLATE_IMAGE" \
          --sdk-language "JAVA" \
          --flex-template-base-image JAVA11 \
          --metadata-file "metadata.json" \
          --jar "target/streaming-beam-sql-1.0.jar" \
          --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.apache.beam.samples.StreamingBeamSQL"

    Python

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image "$TEMPLATE_IMAGE" \
          --sdk-language "PYTHON" \
          --metadata-file "metadata.json"

Ahora, la plantilla está disponible a través del archivo de plantilla en la ubicación de Cloud Storage que especificaste.

Ejecuta una canalización de plantilla flexible

Ahora puedes ejecutar la canalización de Apache Beam en Dataflow mediante una consulta al archivo de plantilla y pasar los parámetros de plantilla que requiere la canalización.

  1. Ejecuta la plantilla.

    Java

      export REGION="us-central1"
    
      gcloud dataflow flex-template run "streaming-beam-sql-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters inputSubscription="$SUBSCRIPTION" \
        --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"

    Python

      export REGION="us-central1"
    
      gcloud dataflow flex-template run "streaming-beam-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters input_subscription="$SUBSCRIPTION" \
        --parameters output_table="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"
    Como alternativa, ejecuta la plantilla con una solicitud a la API de REST.
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/$PROJECT/locations/us-central1/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "inputSubscription": "'$SUBSCRIPTION'",
            "outputTable": "'$PROJECT:$DATASET.$TABLE'"
          },
          "containerSpecGcsPath": "'$TEMPLATE_PATH'"
        }
      }'
    
  2. Después de ejecutar el comando para ejecutar la plantilla flexible, Dataflow muestra un ID de tarea con el estado En cola. Es posible que el estado demore varios minutos en Ejecutarse y que pueda acceder al grafo del trabajo.
  3. Ejecuta la siguiente consulta para verificar los resultados en BigQuery:
    bq query --use_legacy_sql=false 'SELECT * FROM `'"$PROJECT.$DATASET.$TABLE"'`'
    
    Mientras esta canalización se está ejecutando, puedes ver filas nuevas agregadas a la tabla de BigQuery cada minuto.

Limpieza

Una vez completado el instructivo, puedes limpiar los recursos creados en Google Cloud para que no se te facture por ellos en el futuro. En las siguientes secciones, se describe cómo borrarlos o desactivarlos.

Limpia los recursos de la plantilla flexible

  1. Detiene la canalización de Dataflow.
    gcloud dataflow jobs list \
      --filter 'NAME:streaming-beam-sql AND STATE=Running' \
      --format 'value(JOB_ID)' \
      --region "$REGION" \
      | xargs gcloud dataflow jobs cancel --region "$REGION"
    
  2. Borra el archivo de especificaciones de la plantilla de Cloud Storage.
    gsutil rm $TEMPLATE_PATH
    
  3. Borra la imagen del contenedor de la plantilla flexible de Container Registry.
    gcloud container images delete $TEMPLATE_IMAGE --force-delete-tags
    

Borra los recursos del proyecto de Google Cloud

  1. Borra los trabajos de Cloud Scheduler.
    gcloud scheduler jobs delete negative-ratings-publisher
    gcloud scheduler jobs delete positive-ratings-publisher
    
  2. Borra la suscripción y el tema de Pub/Sub.
    gcloud pubsub subscriptions delete $SUBSCRIPTION
    gcloud pubsub topics delete $TOPIC
    
  3. Borra la tabla de BigQuery.
    bq rm -f -t $PROJECT:$DATASET.$TABLE
    
  4. Borra el conjunto de datos de BigQuery, ya que esto no genera cargos.

    El siguiente comando también borra todas las tablas del conjunto de datos. No se podrán recuperar las tablas y los datos.

    bq rm -r -f -d $PROJECT:$DATASET
    
  5. Borra el depósito de Cloud Storage, ya que esto no genera cargos.

    El siguiente comando también borra todos los objetos del depósito. Estos objetos no se podrán recuperar.

    gsutil rm -r gs://$BUCKET
    

Limitaciones

Se aplican las siguientes limitaciones a los trabajos de plantillas flexibles:

  • Debes usar una imagen base que proporciona Google para empaquetar tus contenedores con Docker.
  • No se pueden actualizar los trabajos de transmisión.
  • No se admite el uso de FlexRS.
  • No se admiten waitUntilFinish (Java) ni wait_until_finish (Python).

¿Qué sigue?