Especifica los parámetros de ejecución de la canalización

Después de que tu programa de Apache Beam cree una canalización, deberás ejecutarla. La ejecución de la canalización es independiente de la ejecución del programa de Apache Beam; tu programa de Apache Beam crea la canalización, y el código que escribiste genera una serie de pasos que debe ejecutar un ejecutor de canalizaciones. El ejecutor de canalizaciones puede ser el servicio administrado de Dataflow en Google Cloud, un servicio de ejecutor de terceros o un ejecutor de canalización local que ejecuta los pasos directamente en el entorno local.

Puedes especificar el ejecutor de canalización y otras opciones de ejecución mediante la clase PipelineOptions del SDK de Apache Beam. Usa PipelineOptions para configurar la forma y la ubicación en las que se ejecuta tu canalización y los recursos que usa.

En la mayoría de los casos, te recomendamos que la canalización se ejecute en recursos administrados de Google Cloud mediante el servicio de ejecutor de Dataflow. Cuando ejecutas la canalización con el servicio de Dataflow, se crea un trabajo de Dataflow que usa recursos de Compute Engine y Cloud Storage en tu proyecto de Google Cloud.

También puedes ejecutar tu canalización de manera local. Cuando ejecutas la canalización de manera local, las transformaciones de la canalización se ejecutan en la misma máquina en la que se ejecuta tu programa de Dataflow. La ejecución local es útil para realizar pruebas y depurar, en particular, si tu canalización puede usar conjuntos de datos en la memoria más pequeños.

Configura las PipelineOptions

Pasas PipelineOptions cuando creas el objeto Pipeline en tu programa de Dataflow. Cuando el servicio de Dataflow ejecuta tu canalización, envía una copia de PipelineOptions a cada instancia de trabajador.

Java: SDK 2.x

Nota: Puedes acceder a PipelineOptions dentro de cualquier instancia de DoFn de ParDo a través del método ProcessContext.getPipelineOptions.

Python

Esta función aún no se admite en el SDK de Apache Beam para Python.

Java: SDK 1.x

Configura las PipelineOptions desde argumentos de la línea de comandos

Si bien puedes configurar tu canalización mediante la creación de un objeto PipelineOptions y la configuración directa de los campos, los SDK de Apache Beam incluyen un analizador de línea de comandos que puedes usar para configurar campos en PipelineOptions mediante los argumentos de la línea de comandos.

Java: SDK 2.x

Para leer las opciones desde la línea de comandos, crea tu objeto PipelineOptions mediante el método PipelineOptionsFactory.fromArgs, como se muestra en el siguiente código de ejemplo:

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

Nota: Adjuntar el método .withValidation hace que Dataflow verifique los argumentos de la línea de comandos requeridos y valide los valores de argumento.

Cuando se usa PipelineOptionsFactory.fromArgs, se interpretan los argumentos de la línea de comandos que siguen el siguiente formato:

--<option>=<value>

Compilar tu objeto PipelineOptions de esta forma te permite especificar cualquiera de las opciones en cualquier interfaz de org.apache.beam.sdk.options.PipelineOptions como un argumento de la línea de comandos.

Python

Para leer las opciones desde la línea de comandos, crea tu objeto PipelineOptions, como en el siguiente código de ejemplo:

from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions(flags=argv)

El argumento (flags=argv) de PipelineOptions interpreta los argumentos de la línea de comandos que siguen el formato:

--<option>=<value>

Compilar tu objeto PipelineOptions de esta manera te permite especificar cualquiera de las opciones mediante la creación de subclases a partir de PipelineOptions.

Java: SDK 1.x

Crea opciones personalizadas

Puedes agregar tus propias opciones personalizadas además del objeto PipelineOptions estándar. El analizador de línea de comandos de Dataflow también puede establecer tus opciones personalizadas mediante argumentos de la línea de comandos especificados en el mismo formato.

Java: SDK 2.x

