En esta página, se explica cómo configurar las opciones de canalización para los trabajos de Dataflow. Estas opciones de canalización configuran la forma y la ubicación en las que se ejecuta tu canalización y los recursos que usa.
La ejecución de la canalización es independiente de la ejecución del programa de Apache Beam. El programa de Apache Beam que escribiste construye una canalización para la ejecución posterior. Esto significa que el programa genera una serie de pasos que cualquier ejecutor de Apache Beam compatible puede ejecutar. Los ejecutores compatibles incluyen el ejecutor de Dataflow en Google Cloud y el ejecutor directo que ejecuta la canalización directamente en un entorno local.
Puedes pasar parámetros a un trabajo de Dataflow en el entorno de ejecución. Para obtener más información sobre cómo configurar las opciones de canalización en el entorno de ejecución, consulta Configura las opciones de canalización.
Usa opciones de canalización con los SDK de Apache Beam
Puedes usar los siguientes SDK para configurar opciones de canalización para los trabajos de Dataflow:
- SDK de Apache Beam para Python
- SDK de Apache Beam para Java
- SDK de Apache Beam para Go
Para usar los SDK, configura el ejecutor de canalización y otros parámetros de ejecución mediante la clase PipelineOptions
del SDK de Apache Beam.
Existen dos métodos para especificar las opciones de canalización:
- Establece opciones de canalización de manera programática a través de una lista de opciones de canalización.
- Configura las opciones de canalización directamente en la línea de comandos cuando ejecutes el código de tu canalización.
Establece opciones de canalización de manera programática
Puedes configurar opciones de canalización de manera programática si creas y modificas un objeto PipelineOptions
.
Java
Crea un objeto PipelineOptions
mediante el método PipelineOptionsFactory.fromArgs
.
Para ver un ejemplo, consulta la sección Muestra de inicio en Dataflow en esta página.
Python
Crea un objeto PipelineOptions
.
Para ver un ejemplo, consulta la sección Muestra de inicio en Dataflow en esta página.
Go
El SDK de Apache Beam para Go no admite la configuración de opciones de canalización de manera programática mediante PipelineOptions
. Usa los argumentos de la línea de comandos de Go.
Para ver un ejemplo, consulta la sección Muestra de inicio en Dataflow en esta página.
Configura opciones de canalización en la línea de comandos
Puedes establecer opciones de canalización mediante argumentos de la línea de comandos.
Java
La siguiente sintaxis de ejemplo es de la canalización WordCount
en la Guía de inicio rápido de Java.
mvn -Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=PROJECT_ID \
--gcpTempLocation=gs://BUCKET_NAME/temp/ \
--output=gs://BUCKET_NAME/output \
--runner=DataflowRunner \
--region=REGION"
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Google Cloud.BUCKET_NAME
: Es el nombre de tu bucket de Cloud Storage.REGION
: una región de Dataflow,us-central1
Python
La siguiente sintaxis de ejemplo es de la canalización WordCount
en la Guía de inicio rápido de Python.
python -m apache_beam.examples.wordcount \
--region DATAFLOW_REGION \
--input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://STORAGE_BUCKET/results/outputs \
--runner DataflowRunner \
--project PROJECT_ID \
--temp_location gs://STORAGE_BUCKET/tmp/
Reemplaza lo siguiente:
DATAFLOW_REGION
: la región en la que deseas implementar el trabajo de Dataflow, por ejemplo,europe-west1
La marca
--region
anula la región predeterminada que está configurada en el servidor de metadatos, el cliente local o las variables de entorno.STORAGE_BUCKET
: El nombre del bucket de Cloud StoragePROJECT_ID
: El ID del proyecto de Google Cloud
Go
La siguiente sintaxis de ejemplo es de la canalización WordCount
en la Guía de inicio rápido de Go.
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://BUCKET_NAME/results/outputs \
--runner dataflow \
--project PROJECT_ID \
--region DATAFLOW_REGION \
--staging_location gs://BUCKET_NAME/binaries/
Reemplaza lo siguiente:
BUCKET_NAME
: El nombre del bucket de Cloud StoragePROJECT_ID
: El ID del proyecto de Google CloudDATAFLOW_REGION
: la región en la que deseas implementar el trabajo de Dataflow. Por ejemplo,europe-west1
. La marca--region
anula la región predeterminada que está configurada en el servidor de metadatos, el cliente local o las variables de entorno.
Establece opciones de canalización experimentales
En los SDK de Java, Python y Go, la opción de canalización experiments
habilita funciones de Dataflow experimentales o de fase previa a la DG.
Configura de manera programática
Para configurar la opción experiments
de manera programática, usa la siguiente sintaxis.
Java
En tu objeto PipelineOptions
, incluye la opción experiments
con la siguiente sintaxis.
En este ejemplo, se establece el tamaño del disco de arranque en 80 GB con la marca del experimento.
options.setExperiments("streaming_boot_disk_size_gb=80")
Para ver un ejemplo que muestra cómo crear el objeto PipelineOptions
, consulta la sección Muestra de inicio en Dataflow en esta página.
Python
En tu objeto PipelineOptions
, incluye la opción experiments
con la siguiente sintaxis.
En este ejemplo, se establece el tamaño del disco de arranque en 80 GB con la marca del experimento.
beam_options = PipelineOptions(
beam_args,
experiments=['streaming_boot_disk_size_gb=80'])
Para ver un ejemplo que muestra cómo crear el objeto PipelineOptions
, consulta la sección Iniciar en la muestra de Dataflow en esta página.
Go
El SDK de Apache Beam para Go no admite la configuración de opciones de canalización de manera programática mediante PipelineOptions
. Usa los argumentos de la línea de comandos de Go.
Configura en la línea de comandos
Para configurar la opción experiments
en la línea de comandos, usa la siguiente sintaxis.
Java
En este ejemplo, se establece el tamaño del disco de arranque en 80 GB con la marca del experimento.
--experiments=streaming_boot_disk_size_gb=80
Python
En este ejemplo, se establece el tamaño del disco de arranque en 80 GB con la marca del experimento.
--experiments=streaming_boot_disk_size_gb=80
Go
En este ejemplo, se establece el tamaño del disco de arranque en 80 GB con la marca del experimento.
--experiments=streaming_boot_disk_size_gb=80
Configura en una plantilla
Para habilitar una función experimental cuando ejecutas una plantilla de Dataflow, usa la marca --additional-experiments
.
Plantilla clásica
gcloud dataflow jobs run JOB_NAME --additional-experiments=EXPERIMENT[,...]
plantilla de Flex
gcloud dataflow flex-template run JOB_NAME --additional-experiments=EXPERIMENT[,...]
Accede al objeto de opciones de canalización
Cuando crees el objeto Pipeline
en tu programa de Apache Beam, pasa PipelineOptions
. Cuando el servicio de Dataflow ejecuta tu canalización, envía una copia de PipelineOptions
a cada trabajador.
Java
Accede a PipelineOptions
dentro de cualquier instancia de DoFn
de la transformación ParDo
a través del método ProcessContext.getPipelineOptions
.
Python
Esta función aún no se admite en el SDK de Apache Beam para Python.
Go
Accede a las opciones de canalización mediante beam.PipelineOptions
.
Inicia en Dataflow
Ejecuta tu trabajo en recursos administrados de Google Cloud con el servicio de ejecutor de Dataflow. Cuando ejecutas la canalización con Dataflow, se crea un trabajo de Dataflow que usa recursos de Compute Engine y Cloud Storage en tu proyecto de Google Cloud. Para obtener información sobre los permisos de Dataflow, consulta Seguridad y permisos de Dataflow.
Los trabajos de Dataflow usan Cloud Storage para almacenar archivos temporales durante la ejecución de la canalización. Para evitar que se te facturen costos de almacenamiento innecesarios, desactiva la función de borrar de forma no definitiva en los buckets que usan tus trabajos de Dataflow para el almacenamiento temporal. Para obtener más información, consulta Quita una política de eliminación no definitiva de un bucket.
Establece las opciones obligatorias
Para ejecutar tu canalización con Dataflow, configura las siguientes opciones de canalización:
Java
project
: El ID de tu proyecto de Google Cloud.runner
: el ejecutor de canalizaciones que ejecuta tu canalización. Para la ejecución en la Google Cloud, debe serDataflowRunner
.gcpTempLocation
: una ruta de acceso de Cloud Storage para que Dataflow almacene en etapa intermedia la mayoría de los archivos temporales. Si deseas especificar un bucket, debes crearlo con anticipación. En caso de que no configuresgcpTempLocation
, puedes configurar la opción de canalizacióntempLocation
y, luego,gcpTempLocation
se establece en el valor detempLocation
. Si no se especifica ninguna, se crea unagcpTempLocation
predeterminada.stagingLocation
: una ruta de Cloud Storage para que Dataflow almacene en etapa intermedia los archivos binarios. Si usas el SDK de Apache Beam 2.28 o una versión posterior, no configures esta opción. En el SDK de Apache Beam 2.28 o versiones anteriores, si no configuras esta opción, el valor que especificaste paratempLocation
se usa como ubicación de etapa de pruebas.Se crea una ubicación
gcpTempLocation
predeterminada si no se especifica esta nitempLocation
. Si se especificatempLocation
, pero nogcpTempLocation
,tempLocation
será una ruta de acceso de Cloud Storage, ygcpTempLocation
la usará como la ruta predeterminada. Si no se especificatempLocation
y se especificagcpTempLocation
,tempLocation
no se propaga.
Python
project
el ID del proyecto de Google Cloudregion
: la región del trabajo de Dataflow.runner
: el ejecutor de canalizaciones que ejecuta tu canalización. Para la ejecución en la Google Cloud, debe serDataflowRunner
.temp_location
es una ruta de acceso de Cloud Storage para que Dataflow almacene en etapa intermedia los archivos de trabajo temporales creados durante la ejecución de la canalización.
Go
project
el ID del proyecto de Google Cloudregion
: la región del trabajo de Dataflow.runner
: el ejecutor de canalizaciones que ejecuta tu canalización. Para la ejecución en la Google Cloud, debe serdataflow
.staging_location
es una ruta de acceso de Cloud Storage para que Dataflow almacene en etapa intermedia los archivos de trabajo temporales creados durante la ejecución de la canalización.
Establece opciones de canalización de manera programática
En el siguiente código de ejemplo, se muestra cómo construir una canalización mediante la configuración programática del ejecutor y otras opciones necesarias para ejecutar la canalización con Dataflow.
Java
// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()
# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
beam_args,
runner='DataflowRunner',
project='my-project-id',
job_name='unique-job-name',
temp_location='gs://my-bucket/temp',
region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']
# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
pass # build your pipeline here.
Go
El SDK de Apache Beam para Go usa argumentos de la línea de comandos de Go. Usa flag.Set()
para establecer valores de marca.
// Use the Go flag package to parse custom options.
flag.Parse()
// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")
beam.Init()
// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()
Después de crear tu canalización, especifica todas las transformaciones y las operaciones de lectura y escritura de la canalización, y ejecútala.
Usa opciones de canalización desde la línea de comandos
En el siguiente ejemplo, se muestra cómo usar las opciones de canalización que se especifican en la línea de comandos. En este ejemplo, no se establecen las opciones de canalización de manera programática.
Java
// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
Usa el módulo de argumentos de Python para analizar las opciones de la línea de comandos.
# Use Python argparse module to parse custom arguments
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# For more details on how to use argparse, take a look at:
# https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
'--input-file',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()
# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| 'Read files' >> beam.io.ReadFromText(args.input_file)
| 'Write files' >> beam.io.WriteToText(args.output_path))
Go
Usa el paquete de flag
de Go para analizar las opciones de la línea de comandos. Debes analizar las opciones antes de llamar a beam.Init()
. En este ejemplo, output
es una opción de la línea de comandos.
// Define configuration options
var (
output = flag.String("output", "", "Output file (required).")
)
// Parse options before beam.Init()
flag.Parse()
beam.Init()
// Input validation must be done after beam.Init()
if *output == "" {
log.Fatal("No output provided!")
}
p := beam.NewPipeline()
Después de crear tu canalización, especifica todas las transformaciones y las operaciones de lectura y escritura de la canalización, y ejecútala.
Controla los modos de ejecución
Cuando un programa de Apache Beam ejecuta una canalización en un servicio como Dataflow, este puede ejecutar la canalización de forma asíncrona o puede bloquearse hasta que se complete la canalización. Puedes cambiar este comportamiento mediante la siguiente guía.
Java
Cuando un programa de Java de Apache Beam ejecuta una canalización en un servicio como Dataflow, por lo general, se ejecuta de forma asíncrona. Para ejecutar una canalización y esperar hasta que se complete el trabajo, configura DataflowRunner
como el ejecutor de canalizaciones y llama de forma explícita a pipeline.run().waitUntilFinish()
.
Cuando usas DataflowRunner
y llamas a waitUntilFinish()
en el objeto PipelineResult
que muestra pipeline.run()
, la canalización se ejecuta en Google Cloud, pero el código local espera a que finalice el trabajo en la nube y muestre el objeto DataflowPipelineJob
final. Mientras se ejecuta el trabajo, el servicio de Dataflow imprime actualizaciones del estado del trabajo y mensajes de la consola mientras espera.
Python
Cuando un programa de Python de Apache Beam ejecuta una canalización en un servicio como Dataflow, por lo general, se ejecuta de forma asíncrona. Usa el método wait_until_finish()
del objeto PipelineResult
, que se muestra a partir del método run()
del ejecutor, para bloquear hasta que se complete la canalización.
Go
Cuando un programa de Go de Apache Beam ejecuta una canalización en Dataflow, es síncrono de forma predeterminada y se bloquea hasta que se completa la canalización. Si no deseas bloquear, hay dos opciones:
Inicia el trabajo en una rutina de Go.
go func() { pr, err := beamx.Run(ctx, p) if err != nil { // Handle the error. } // Send beam.PipelineResult into a channel. results <- pr }() // Do other operations while the pipeline runs.
Usa la marca de línea de comandos
--async
, que está en el paquetejobopts
.
Para ver los detalles de ejecución, supervisar el progreso y verificar el estado de finalización del trabajo, usa la interfaz de supervisión de Dataflow o la interfaz de línea de comandos de Dataflow.
Usa fuentes de transmisión
Java
Si tu canalización lee desde una fuente de datos no delimitada, como Pub/Sub, la canalización se ejecuta de forma automática en modo de transmisión.
Python
Si tu canalización usa una fuente de datos no delimitados, como Pub/Sub, debes configurar la opción streaming
como verdadera.
Go
Si tu canalización lee desde una fuente de datos no delimitada, como Pub/Sub, la canalización se ejecuta de forma automática en modo de transmisión.
Los trabajos de transmisión utilizan un tipo de máquina de Compute Engine de n1-standard-2
o superior según la configuración predeterminada.
Inicia de forma local
En lugar de ejecutar tu canalización en recursos de la nube administrados, puedes elegir ejecutar tu canalización de forma local. La ejecución local tiene ciertas ventajas para probar, depurar o ejecutar tu canalización en conjuntos de datos pequeños. Por ejemplo, la ejecución local quita la dependencia del servicio de Dataflow remoto y el proyecto de Google Cloud asociado.
Cuando usas la ejecución local, debes ejecutar tu canalización con conjuntos de datos que sean lo bastante pequeños como para caber en la memoria local. Puedes crear un conjunto de datos pequeño en la memoria con una transformación Create
o usar una transformación Read
para trabajar con archivos locales o remotos pequeños. Por lo general, la ejecución local proporciona una forma más rápida y fácil de realizar pruebas y depurar con menos dependencias externas, pero está limitada por la memoria disponible en tu entorno local.
En el siguiente código de ejemplo, se muestra cómo crear una canalización que se ejecute en tu entorno local.
Java
// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()
# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| beam.io.ReadFromText(args.input)
| beam.io.WriteToText(args.output))
Go
// Parse options before beam.Init()
flag.Parse()
beam.Init()
p := beam.NewPipeline()
Después de crear tu canalización, ejecútala.
Crea opciones de canalización personalizadas
Puedes agregar tus propias opciones personalizadas además del objeto PipelineOptions
estándar. La línea de comandos de Apache Beam también puede analizar opciones personalizadas mediante los argumentos de línea de comandos especificados en el mismo formato.
Java
Si quieres agregar tus propias opciones, define una interfaz con métodos get y set para cada opción, como en el siguiente ejemplo:
public interface MyOptions extends PipelineOptions {
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
Python
Para agregar tus propias opciones, usa el método add_argument()
(que se comporta de la misma manera que el módulo argparse estándar de Python), como en el siguiente ejemplo:
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--input')
parser.add_argument('--output')
Go
Para agregar tus propias opciones, usa el paquete de marcas de Go como se muestra en el siguiente ejemplo:
var (
input = flag.String("input", "", "")
output = flag.String("output", "", "")
)
También puedes especificar una descripción, que aparece cuando un usuario pasa --help
como un argumento de la línea de comandos, y un valor predeterminado.
Java
Establece la descripción y el valor predeterminado con anotaciones, de la siguiente manera:
public interface MyOptions extends PipelineOptions {
@Description("My custom command line argument.")
@Default.String("DEFAULT")
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
Se recomienda que registres tu interfaz con PipelineOptionsFactory
y pases la interfaz cuando crees el objeto PipelineOptions
. Cuando registras tu interfaz con PipelineOptionsFactory
, --help
puede encontrar tu interfaz de opciones personalizadas y agregarla al resultado del comando --help
. PipelineOptionsFactory
valida que tus opciones personalizadas sean compatibles con todas las otras opciones registradas.
En el siguiente código de ejemplo, se muestra cómo registrar tu interfaz de opciones personalizadas con PipelineOptionsFactory
:
PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(MyOptions.class);
Ahora tu canalización puede aceptar --myCustomOption=value
como un argumento de la línea de comandos.
Python
Establece la descripción y el valor predeterminado de la siguiente manera:
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output', required=True, help='The path prefix for output files.')
Go
Establece la descripción y el valor predeterminado de la siguiente manera:
var (
input = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)