Ejecuta un trabajo de Dataflow en un contenedor personalizado

En este documento, se describe cómo ejecutar una canalización de Dataflow con un contenedor personalizado.

Si deseas obtener información para crear la imagen de contenedor, consulta Compila imágenes de contenedor personalizadas para Dataflow.

Cuando ejecutes tu canalización, iníciala con el SDK de Apache Beam con la misma versión y la misma versión de lenguaje que el SDK en tu imagen de contenedor personalizada. Este paso evita errores inesperados de dependencias o SDK incompatibles.

Realiza pruebas locales

Antes de ejecutar la canalización en Dataflow, es una buena idea probar la imagen de contenedor de forma local, lo que permite realizar pruebas y depuraciones más rápidas.

Si deseas obtener más información sobre el uso específico de Apache Beam, consulta la guía de Apache Beam para ejecutar canalizaciones con imágenes de contenedor personalizadas.

Pruebas básicas con PortableRunner

Para verificar que las imágenes de contenedor remotas se puedan extraer y puedan ejecutar una canalización simple, usa PortableRunner de Apache Beam. Cuando usas PortableRunner, el envío de trabajos se realiza en el entorno local y la ejecución DoFn se realiza en el entorno de Docker.

Cuando usas GPU, es posible que el contenedor de Docker no tenga acceso a ellas. Para probar tu contenedor con GPU, usa el ejecutor directo y sigue los pasos para probar una imagen de contenedor en una VM independiente con GPU en la sección Depura con una VM independiente de la página "Usa GPU".

A continuación, se ejecuta una canalización de ejemplo:

Java

mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain \
    -Dexec.args="--runner=PortableRunner \
    --jobEndpoint=REGION \
    --defaultEnvironmentType=DOCKER \
    --defaultEnvironmentConfig=IMAGE_URI \
    --inputFile=INPUT_FILE \
    --output=OUTPUT_FILE"

Python

python path/to/my/pipeline.py \
  --runner=PortableRunner \
  --job_endpoint=REGION \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

Go

go path/to/my/pipeline.go \
  --runner=PortableRunner \
  --job_endpoint=REGION \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

Reemplaza lo siguiente:

  • REGION: la región del servicio de trabajo que se usará, en forma de dirección y puerto. Por ejemplo: localhost:3000. Usa embed para ejecutar un servicio de trabajo en proceso.
  • IMAGE_URI: El URI de la imagen del contenedor personalizado.
  • INPUT_FILE: es un archivo de entrada que se puede leer como un archivo de texto. La imagen del contenedor
    del aprovechamiento del SDK debe poder acceder a este archivo, ya sea precargado en la imagen de contenedor o en un archivo remoto.
  • OUTPUT_FILE: una ruta de acceso en la que se escribirá el resultado. Esta ruta es una ruta de acceso remota o local en el contenedor.

Cuando la canalización se complete de forma correcta, revisa los registros de la consola para verificar que la canalización se haya completado correctamente y que se use la imagen remota especificada por IMAGE_URI.

Después de ejecutar la canalización, los archivos guardados en el contenedor no están en tu sistema de archivos local y el contenedor se detiene. Puedes copiar archivos del sistema de archivos del contenedor detenido con docker cp.

O, como alternativa:

  • Proporciona resultados para un sistema de archivos remoto, como Cloud Storage. Es posible que debas configurar el acceso de forma manual con fines de prueba, incluso para los archivos de credenciales o las credenciales predeterminadas de la aplicación.
  • Para una depuración rápida, agrega registro temporal.

Usa el ejecutor directo

Para realizar pruebas locales más detalladas de la imagen de contenedor y tu canalización, usa el ejecutor directo de Apache Beam.

Puedes verificar la canalización por separado del contenedor mediante la prueba en un entorno local que coincida con la imagen de contenedor o si inicias la canalización en un contenedor en ejecución.

Java

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain ...

Python

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  python path/to/my/pipeline.py ...

Go

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  go path/to/my/pipeline.go ...

Reemplaza IMAGE_URI por el URI de la imagen del contenedor personalizado.

En los ejemplos, se supone que cualquier archivo de canalización (incluida la canalización en sí) se encuentra en el contenedor personalizado, que se activó desde un sistema de archivos local o que es remoto y que Apache Beam puede acceder a él. Por ejemplo, para usar Maven (mvn) a fin de ejecutar el ejemplo de Java anterior, Maven y sus dependencias deben almacenarse en etapa intermedia en el contenedor. Para obtener más información, consulta Almacenamiento y docker run en la documentación de Docker.

