Configura las opciones de canalización de Dataflow

En esta página, se explica cómo configurar las opciones de canalización para los trabajos de Dataflow. Estas opciones de canalización configuran la forma y la ubicación en las que se ejecuta tu canalización y los recursos que usa.

La ejecución de la canalización es independiente de la ejecución del programa de Apache Beam. El programa de Apache Beam que escribiste construye una canalización para la ejecución posterior. Esto significa que el programa genera una serie de pasos que cualquier ejecutor de Apache Beam compatible puede ejecutar. Los ejecutores compatibles incluyen el ejecutor de Dataflow en Google Cloud y el ejecutor directo que ejecuta la canalización directamente en un entorno local.

Puedes pasar parámetros a un trabajo de Dataflow en el entorno de ejecución. Para obtener más información sobre cómo configurar las opciones de canalización en el entorno de ejecución, consulta Configura las opciones de canalización.

Usa opciones de canalización con los SDK de Apache Beam

Puedes usar los siguientes SDK para configurar opciones de canalización para los trabajos de Dataflow:

  • SDK de Apache Beam para Python
  • SDK de Apache Beam para Java
  • SDK de Apache Beam para Go

Para usar los SDK, configura el ejecutor de canalización y otros parámetros de ejecución mediante la clase PipelineOptions del SDK de Apache Beam.

Existen dos métodos para especificar las opciones de canalización:

  • Establece opciones de canalización de manera programática a través de una lista de opciones de canalización.
  • Configura las opciones de canalización directamente en la línea de comandos cuando ejecutes el código de tu canalización.

Establece opciones de canalización de manera programática

Puedes configurar opciones de canalización de manera programática si creas y modificas un objeto PipelineOptions.

Java

Crea un objeto PipelineOptions mediante el método PipelineOptionsFactory.fromArgs.

Para ver un ejemplo, consulta la sección Muestra de inicio en Dataflow en esta página.

Python

Crea un objeto PipelineOptions.

Para ver un ejemplo, consulta la sección Muestra de inicio en Dataflow en esta página.

Go

El SDK de Apache Beam para Go no admite la configuración de opciones de canalización de manera programática mediante PipelineOptions. Usa los argumentos de la línea de comandos de Go.

Para ver un ejemplo, consulta la sección Muestra de inicio en Dataflow en esta página.

Configura opciones de canalización en la línea de comandos

Puedes establecer opciones de canalización mediante argumentos de la línea de comandos.

Java

La siguiente sintaxis de ejemplo es de la canalización WordCount en la Guía de inicio rápido de Java.

mvn -Pdataflow-runner compile exec:java \
  -Dexec.mainClass=org.apache.beam.examples.WordCount \
  -Dexec.args="--project=PROJECT_ID \
  --gcpTempLocation=gs://BUCKET_NAME/temp/ \
  --output=gs://BUCKET_NAME/output \
  --runner=DataflowRunner \
  --region=REGION"

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud.
  • BUCKET_NAME: Es el nombre de tu bucket de Cloud Storage.
  • REGION: una región de Dataflow, us-central1

Python

La siguiente sintaxis de ejemplo es de la canalización WordCount en la Guía de inicio rápido de Python.

python -m apache_beam.examples.wordcount \
  --region DATAFLOW_REGION \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output gs://STORAGE_BUCKET/results/outputs \
  --runner DataflowRunner \
  --project PROJECT_ID \
  --temp_location gs://STORAGE_BUCKET/tmp/

Reemplaza lo siguiente:

  • DATAFLOW_REGION: la región en la que deseas implementar el trabajo de Dataflow, por ejemplo, europe-west1

    La marca --region anula la región predeterminada que está configurada en el servidor de metadatos, el cliente local o las variables de entorno.

  • STORAGE_BUCKET: El nombre del bucket de Cloud Storage

  • PROJECT_ID: El ID del proyecto de Google Cloud

Go

La siguiente sintaxis de ejemplo es de la canalización WordCount en la Guía de inicio rápido de Go.

go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
   --output gs://BUCKET_NAME/results/outputs \
   --runner dataflow \
   --project PROJECT_ID \
   --region DATAFLOW_REGION \
   --staging_location gs://BUCKET_NAME/binaries/

Reemplaza lo siguiente:

  • BUCKET_NAME: El nombre del bucket de Cloud Storage

  • PROJECT_ID: El ID del proyecto de Google Cloud

  • DATAFLOW_REGION: la región en la que deseas implementar el trabajo de Dataflow. Por ejemplo, europe-west1. La marca --region anula la región predeterminada que está configurada en el servidor de metadatos, el cliente local o las variables de entorno.

Establece opciones de canalización experimentales

En los SDK de Java, Python y Go, la opción de canalización experiments habilita funciones de Dataflow experimentales o de fase previa a la DG.

