En esta página se explica cómo definir las opciones de la canalización de las tareas de Dataflow. Estas opciones de la canalización configuran cómo y dónde se ejecuta la canalización y qué recursos utiliza.
La ejecución de la canalización es independiente de la ejecución del programa Apache Beam. El programa Apache Beam que has escrito crea una canalización para la ejecución diferida. Esto significa que el programa genera una serie de pasos que puede ejecutar cualquier runner de Apache Beam compatible. Entre los ejecutores compatibles se incluyen el ejecutor de Dataflow enGoogle Cloud y el ejecutor directo, que ejecuta el flujo de procesamiento directamente en un entorno local.
Puedes transferir parámetros a una tarea de Dataflow en el tiempo de ejecución. Para obtener más información sobre cómo definir opciones de la canalización en tiempo de ejecución, consulta el artículo Configurar opciones de la canalización.
Usar opciones de canalización con los SDKs de Apache Beam
Puede usar los siguientes SDKs para definir las opciones de las canalizaciones de los trabajos de Dataflow:
- SDK de Apache Beam para Python
- SDK de Apache Beam para Java
- SDK de Apache Beam para Go
Para usar los SDKs, debes definir el ejecutor de la canalización y otros parámetros de ejecución mediante la clase PipelineOptions
del SDK de Apache Beam.
Hay dos métodos para especificar las opciones de la canalización:
- Define las opciones del flujo de procesamiento de forma programática proporcionando una lista de opciones.
- Define las opciones del flujo de procesamiento directamente en la línea de comandos cuando ejecutes el código del flujo.
Definir las opciones de la canalización mediante programación
Puedes definir opciones de la canalización de forma programática creando y modificando un objeto PipelineOptions
.
Java
Construye un objeto PipelineOptions
con el método PipelineOptionsFactory.fromArgs
.
Para ver un ejemplo, consulta la sección Ejemplo de lanzamiento en Dataflow de esta página.
Python
Crea un objeto PipelineOptions
.
Para ver un ejemplo, consulta la sección Ejemplo de lanzamiento en Dataflow de esta página.
Go
No se admite la configuración de opciones de la canalización de forma programática mediante PipelineOptions
en el SDK de Apache Beam para Go. Usa argumentos de línea de comandos de Go.
Para ver un ejemplo, consulta la sección Ejemplo de lanzamiento en Dataflow de esta página.
Definir opciones de la canalización en la línea de comandos
Puedes definir las opciones de la canalización mediante argumentos de línea de comandos.
Java
La siguiente sintaxis de ejemplo procede de la WordCount
de la guía de Java.
mvn -Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=PROJECT_ID \
--gcpTempLocation=gs://BUCKET_NAME/temp/ \
--output=gs://BUCKET_NAME/output \
--runner=DataflowRunner \
--region=REGION"
Haz los cambios siguientes:
PROJECT_ID
: tu ID de proyecto Google CloudBUCKET_NAME
: el nombre de tu segmento de Cloud StorageREGION
: a Región de flujo de datos,us-central1
Python
La siguiente sintaxis de ejemplo procede de la canalización WordCount
del tutorial de Python.
python -m apache_beam.examples.wordcount \
--region DATAFLOW_REGION \
--input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://STORAGE_BUCKET/results/outputs \
--runner DataflowRunner \
--project PROJECT_ID \
--temp_location gs://STORAGE_BUCKET/tmp/
Haz los cambios siguientes:
DATAFLOW_REGION
: la región en la que quieres implementar el trabajo de Dataflow. Por ejemplo,europe-west1
La marca
--region
anula la región predeterminada que se ha definido en el servidor de metadatos, en tu cliente local o en las variables de entorno.STORAGE_BUCKET
: el nombre del segmento de Cloud StoragePROJECT_ID
: el ID del proyecto Google Cloud
Go
La siguiente sintaxis de ejemplo procede de la canalización WordCount
del tutorial de Go.
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://BUCKET_NAME/results/outputs \
--runner dataflow \
--project PROJECT_ID \
--region DATAFLOW_REGION \
--staging_location gs://BUCKET_NAME/binaries/
Haz los cambios siguientes:
BUCKET_NAME
: el nombre del segmento de Cloud StoragePROJECT_ID
: el ID del proyecto Google CloudDATAFLOW_REGION
: la región en la que quieres desplegar la tarea de Dataflow. Por ejemplo,europe-west1
. La marca--region
anula la región predeterminada que se ha definido en el servidor de metadatos, en tu cliente local o en las variables de entorno.
Definir opciones de canalización experimentales
En los SDKs de Java, Python y Go, la experiments
opción de canalización
permite usar funciones experimentales o en fase pre-GA de Dataflow.
Definir de forma programática
Para definir la opción experiments
de forma programática, usa la siguiente sintaxis.
Java
En el objeto PipelineOptions
, incluye la opción experiments
con la siguiente sintaxis.
En este ejemplo se define el tamaño del disco de arranque en 80 GB con la marca de experimento.
options.setExperiments("streaming_boot_disk_size_gb=80")
Para ver un ejemplo de cómo crear el objeto PipelineOptions
, consulta la sección Ejemplo de lanzamiento en Dataflow de esta página.
Python
En el objeto PipelineOptions
, incluye la opción experiments
con la siguiente sintaxis.
En este ejemplo se define el tamaño del disco de arranque en 80 GB con la marca de experimento.
beam_options = PipelineOptions(
beam_args,
experiments=['streaming_boot_disk_size_gb=80'])
Para ver un ejemplo de cómo crear el objeto PipelineOptions
, consulta la sección Ejemplo de lanzamiento en Dataflow de esta página.
Go
No se admite la configuración de opciones de la canalización de forma programática mediante PipelineOptions
en el SDK de Apache Beam para Go. Usa argumentos de línea de comandos de Go.
Definir en la línea de comandos
Para definir la opción experiments
en la línea de comandos, usa la siguiente sintaxis.
Java
En este ejemplo se define el tamaño del disco de arranque en 80 GB con la marca de experimento.
--experiments=streaming_boot_disk_size_gb=80
Python
En este ejemplo se define el tamaño del disco de arranque en 80 GB con la marca de experimento.
--experiments=streaming_boot_disk_size_gb=80
Go
En este ejemplo se define el tamaño del disco de arranque en 80 GB con la marca de experimento.
--experiments=streaming_boot_disk_size_gb=80
Definir en una plantilla
Para habilitar una función experimental al ejecutar una plantilla de Dataflow, usa la marca --additional-experiments
.
Plantilla clásica
gcloud dataflow jobs run JOB_NAME --additional-experiments=EXPERIMENT[,...]
plantilla Flex
gcloud dataflow flex-template run JOB_NAME --additional-experiments=EXPERIMENT[,...]
Acceder al objeto de opciones de la canalización
Cuando cree el objeto Pipeline
en su programa Apache Beam, transmita
PipelineOptions
. Cuando el servicio Dataflow ejecuta tu flujo de procesamiento, envía una copia del PipelineOptions
a cada trabajador.
Java
Accede a PipelineOptions
dentro de cualquier instancia de DoFn
de la transformación ParDo
mediante el método ProcessContext.getPipelineOptions
.
Python
Esta función no está disponible en el SDK de Apache Beam para Python.
Go
Accede a las opciones de los flujos de procesamiento mediante beam.PipelineOptions
.
Lanzar en Dataflow
Ejecuta tu trabajo en recursos gestionados Google Cloud con el servicio de ejecutor de Dataflow. Al ejecutar tu flujo de procesamiento con Dataflow, se crea un trabajo de Dataflow que usa recursos de Compute Engine y Cloud Storage en tu Google Cloudproyecto. Para obtener información sobre los permisos de Dataflow, consulta el artículo Seguridad y permisos de Dataflow.
Los trabajos de Dataflow usan Cloud Storage para almacenar archivos temporales durante la ejecución de la canalización. Para evitar que se te cobren costes de almacenamiento innecesarios, desactiva la función de eliminación lógica en los segmentos que usen tus trabajos de Dataflow para el almacenamiento temporal. Para obtener más información, consulta Eliminar una política de eliminación lógica de un bucket.
Configurar las opciones obligatorias
Para ejecutar tu flujo de procesamiento con Dataflow, define las siguientes opciones de flujo de procesamiento:
Java
project
: el ID de tu proyecto de Google Cloud .runner
: el ejecutor de flujos de procesamiento que ejecuta tu flujo de procesamiento. Para la ejecución deGoogle Cloud , debe serDataflowRunner
.gcpTempLocation
: ruta de Cloud Storage para que Dataflow almacene la mayoría de los archivos temporales. El segmento especificado ya debe existir.Si no especifica
gcpTempLocation
, Dataflow usará el valor de la opcióntempLocation
. Si no especifica ninguna de estas opciones, Dataflow creará un segmento de Cloud Storage.
Python
project
: tu ID de proyecto Google Cloud .region
: la región de tu tarea de Dataflow.runner
: el ejecutor de flujos de procesamiento que ejecuta tu flujo de procesamiento. Para la ejecución deGoogle Cloud , debe serDataflowRunner
.temp_location
: una ruta de Cloud Storage para que Dataflow almacene los archivos de trabajo temporales creados durante la ejecución de la canalización.
Go
project
: tu ID de proyecto Google Cloud .region
: la región de tu tarea de Dataflow.runner
: el ejecutor de flujos de procesamiento que ejecuta tu flujo de procesamiento. Para la ejecución deGoogle Cloud , debe serdataflow
.staging_location
: una ruta de Cloud Storage para que Dataflow almacene los archivos de trabajo temporales creados durante la ejecución de la canalización.
Definir las opciones de la canalización mediante programación
En el siguiente ejemplo de código se muestra cómo crear una canalización configurando de forma programática el ejecutor y otras opciones necesarias para ejecutar la canalización con Dataflow.
Java
// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()
# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
beam_args,
runner='DataflowRunner',
project='my-project-id',
job_name='unique-job-name',
temp_location='gs://my-bucket/temp',
region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']
# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
pass # build your pipeline here.
Go
El SDK de Apache Beam para Go usa argumentos de línea de comandos de Go. Usa flag.Set()
para definir los valores de las marcas.
// Use the Go flag package to parse custom options.
flag.Parse()
// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")
beam.Init()
// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()
Una vez que hayas creado el flujo de procesamiento, especifica todas las lecturas, transformaciones y escrituras del flujo, y ejecuta el flujo.
Usar opciones de canalización desde la línea de comandos
En el siguiente ejemplo se muestra cómo usar las opciones de la canalización que se especifican en la línea de comandos. En este ejemplo no se definen las opciones de la canalización mediante programación.
Java
// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
Usa el módulo argparse de Python para analizar las opciones de la línea de comandos.
# Use Python argparse module to parse custom arguments
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# For more details on how to use argparse, take a look at:
# https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
'--input-file',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()
# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| 'Read files' >> beam.io.ReadFromText(args.input_file)
| 'Write files' >> beam.io.WriteToText(args.output_path))
Go
Usa el paquete Go flag
para analizar las opciones de la línea de comandos. Debes analizar las opciones antes de llamar
beam.Init()
. En este ejemplo, output
es una opción de línea de comandos.
// Define configuration options
var (
output = flag.String("output", "", "Output file (required).")
)
// Parse options before beam.Init()
flag.Parse()
beam.Init()
// Input validation must be done after beam.Init()
if *output == "" {
log.Fatal("No output provided!")
}
p := beam.NewPipeline()
Una vez que hayas creado el flujo de procesamiento, especifica todas las lecturas, transformaciones y escrituras del flujo y, a continuación, ejecútalo.
Modos de ejecución de controles
Cuando un programa de Apache Beam ejecuta un flujo de procesamiento en un servicio como Dataflow, puede ejecutarlo de forma asíncrona o bloquearse hasta que se complete. Puedes cambiar este comportamiento siguiendo las instrucciones que se indican a continuación.
Java
Cuando un programa Java de Apache Beam ejecuta una canalización en un servicio como Dataflow, suele ejecutarse de forma asíncrona. Para ejecutar un flujo de procesamiento y esperar a que se complete la tarea, define DataflowRunner
como el ejecutor del flujo de procesamiento y llama explícitamente a pipeline.run().waitUntilFinish()
.
Cuando usas DataflowRunner
y llamas a waitUntilFinish()
en el objeto PipelineResult
devuelto por pipeline.run()
, la canalización se ejecuta en Google Cloud , pero el código local espera a que finalice el trabajo en la nube y devuelva el objeto DataflowPipelineJob
final. Mientras se ejecuta el trabajo, el servicio Dataflow imprime actualizaciones del estado del trabajo y mensajes de la consola mientras espera.
Python
Cuando un programa de Python de Apache Beam ejecuta un flujo de procesamiento en un servicio como Dataflow, suele ejecutarse de forma asíncrona. Para bloquear hasta que se complete la canalización, usa el método wait_until_finish()
del objeto PipelineResult
, devuelto por el método run()
del ejecutor.
Go
Cuando un programa de Apache Beam Go ejecuta un flujo de procesamiento en Dataflow, se realiza de forma síncrona de forma predeterminada y se bloquea hasta que se completa el flujo. Si no quieres bloquearlo, tienes dos opciones:
Inicia el trabajo en una gorutina.
go func() { pr, err := beamx.Run(ctx, p) if err != nil { // Handle the error. } // Send beam.PipelineResult into a channel. results <- pr }() // Do other operations while the pipeline runs.
Usa la marca de línea de comandos
--async
, que se encuentra en el paquetejobopts
.
Para ver los detalles de la ejecución, monitorizar el progreso y verificar el estado de finalización de un trabajo, usa la interfaz de monitorización de Dataflow o la interfaz de línea de comandos de Dataflow.
Usar fuentes de streaming
Java
Si tu flujo de procesamiento lee datos de una fuente de datos ilimitada, como Pub/Sub, se ejecutará automáticamente en modo de streaming.
Python
Si tu canalización usa una fuente de datos ilimitada, como Pub/Sub, debes definir la opción streaming
como true.
Go
Si tu flujo de procesamiento lee datos de una fuente de datos ilimitada, como Pub/Sub, se ejecutará automáticamente en modo de streaming.
Las tareas de streaming usan un tipo de máquina de Compute Engine n1-standard-2
o superior de forma predeterminada.
Lanzar localmente
En lugar de ejecutar tu canalización en recursos de nube gestionados, puedes ejecutarla de forma local. La ejecución local tiene ciertas ventajas para probar, depurar o ejecutar tu flujo de trabajo en conjuntos de datos pequeños. Por ejemplo, la ejecución local elimina la dependencia del servicio Dataflow remoto y del Google Cloud proyecto asociado.
Cuando usas la ejecución local, debes ejecutar tu flujo de procesamiento con conjuntos de datos lo suficientemente pequeños como para que quepan en la memoria local. Puedes crear un conjunto de datos pequeño en la memoria con una transformación Create
o usar una transformación Read
para trabajar con archivos locales o remotos pequeños. La ejecución local suele ser una forma más rápida y sencilla de realizar pruebas y depuraciones con menos dependencias externas, pero está limitada por la memoria disponible en tu entorno local.
El siguiente código de ejemplo muestra cómo crear una canalización que se ejecute en tu entorno local.
Java
// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()
# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| beam.io.ReadFromText(args.input)
| beam.io.WriteToText(args.output))
Go
// Parse options before beam.Init()
flag.Parse()
beam.Init()
p := beam.NewPipeline()
Una vez que hayas creado el flujo de procesamiento, ejecútalo.
Crear opciones de flujo de trabajo personalizadas
Puedes añadir tus propias opciones personalizadas además de las PipelineOptions
estándar. La línea de comandos de Apache Beam también puede analizar opciones personalizadas mediante argumentos de línea de comandos especificados en el mismo formato.
Java
Para añadir tus propias opciones, define una interfaz con métodos getter y setter para cada opción, como en el ejemplo siguiente:
public interface MyOptions extends PipelineOptions {
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
Python
Para añadir tus propias opciones, usa el método add_argument()
(que se comporta exactamente igual que el módulo argparse estándar de Python), como en el ejemplo siguiente:
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--input')
parser.add_argument('--output')
Go
Para añadir tus propias opciones, usa el paquete de marcas Go, como se muestra en el siguiente ejemplo:
var (
input = flag.String("input", "", "")
output = flag.String("output", "", "")
)
También puedes especificar una descripción, que aparece cuando un usuario introduce --help
como argumento de línea de comandos, y un valor predeterminado.
Java
Para definir la descripción y el valor predeterminado, usa anotaciones, como se indica a continuación:
public interface MyOptions extends PipelineOptions {
@Description("My custom command line argument.")
@Default.String("DEFAULT")
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
Te recomendamos que registres tu interfaz con PipelineOptionsFactory
y, a continuación, pases la interfaz al crear el objeto PipelineOptions
. Cuando registras tu interfaz con PipelineOptionsFactory
, --help
puede encontrar tu interfaz de opciones personalizadas y añadirla al resultado del comando --help
. PipelineOptionsFactory
valida que tus opciones personalizadas sean compatibles con el resto de las opciones registradas.
En el siguiente código de ejemplo se muestra cómo registrar tu interfaz de opciones personalizadas con PipelineOptionsFactory
:
PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(MyOptions.class);
Ahora, tu canal puede aceptar --myCustomOption=value
como argumento de línea de comandos.
Python
Define la descripción y el valor predeterminado de la siguiente manera:
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output', required=True, help='The path prefix for output files.')
Go
Define la descripción y el valor predeterminado de la siguiente manera:
var (
input = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)