Si quieres agregar tus propias opciones, define una interfaz con métodos de obtención y configuración para cada opción, como en el siguiente ejemplo:

  public interface MyOptions extends PipelineOptions {
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Python

Para agregar tus propias opciones, usa el método add_argument() (que se comporta de la misma manera que el módulo argparse estándar de Python), como en el siguiente ejemplo:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

Java: SDK 1.x

También puedes especificar una descripción, que aparece cuando un usuario pasa --help como un argumento de la línea de comandos, y un valor predeterminado.

Java: SDK 2.x

Establece la descripción y el valor predeterminado con anotaciones, de la siguiente manera:

  public interface MyOptions extends PipelineOptions {
    @Description("My custom command line argument.")
    @Default.String("DEFAULT")
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Se recomienda que registres tu interfaz con PipelineOptionsFactory y pases la interfaz cuando crees el objeto PipelineOptions. Cuando registras tu interfaz con PipelineOptionsFactory, --help puede encontrar tu interfaz de opciones personalizadas y agregarla al resultado del comando --help. PipelineOptionsFactory también validará que tus opciones personalizadas sean compatibles con todas las otras opciones registradas.

En el siguiente código de ejemplo, se muestra cómo registrar tu interfaz de opciones personalizadas con PipelineOptionsFactory:

  PipelineOptionsFactory.register(MyOptions.class);
  MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                            .withValidation()
                                            .as(MyOptions.class);

Ahora tu canalización puede aceptar --myCustomOption=value como un argumento de la línea de comandos.

Python

Establece la descripción y el valor predeterminado de la siguiente manera:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input',
        help='Input for the pipeline',
        default='gs://my-bucket/input')
    parser.add_argument(
        '--output',
        help='Output for the pipeline',
        default='gs://my-bucket/output')

Java: SDK 1.x

Configura PipelineOptions para la ejecución en el servicio de Cloud Dataflow

Para ejecutar tu canalización mediante el servicio administrado de Dataflow, debes configurar los siguientes campos en PipelineOptions:

Java: SDK 2.x

  • project: el ID del proyecto de Google Cloud
  • runner: el ejecutor de canalizaciones que analizará tu programa y creará tu canalización. Para la ejecución en la nube, debe ser DataflowRunner
  • gcpTempLocation: una ruta de acceso de Cloud Storage para que Dataflow almacene en etapa intermedia los archivos temporales. Debes crear este depósito con anticipación, antes de ejecutar tu canalización. En caso de que no especifiques gcpTempLocation, puedes especificar la opción de canalización tempLocation y, luego, gcpTempLocation se configurará en el valor de tempLocation. Si no se especifica ninguna, se creará una opción gcpTempLocation predeterminada
  • stagingLocation: un depósito de Cloud Storage para que Dataflow almacene en etapa intermedia los archivos binarios. Si no configuras esta opción, el valor que especificaste para tempLocation se usará también en la ubicación de etapa de pruebas
  • Se creará una ubicación gcpTempLocation predeterminada si no se especifica esta ni tempLocation. Si se especifica tempLocation, pero no gcpTempLocation, tempLocation será una ruta de Cloud Storage, que gcpTempLocation usará como la ruta predeterminada. Si no se especifica tempLocation y se especifica gcpTempLocation, tempLocation no se propagará

Nota: Si usas el SDK de Apache Beam para Java 2.15.0 o posterior, también debes especificar region.

Python

  • project: el ID del proyecto de Google Cloud
  • runner: el ejecutor de canalizaciones que analizará tu programa y creará tu canalización. Para la ejecución en la nube, debe ser DataflowRunner
  • staging_location: una ruta de acceso de Cloud Storage a fin de que Dataflow almacene en etapa intermedia los paquetes de código necesarios para los trabajadores que ejecutan el trabajo
  • temp_location: una ruta de acceso de Cloud Storage para que Dataflow almacene en etapa intermedia los archivos de trabajo temporales creados durante la ejecución de la canalización

Nota: Si usas el SDK de Apache Beam para Python 2.15.0 o posterior, también debes especificar region.

Java: SDK 1.x

Puedes configurar estas opciones de manera programática o especificarlas mediante la línea de comandos. En el siguiente código de ejemplo, se muestra cómo construir una canalización mediante la configuración programática del ejecutor y otras opciones necesarias para ejecutar la canalización con el servicio administrado de Dataflow.

Java: SDK 2.x

  // Create and set your PipelineOptions.
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

  // For Cloud execution, set the Cloud Platform project, staging location,
  // and specify DataflowRunner.
  options.setProject("my-project-id");
  options.setStagingLocation("gs://my-bucket/binaries");
  options.setRunner(DataflowRunner.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
options = PipelineOptions(
    flags=argv,
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
    region='us-central1')

# Create the Pipeline with the specified options.
# with beam.Pipeline(options=options) as pipeline:
#   pass  # build your pipeline here.

Java: SDK 1.x

Después de crear tu canalización, especifica todas las transformaciones y las operaciones de lectura y escritura de la canalización, y ejecútala.

En el siguiente código de ejemplo, se muestra cómo configurar las opciones obligatorias para la ejecución del servicio de Dataflow mediante la línea de comandos:

Java: SDK 2.x

  // Create and set your PipelineOptions.
  MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

# Use Python argparse module to parse custom arguments
import argparse

import apache_beam as beam

parser = argparse.ArgumentParser()
parser.add_argument('--input')
parser.add_argument('--output')
args, beam_args = parser.parse_known_args(argv)

# Create the Pipeline with remaining arguments.
with beam.Pipeline(argv=beam_args) as pipeline:
  lines = pipeline | 'Read files' >> beam.io.ReadFromText(args.input)
  lines | 'Write files' >> beam.io.WriteToText(args.output)

Java: SDK 1.x

Después de crear tu canalización, especifica todas las transformaciones y las operaciones de lectura y escritura de la canalización, y ejecútala.

Java: SDK 2.x

Cuando pases las opciones obligatorias en la línea de comandos, usa las opciones --project, --runner, --gcpTempLocation y, de forma opcional, --stagingLocation.

Python

Cuando pases las opciones obligatorias en la línea de comandos, usa las opciones --project, --runner y --staging_location.

Java: SDK 1.x

Ejecución asíncrona

Java: SDK 2.x

Cuando se usa DataflowRunner, tu canalización se ejecuta de forma asíncrona en la nube de Google. Mientras se ejecuta la canalización, puedes supervisar el progreso del trabajo, ver detalles de la ejecución y recibir actualizaciones sobre los resultados de la canalización mediante la interfaz de supervisión de Dataflow o la interfaz de línea de comandos de Dataflow.

Python

Cuando se usa DataflowRunner, tu canalización se ejecuta de forma asíncrona en la nube de Google. Mientras se ejecuta la canalización, puedes supervisar el progreso del trabajo, ver detalles de la ejecución y recibir actualizaciones sobre los resultados de la canalización mediante la interfaz de supervisión de Dataflow o la interfaz de línea de comandos de Dataflow.

Java: SDK 1.x

Ejecución síncrona

Java: SDK 2.x

Especifica DataflowRunner como el ejecutor de canalizaciones y llama de forma explícita a pipeline.run().waitUntilFinish().

Cuando usas DataflowRunner y llamas a waitUntilFinish() en el objeto PipelineResult que muestra pipeline.run(), la canalización se ejecuta en la nube, pero el código local espera a que finalice el trabajo en la nube y muestra el objeto final DataflowPipelineJob. Mientras se ejecuta el trabajo, el servicio de Dataflow imprime actualizaciones del estado del trabajo y mensajes de la consola mientras espera.

Si eras usuario del SDK de Java 1.x y usabas --runner BlockingDataflowPipelineRunner en la línea de comandos para hacer que tu programa principal se bloquee de forma interactiva hasta que la canalización finalice, entonces, con Java 2.x, tu programa principal necesita llamar de manera explícita a waitUntilFinish().

Python

Usa el método wait_until_finish() del objeto PipelineResult, que se muestra a partir del método run() del ejecutor, para bloquear hasta que se complete la canalización.

Java: SDK 1.x

Nota: Si escribes Ctrl+C desde la línea de comandos, no se cancela tu trabajo. El servicio de Dataflow aún ejecuta el trabajo en Google Cloud; para cancelar el trabajo, deberás usar la interfaz de supervisión de Dataflow o la interfaz de línea de comandos de Dataflow.

Ejecución de la transmisión

Java: SDK 2.x

Si tu canalización lee desde una fuente de datos no delimitada, como Pub/Sub, la canalización se ejecuta de forma automática en modo de transmisión.

Si tu canalización usa fuentes de datos y receptores no delimitados, es necesario seleccionar una estrategia del sistema de ventanas para tus PCollections no delimitadas antes de que se pueda usar cualquier agregación, como un GroupByKey.

Python

Si tu canalización usa una fuente de datos o un receptor no delimitados (como Pub/Sub), debes configurar la opción streaming como verdadera.

Los trabajos de transmisión utilizan un tipo de máquina de Compute Engine de n1-standard-2 o superior según la configuración predeterminada. No debes anular esto, puesto que n1-standard-2 es el tipo de máquina mínimo necesario para ejecutar trabajos de transmisión.

Si tu canalización usa fuentes de datos y receptores no delimitados, es necesario seleccionar una estrategia del sistema de ventanas para tus PCollections no delimitadas antes de que se pueda usar cualquier agregación, como un GroupByKey.

Java: SDK 1.x

Configura otras opciones de canalización de Cloud Dataflow

Para ejecutar tu canalización en la nube, puedes establecer los siguientes campos de manera programática en tu objeto PipelineOptions:

Java: SDK 2.x

Campo Tipo Descripción Valor predeterminado
runner Class (NameOfRunner) El PipelineRunner que se debe usar. Este campo te permite determinar el PipelineRunner en el entorno de ejecución. DirectRunner (modo local)
streaming boolean Especifica si el modo de transmisión está habilitado o inhabilitado; es verdadero si está habilitado. Si tu canalización lee desde una fuente no delimitada, el valor predeterminado es true. De lo contrario, es false.
project String El ID de tu proyecto Google Cloud. Esto es necesario si deseas ejecutar tu canalización mediante el servicio administrado de Dataflow. Si no está configurado, se establece de manera predeterminada en el proyecto configurado en la actualidad en el SDK de Cloud.
jobName String El nombre del trabajo de Dataflow que se ejecuta como aparece en la lista de trabajos de Dataflow y los detalles del trabajo. También se usa cuando se actualiza una canalización existente. Dataflow genera un nombre único de forma automática.
gcpTempLocation String Ruta de Cloud Storage para archivos temporales. Debe ser una URL válida de Cloud Storage que comience con gs://.
stagingLocation String Ruta de Cloud Storage para archivos locales de etapa de pruebas. Debe ser una URL válida de Cloud Storage que comience con gs://. Si no está configurado, se establece de manera predeterminada en lo que especificaste para tempLocation.
autoscalingAlgorithm String El modo de ajuste de escala automático para tu trabajo de Dataflow. Los valores posibles son THROUGHPUT_BASED, para habilitar el ajuste de escala automático, o NONE, para inhabilitarlo. Consulta Características del ajuste automático para obtener más información sobre el funcionamiento del ajuste de escala automático en el servicio administrado de Dataflow. La configuración predeterminada es THROUGHPUT_BASED para todos los trabajos por lotes de Dataflow y los trabajos de transmisión que usan Streaming Engine. La configuración predeterminada es NONE para los trabajos de transmisión que no usan Streaming Engine.
numWorkers int La cantidad inicial de instancias de Google Compute Engine que se usarán cuando ejecutes tu canalización. Esta opción determina cuántos trabajadores inicia el servicio de Dataflow cuando comienza tu trabajo. Si no se especifica, el servicio de Dataflow determina una cantidad apropiada de trabajadores.
maxNumWorkers int La cantidad máxima de instancias de Compute Engine que estarán disponibles para tu canalización durante la ejecución. Ten en cuenta que puede ser mayor que la cantidad inicial de trabajadores (que especifica numWorkers para permitir que tu trabajo escale verticalmente de forma automática o no). Si no se especifica, el servicio de Dataflow determinará una cantidad adecuada de trabajadores.
numberOfWorkerHarnessThreads int La cantidad de subprocesos por agente de trabajo. Si no se especifica, el servicio de Dataflow determina una cantidad adecuada de subprocesos por trabajador.
region String Especifica un extremo regional para implementar tus trabajos de Dataflow. Si no se establece, el valor predeterminado es us-central1.
workerRegion String

Especifica una región de Compute Engine para iniciar instancias de trabajador a fin de ejecutar tu canalización. Esta opción se usa para ejecutar trabajadores en una ubicación diferente a la region que se usa con el fin de implementar, administrar y supervisar trabajos. La zona para workerRegion se asigna de forma automática.

Nota: Esta opción no se puede combinar con workerZone o zone.

Si no está configurada, se establece de manera predeterminada en el valor establecido para region.
workerZone String

Especifica una zona de Compute Engine para iniciar instancias de trabajador a fin de ejecutar tu canalización. Esta opción se usa para ejecutar trabajadores en una ubicación diferente a la region que se usa con el fin de implementar, administrar y supervisar trabajos.

Nota: Esta opción no se puede combinar con workerRegion o zone.

Si especificas region o workerRegion, workerZone usa una zona predeterminada de la región correspondiente. Puedes anular este comportamiento mediante la especificación de una zona diferente.
zone String (Obsoleto) En el SDK de Apache Beam 2.17.0 o anterior, esto especificaba la zona de Compute Engine para iniciar instancias de trabajador a fin de ejecutar tu canalización. Si especificas region, zone usa una zona predeterminada de la región correspondiente. Puedes anular este comportamiento mediante la especificación de una zona diferente.
dataflowKmsKey String Especifica la clave de encriptación administrada por el cliente (CMEK) que se utiliza para encriptar datos en reposo. Puedes controlar la clave de encriptación a través de Cloud KMS. También debes especificar gcpTempLocation para usar esta función. Si no se especifica, Dataflow usa la encriptación de Google Cloud predeterminada en lugar de una CMEK.
flexRSGoal String Especifica la Programación flexible de recursos (FlexRS) para trabajos por lotes con ajuste de escala automático. Afecta los parámetros numWorkers, autoscalingAlgorithm, zone, region y workerMachineType. Para obtener más información, consulta la sección de opciones de canalización de FlexRS. Si no se especifica, el valor predeterminado es SPEED_OPTIMIZED, que es lo mismo que omitir esta marca. Para activar FlexRS, debes especificar el valor COST_OPTIMIZED a fin de permitir que el servicio de Dataflow elija cualquier recurso con descuento disponible.
filesToStage List<String> Una lista de archivos locales, directorios de archivos o archivos (JAR o ZIP) para poner a disposición de cada trabajador. Si configuras esta opción, solo se subirán los archivos que especifiques (se ignorará la ruta de clase de Java). Debes especificar todos tus recursos en el orden correcto de la ruta de clase. Los recursos no se limitan al código, también pueden incluir archivos de configuración y otros recursos que estarán disponibles para todos los trabajadores. Tu código puede acceder a los recursos enumerados mediante los métodos de búsqueda de recursos estándar de Java. Precaución: La especificación de una ruta de acceso del directorio es poco eficaz debido a que Dataflow comprimirá los archivos antes de subirlos, lo que implica un costo de tiempo de inicio mayor. Además, no uses esta opción para transferir datos a los trabajadores que la canalización debe procesar, ya que hacerlo es mucho más lento que usar las API Cloud Storage/BigQuery nativas combinadas con la fuente de datos Dataflow adecuada. Si filesToStage está en blanco, Dataflow inferirá los archivos que se deben almacenar en etapa intermedia según la ruta de clase de Java. Las consideraciones y precauciones mencionadas en la columna de la izquierda también se aplican aquí (tipos de archivos para enumerar y cómo acceder a ellos desde tu código).
network String La red de Compute Engine para iniciar instancias de Compute Engine a fin de ejecutar tu canalización. Consulta cómo especificar tu red. Si no se establece, Google Cloud supone que tienes la intención de usar una red llamada default.
subnetwork String La subred de Compute Engine para iniciar instancias de Compute Engine a fin de ejecutar tu canalización. Consulta cómo especificar tu subred. El servicio de Dataflow determina el valor predeterminado.
usePublicIps boolean Especifica si los trabajadores de Dataflow usan direcciones IP públicas. Si el valor se establece en false, los trabajadores de Dataflow usan direcciones IP privadas para todas las comunicaciones. En este caso, si se especifica la opción subnetwork, se ignora la opción network. Asegúrate de que las network o subnetwork especificadas tengan habilitado el Acceso privado a Google. Si no se configura, el valor predeterminado es true y los trabajadores de Dataflow usan direcciones IP públicas.
enableStreamingEngine boolean Especifica si Streaming Engine de Dataflow está habilitado o no; es verdadero si está habilitado. Habilitar Streaming Engine te permite ejecutar los pasos de tu canalización de transmisión en el backend del servicio de Dataflow y conservar así los recursos de CPU, memoria y almacenamiento de Persistent Disk. El valor predeterminado es false. Esto significa que los pasos de tu canalización de transmisión se ejecutan por completo en las VM de trabajador.
createFromSnapshot String Especifica el ID de la instantánea que se usará cuando se cree un trabajo de transmisión. Las instantáneas guardan el estado de una canalización de transmisión y te permiten iniciar una versión nueva de tu trabajo desde ese estado. Para obtener más información sobre las instantáneas, consulta Usa instantáneas. Si no se configura, no se usan instantáneas para crear un trabajo.
diskSizeGb int

El tamaño del disco, en gigabytes, para usar en cada instancia de trabajador remota de Compute Engine. Si se establece, especifica al menos 30 GB para la imagen de arranque del trabajador y los registros locales.

Para los trabajos por lotes que usan Dataflow Shuffle, esta opción establece el tamaño del disco de arranque de una VM de trabajador. En los trabajos por lotes que no usan Dataflow Shuffle, esta opción establece el tamaño de los discos que se usan para almacenar datos aleatorios; el tamaño del disco de arranque no se ve afectado.

En los trabajos de transmisión que usan Streaming Engine, esta opción establece el tamaño de los discos de arranque. En los trabajos de transmisión que no usan Streaming Engine, esta opción establece el tamaño de cada disco persistente adicional que crea el servicio de Dataflow; el disco de arranque no se ve afectado. Si un trabajo de transmisión no usa Streaming Engine, puedes establecer el tamaño del disco de arranque con la marca del experimento streaming_boot_disk_size_gb. Por ejemplo, especifica --experiments=streaming_boot_disk_size_gb=80 para crear discos de arranque de 80 GB.

Configúralo en 0 para usar el tamaño predeterminado definido en tu proyecto de Cloud Platform.

Si un trabajo por lotes usa Dataflow Shuffle, el valor predeterminado es 25 GB; de lo contrario, el valor predeterminado es 250 GB.

Si un trabajo de transmisión usa Streaming Engine, el valor predeterminado es 30 GB; de lo contrario, el valor predeterminado es 400 GB.

Advertencia: Reducir el tamaño del disco reduce la E/S aleatoria disponible. Los trabajos vinculados a Shuffle que no usan Dataflow Shuffle o Streaming Engine pueden aumentar el entorno de ejecución y el costo del trabajo.

serviceAccount String Especifica una cuenta de servicio de controlador administrada por el usuario, mediante el formato my-service-account-name@<project-id>.iam.gserviceaccount.com. Para obtener más información, consulta la sección Cuenta de servicio del controlador de la página de seguridad y permisos de Cloud Dataflow. Si no se configura, los trabajadores usan la cuenta de servicio de Compute Engine de tu proyecto como la cuenta de servicio del controlador.
workerDiskType String El tipo de disco persistente que se usará, especificado por una URL completa del recurso de tipo de disco. Por ejemplo, usa compute.googleapis.com/projects//zones//diskTypes/pd-ssd para especificar un disco persistente SSD. Si quieres obtener más información, consulta la página de referencia de la API de Compute Engine para diskTypes. El servicio de Dataflow determina el valor predeterminado.
workerMachineType String

El tipo de máquina de Compute Engine que usa Dataflow cuando inicia las VM de trabajador. Puedes usar cualquiera de las familias de tipos de máquinas de Compute Engine disponibles, así como los tipos personalizados de máquinas.

Para obtener mejores resultados, usa tipos de máquina n1. Los tipos de máquinas de núcleo compartido, como los trabajadores de la serie f1 y g1, no son compatibles con el Acuerdo de Nivel de Servicio de Dataflow.

Ten en cuenta que Dataflow factura por la cantidad de CPU virtuales y GB de memoria en los trabajadores. La facturación es independiente de la familia de tipos de máquinas.

El servicio de Dataflow elegirá el tipo de máquina en función de tu trabajo si no configuras esta opción.

Consulta la documentación de referencia de la API de Java para la interfaz de PipelineOptions (y sus subinterfaces) a fin de obtener la lista completa de opciones de configuración de la canalización.

Python

Campo Tipo Descripción Valor predeterminado
runner str El PipelineRunner que se debe usar. Este campo puede ser DirectRunner o DataflowRunner. DirectRunner (modo local)
streaming bool Especifica si el modo de transmisión está habilitado o inhabilitado; es verdadero si está habilitado. false
project str El ID de tu proyecto Google Cloud. Esto es necesario si deseas ejecutar tu canalización mediante el servicio administrado de Dataflow. Si no se configura, muestra un error.
job_name String El nombre del trabajo de Dataflow que se ejecuta como aparece en la lista de trabajos de Dataflow y los detalles del trabajo. Dataflow genera un nombre único de forma automática.
temp_location str Ruta de Cloud Storage para archivos temporales. Debe ser una URL válida de Cloud Storage que comience con gs://. Si no está configurado, se establece de manera predeterminada en el valor de staging_location. Debes especificar, como mínimo, uno de los valores temp_location o staging_location para ejecutar tu canalización en la nube de Google.
staging_location str Ruta de Cloud Storage para archivos locales de etapa de pruebas. Debe ser una URL válida de Cloud Storage que comience con gs://. Si no está configurado, se establece de manera predeterminada en un directorio de etapa de pruebas dentro de temp_location. Debes especificar, como mínimo, uno de los valores temp_location o staging_location para ejecutar tu canalización en la nube de Google.
autoscaling_algorithm str El modo de ajuste de escala automático para tu trabajo de Dataflow. Los valores posibles son THROUGHPUT_BASED, para habilitar el ajuste de escala automático, o NONE, para inhabilitarlo. Consulta Características del ajuste automático para obtener más información sobre el funcionamiento del ajuste de escala automático en el servicio administrado de Dataflow. La configuración predeterminada es THROUGHPUT_BASED para todos los trabajos por lotes de Dataflow y los trabajos de transmisión que usan Streaming Engine. La configuración predeterminada es NONE para los trabajos de transmisión que no usan Streaming Engine.
num_workers int La cantidad de instancias de Compute Engine que se usarán cuando ejecutes tu canalización. Si no se especifica, el servicio de Dataflow determinará una cantidad adecuada de trabajadores.
max_num_workers int La cantidad máxima de instancias de Compute Engine que estarán disponibles para tu canalización durante la ejecución. Ten en cuenta que puede ser mayor que la cantidad inicial de trabajadores (que especifica num_workers para permitir que tu trabajo escale verticalmente de forma automática o no). Si no se especifica, el servicio de Dataflow determinará una cantidad adecuada de trabajadores.
number_of_worker_harness_threads int La cantidad de subprocesos por agente de trabajo. Si no se especifica, el servicio de Dataflow determina una cantidad adecuada de subprocesos por trabajador. Para usar este parámetro, también debes usar la marca --experiments=use_runner_v2
region str Especifica un extremo regional para implementar tus trabajos de Dataflow. Si no se establece, el valor predeterminado es us-central1.
worker_region String

Especifica una región de Compute Engine para iniciar instancias de trabajador a fin de ejecutar tu canalización. Esta opción se usa para ejecutar trabajadores en una ubicación diferente a la region que se usa con el fin de implementar, administrar y supervisar trabajos. La zona para worker_region se asigna de forma automática.

Nota: Esta opción no se puede combinar con worker_zone o zone.

Si no está configurada, se establece de manera predeterminada en el valor establecido para region.
worker_zone String

Especifica una zona de Compute Engine para iniciar instancias de trabajador a fin de ejecutar tu canalización. Esta opción se usa para ejecutar trabajadores en una ubicación diferente a la region que se usa con el fin de implementar, administrar y supervisar trabajos.

Nota: Esta opción no se puede combinar con worker_region o zone.

Si especificas region o worker_region, worker_zone usa una zona predeterminada de la región correspondiente. Puedes anular este comportamiento mediante la especificación de una zona diferente.
zone str (Obsoleto) En el SDK de Apache Beam 2.17.0 o anterior, esto especificaba la zona de Compute Engine para iniciar instancias de trabajador a fin de ejecutar tu canalización. Si especificas region, zone usa una zona predeterminada de la región correspondiente. Puedes anular este comportamiento mediante la especificación de una zona diferente.
dataflow_kms_key str Especifica la clave de encriptación administrada por el cliente (CMEK) que se utiliza para encriptar datos en reposo. Puedes controlar la clave de encriptación a través de Cloud KMS. También debes especificar temp_location para usar esta función. Si no se especifica, Dataflow usa la encriptación de Google Cloud predeterminada en lugar de una CMEK.
flexrs_goal str Especifica la Programación flexible de recursos (FlexRS) para trabajos por lotes con ajuste de escala automático. Afecta los parámetros num_workers, autoscaling_algorithm, zone, region y machine_type. Para obtener más información, consulta la sección de opciones de canalización de FlexRS. Si no se especifica, el valor predeterminado es SPEED_OPTIMIZED, que es lo mismo que omitir esta marca. Para activar FlexRS, debes especificar el valor COST_OPTIMIZED a fin de permitir que el servicio de Dataflow elija cualquier recurso con descuento disponible.
network str La red de Compute Engine para iniciar instancias de Compute Engine a fin de ejecutar tu canalización. Consulta cómo especificar tu red. Si no se establece, Google Cloud supone que tienes la intención de usar una red llamada default.
subnetwork str La subred de Compute Engine para iniciar instancias de Compute Engine a fin de ejecutar tu canalización. Consulta cómo especificar tu subred. El servicio de Dataflow determina el valor predeterminado.
use_public_ips bool Especifica que los trabajadores de Dataflow deben usar direcciones IP públicas. Si el valor se establece en false, los trabajadores de Dataflow usan direcciones IP privadas para todas las comunicaciones. En este caso, si se especifica la opción subnetwork, se ignora la opción network. Asegúrate de que las network o subnetwork especificadas tengan habilitado el Acceso privado a Google. Esta opción requiere el SDK de Beam para Python. El SDK de Dataflow obsoleto para Python no lo admite. Si no se configura, los trabajadores de Dataflow usan direcciones IP públicas.
enable_streaming_engine boolean Especifica si Streaming Engine de Dataflow está habilitado o no; es verdadero si está habilitado. Habilitar Streaming Engine te permite ejecutar los pasos de tu canalización de transmisión en el backend del servicio de Dataflow y conservar así los recursos de CPU, memoria y almacenamiento de Persistent Disk. El valor predeterminado es false. Esto significa que los pasos de tu canalización de transmisión se ejecutan por completo en las VM de trabajador.
disk_size_gb int

El tamaño del disco, en gigabytes, para usar en cada instancia de trabajador remota de Compute Engine. Si se establece, especifica al menos 30 GB para la imagen de arranque del trabajador y los registros locales.

Para los trabajos por lotes que usan Dataflow Shuffle, esta opción establece el tamaño del disco de arranque de una VM de trabajador. En los trabajos por lotes que no usan Dataflow Shuffle, esta opción establece el tamaño de los discos que se usan para almacenar datos aleatorios; el tamaño del disco de arranque no se ve afectado.

En los trabajos de transmisión que usan Streaming Engine, esta opción establece el tamaño de los discos de arranque. En los trabajos de transmisión que no usan Streaming Engine, esta opción establece el tamaño de cada disco persistente adicional que crea el servicio de Dataflow; el disco de arranque no se ve afectado. Si un trabajo de transmisión no usa Streaming Engine, puedes establecer el tamaño del disco de arranque con la marca del experimento streaming_boot_disk_size_gb. Por ejemplo, especifica --experiments=streaming_boot_disk_size_gb=80 para crear discos de arranque de 80 GB.

Configúralo en 0 para usar el tamaño predeterminado definido en tu proyecto de Cloud Platform.

Si un trabajo por lotes usa Dataflow Shuffle, el valor predeterminado es 25 GB; de lo contrario, el valor predeterminado es 250 GB.

Si un trabajo de transmisión usa Streaming Engine, el valor predeterminado es 30 GB; de lo contrario, el valor predeterminado es 400 GB.

Advertencia: Reducir el tamaño del disco reduce la E/S aleatoria disponible. Los trabajos vinculados a Shuffle que no usan Dataflow Shuffle o Streaming Engine pueden aumentar el entorno de ejecución y el costo del trabajo.

service_account_email str Especifica una cuenta de servicio de controlador administrada por el usuario, mediante el formato my-service-account-name@<project-id>.iam.gserviceaccount.com. Para obtener más información, consulta la sección Cuenta de servicio del controlador de la página de seguridad y permisos de Cloud Dataflow. Si no se configura, los trabajadores usan la cuenta de servicio de Compute Engine de tu proyecto como la cuenta de servicio del controlador.
worker_disk_type str El tipo de disco persistente que se usará, especificado por una URL completa del recurso de tipo de disco. Por ejemplo, usa compute.googleapis.com/projects//zones//diskTypes/pd-ssd para especificar un disco persistente SSD. Si quieres obtener más información, consulta la página de referencia de la API de Compute Engine para diskTypes. El servicio de Dataflow determina el valor predeterminado.
machine_type str

El tipo de máquina de Compute Engine que usa Dataflow cuando inicia las VM de trabajador. Puedes usar cualquiera de las familias de tipos de máquinas de Compute Engine disponibles, así como los tipos personalizados de máquinas.

Para obtener mejores resultados, usa tipos de máquina n1. Los tipos de máquinas de núcleo compartido, como los trabajadores de la serie f1 y g1, no son compatibles con el Acuerdo de Nivel de Servicio de Dataflow.

Ten en cuenta que Dataflow factura por la cantidad de CPU virtuales y GB de memoria en los trabajadores. La facturación es independiente de la familia de tipos de máquinas.

El servicio de Dataflow elegirá el tipo de máquina en función de tu trabajo si no configuras esta opción.

Java: SDK 1.x

Configura PipelineOptions para la ejecución local

En lugar de ejecutar tu canalización en recursos de la nube administrados, puedes elegir ejecutar tu canalización de forma local. La ejecución local tiene ciertas ventajas para probar, depurar o ejecutar tu canalización en conjuntos de datos pequeños. Por ejemplo, la ejecución local quita la dependencia del servicio de Dataflow remoto y el proyecto de Google Cloud asociado.

Cuando uses la ejecución local, te recomendamos que ejecutes tu canalización con conjuntos de datos que sean lo bastante pequeños como para caber en la memoria local. Puedes crear un conjunto de datos pequeño en la memoria con una transformación Create o usar una transformación Read para trabajar con archivos locales o remotos pequeños. La ejecución local proporciona una forma rápida y fácil de realizar pruebas y depurar con menos dependencias externas, pero estará limitada por la memoria disponible en tu entorno local.

En el siguiente código de ejemplo, se muestra cómo crear una canalización que se ejecute en tu entorno local.

Java: SDK 2.x

  // Create and set our Pipeline Options.
  PipelineOptions options = PipelineOptionsFactory.create();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Nota: Para el modo local, no debes configurar el ejecutor, dado que DirectRunner ya es el valor predeterminado. Sin embargo, es necesario que incluyas de forma explícita DirectRunner como una dependencia o que lo agregues a la ruta de clase.

Python

# Create and set your Pipeline Options.
options = PipelineOptions(flags=argv)
my_options = options.view_as(MyOptions)

with Pipeline(options=options) as pipeline:
  pass  # build your pipeline here.

Nota: Para el modo local, no debes configurar el ejecutor, dado que DirectRunner ya es el valor predeterminado.

Java: SDK 1.x

Después de crear tu canalización, ejecútala.

Configura otras opciones de canalización local

Cuando ejecutas tu canalización de manera local, los valores predeterminados para las propiedades en PipelineOptions suelen ser suficientes.

Java: SDK 2.x

Puedes encontrar los valores predeterminados para PipelineOptions de Java en la referencia de la API de Java. Consulta la lista de clase de PipelineOptions para obtener detalles completos.

Si tu canalización usa Google Cloud, como BigQuery o Cloud Storage para IO, es posible que debas configurar ciertas opciones de credenciales y proyectos de Google Cloud. En esos casos, debes usar GcpOptions.setProject para configurar tu ID del proyecto de Google Cloud. También es posible que necesites configurar credenciales de forma explícita. Consulta la clase GcpOptions para obtener más información.

Python

Puedes encontrar los valores predeterminados para PipelineOptions de Python en la referencia de la API de Python. Consulta la lista del módulo de PipelineOptions para obtener detalles completos.

Si tu canalización usa servicios de Google Cloud, como BigQuery o Cloud Storage para IO, es posible que debas configurar ciertas opciones de credenciales y proyectos de Google Cloud. En esos casos, debes usar options.view_as(GoogleCloudOptions).project para configurar tu ID del proyecto de Google Cloud. Es posible que también debas configurar las credenciales de forma explícita. Consulta la clase GoogleCloudOptions para obtener más información.

Java: SDK 1.x