Configura de manera programática

Para configurar la opción experiments de manera programática, usa la siguiente sintaxis.

Java

En tu objeto PipelineOptions, incluye la opción experiments con la siguiente sintaxis. En este ejemplo, se establece el tamaño del disco de arranque en 80 GB con la marca del experimento.

options.setExperiments("streaming_boot_disk_size_gb=80")

Para ver un ejemplo que muestra cómo crear el objeto PipelineOptions, consulta la sección Muestra de inicio en Dataflow en esta página.

Python

En tu objeto PipelineOptions, incluye la opción experiments con la siguiente sintaxis. En este ejemplo, se establece el tamaño del disco de arranque en 80 GB con la marca del experimento.

beam_options = PipelineOptions(
  beam_args,
  experiments=['streaming_boot_disk_size_gb=80'])

Para ver un ejemplo que muestra cómo crear el objeto PipelineOptions, consulta la sección Iniciar en la muestra de Dataflow en esta página.

Go

El SDK de Apache Beam para Go no admite la configuración de opciones de canalización de manera programática mediante PipelineOptions. Usa los argumentos de la línea de comandos de Go.

Configura en la línea de comandos

Para configurar la opción experiments en la línea de comandos, usa la siguiente sintaxis.

Java

En este ejemplo, se establece el tamaño del disco de arranque en 80 GB con la marca del experimento.

--experiments=streaming_boot_disk_size_gb=80

Python

En este ejemplo, se establece el tamaño del disco de arranque en 80 GB con la marca del experimento.

--experiments=streaming_boot_disk_size_gb=80

Go

En este ejemplo, se establece el tamaño del disco de arranque en 80 GB con la marca del experimento.

--experiments=streaming_boot_disk_size_gb=80

Configura en una plantilla

Para habilitar una función experimental cuando ejecutas una plantilla de Dataflow, usa la marca --additional-experiments.

Plantilla clásica

gcloud dataflow jobs run JOB_NAME --additional-experiments=EXPERIMENT[,...]

plantilla de Flex

gcloud dataflow flex-template run JOB_NAME --additional-experiments=EXPERIMENT[,...]

Accede al objeto de opciones de canalización

Cuando crees el objeto Pipeline en tu programa de Apache Beam, pasa PipelineOptions. Cuando el servicio de Dataflow ejecuta tu canalización, envía una copia de PipelineOptions a cada trabajador.

Java

Accede a PipelineOptions dentro de cualquier instancia de DoFn de la transformación ParDo a través del método ProcessContext.getPipelineOptions.

Python

Esta función aún no se admite en el SDK de Apache Beam para Python.

Go

Accede a las opciones de canalización mediante beam.PipelineOptions.

Inicia en Dataflow

Ejecuta tu trabajo en recursos administrados de Google Cloud con el servicio de ejecutor de Dataflow. Cuando ejecutas la canalización con Dataflow, se crea un trabajo de Dataflow que usa recursos de Compute Engine y Cloud Storage en tu proyecto de Google Cloud. Para obtener información sobre los permisos de Dataflow, consulta Seguridad y permisos de Dataflow.

Los trabajos de Dataflow usan Cloud Storage para almacenar archivos temporales durante la ejecución de la canalización. Para evitar que se te facturen costos de almacenamiento innecesarios, desactiva la función de borrar de forma no definitiva en los buckets que usan tus trabajos de Dataflow para el almacenamiento temporal. Para obtener más información, consulta Quita una política de eliminación no definitiva de un bucket.

Establece las opciones obligatorias

Para ejecutar tu canalización con Dataflow, configura las siguientes opciones de canalización:

Java

  • project: El ID de tu proyecto de Google Cloud.
  • runner: el ejecutor de canalizaciones que ejecuta tu canalización. Para la ejecución en la Google Cloud, debe ser DataflowRunner.
  • gcpTempLocation: una ruta de acceso de Cloud Storage para que Dataflow almacene en etapa intermedia la mayoría de los archivos temporales. Si deseas especificar un bucket, debes crearlo con anticipación. En caso de que no configures gcpTempLocation, puedes configurar la opción de canalización tempLocation y, luego, gcpTempLocation se establece en el valor de tempLocation. Si no se especifica ninguna, se crea una gcpTempLocation predeterminada.
  • stagingLocation: una ruta de Cloud Storage para que Dataflow almacene en etapa intermedia los archivos binarios. Si usas el SDK de Apache Beam 2.28 o una versión posterior, no configures esta opción. En el SDK de Apache Beam 2.28 o versiones anteriores, si no configuras esta opción, el valor que especificaste para tempLocation se usa como ubicación de etapa de pruebas.

    Se crea una ubicación gcpTempLocation predeterminada si no se especifica esta ni tempLocation. Si se especifica tempLocation, pero no gcpTempLocation, tempLocation será una ruta de acceso de Cloud Storage, y gcpTempLocation la usará como la ruta predeterminada. Si no se especifica tempLocation y se especifica gcpTempLocation, tempLocation no se propaga.

