En este documento se describe cómo ejecutar una canalización de Dataflow mediante un contenedor personalizado.
Para obtener información sobre cómo crear la imagen de contenedor, consulta Crear imágenes de contenedor personalizadas para Dataflow.
Cuando ejecutes tu flujo de procesamiento, lánzalo con el SDK de Apache Beam con la misma versión y el mismo idioma que el SDK de tu imagen de contenedor personalizada. De esta forma, se evitan errores inesperados derivados de dependencias o SDKs incompatibles.
Probar localmente
Antes de ejecutar una canalización en Dataflow, es recomendable probar la imagen de contenedor de forma local, lo que permite realizar pruebas y depuraciones más rápidas.
Para obtener más información sobre el uso específico de Apache Beam, consulta la guía de Apache Beam sobre ejecución de canalizaciones con imágenes de contenedor personalizadas.
Pruebas básicas con PortableRunner
Para verificar que se pueden extraer imágenes de contenedor remotas y que se puede ejecutar una canalización sencilla, usa Apache Beam PortableRunner
. Cuando usas PortableRunner
, el envío de trabajos se produce en el entorno local y la ejecución de DoFn
se realiza en el entorno de Docker.
Cuando usas GPUs, es posible que el contenedor Docker no tenga acceso a ellas. Para probar tu contenedor con GPUs, usa el ejecutor directo y sigue los pasos para probar una imagen de contenedor en una VM independiente con GPUs en la sección Depurar con una VM independiente de la página "Usar GPUs".
A continuación, se ejecuta una canalización de ejemplo:
Java
mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain \
-Dexec.args="--runner=PortableRunner \
--jobEndpoint=REGION \
--defaultEnvironmentType=DOCKER \
--defaultEnvironmentConfig=IMAGE_URI \
--inputFile=INPUT_FILE \
--output=OUTPUT_FILE"
Python
python path/to/my/pipeline.py \
--runner=PortableRunner \
--job_endpoint=REGION \
--environment_type=DOCKER \
--environment_config=IMAGE_URI \
--input=INPUT_FILE \
--output=OUTPUT_FILE
Go
go path/to/my/pipeline.go \
--runner=PortableRunner \
--job_endpoint=REGION \
--environment_type=DOCKER \
--environment_config=IMAGE_URI \
--input=INPUT_FILE \
--output=OUTPUT_FILE
Haz los cambios siguientes:
REGION
: la región del servicio de trabajo que se va a usar, con el formato de dirección y puerto. Por ejemplo:localhost:3000
. Usaembed
para ejecutar un servicio de trabajo en proceso.IMAGE_URI
: el URI de la imagen de contenedor personalizada.INPUT_FILE
: un archivo de entrada que se puede leer como un archivo de texto. El arnés del SDK debe poder acceder a este archivo
de imagen de contenedor, ya sea porque esté precargado en la imagen de contenedor o porque sea un archivo remoto.OUTPUT_FILE
: ruta en la que se escribirá la salida. Esta ruta puede ser una ruta remota o una ruta local en el contenedor.
Cuando el flujo de procesamiento se complete correctamente, revisa los registros de la consola para verificar que se ha completado correctamente y que se ha usado la imagen remota especificada por IMAGE_URI
.
Después de ejecutar la canalización, los archivos guardados en el contenedor no estarán en tu sistema de archivos local y el contenedor se detendrá. Puedes copiar archivos del sistema de archivos del contenedor detenido mediante docker cp
.
Alternativa:
- Proporcionar salidas a un sistema de archivos remoto, como Cloud Storage. Es posible que tengas que configurar manualmente el acceso con fines de prueba, incluidos los archivos de credenciales o las credenciales predeterminadas de la aplicación.
- Para depurar rápidamente, añade registros temporales.
Usar Direct Runner
Para hacer pruebas locales más exhaustivas de la imagen de contenedor y de tu canalización, usa Direct Runner de Apache Beam.
Puedes verificar tu canal de forma independiente del contenedor probándolo en un entorno local que coincida con la imagen del contenedor o lanzando el canal en un contenedor en ejecución.
Java
docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/# mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain ...
Python
docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/# python path/to/my/pipeline.py ...
Go
docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/# go path/to/my/pipeline.go ...
Sustituye IMAGE_URI
por el URI de la imagen de contenedor personalizada.
En los ejemplos se da por hecho que los archivos de la canalización, incluida la propia canalización, se encuentran en el contenedor personalizado, se han montado desde un sistema de archivos local o son remotos y Apache Beam y el contenedor pueden acceder a ellos. Por ejemplo, para usar Maven (mvn
) para ejecutar el ejemplo de Java anterior, Maven y sus dependencias deben almacenarse en el contenedor. Para obtener más información, consulta Almacenamiento y docker run
en la documentación de Docker.
El objetivo de las pruebas en Direct Runner es probar tu flujo de trabajo en el entorno de contenedor personalizado, no probar la ejecución de tu contenedor con su ENTRYPOINT
predeterminado. Modifica ENTRYPOINT
(por ejemplo, docker run --entrypoint ...
) para ejecutar tu canalización directamente o para permitir que se ejecuten comandos manualmente en el contenedor.
Si dependes de una configuración específica basada en la ejecución del contenedor en Compute Engine, puedes ejecutar el contenedor directamente en una VM de Compute Engine. Para obtener más información, consulta el artículo sobre contenedores en Compute Engine.
Lanzar la tarea de Dataflow
Cuando inicies el flujo de procesamiento de Apache Beam en Dataflow, especifica la ruta a la imagen de contenedor. No uses la etiqueta :latest
con tus imágenes personalizadas. Etiqueta tus compilaciones con una fecha o un identificador único. Si algo va mal, usar este tipo de etiqueta puede permitir revertir la ejecución de la canalización a una configuración que se sepa que funciona y permite inspeccionar los cambios.
Java
Usa --sdkContainerImage
para especificar una imagen de contenedor del SDK para tu tiempo de ejecución de Java.
Usa --experiments=use_runner_v2
para habilitar Runner v2.
Python
Si usas la versión 2.30.0 o una posterior del SDK, utiliza la opción de canalización --sdk_container_image
para especificar una imagen de contenedor del SDK.
En versiones anteriores del SDK, usa la opción de canalización --worker_harness_container_image
para especificar la ubicación de la imagen de contenedor que se va a usar en el arnés de trabajador.
Los contenedores personalizados solo se admiten en Dataflow Runner v2. Si vas a lanzar una canalización de Python por lotes, define la marca --experiments=use_runner_v2
.
Si vas a iniciar un flujo de procesamiento de Python, no es necesario especificar el experimento, ya que los flujos de procesamiento de Python usan Runner v2 de forma predeterminada.
Go
Si usas la versión 2.40.0 o una posterior del SDK, utiliza la opción de canalización --sdk_container_image
para especificar una imagen de contenedor del SDK.
En versiones anteriores del SDK, usa la opción de canalización --worker_harness_container_image
para especificar la ubicación de la imagen de contenedor que se va a usar en el arnés de trabajador.
Los contenedores personalizados se admiten en todas las versiones del SDK de Go porque usan Dataflow Runner v2 de forma predeterminada.
En el siguiente ejemplo se muestra cómo iniciar el ejemplo de lote WordCount
con un contenedor personalizado.
Java
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=DataflowRunner \
--inputFile=INPUT_FILE \
--output=OUTPUT_FILE \
--project=PROJECT_ID \
--region=REGION \
--gcpTempLocation=TEMP_LOCATION \
--diskSizeGb=DISK_SIZE_GB \
--experiments=use_runner_v2 \
--sdkContainerImage=IMAGE_URI"
Python
Si usas la versión 2.30.0 o una posterior del SDK de Apache Beam para Python:
python -m apache_beam.examples.wordcount \
--input=INPUT_FILE \
--output=OUTPUT_FILE \
--project=PROJECT_ID \
--region=REGION \
--temp_location=TEMP_LOCATION \
--runner=DataflowRunner \
--disk_size_gb=DISK_SIZE_GB \
--experiments=use_runner_v2 \
--sdk_container_image=IMAGE_URI
Go
wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://<your-gcs-bucket>/counts \
--runner dataflow \
--project your-gcp-project \
--region your-gcp-region \
--temp_location gs://<your-gcs-bucket>/tmp/ \
--staging_location gs://<your-gcs-bucket>/binaries/ \
--sdk_container_image=IMAGE_URI
Haz los cambios siguientes:
INPUT_FILE
: la ruta de entrada de Cloud Storage que lee Dataflow al ejecutar el ejemplo.OUTPUT_FILE
: la ruta de salida de Cloud Storage en la que escribe la canalización de ejemplo. Este archivo contiene el recuento de palabras.PROJECT_ID
: el ID de tu proyecto de Google Cloud.REGION
: la región en la que se desplegará tu trabajo de Dataflow.TEMP_LOCATION
: la ruta de Cloud Storage para que Dataflow organice los archivos de trabajo temporales creados durante la ejecución del flujo de procesamiento.DISK_SIZE_GB
: opcional. Si tu contenedor es grande, plantéate aumentar el tamaño predeterminado del disco de arranque para evitar quedarte sin espacio en disco.IMAGE_URI
: el URI de la imagen del contenedor personalizado del SDK. Utiliza siempre un SHA o una etiqueta de contenedor con versión. No uses la etiqueta:latest
ni una etiqueta mutable.
Streaming de imágenes de contenedor
Puedes mejorar la latencia de inicio y de escalado automático de tu canalización de Dataflow habilitando el streaming de imágenes. Esta función es útil si tu contenedor personalizado incluye contenido superfluo o no usa todo su contenido en cada paso. Por ejemplo, tu contenedor puede incluir contenido incidental, como código de biblioteca basado en la CPU para la inferencia basada en la GPU. Del mismo modo, puede que tengas un contenedor que ejecute pipelines de aprendizaje automático con varios modelos que solo usen un modelo en cada paso, por lo que no es necesario que se carguen todos los contenidos a la vez. Si habilitas la transmisión de imágenes, la latencia mejorará en estos casos.
Java
--dataflowServiceOptions=enable_image_streaming
Python
--dataflow_service_options=enable_image_streaming
Go
--dataflow_service_options=enable_image_streaming
La transmisión de imágenes obtendrá partes de tu contenedor personalizado a medida que las necesite tu código de la canalización, en lugar de descargar todo el contenedor por adelantado. Las partes del contenedor que no se usan no se descargan nunca.
Para beneficiarte del streaming de imágenes, debes tener habilitada la API Container File System.