El objetivo para realizar pruebas en el ejecutor directo es probar la canalización en el entorno del contenedor personalizado, no probar ejecutar el contenedor con el ENTRYPOINT predeterminado. Modifica ENTRYPOINT (por ejemplo, docker run --entrypoint ...) para ejecutar directamente tu canalización o permitir que se ejecuten comandos en el contenedor de forma manual.

Si dependes de una configuración específica que se basa en la ejecución del contenedor en Compute Engine, puedes ejecutar el contenedor directamente en una VM de Compute Engine. Para obtener más información, consulta Contenedores en Compute Engine.

Inicia el trabajo de Dataflow

Cuando inicies la canalización de Apache Beam en Dataflow, especifica la ruta a la imagen de contenedor. No uses la etiqueta :latest con las imágenes personalizadas. Etiqueta tus compilaciones con una fecha o un identificador único. Si algo sale mal, usar este tipo de etiqueta podría hacer posible revertir la ejecución de la canalización a una configuración de trabajo conocida y permitir una inspección de cambios.

Java

Usa --sdkContainerImage a fin de especificar una imagen de contenedor del SDK para tu entorno de ejecución de Java.

Usa --experiments=use_runner_v2 para habilitar Runner v2.

Python

Si usas la versión del SDK 2.30.0 o posterior, usa la opción de canalización --sdk_container_image para especificar una imagen de contenedor del SDK.

Para versiones anteriores del SDK, usa la opción de canalización --worker_harness_container_image para especificar la ubicación de la imagen de contenedor que se usará para el agente de trabajo.

Los contenedores personalizados solo son compatibles con Dataflow Runner v2. Si inicias una canalización de Python por lotes, configura la marca --experiments=use_runner_v2. Si vas a iniciar una canalización de Python de transmisión, no es necesario especificar el experimento, ya que las canalizaciones de Python de transmisión usan Runner v2 de forma predeterminada.

Go

Si usas la versión del SDK 2.40.0 o posterior, usa la opción de canalización --sdk_container_image para especificar una imagen de contenedor del SDK.

Para versiones anteriores del SDK, usa la opción de canalización --worker_harness_container_image para especificar la ubicación de la imagen de contenedor que se usará para el agente de trabajo.

Los contenedores personalizados son compatibles con todas las versiones del SDK de Go porque usan Dataflow Runner v2 de forma predeterminada.

En el siguiente ejemplo, se demuestra cómo iniciar el ejemplo de WordCount por lotes con un contenedor personalizado.

Java

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
   -Dexec.args="--runner=DataflowRunner \
                --inputFile=INPUT_FILE \
                --output=OUTPUT_FILE \
                --project=PROJECT_ID \
                --region=REGION \
                --gcpTempLocation=TEMP_LOCATION \
                --diskSizeGb=DISK_SIZE_GB \
                --experiments=use_runner_v2 \
                --sdkContainerImage=IMAGE_URI"

Python

Usa el SDK de Apache Beam para la versión 2.30.0 o posterior de Python:

python -m apache_beam.examples.wordcount \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE \
  --project=PROJECT_ID \
  --region=REGION \
  --temp_location=TEMP_LOCATION \
  --runner=DataflowRunner \
  --disk_size_gb=DISK_SIZE_GB \
  --experiments=use_runner_v2 \
  --sdk_container_image=IMAGE_URI

Go

wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
          --output gs://<your-gcs-bucket>/counts \
          --runner dataflow \
          --project your-gcp-project \
          --region your-gcp-region \
          --temp_location gs://<your-gcs-bucket>/tmp/ \
          --staging_location gs://<your-gcs-bucket>/binaries/ \
          --sdk_container_image=IMAGE_URI

Reemplaza lo siguiente:

  • INPUT_FILE: la ruta de acceso del archivo de entrada de Cloud Storage que Dataflow lee cuando ejecuta el ejemplo.
  • OUTPUT_FILE: la ruta de acceso del archivo de salida de Cloud Storage en la que se escribe la canalización de ejemplo. Este archivo contiene los conteos de palabras.
  • PROJECT_ID es el ID de tu proyecto de Google Cloud.
  • REGION: la región en la que se implementará tu trabajo de Dataflow.
  • TEMP_LOCATION es una ruta de acceso de Cloud Storage para que Dataflow almacene en etapa intermedia los archivos de trabajo temporales creados durante la ejecución de la canalización
  • DISK_SIZE_GB: Opcional Si tu contenedor es grande, considera aumentar el tamaño de disco de arranque predeterminado para evitar quedar sin espacio en el disco.
  • IMAGE_URI: Es el URI de la imagen de contenedor personalizada del SDK. Usa siempre una etiqueta o un SHA de contenedor con control de versiones. No uses la etiqueta :latest ni una mutable.