Python

  • project el ID del proyecto de Google Cloud
  • region: la región del trabajo de Dataflow.
  • runner: el ejecutor de canalizaciones que ejecuta tu canalización. Para la ejecución en la Google Cloud, debe ser DataflowRunner.
  • temp_location es una ruta de acceso de Cloud Storage para que Dataflow almacene en etapa intermedia los archivos de trabajo temporales creados durante la ejecución de la canalización.

Go

  • project el ID del proyecto de Google Cloud
  • region: la región del trabajo de Dataflow.
  • runner: el ejecutor de canalizaciones que ejecuta tu canalización. Para la ejecución en la Google Cloud, debe ser dataflow.
  • staging_location es una ruta de acceso de Cloud Storage para que Dataflow almacene en etapa intermedia los archivos de trabajo temporales creados durante la ejecución de la canalización.

Establece opciones de canalización de manera programática

En el siguiente código de ejemplo, se muestra cómo construir una canalización mediante la configuración programática del ejecutor y otras opciones necesarias para ejecutar la canalización con Dataflow.

Java

// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()

# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
    beam_args,
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
    region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']

# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
  pass  # build your pipeline here.

Go

El SDK de Apache Beam para Go usa argumentos de la línea de comandos de Go. Usa flag.Set() para establecer valores de marca.

// Use the Go flag package to parse custom options.
flag.Parse()

// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")

beam.Init()

// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()

Después de crear tu canalización, especifica todas las transformaciones y las operaciones de lectura y escritura de la canalización, y ejecútala.

Usa opciones de canalización desde la línea de comandos

En el siguiente ejemplo, se muestra cómo usar las opciones de canalización que se especifican en la línea de comandos. En este ejemplo, no se establecen las opciones de canalización de manera programática.

Java

// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

Usa el módulo de argumentos de Python para analizar las opciones de la línea de comandos.

# Use Python argparse module to parse custom arguments
import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# For more details on how to use argparse, take a look at:
#   https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
    '--input-file',
    default='gs://dataflow-samples/shakespeare/kinglear.txt',
    help='The file path for the input text to process.')
parser.add_argument(
    '--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()

# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | 'Read files' >> beam.io.ReadFromText(args.input_file)
      | 'Write files' >> beam.io.WriteToText(args.output_path))

Go

Usa el paquete de flag de Go para analizar las opciones de la línea de comandos. Debes analizar las opciones antes de llamar a beam.Init(). En este ejemplo, output es una opción de la línea de comandos.

// Define configuration options
var (
  output = flag.String("output", "", "Output file (required).")
)

// Parse options before beam.Init()
flag.Parse()

beam.Init()

// Input validation must be done after beam.Init()
if *output == "" {
  log.Fatal("No output provided!")
}

p := beam.NewPipeline()

Después de crear tu canalización, especifica todas las transformaciones y las operaciones de lectura y escritura de la canalización, y ejecútala.

Controla los modos de ejecución

Cuando un programa de Apache Beam ejecuta una canalización en un servicio como Dataflow, este puede ejecutar la canalización de forma asíncrona o puede bloquearse hasta que se complete la canalización. Puedes cambiar este comportamiento mediante la siguiente guía.

Java

Cuando un programa de Java de Apache Beam ejecuta una canalización en un servicio como Dataflow, por lo general, se ejecuta de forma asíncrona. Para ejecutar una canalización y esperar hasta que se complete el trabajo, configura DataflowRunner como el ejecutor de canalizaciones y llama de forma explícita a pipeline.run().waitUntilFinish().

Cuando usas DataflowRunner y llamas a waitUntilFinish() en el objeto PipelineResult que muestra pipeline.run(), la canalización se ejecuta en Google Cloud, pero el código local espera a que finalice el trabajo en la nube y muestre el objeto DataflowPipelineJob final. Mientras se ejecuta el trabajo, el servicio de Dataflow imprime actualizaciones del estado del trabajo y mensajes de la consola mientras espera.

Python

Cuando un programa de Python de Apache Beam ejecuta una canalización en un servicio como Dataflow, por lo general, se ejecuta de forma asíncrona. Usa el método wait_until_finish() del objeto PipelineResult, que se muestra a partir del método run() del ejecutor, para bloquear hasta que se complete la canalización.

Go

