Especifica los parámetros de ejecución de la canalización

Después de que tu programa de Apache Beam cree una canalización, deberás ejecutarla. La ejecución de la canalización es independiente de la ejecución del programa de Apache Beam; tu programa de Apache Beam crea la canalización, y el código que escribiste genera una serie de pasos que debe ejecutar un ejecutor de canalizaciones. El ejecutor de canalizaciones puede ser el servicio administrado de Cloud Dataflow en Google Cloud Platform (GCP), un servicio de ejecución de terceros o un ejecutor de canalizaciones local que ejecute los pasos directamente en el entorno local.

Puedes especificar el ejecutor de canalización y otras opciones de ejecución mediante la clase PipelineOptions del SDK de Apache Beam. Utiliza PipelineOptions para configurar la forma y la ubicación en las que se ejecuta tu canalización y los recursos que usa.

La mayor parte del tiempo, desearás que tu canalización se ejecute en los recursos de GCP administrados con el servicio de ejecución de Cloud Dataflow. Si ejecutas tu canalización con el servicio de Cloud Dataflow, se crea un trabajo de Cloud Dataflow, que usa los recursos de Compute Engine y Cloud Storage en tu proyecto de GCP.

También puedes ejecutar tu canalización de manera local. Cuando ejecutas tu canalización de manera local, las transformaciones de la canalización se ejecutan en la misma máquina en la que se ejecuta tu programa de Cloud Dataflow. La ejecución local es útil para realizar pruebas y depurar, en particular, si tu canalización puede usar conjuntos de datos en la memoria más pequeños.

Configura las PipelineOptions

Debes pasar PipelineOptions cuando creas el objeto Pipeline en tu programa de Cloud Dataflow. Cuando el servicio de Cloud Dataflow ejecuta tu canalización, envía una copia de PipelineOptions a cada instancia de trabajador.

Java: SDK 2.x

Nota: Puedes acceder a PipelineOptions dentro de cualquier instancia de DoFn de ParDo a través del método ProcessContext.getPipelineOptions.

Python

Esta función aún no se admite en el SDK de Apache Beam para Python.

Java: SDK 1.x

Nota: Puedes acceder a PipelineOptions dentro de cualquier instancia de DoFn de ParDo a través del método ProcessContext.getPipelineOptions.

Configura las PipelineOptions desde argumentos de la línea de comandos

Si bien puedes configurar tu canalización mediante la creación de un objeto PipelineOptions y la configuración directa de los campos, los SDK de Apache Beam incluyen un analizador de línea de comandos que puedes usar para configurar campos en con los argumentos de línea de comandos.

Java: SDK 2.x

Para leer las opciones desde la línea de comandos, crea tu objeto PipelineOptions con el método PipelineOptionsFactory.fromArgs, como en el siguiente código de ejemplo:

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

Nota: Agregar el método .withValidation hace que Cloud Dataflow verifique los argumentos de línea de comandos necesarios y valide los valores de los argumentos.

Usar PipelineOptionsFactory.fromArgs interpreta los argumentos de la línea de comandos que siguen el formato:

--<option>=<value>

Compilar tu objeto PipelineOptions de esta forma te permite especificar cualquiera de las opciones en cualquier interfaz de org.apache.beam.sdk.options.PipelineOptions como un argumento de la línea de comandos.

Python

Para leer las opciones desde la línea de comandos, crea tu objeto PipelineOptions, como en el siguiente código de ejemplo:

options = PipelineOptions(flags=argv)

El argumento (flags=argv) de PipelineOptions interpreta los argumentos de la línea de comandos que siguen el formato:

--<option>=<value>

Compilar tu objeto PipelineOptions de esta manera te permite especificar cualquiera de las opciones mediante la creación de subclases a partir de .

Java: SDK 1.x

Para leer las opciones desde la línea de comandos, crea tu objeto PipelineOptions con el método PipelineOptionsFactory.fromArgs, como en el siguiente código de ejemplo:

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

Nota: Agregar el método .withValidation hace que Cloud Dataflow verifique los argumentos de línea de comandos necesarios y valide los valores de los argumentos.

Usar PipelineOptionsFactory.fromArgs interpreta los argumentos de la línea de comandos que siguen el formato:

--<option>=<value>

Compilar tu objeto PipelineOptions de esta forma te permite especificar las opciones en cualquier subinterfaz de com.google.cloud.dataflow.sdk.options.PipelineOptions como un argumento de la línea de comandos.

Crea opciones personalizadas

Puedes agregar tus propias opciones personalizadas además del objeto PipelineOptions estándar. El analizador de línea de comandos de Cloud Dataflow también puede configurar tus opciones personalizadas con argumentos de la línea de comandos especificados en el mismo formato.

Java: SDK 2.x

Si quieres agregar tus propias opciones, define una interfaz con métodos de obtención y configuración para cada opción, como en el siguiente ejemplo:

  public interface MyOptions extends PipelineOptions {
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Python

Para agregar tus propias opciones, usa el método add_argument() (que se comporta de la misma manera que el módulo argparse estándar de Python), como en el siguiente ejemplo:

class MyOptions(PipelineOptions):

  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

Java: SDK 1.x

Si quieres agregar tus propias opciones, define una interfaz con métodos de obtención y configuración para cada opción, como en el siguiente ejemplo:

  public interface MyOptions extends PipelineOptions {
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

También puedes especificar una descripción, que aparece cuando un usuario pasa --help como un argumento de la línea de comandos y un valor predeterminado.

Java: SDK 2.x

Establece la descripción y el valor predeterminado con anotaciones, de la siguiente manera:

  public interface MyOptions extends PipelineOptions {
    @Description("My custom command line argument.")
    @Default.String("DEFAULT")
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Se recomienda que registres tu interfaz con PipelineOptionsFactory y pases la interfaz cuando crees el objeto PipelineOptions. Cuando registras tu interfaz con PipelineOptionsFactory, --help puede encontrar tu interfaz de opciones personalizadas y agregarla al resultado del comando --help. PipelineOptionsFactory también validará que tus opciones personalizadas sean compatibles con todas las otras opciones registradas.

En el siguiente código de ejemplo, se muestra cómo registrar tu interfaz de opciones personalizadas con PipelineOptionsFactory:

  PipelineOptionsFactory.register(MyOptions.class);
  MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                            .withValidation()
                                            .as(MyOptions.class);

Ahora tu canalización puede aceptar --myCustomOption=value como un argumento de la línea de comandos.

Python

Establece la descripción y el valor predeterminado de la siguiente manera:

class MyOptions(PipelineOptions):

  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input',
                        help='Input for the pipeline',
                        default='gs://my-bucket/input')
    parser.add_argument('--output',
                        help='Output for the pipeline',
                        default='gs://my-bucket/output')

Java: SDK 1.x

Establece la descripción y el valor predeterminado con anotaciones, de la siguiente manera:

  public interface MyOptions extends PipelineOptions {
    @Description("My custom command line argument.")
    @Default.String("DEFAULT")
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Se recomienda que registres tu interfaz con PipelineOptionsFactory y pases la interfaz cuando crees el objeto PipelineOptions. Cuando registras tu interfaz con PipelineOptionsFactory, --help puede encontrar tu interfaz de opciones personalizadas y agregarla al resultado del comando --help. PipelineOptionsFactory también validará que tus opciones personalizadas sean compatibles con todas las otras opciones registradas.

En el siguiente código de ejemplo, se muestra cómo registrar tu interfaz de opciones personalizadas con PipelineOptionsFactory:

  PipelineOptionsFactory.register(MyOptions.class);
  MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                            .withValidation()
                                            .as(MyOptions.class);

Ahora tu canalización puede aceptar --myCustomOption=value como un argumento de la línea de comandos.

Configura PipelineOptions para su ejecución en el servicio de Cloud Dataflow

Para ejecutar tu canalización con el servicio administrado de Cloud Dataflow, necesitarás establecer los siguientes campos en PipelineOptions:

Java: SDK 2.x

  • project: El ID de tu proyecto de GCP.
  • runner: El ejecutor de canalizaciones que analizará tu programa y creará tu canalización. Para la ejecución en la nube, debe ser DataflowRunner.
  • gcpTempLocation: Una ruta de Cloud Storage para Cloud Dataflow que habilita por etapas cualquier archivo temporal. Debes crear este depósito con anticipación, antes de ejecutar tu canalización. En caso de que no especifiques gcpTempLocation, puedes especificar la opción de canalización tempLocation y, luego, gcpTempLocation se configurará en el valor de tempLocation. Si no se especifica ninguna, se creará una opción gcpTempLocation predeterminada.
  • stagingLocation: Un depósito de Cloud Storage para Cloud Dataflow que habilita por etapas tus archivos binarios. Si no configuras esta opción, el valor que especificaste para tempLocation se usará también en la ubicación de etapa de pruebas.
  • Se creará una ubicación gcpTempLocation predeterminada si no se especifica esta ni tempLocation. Si se especifica tempLocation, pero no gcpTempLocation, tempLocation será una ruta de Cloud Storage, que gcpTempLocation usará como la ruta predeterminada. Si no se especifica tempLocation y se especifica gcpTempLocation, tempLocation no se propagará.

Python

  • job_name: Es el nombre del trabajo de Cloud Dataflow que se ejecuta.
  • project: El ID de tu proyecto de GCP.
  • runner: El ejecutor de canalizaciones que analizará tu programa y creará tu canalización. Para la ejecución en la nube, debe ser DataflowRunner.
  • staging_location: Una ruta de Cloud Storage para Cloud Dataflow que habilita por etapas paquetes de código que necesitan los trabajadores que ejecutan el trabajo.
  • temp_location: Una ruta de Cloud Storage para Cloud Dataflow que habilita por etapas archivos de trabajo temporales creados durante la ejecución de la canalización.

Java: SDK 1.x

  • project: El ID de tu proyecto de GCP.
  • runner: El ejecutor de canalizaciones que analizará tu programa y creará tu canalización. Para la ejecución en la nube, debe ser DataflowPipelineRunner o BlockingDataflowPipelineRunner.
  • stagingLocation: Un depósito de Cloud Storage para Cloud Dataflow que habilita por etapas tus archivos binarios y temporales. Debes crear este depósito con anticipación, antes de ejecutar tu canalización.

Puedes configurar estas opciones de manera programática o especificarlas con la línea de comandos. En el siguiente código de ejemplo, se muestra cómo crear una canalización mediante la configuración programática del ejecutor y otras opciones necesarias, para ejecutar la canalización con el servicio administrado de Cloud Dataflow.

Java: SDK 2.x

  // Create and set your PipelineOptions.
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

  // For Cloud execution, set the Cloud Platform project, staging location,
  // and specify DataflowRunner.
  options.setProject("my-project-id");
  options.setStagingLocation("gs://my-bucket/binaries");
  options.setRunner(DataflowRunner.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

# Create and set your PipelineOptions.
options = PipelineOptions(flags=argv)

# For Cloud execution, set the Cloud Platform project, job_name,
# staging location, temp_location and specify DataflowRunner.
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'my-project-id'
google_cloud_options.job_name = 'myjob'
google_cloud_options.staging_location = 'gs://my-bucket/binaries'
google_cloud_options.temp_location = 'gs://my-bucket/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'

# Create the Pipeline with the specified options.
p = Pipeline(options=options)

Java: SDK 1.x

  // Create and set your PipelineOptions.
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

  // For Cloud execution, set the Cloud Platform project, staging location,
  // and specify DataflowPipelineRunner or BlockingDataflowPipelineRunner.
  options.setProject("my-project-id");
  options.setStagingLocation("gs://my-bucket/binaries");
  options.setRunner(DataflowPipelineRunner.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Después de crear tu canalización, especifica todas las transformaciones y las operaciones de lectura y escritura de la canalización y ejecútala.

En el siguiente código de ejemplo, se muestra cómo configurar las opciones necesarias para la ejecución del servicio de Cloud Dataflow con la línea de comandos:

Java: SDK 2.x

  // Create and set your PipelineOptions.
  MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

# Use Python argparse module to parse custom arguments
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('--input')
parser.add_argument('--output')
known_args, pipeline_args = parser.parse_known_args(argv)

# Create the Pipeline with remaining arguments.
with beam.Pipeline(argv=pipeline_args) as p:
  lines = p | 'ReadFromText' >> beam.io.ReadFromText(known_args.input)
  lines | 'WriteToText' >> beam.io.WriteToText(known_args.output)

Java: SDK 1.x

  // Create and set your PipelineOptions.
  MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Después de crear tu canalización, especifica todas las transformaciones y las operaciones de lectura y escritura de la canalización y ejecútala.

Java: SDK 2.x

Cuando pases las opciones necesarias en la línea de comandos, usa las opciones --project, --runner, --gcpTempLocationy, de forma opcional, --stagingLocation.

Python

Cuando pases las opciones necesarias en la línea de comandos, usa las opciones --project, --runner y --stagingLocation.

Java: SDK 1.x

Cuando pases las opciones necesarias en la línea de comandos, usa las opciones --project, --runner y --stagingLocation.

Ejecución asíncrona

Java: SDK 2.x

Usar DataflowRunner hace que tu canalización se ejecute de forma asíncrona en la nube de Google. Mientras tu canalización se ejecuta, puedes supervisar el progreso del trabajo, ver los detalles en ejecución y recibir actualizaciones sobre los resultados de la canalización con la Interfaz de supervisión de Cloud Dataflow o la Interfaz de línea de comandos de Cloud Dataflow.

Python

Usar DataflowRunner hace que tu canalización se ejecute de forma asíncrona en la nube de Google. Mientras tu canalización se ejecuta, puedes supervisar el progreso del trabajo, ver los detalles en ejecución y recibir actualizaciones sobre los resultados de la canalización con la Interfaz de supervisión de Cloud Dataflow o la Interfaz de línea de comandos de Cloud Dataflow.

Java: SDK 1.x

Usar DataflowPipelineRunner hace que tu canalización se ejecute de forma asíncrona en la nube de Google. Mientras tu canalización se ejecuta, puedes supervisar el progreso del trabajo, ver los detalles en ejecución y recibir actualizaciones sobre los resultados de la canalización con la Interfaz de supervisión de Cloud Dataflow o la Interfaz de línea de comandos de Cloud Dataflow.

Ejecución de bloqueo

Java: SDK 2.x

Especifica DataflowRunner como el ejecutor de canalizaciones y llama de forma explícita a pipeline.run().waitUntilFinish().

Cuando utilizas DataflowRunner y llamas a waitUntilFinish() en el objeto PipelineResult que se muestra en pipeline.run(), la canalización se ejecuta en la nube de la misma manera que , pero espera a que el trabajo iniciado finalice y muestra el objeto final DataflowPipelineJob. Mientras se ejecuta el trabajo, el servicio de Cloud Dataflow imprime las actualizaciones de estado del trabajo y los mensajes de la consola mientras espera.

Si eras usuario del SDK de Java 1.x y usabas --runner BlockingDataflowPipelineRunner en la línea de comandos para hacer que tu programa principal se bloquee de forma interactiva hasta que la canalización finalice, entonces, con Java 2.x, tu programa principal necesita llamar de manera explícita a waitUntilFinish().

Python

Utiliza el método wait_until_finish() del objeto PipelineResult, que se muestra a partir del método run() del ejecutor, para bloquear hasta que se complete la canalización.

Java: SDK 1.x

Especifica BlockingDataflowPipelineRunner como el ejecutor de la canalización.

Cuando utilizas BlockingDataflowPipelineRunner, la canalización se ejecuta en la nube de la misma manera que DataflowPipelineRunner, pero espera a que el trabajo iniciado termine. Mientras se ejecuta el trabajo, el servicio de Cloud Dataflow imprime las actualizaciones de estado del trabajo y los mensajes de la consola mientras espera. Usar BlockingDataflowPipelineRunner muestra el objeto final DataflowPipelineJob.

Nota: Si escribes Ctrl+C desde la línea de comandos, no se cancela tu trabajo. El servicio de Cloud Dataflow aún ejecuta el trabajo en GCP. Para cancelar el trabajo, deberás usar la Interfaz de supervisión de Cloud Dataflow o la Interfaz de línea de comandos de Cloud Dataflow.

Ejecución de la transmisión

Java: SDK 2.x

Si tu canalización lee desde una fuente de datos no delimitada, como Cloud Pub/Sub, la canalización se ejecuta automáticamente en modo de transmisión.

Si tu canalización usa fuentes de datos y receptores no delimitados, es necesario seleccionar una estrategia del sistema de ventanas para tus PCollections no delimitadas antes de que se pueda usar cualquier agregación como un GroupByKey.

Python

Si tu canalización usa una fuente de datos o un receptor no delimitados (como Cloud Pub/Sub), debes configurar la opción streaming como verdadera.

Los trabajos de transmisión utilizan un tipo de máquina de Compute Engine de n1-standard-2 o superior según la configuración predeterminada. No debes anular esto, puesto que n1-standard-2 es el tipo de máquina mínimo necesario para ejecutar trabajos de transmisión.

Si tu canalización usa fuentes de datos y receptores no delimitados, es necesario seleccionar una estrategia del sistema de ventanas para tus PCollections no delimitadas antes de que se pueda usar cualquier agregación como un GroupByKey.

Java: SDK 1.x

Si tu canalización usa una fuente de datos o un receptor no delimitados (como Cloud Pub/Sub), debes configurar la opción streaming en PipelineOptions como true. Puedes configurar la opción streaming de manera programática, como se muestra en el siguiente ejemplo:

  DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
  dataflowOptions.setStreaming(true);

Si tu canalización usa fuentes de datos y receptores no delimitados, debes seleccionar una estrategia del sistema de ventanas para tus PCollections no delimitadas antes de usar cualquier agregación, como GroupByKey.

Configura otras opciones de canalización de Cloud Dataflow

Para ejecutar tu canalización en la nube, puedes establecer los siguientes campos de manera programática en tu objeto PipelineOptions:

Java: SDK 2.x

Campo Tipo Descripción Valor predeterminado
runner Class (NameOfRunner) El PipelineRunner que se debe usar. Este campo te permite determinar el PipelineRunner en el entorno de ejecución. DirectRunner (modo local)
streaming boolean Especifica si el modo de transmisión está habilitado o inhabilitado; es verdadero si está habilitado. Si tu canalización lee desde una fuente no delimitada, el valor predeterminado es true. De lo contrario, es false.
project String El ID del proyecto para tu proyecto de GCP. Esto es necesario si deseas ejecutar tu canalización con el servicio administrado de Cloud Dataflow. Si no está configurado, se establece de manera predeterminada en el proyecto configurado en la actualidad en el SDK de Cloud.
gcpTempLocation String Ruta de Cloud Storage para archivos temporales. Debe ser una URL válida de Cloud Storage que comience con gs://.
stagingLocation String Ruta de Cloud Storage para archivos locales de etapa de pruebas. Debe ser una URL válida de Cloud Storage que comience con gs://. Si no está configurado, se establece de manera predeterminada en lo que especificaste para tempLocation.
autoscalingAlgorithm String El modo de ajuste de escala automático para tu trabajo de Cloud Dataflow. Los valores posibles son THROUGHPUT_BASED para habilitar el ajuste de escala automático o NONE para inhabilitarlo. Consulta Funciones de configuración automática para obtener más información sobre el funcionamiento del ajuste de escala automático en el servicio administrado de Cloud Dataflow. Se establece de forma predeterminada como THROUGHPUT_BASED para todos los trabajos por lotes de Cloud Dataflow que usan el SDK de Cloud Dataflow de la versión de Java 1.6.0 o versiones posteriores; se establece de forma predeterminada como NONE para los trabajos de transmisión o trabajos por lotes que usan versiones anteriores del SDK de Cloud Dataflow de Java.
numWorkers int La cantidad inicial de instancias de Google Compute Engine que se usarán cuando ejecutes tu canalización. Esta opción determina cuántos trabajadores inicia el servicio de Dataflow cuando comienza tu trabajo. Si no se especifica, el servicio de Cloud Dataflow determina una cantidad apropiada de trabajadores.
maxNumWorkers int La cantidad máxima de instancias de Compute Engine que estarán disponibles para tu canalización durante la ejecución. Ten en cuenta que puede ser mayor que la cantidad inicial de trabajadores (especificada por numWorkers para permitir que tu trabajo escale de forma automática o no). Si no se especifica, el servicio de Cloud Dataflow determinará una cantidad apropiada de trabajadores.
region String Especificar un extremo regional te permite definir una región para implementar tus trabajos de Cloud Dataflow. Si no se establece, el valor predeterminado es us-central1.
zone String La zona de Compute Engine para iniciar instancias de trabajadores y ejecutarlas en tu canalización. Si especificaste el parámetro region, el parámetro zone se establecerá de manera predeterminada en una zona de la región correspondiente. Puedes anular este comportamiento mediante la especificación de una zona diferente.
dataflowKmsKey String Especifica la clave de encriptación administrada por el cliente (CMEK) que se utiliza para encriptar datos en reposo. Puedes controlar la clave de encriptación a través de Cloud KMS. También debes especificar gcpTempLocation para usar esta función. Si no se especifica, Cloud Dataflow utiliza la encriptación de GCP predeterminada en lugar de una CMEK.
flexRSGoal String Especifica la Programación flexible de recursos (FlexRS) para trabajos por lotes con ajuste de escala automático. Afecta los parámetros numWorkers, autoscalingAlgorithm, zone, region y workerMachineType. Para obtener más información, consulta la sección de opciones de canalización de FlexRS. Si no se especifica, el valor predeterminado es SPEED_OPTIMIZED, que es lo mismo que omitir esta marca. Para activar FlexRS, debes especificar el valor COST_OPTIMIZED a fin de permitir que el servicio de Cloud Dataflow elija cualquier recurso con descuento disponible.
filesToStage List<String> Una lista de archivos locales, directorios de archivos o archivos (JAR o ZIP) para poner a disposición de cada trabajador. Si configuras esta opción, solo se subirán los archivos que especifiques (se ignorará la ruta de clase de Java). Debes especificar todos tus recursos en el orden correcto de la ruta de clase. Los recursos no se limitan al código, también pueden incluir archivos de configuración y otros recursos que estarán disponibles para todos los trabajadores. Tu código puede acceder a los recursos enumerados mediante los métodos de búsqueda de recursos de Java estándar. Precaución: La especificación de una ruta de acceso del directorio es poco eficaz debido a que Cloud Dataflow comprimirá los archivos antes de subirlos, lo que implica un costo de tiempo de inicio mayor. Además, no uses esta opción para transferir datos a los trabajadores que la canalización debe procesar, puesto que hacerlo es significativamente más lento que usar las API nativas de Cloud Storage/BigQuery combinadas con la fuente de datos adecuada de Cloud Dataflow. Si el campo filesToStage está vacío, Cloud Dataflow inferirá los archivos para habilitar a etapa según la ruta de clase de Java. Las consideraciones y precauciones mencionadas en la columna de la izquierda también se aplican aquí (tipos de archivos para enumerar y cómo acceder a ellos desde tu código).
network String La red de Compute Engine para iniciar instancias de Compute Engine a fin de ejecutar tu canalización. Consulta cómo especificar tu red. Si no está configurada, GCP supone que deseas usar una red llamada default.
subnetwork String La subred de Compute Engine para iniciar instancias de Compute Engine a fin de ejecutar tu canalización. Consulta cómo especificar tu subred. El servicio de Cloud Dataflow determina el valor predeterminado.
usePublicIps boolean Especifica si los trabajadores de Cloud Dataflow usan direcciones IP públicas. Si el valor está establecido como false, los trabajadores de Cloud Dataflow usan direcciones IP privadas para todas las comunicaciones. En este caso, si se especifica la opción subnetwork, se ignora la opción network. Asegúrate de que la opción network o subnetwork especificada tenga habilitado el Acceso privado a Google. Si no se configura, el valor predeterminado es true y los trabajadores de Cloud Dataflow usan direcciones IP públicas.
enableStreamingEngine boolean Especifica si Cloud Dataflow Streaming Engine está habilitado o inhabilitado; verdadero si está habilitado. Habilitar Streaming Engine te permite ejecutar los pasos de tu canalización de transmisión en el backend del servicio de Cloud Dataflow y conservar los recursos de almacenamiento de CPU, memoria y disco persistente. El valor predeterminado es false. Esto significa que los pasos de tu canalización de transmisión se ejecutan completamente en las VM de trabajador.
diskSizeGb int El tamaño del disco, en gigabytes, para usar en cada instancia de trabajador de Compute Engine remota. El tamaño mínimo de disco es de 30 GB para almacenar la imagen de arranque del trabajador y los registros locales. Si tu canalización redistribuye los datos, debes asignar más que el mínimo.

Configúralo en 0 para usar el tamaño predeterminado definido en tu proyecto de Cloud Platform.

Advertencia: Reducir el tamaño del disco disminuye la redistribución de E/S disponible. Los trabajos vinculados a la redistribución pueden aumentar el entorno de ejecución y el costo del trabajo.

serviceAccount String Especifica una cuenta de servicio de controlador administrada por el usuario, mediante el formato my-service-account-name@<project-id>.iam.gserviceaccount.com. Para obtener más información, consulta la sección Cuenta de servicio del controlador de la página de seguridad y permisos de Cloud Dataflow. Si no se configura, los trabajadores utilizan la cuenta de servicio de Compute Engine de tu proyecto como la cuenta de servicio del controlador.
workerDiskType String El tipo de disco persistente que se usará, especificado por una URL completa del recurso de tipo de disco. Por ejemplo, usa compute.googleapis.com/projects//zones//diskTypes/pd-ssd para especificar un disco persistente SSD. Si quieres obtener más información, consulta la página de referencia de la API de Compute Engine para diskTypes. El servicio de Cloud Dataflow determina el valor predeterminado.
workerMachineType String El tipo de máquina de Compute Engine que usará Cloud Dataflow cuando inicie la VM de trabajador. Cloud Dataflow admite series de n1 y tipos personalizados de máquinas. Los tipos de máquina de núcleo compartido, como los trabajadores de las series f1 y g1, no se admiten en el Acuerdo de Nivel de Servicio de Cloud Dataflow. El servicio de Cloud Dataflow elegirá el tipo de máquina en función de tu trabajo, si no configuras esta opción.

Consulta la documentación de referencia de la API de Java para la interfaz de PipelineOptions (y sus subinterfaces) a fin de obtener la lista completa de opciones de configuración de la canalización.

Python

Campo Tipo Descripción Valor predeterminado
runner str El PipelineRunner que se debe usar. Este campo puede ser DirectRunner o DataflowRunner. DirectRunner (modo local)
streaming bool Especifica si el modo de transmisión está habilitado o inhabilitado; es verdadero si está habilitado. false
project str El ID del proyecto para tu proyecto de GCP. Esto es necesario si deseas ejecutar tu canalización con el servicio administrado de Cloud Dataflow. Si no se configura, muestra un error.
temp_location str Ruta de Cloud Storage para archivos temporales. Debe ser una URL válida de Cloud Storage que comience con gs://. Si no está configurado, se establece de manera predeterminada en el valor de staging_location. Debes especificar, al menos, uno de los valores temp_location o staging_location para ejecutar tu canalización en la nube de Google.
staging_location str Ruta de Cloud Storage para archivos locales de etapa de pruebas. Debe ser una URL válida de Cloud Storage que comience con gs://. Si no está configurado, se establece de manera predeterminada en un directorio de etapa de pruebas dentro de temp_location. Debes especificar, al menos, uno de los valores temp_location o staging_location para ejecutar tu canalización en la nube de Google.
autoscaling_algorithm str El modo de ajuste de escala automático para tu trabajo de Cloud Dataflow. Los valores posibles son THROUGHPUT_BASED para habilitar el ajuste de escala automático o NONE para inhabilitarlo. Consulta Funciones de configuración automática para obtener más información sobre el funcionamiento del ajuste de escala automático en el servicio administrado de Cloud Dataflow. El valor predeterminado es THROUGHPUT_BASED para todos los trabajos por lotes de Cloud Dataflow, y NONE para todos los trabajos de transmisión de Cloud Dataflow.
num_workers int La cantidad de instancias de Compute Engine que se usarán cuando ejecutes tu canalización. Si no se especifica, el servicio de Cloud Dataflow determinará una cantidad apropiada de trabajadores.
max_num_workers int La cantidad máxima de instancias de Compute Engine que estarán disponibles para tu canalización durante la ejecución. Ten en cuenta que puede ser mayor que la cantidad inicial de trabajadores (especificada por num_workers para permitir que tu trabajo escale de forma automática o no). Si no se especifica, el servicio de Cloud Dataflow determinará una cantidad apropiada de trabajadores.
region str Especificar un extremo regional te permite definir una región para implementar tus trabajos de Cloud Dataflow. Si no se establece, el valor predeterminado es us-central1.
zone str La zona de Compute Engine para iniciar instancias de trabajadores y ejecutarlas en tu canalización. Si especificaste el parámetro region, el parámetro zone se establecerá de manera predeterminada en una zona de la región correspondiente. Puedes anular este comportamiento mediante la especificación de una zona diferente.
dataflow_kms_key str Especifica la clave de encriptación administrada por el cliente (CMEK) que se utiliza para encriptar datos en reposo. Puedes controlar la clave de encriptación a través de Cloud KMS. También debes especificar temp_location para usar esta función. Si no se especifica, Cloud Dataflow utiliza la encriptación de GCP en lugar de una CMEK.
flexrs_goal str Especifica la Programación flexible de recursos (FlexRS) para trabajos por lotes con ajuste de escala automático. Afecta los parámetros num_workers, autoscaling_algorithm, zone, region y machine_type. Para obtener más información, consulta la sección de opciones de canalización de FlexRS. Si no se especifica, el valor predeterminado es SPEED_OPTIMIZED, que es lo mismo que omitir esta marca. Para activar FlexRS, debes especificar el valor COST_OPTIMIZED a fin de permitir que el servicio de Cloud Dataflow elija cualquier recurso con descuento disponible.
network str La red de Compute Engine para iniciar instancias de Compute Engine a fin de ejecutar tu canalización. Consulta cómo especificar tu red. Si no está configurada, GCP supone que deseas usar una red llamada default.
subnetwork str La subred de Compute Engine para iniciar instancias de Compute Engine a fin de ejecutar tu canalización. Consulta cómo especificar tu subred. El servicio de Cloud Dataflow determina el valor predeterminado.
no_use_public_ips bool Especifica que los trabajadores de Cloud Dataflow no deben usar direcciones IP públicas. Por lo tanto, los trabajadores de Cloud Dataflow usarán direcciones IP privadas para todas las comunicaciones. Si se especifica la opción subnetwork, se omite la opción network. Asegúrate de que las opciones network o subnetwork especificadas tengan habilitado el Acceso privado a Google. El parámetro de las IP públicas necesita el SDK de Beam para Python. El SDK de Cloud Dataflow para Python no admite este parámetro. Si no se configura, los trabajadores de Cloud Dataflow usan direcciones IP públicas.
enable_streaming_engine boolean Especifica si Cloud Dataflow Streaming Engine está habilitado o inhabilitado; verdadero si está habilitado. Habilitar Streaming Engine te permite ejecutar los pasos de tu canalización de transmisión en el backend del servicio de Cloud Dataflow y conservar los recursos de almacenamiento de CPU, memoria y disco persistente. El valor predeterminado es false. Esto significa que los pasos de tu canalización de transmisión se ejecutan completamente en las VM de trabajador.
disk_size_gb int El tamaño del disco, en gigabytes, para usar en cada instancia de trabajador de Compute Engine remota. El tamaño mínimo de disco es de 30 GB para almacenar la imagen de arranque del trabajador y los registros locales. Si tu canalización redistribuye los datos, debes asignar más que el mínimo. Configúralo en 0 para usar el tamaño predeterminado definido en tu proyecto de GCP.
service_account_email str Especifica una cuenta de servicio de controlador administrada por el usuario, mediante el formato my-service-account-name@<project-id>.iam.gserviceaccount.com. Para obtener más información, consulta la sección Cuenta de servicio del controlador de la página de seguridad y permisos de Cloud Dataflow. Si no se configura, los trabajadores utilizan la cuenta de servicio de Compute Engine de tu proyecto como la cuenta de servicio del controlador.
worker_disk_type str El tipo de disco persistente que se usará, especificado por una URL completa del recurso de tipo de disco. Por ejemplo, usa compute.googleapis.com/projects//zones//diskTypes/pd-ssd para especificar un disco persistente SSD. Si quieres obtener más información, consulta la página de referencia de la API de Compute Engine para diskTypes. El servicio de Cloud Dataflow determina el valor predeterminado.
machine_type str El tipo de máquina de Compute Engine que usará Cloud Dataflow cuando inicie la VM de trabajador. Ten en cuenta que worker_machine_type es una marca de alias. El servicio de Cloud Dataflow elegirá el tipo de máquina en función de tu trabajo, si no configuras esta opción.

Java: SDK 1.x

Campo Tipo Descripción Valor predeterminado
runner Class (NameOfRunner) El PipelineRunner que se debe usar. Este campo te permite determinar el PipelineRunner en el entorno de ejecución. DirectPipelineRunner (modo local)
streaming boolean Especifica si el modo de transmisión (por el momento en versión Beta) está habilitado o inhabilitado; es verdadero si está habilitado. false
project String El ID del proyecto para tu proyecto de GCP. Esto es necesario si deseas ejecutar tu canalización con el servicio administrado de Cloud Dataflow. Si no está configurado, se establece de manera predeterminada en el proyecto configurado en la actualidad en el SDK de Cloud.
tempLocation String Ruta de Cloud Storage para archivos temporales. Debe ser una URL válida de Cloud Storage que comience con gs://. Si no está configurado, se establece de manera predeterminada en el valor de stagingLocation. Debes especificar, al menos, uno de los valores tempLocation o stagingLocation para ejecutar tu canalización en la nube de Google.
stagingLocation String Ruta de Cloud Storage para archivos locales de etapa de pruebas. Debe ser una URL válida de Cloud Storage que comience con gs://. Si no está configurado, se establece de manera predeterminada en un directorio de etapa de pruebas dentro de tempLocation. Debes especificar, al menos, uno de los valores tempLocation o stagingLocation para ejecutar tu canalización en la nube de Google.
autoscalingAlgorithm String El modo de ajuste de escala automático que se usa para tu trabajo de Cloud Dataflow. Los valores posibles son THROUGHPUT_BASED para habilitar el ajuste de escala automático o NONE para inhabilitarlo. Consulta Funciones de configuración automática para obtener más información sobre el funcionamiento del ajuste de escala automático en el servicio administrado de Cloud Dataflow. Se establece de forma predeterminada como THROUGHPUT_BASED para todos los trabajos por lotes de Cloud Dataflow que usan el SDK de Cloud Dataflow de la versión de Java 1.6.0 o versiones posteriores; se establece de forma predeterminada como NONE para los trabajos de transmisión o trabajos por lotes que usan versiones anteriores del SDK de Cloud Dataflow de Java.
numWorkers int La cantidad inicial de instancias de Compute Engine que se usarán cuando ejecutes tu canalización. Esta opción determina cuántos trabajadores inicia el servicio de Cloud Dataflow cuando comienza tu trabajo. Si no se especifica, el servicio de Cloud Dataflow determinará una cantidad apropiada de trabajadores.
maxNumWorkers int La cantidad máxima de instancias de Compute Engine que estarán disponibles para tu canalización durante la ejecución. Ten en cuenta que puede ser mayor que la cantidad inicial de trabajadores (especificada por numWorkers para permitir que tu trabajo escale de forma automática o no). Si no se especifica, el servicio de Cloud Dataflow determinará una cantidad apropiada de trabajadores.
zone String La zona de Compute Engine para iniciar instancias de trabajadores y ejecutarlas en tu canalización. Si no se establece, el valor predeterminado es us-central1-f. Puedes especificar una zona diferente para garantizar, por ejemplo, que tus trabajadores se inicien en la UE.
filesToStage List<String> Una lista de archivos locales, directorios de archivos o archivos (JAR o ZIP) para poner a disposición de cada trabajador. Si configuras esta opción, solo se subirán los archivos que especifiques (se ignorará la ruta de clase de Java). Debes especificar todos tus recursos en el orden correcto de la ruta de clase. Los recursos no se limitan al código, también pueden incluir archivos de configuración y otros recursos que estarán disponibles para todos los trabajadores. Tu código puede acceder a los recursos enumerados mediante los métodos de búsqueda de recursos de Java estándar. Precaución: La especificación de una ruta de acceso del directorio es poco eficaz debido a que Cloud Dataflow comprimirá los archivos antes de subirlos, lo que implica un costo de tiempo de inicio mayor. Además, no uses esta opción para transferir datos a los trabajadores que la canalización debe procesar, puesto que hacerlo es significativamente más lento que usar las API nativas de Cloud Storage/BigQuery combinadas con la fuente de datos de Cloud Dataflow adecuada. Si el campo filesToStage está vacío, Cloud Dataflow inferirá los archivos para habilitar a etapa según la ruta de clase de Java. Las consideraciones y precauciones mencionadas en la columna de la izquierda también se aplican aquí (tipos de archivos para enumerar y cómo acceder a ellos desde tu código).
network String La red de Compute Engine para iniciar instancias de Compute Engine a fin de ejecutar tu canalización. Consulta cómo especificar tu red. Si no está configurada, GCP supone que deseas usar una red llamada default.
subnetwork String La subred de Compute Engine para iniciar instancias de Compute Engine a fin de ejecutar tu canalización. Consulta cómo especificar tu subred. El servicio de Cloud Dataflow determina el valor predeterminado.
diskSizeGb int El tamaño del disco, en gigabytes, para usar en cada instancia de trabajador de Compute Engine remota. El tamaño mínimo de disco es de 30 GB para almacenar la imagen de arranque del trabajador y los registros locales. Si tu canalización redistribuye los datos, debes asignar más que el mínimo.

Configúralo en 0 para usar el tamaño predeterminado definido en tu proyecto de GCP.

Advertencia: Reducir el tamaño del disco disminuye la redistribución de E/S disponible. Los trabajos vinculados a la redistribución pueden aumentar el entorno de ejecución y el costo del trabajo.

workerDiskType String El tipo de disco persistente que se usará, especificado por una URL completa del recurso de tipo de disco. Por ejemplo, usa compute.googleapis.com/projects//zones//diskTypes/pd-ssd para especificar un disco persistente SSD. Si quieres obtener más información, consulta la página de referencia de la API de Compute Engine para diskTypes. El servicio de Cloud Dataflow determina el valor predeterminado.
workerMachineType String El tipo de máquina de Compute Engine que usará Cloud Dataflow cuando inicie la VM de trabajador. Cloud Dataflow admite series de n1 y tipos personalizados de máquinas. Los tipos de máquina de núcleo compartido, como los trabajadores de las series f1 y g1, no se admiten en el Acuerdo de Nivel de Servicio de Cloud Dataflow. El servicio de Cloud Dataflow elegirá el tipo de máquina en función de tu trabajo, si no configuras esta opción.

Consulta la documentación de referencia de la API de Java para la interfaz de PipelineOptions (y sus subinterfaces) a fin de obtener la lista completa de opciones de configuración de la canalización.

Configura PipelineOptions para la ejecución local

En lugar de ejecutar tu canalización en recursos de la nube administrados, puedes elegir ejecutar tu canalización de forma local. La ejecución local tiene ciertas ventajas para probar, depurar o ejecutar tu canalización en conjuntos de datos pequeños. Por ejemplo, la ejecución local quita la dependencia del servicio remoto de Cloud Dataflow y el proyecto de GCP asociado.

Cuando uses la ejecución local, te recomendamos que ejecutes tu canalización con conjuntos de datos que sean tan pequeños como para caber en la memoria local. Puedes crear un conjunto de datos pequeño en la memoria con una transformación Create o usar una transformación Read para trabajar con archivos locales o remotos pequeños. La ejecución local proporciona una forma rápida y fácil de realizar pruebas y depurar con menos dependencias externas, pero estará limitada por la memoria disponible en tu entorno local.

En el siguiente código de ejemplo, se muestra cómo crear una canalización que se ejecute en tu entorno local.

Java: SDK 2.x

  // Create and set our Pipeline Options.
  PipelineOptions options = PipelineOptionsFactory.create();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Nota: Para el modo local, no debes configurar el ejecutor, dado que DirectRunner ya es el valor predeterminado. Sin embargo, es necesario que incluyas de forma explícita DirectRunner como una dependencia o que lo agregues a la ruta de clase.

Python

# Create and set your Pipeline Options.
options = PipelineOptions()
p = Pipeline(options=options)

Nota: Para el modo local, no debes configurar el ejecutor, dado que DirectRunner ya es el valor predeterminado.

Java: SDK 1.x

  // Create and set our Pipeline Options.
  PipelineOptions options = PipelineOptionsFactory.create();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Nota: Para el modo local, no debes configurar el ejecutor, dado que DirectPipelineRunner ya es el valor predeterminado.

Después de crear tu canalización, ejecútala.

Configura otras opciones de canalización local

Cuando ejecutas tu canalización de manera local, los valores predeterminados para las propiedades en PipelineOptions suelen ser suficientes.

Java: SDK 2.x

Puedes encontrar los valores predeterminados para PipelineOptions de Java en la referencia de la API de Java. Consulta la lista de clase de PipelineOptions para obtener detalles completos.

Si tu canalización usa servicios de GCP, como BigQuery o Cloud Storage para IO, es posible que debas configurar ciertas opciones de credenciales y proyectos de GCP. En esos casos, debes usar GcpOptions.setProject para configurar tu ID del proyecto de Google Cloud. También es posible que necesites configurar credenciales explícitamente. Consulta la clase GcpOptions para obtener detalles completos.

Python

Puedes encontrar los valores predeterminados para PipelineOptions de Python en la referencia de la API de Python. Consulta la lista del módulo de PipelineOptions para obtener detalles completos.

Si tu canalización usa servicios de GCP, como BigQuery o Cloud Storage para E/S, es posible que debas configurar ciertas opciones de credenciales y proyectos de GCP. En esos casos, debes usar options.view_as(GoogleCloudOptions).project para configurar tu ID del proyecto de Google Cloud. Es posible que también debas configurar las credenciales de forma explícita. Consulta la clase GoogleCloudOptions para obtener detalles completos.

Java: SDK 1.x

Puedes encontrar los valores predeterminados para PipelineOptions de Java en la referencia de la API de Java. Consulta la lista de clase de PipelineOptions para obtener detalles completos.

Si tu canalización usa servicios de GCP, como BigQuery o Cloud Storage para E/S, es posible que debas configurar ciertas opciones de credenciales y proyectos de GCP. En esos casos, debes usar GcpOptions.setProject para configurar tu ID del proyecto de Google Cloud. También es posible que necesites configurar credenciales explícitamente. Consulta la clase GcpOptions para obtener detalles completos.

¿Te sirvió esta página? Envíanos tu opinión:

Enviar comentarios sobre…

¿Necesitas ayuda? Visita nuestra página de asistencia.