Cuando un programa de Go de Apache Beam ejecuta una canalización en Dataflow, es síncrono de forma predeterminada y se bloquea hasta que se completa la canalización. Si no deseas bloquear, hay dos opciones:

  1. Inicia el trabajo en una rutina de Go.

    go func() {
      pr, err := beamx.Run(ctx, p)
      if err != nil {
        // Handle the error.
      }
      // Send beam.PipelineResult into a channel.
      results <- pr
    }()
    // Do other operations while the pipeline runs.
    
  2. Usa la marca de línea de comandos --async, que está en el paquete jobopts.

Para ver los detalles de ejecución, supervisar el progreso y verificar el estado de finalización del trabajo, usa la interfaz de supervisión de Dataflow o la interfaz de línea de comandos de Dataflow.

Usa fuentes de transmisión

Java

Si tu canalización lee desde una fuente de datos no delimitada, como Pub/Sub, la canalización se ejecuta de forma automática en modo de transmisión.

Python

Si tu canalización usa una fuente de datos no delimitados, como Pub/Sub, debes configurar la opción streaming como verdadera.

Go

Si tu canalización lee desde una fuente de datos no delimitada, como Pub/Sub, la canalización se ejecuta de forma automática en modo de transmisión.

Los trabajos de transmisión utilizan un tipo de máquina de Compute Engine de n1-standard-2 o superior según la configuración predeterminada.

Inicia de forma local

En lugar de ejecutar tu canalización en recursos de la nube administrados, puedes elegir ejecutar tu canalización de forma local. La ejecución local tiene ciertas ventajas para probar, depurar o ejecutar tu canalización en conjuntos de datos pequeños. Por ejemplo, la ejecución local quita la dependencia del servicio de Dataflow remoto y el proyecto de Google Cloud asociado.

Cuando usas la ejecución local, debes ejecutar tu canalización con conjuntos de datos que sean lo bastante pequeños como para caber en la memoria local. Puedes crear un conjunto de datos pequeño en la memoria con una transformación Create o usar una transformación Read para trabajar con archivos locales o remotos pequeños. Por lo general, la ejecución local proporciona una forma más rápida y fácil de realizar pruebas y depurar con menos dependencias externas, pero está limitada por la memoria disponible en tu entorno local.

En el siguiente código de ejemplo, se muestra cómo crear una canalización que se ejecute en tu entorno local.

Java

// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()

# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)

with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | beam.io.ReadFromText(args.input)
      | beam.io.WriteToText(args.output))

Go

// Parse options before beam.Init()
flag.Parse()

beam.Init()

p := beam.NewPipeline()

Después de crear tu canalización, ejecútala.

Crea opciones de canalización personalizadas

Puedes agregar tus propias opciones personalizadas además del objeto PipelineOptions estándar. La línea de comandos de Apache Beam también puede analizar opciones personalizadas mediante los argumentos de línea de comandos especificados en el mismo formato.

Java

Si quieres agregar tus propias opciones, define una interfaz con métodos get y set para cada opción, como en el siguiente ejemplo:

public interface MyOptions extends PipelineOptions {
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

Python

Para agregar tus propias opciones, usa el método add_argument() (que se comporta de la misma manera que el módulo argparse estándar de Python), como en el siguiente ejemplo:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

Go

Para agregar tus propias opciones, usa el paquete de marcas de Go como se muestra en el siguiente ejemplo:

var (
  input  = flag.String("input", "", "")
  output = flag.String("output", "", "")
)

También puedes especificar una descripción, que aparece cuando un usuario pasa --help como un argumento de la línea de comandos, y un valor predeterminado.

Java

Establece la descripción y el valor predeterminado con anotaciones, de la siguiente manera:

public interface MyOptions extends PipelineOptions {
  @Description("My custom command line argument.")
  @Default.String("DEFAULT")
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

Se recomienda que registres tu interfaz con PipelineOptionsFactory y pases la interfaz cuando crees el objeto PipelineOptions. Cuando registras tu interfaz con PipelineOptionsFactory, --help puede encontrar tu interfaz de opciones personalizadas y agregarla al resultado del comando --help. PipelineOptionsFactory valida que tus opciones personalizadas sean compatibles con todas las otras opciones registradas.

En el siguiente código de ejemplo, se muestra cómo registrar tu interfaz de opciones personalizadas con PipelineOptionsFactory:

PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                          .withValidation()
                                          .as(MyOptions.class);

Ahora tu canalización puede aceptar --myCustomOption=value como un argumento de la línea de comandos.

Python

Establece la descripción y el valor predeterminado de la siguiente manera:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input',
        default='gs://dataflow-samples/shakespeare/kinglear.txt',
        help='The file path for the input text to process.')
    parser.add_argument(
        '--output', required=True, help='The path prefix for output files.')

Go

Establece la descripción y el valor predeterminado de la siguiente manera:

var (
  input  = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
  output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)