Especifica los parámetros de ejecución de la canalización

Después de que tu programa de Apache Beam cree una canalización, deberás ejecutarla. La ejecución de la canalización es independiente de la ejecución del programa de Apache Beam; tu programa de Apache Beam crea la canalización, y el código que escribiste genera una serie de pasos que debe ejecutar un ejecutor de canalizaciones. El ejecutor de canalizaciones puede ser el servicio administrado de Cloud Dataflow en Google Cloud Platform (GCP), un servicio de ejecución de terceros o un ejecutor de canalizaciones local que ejecute los pasos directamente en el entorno local.

Puedes especificar el ejecutor de canalizaciones y otras opciones de ejecución con la clase PipelineOptions del SDK de Apache Beam. Usa las PipelineOptions para configurar la forma y la ubicación en las que se ejecuta tu canalización y los recursos que usa.

La mayor parte del tiempo, desearás que tu canalización se ejecute en los recursos de GCP administrados con el servicio de ejecución de Cloud Dataflow. Si ejecutas tu canalización con el servicio de Cloud Dataflow, se crea un trabajo de Cloud Dataflow, que usa los recursos de Compute Engine y Cloud Storage en tu proyecto de GCP.

También puedes ejecutar tu canalización de manera local. Cuando ejecutas tu canalización de manera local, las transformaciones de la canalización se ejecutan en la misma máquina en la que se ejecuta tu programa de Cloud Dataflow. La ejecución local es útil para realizar pruebas y depurar, en particular, si tu canalización puede usar conjuntos de datos en la memoria más pequeños.

Configura las PipelineOptions

Debes pasar las PipelineOptions cuando creas tu objeto de Pipeline en tu programa de Cloud Dataflow. Cuando el servicio de Cloud Dataflow ejecuta tu canalización, envía una copia de las PipelineOptions a cada instancia de trabajador.

Java: SDK 2.x

Nota: Puedes acceder a las PipelineOptions dentro de cada instancia de ParDo DoFn con el método ProcessContext.getPipelineOptions.

Python

Esta función aún no se admite en el SDK de Apache Beam para Python.

Java: SDK 1.x

Nota: Puedes acceder a las PipelineOptions dentro de cada instancia de ParDo DoFn con el método ProcessContext.getPipelineOptions.

Configura las PipelineOptions desde argumentos de la línea de comandos

Si bien puedes configurar tu canalización mediante la creación de un objeto de PipelineOptions y la configuración directa de los campos, los SDK de Apache Beam incluyen un analizador de línea de comandos que puedes usar para establecer campos en PipelineOptions con los argumentos de la línea de comandos.

Java: SDK 2.x

Para leer las opciones desde la línea de comandos, crea tu objeto de PipelineOptions con el método PipelineOptionsFactory.fromArgs, como en el siguiente código de ejemplo:

PipelineOptions options = PipelineOptions.fromArgs(args).withValidation().create();

Nota: Agregar el método .withValidation hace que Cloud Dataflow verifique los argumentos de línea de comandos necesarios y valide los valores de los argumentos.

El uso de PipelineOptionsFactory.fromArgs interpreta los argumentos de la línea de comandos que siguen el formato que se ve a continuación:

--<option>=<value>

Compilar tus PipelineOptions de esta forma te permite especificar las opciones en cualquier subinterfaz de org.apache.beam.sdk.options.PipelineOptions como un argumento de la línea de comandos.

Python

Para leer las opciones desde la línea de comandos, crea tu objeto de PipelineOptions, como en el siguiente código de ejemplo:

options = PipelineOptions(flags=argv)

El argumento (flags=argv) para las PipelineOptions interpreta los argumentos de la línea de comandos que sigue el formato que se ve a continuación:

--<option>=<value>

Compilar tus PipelineOptions de esta forma te permite especificar cualquiera de las opciones mediante la subclasificación desde las PipelineOptions.

Java: SDK 1.x

Para leer las opciones desde la línea de comandos, crea tu objeto de PipelineOptions con el método PipelineOptionsFactory.fromArgs, como en el siguiente código de ejemplo:

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

Nota: Agregar el método .withValidation hace que Cloud Dataflow verifique los argumentos de línea de comandos necesarios y valide los valores de los argumentos.

El uso de PipelineOptionsFactory.fromArgs interpreta los argumentos de la línea de comandos que siguen el formato que se ve a continuación:

--<option>=<value>

Compilar tus PipelineOptions de esta forma te permite especificar las opciones en cualquier subinterfaz de org.apache.beam.sdk.options.PipelineOptions como un argumento de la línea de comandos.

Crea opciones personalizadas

Puedes agregar tus propias opciones personalizadas además de las opciones de PipelineOptions estándar. El analizador de línea de comandos de Cloud Dataflow también puede configurar tus opciones personalizadas con argumentos de la línea de comandos especificados en el mismo formato.

Java: SDK 2.x

Si quieres agregar tus propias opciones, define una interfaz con métodos de obtención y configuración para cada opción, como en el siguiente ejemplo:

  public interface MyOptions extends PipelineOptions {
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Python

Para agregar tus propias opciones, usa el método add_argument() (que se comporta de la misma manera que el módulo argparse estándar de Python), como en el siguiente ejemplo:

class MyOptions(PipelineOptions):

  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

Java: SDK 1.x

Si quieres agregar tus propias opciones, define una interfaz con métodos de obtención y configuración para cada opción, como en el siguiente ejemplo:

  public interface MyOptions extends PipelineOptions {
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

También puedes especificar una descripción, que aparece cuando un usuario pasa la --help como un argumento de la línea de comandos y un valor predeterminado.

Java: SDK 2.x

Establece la descripción y el valor predeterminado con anotaciones, de la siguiente manera:

  public interface MyOptions extends PipelineOptions {
    @Description("My custom command line argument.")
    @Default.String("DEFAULT")
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Se recomienda que registres tu interfaz con PipelineOptionsFactory y, luego, pases la interfaz cuando creas el objeto de PipelineOptions. Cuando registras tu interfaz con PipelineOptionsFactory, la --help puede encontrar tu interfaz de opciones personalizadas y agregarla al resultado del comando --help. PipelineOptionsFactory también validará que tus opciones personalizadas sean compatibles con todas las otras opciones registradas.

En el siguiente código de ejemplo, se muestra cómo registrar tu interfaz de opciones personalizadas con PipelineOptionsFactory:

  PipelineOptionsFactory.register(MyOptions.class);
  MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                            .withValidation()
                                            .as(MyOptions.class);

Ahora tu canalización puede aceptar --myCustomOption=value como un argumento de la línea de comandos.

Python

Establece la descripción y el valor predeterminado de la siguiente manera:

class MyOptions(PipelineOptions):

  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input',
                        help='Input for the pipeline',
                        default='gs://my-bucket/input')
    parser.add_argument('--output',
                        help='Output for the pipeline',
                        default='gs://my-bucket/output')

Java: SDK 1.x

Establece la descripción y el valor predeterminado con anotaciones, de la siguiente manera:

  public interface MyOptions extends PipelineOptions {
    @Description("My custom command line argument.")
    @Default.String("DEFAULT")
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Se recomienda que registres tu interfaz con PipelineOptionsFactory y, luego, pases la interfaz cuando creas el objeto de PipelineOptions. Cuando registras tu interfaz con PipelineOptionsFactory, la --help puede encontrar tu interfaz de opciones personalizadas y agregarla al resultado del comando --help. PipelineOptionsFactory también validará que tus opciones personalizadas sean compatibles con todas las otras opciones registradas.

En el siguiente código de ejemplo, se muestra cómo registrar tu interfaz de opciones personalizadas con PipelineOptionsFactory:

  PipelineOptionsFactory.register(MyOptions.class);
  MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                            .withValidation()
                                            .as(MyOptions.class);

Ahora tu canalización puede aceptar --myCustomOption=value como un argumento de la línea de comandos.

Configura PipelineOptions para su ejecución en el servicio de Cloud Dataflow

Para ejecutar tu canalización con el servicio administrado de Cloud Dataflow, necesitarás establecer los siguientes campos en PipelineOptions:

Java: SDK 2.x

  • project: el ID de tu proyecto de GCP.
  • runner: el ejecutor de canalizaciones que analizará tu programa y creará tu canalización. Para la ejecución en la nube, debe ser DataflowRunner.
  • gcpTempLocation: una ruta de Cloud Storage para Cloud Dataflow que habilita por etapas cualquier archivo temporal. Debes crear este depósito con anticipación, antes de ejecutar tu canalización. En caso de que no especifiques gcpTempLocation, puedes especificar la opción de canalización tempLocation y, luego, la gcpTempLocation se establecerá al valor de tempLocation. Si ninguna se especifica, se creará una gcpTempLocation predeterminada.
  • stagingLocation: un depósito de Cloud Storage para Cloud Dataflow que habilita por etapas tus archivos binarios. Si no estableces esta opción, lo que especificaste para tempLocation se usará también en la ubicación de etapa de pruebas.
  • Si no se especifican gcpTempLocation ni tempLocation, se creará una gcpTempLocation predeterminada. Si se especifica tempLocation, pero no gcpTempLocation, la primera debe ser una ruta de Cloud Storage, que gcpTempLocation usará como la ruta predeterminada. Si no se especifica tempLocation y se especifica gcpTempLocation, tempLocation no se propagará.

Python

  • job_name: el nombre del trabajo de Cloud Dataflow en ejecución.
  • project: el ID de tu proyecto de GCP.
  • runner: el ejecutor de canalizaciones que analizará tu programa y creará tu canalización. Para la ejecución en la nube, debe ser DataflowRunner.
  • staging_location: una ruta de Cloud Storage para Cloud Dataflow que habilita por etapas paquetes de códigos que necesitan los trabajadores que ejecutan el trabajo.
  • temp_location: una ruta de Cloud Storage para Cloud Dataflow que habilita por etapas archivos de trabajo temporales durante la ejecución de la canalización.

Java: SDK 1.x

  • project: el ID de tu proyecto de GCP.
  • runner: el ejecutor de canalizaciones que analizará tu programa y creará tu canalización. Para la ejecución en la nube, debe ser DataflowPipelineRunner o BlockingDataflowPipelineRunner.
  • stagingLocation: un depósito de Cloud Storage para Cloud Dataflow que habilita por etapas tus archivos binarios y temporales. Debes crear este depósito con anticipación, antes de ejecutar tu canalización.

Puedes configurar estas opciones de manera programática o especificarlas con la línea de comandos. En el siguiente código de ejemplo, se muestra cómo crear una canalización mediante la configuración programática del ejecutor y otras opciones necesarias, para ejecutar la canalización con el servicio administrado de Cloud Dataflow.

Java: SDK 2.x

  // Create and set your PipelineOptions.
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

  // For Cloud execution, set the Cloud Platform project, staging location,
  // and specify DataflowRunner.
  options.setProject("my-project-id");
  options.setStagingLocation("gs://my-bucket/binaries");
  options.setRunner(DataflowRunner.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

# Create and set your PipelineOptions.
options = PipelineOptions(flags=argv)

# For Cloud execution, set the Cloud Platform project, job_name,
# staging location, temp_location and specify DataflowRunner.
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'my-project-id'
google_cloud_options.job_name = 'myjob'
google_cloud_options.staging_location = 'gs://my-bucket/binaries'
google_cloud_options.temp_location = 'gs://my-bucket/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'

# Create the Pipeline with the specified options.
p = Pipeline(options=options)

Java: SDK 1.x

  // Create and set your PipelineOptions.
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

  // For Cloud execution, set the Cloud Platform project, staging location,
  // and specify DataflowPipelineRunner or BlockingDataflowPipelineRunner.
  options.setProject("my-project-id");
  options.setStagingLocation("gs://my-bucket/binaries");
  options.setRunner(DataflowPipelineRunner.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Después de crear tu canalización, especifica todas las transformaciones y las operaciones de lectura y escritura de la canalización y ejecútala.

En el siguiente código de ejemplo, se muestra cómo configurar las opciones necesarias para la ejecución del servicio de Cloud Dataflow con la línea de comandos:

Java: SDK 2.x

  // Create and set your PipelineOptions.
  MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

# Use Python argparse module to parse custom arguments
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('--input')
parser.add_argument('--output')
known_args, pipeline_args = parser.parse_known_args(argv)

# Create the Pipeline with remaining arguments.
with beam.Pipeline(argv=pipeline_args) as p:
  lines = p | 'ReadFromText' >> beam.io.ReadFromText(known_args.input)
  lines | 'WriteToText' >> beam.io.WriteToText(known_args.output)

Java: SDK 1.x

  // Create and set your PipelineOptions.
  MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Después de crear tu canalización, especifica todas las transformaciones y las operaciones de lectura y escritura de la canalización y ejecútala.

Java: SDK 2.x

Cuando pases las opciones necesarias en la línea de comandos, usa las opciones de --project, --runner, --tempLocation y, de forma opcional, --stagingLocation.

Python

Cuando pases las opciones necesarias en la línea de comandos, usa las opciones de --project, --runner y --stagingLocation.

Java: SDK 1.x

Cuando pases las opciones necesarias en la línea de comandos, usa las opciones de --project, --runner y --stagingLocation.

Ejecución asíncrona

Java: SDK 2.x

El uso de DataflowRunner hace que tu canalización se ejecute de forma asíncrona en la nube de Google. Mientras tu canalización se ejecuta, puedes supervisar el progreso del trabajo, ver los detalles en ejecución y recibir actualizaciones sobre los resultados de la canalización con la interfaz de supervisión de Cloud Dataflow o la interfaz de línea de comandos de Cloud Dataflow.

Python

El uso de DataflowRunner hace que tu canalización se ejecute de forma asíncrona en la nube de Google. Mientras tu canalización se ejecuta, puedes supervisar el progreso del trabajo, ver los detalles en ejecución y recibir actualizaciones sobre los resultados de la canalización con la interfaz de supervisión de Cloud Dataflow o la interfaz de línea de comandos de Cloud Dataflow.

Java: SDK 1.x

El uso de DataflowPipelineRunner hace que tu canalización se ejecute de forma asíncrona en la nube de Google. Mientras tu canalización se ejecuta, puedes supervisar el progreso del trabajo, ver los detalles en ejecución y recibir actualizaciones sobre los resultados de la canalización con la interfaz de supervisión de Cloud Dataflow o la interfaz de línea de comandos de Cloud Dataflow.

Ejecución de bloqueo

Java: SDK 2.x

Especifica DataflowRunner como el ejecutor de canalizaciones y llama de forma explícita a pipeline.run().waitUntilFinish().

Cuando usas el DataflowRunner y llamas a waitUntilFinish() en el objeto de PipelineResult que se muestra desde pipeline.run(), la canalización se ejecuta en la nube de la misma manera que el DataflowRunner, pero espera que el trabajo que se inició finalice y muestre el objeto de DataflowPipelineJob final. Mientras se ejecuta el trabajo, el servicio de Cloud Dataflow imprime las actualizaciones de estado del trabajo y los mensajes de la consola mientras espera.

Si eras un usuario del SDK de Java 1.x y usabas --runner BlockingDataflowPipelineRunner en la línea de comandos para hacer que tu programa principal se bloquee de forma interactiva hasta que la canalización finalice, entonces con Java 2.x tu programa principal necesita llamar de manera explícita waitUntilFinish().

Python

Usa el método wait_until_finish() del objeto de PipelineResult, que se muestra desde el método run() del ejecutor, para bloquear hasta que se complete la canalización.

Java: SDK 1.x

Especifica BlockingDataflowPipelineRunner como el ejecutor de canalizaciones.

Cuando usas BlockingDataflowPipelineRunner, la canalización se ejecuta en la nube de la misma forma que DataflowPipelineRunner, pero espera a que finalice el trabajo iniciado. Mientras se ejecuta el trabajo, el servicio de Cloud Dataflow imprime las actualizaciones de estado del trabajo y los mensajes de la consola mientras espera. El uso de BlockingDataflowPipelineRunner muestra el objeto de DataflowPipelineJob final.

Nota: Escribir Ctrl+C en la línea de comandos no cancela el trabajo. El servicio de Cloud Dataflow aún ejecuta el trabajo en GCP; para cancelar el trabajo, deberás usar la interfaz de supervisión de Cloud Dataflow o la interfaz de línea de comandos de Cloud Dataflow.

Ejecución de transmisión

Java: SDK 2.x

Si tu canalización lee desde una fuente de datos no delimitada (como Cloud Pub/Sub), la canalización se ejecutará de forma automática en modo de transmisión.

Si tu canalización usa fuentes de datos y receptores no delimitados, es necesario seleccionar una estrategia del sistema de ventanas para tu PCollections no delimitadas antes de que se pueda usar cualquier agregación como un GroupByKey.

Python

Si tu canalización usa una fuente de datos o un receptor no delimitados (como Cloud Pub/Sub), debes configurar la opción streaming como verdadera.

Los trabajos de transmisión usan un tipo de máquina de Compute Engine de n1-standard-2 o superior de forma predeterminada. No debes anular esto, ya que n1-standard-2 es el tipo de máquina mínimo necesario para ejecutar trabajos de transmisión.

Si tu canalización usa fuentes de datos y receptores no delimitados, debes seleccionar una estrategia del sistema de ventanas para tus PCollections no delimitadas antes de usar cualquier agregación, como GroupByKey.

Java: SDK 1.x

Si tu canalización usa una fuente de datos o un receptor no delimitados (como Cloud Pub/Sub), debes configurar la opción streaming en PipelineOptions como true. Puedes configurar la opción streaming de manera programática como se muestra en el siguiente ejemplo:

  DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
  dataflowOptions.setStreaming(true);

Si tu canalización usa fuentes de datos y receptores no delimitados, debes seleccionar una estrategia del sistema de ventanas para tus PCollections no delimitadas antes de usar cualquier agregación, como GroupByKey.

Configura otras opciones de canalización en la nube

Para ejecutar tu canalización en la nube, puedes establecer los siguientes campos de manera programática en tu objeto de PipelineOptions:

Java: SDK 2.x

Campo Tipo Descripción Valor predeterminado
runner Class (NameOfRunner) El PipelineRunner para usar. Este campo te permite determinar el PipelineRunner en el entorno de ejecución. DirectRunner (modo local)
streaming boolean Si el modo de transmisión está habilitado o inhabilitado; es verdadero si está habilitado. Si tu canalización lee desde una fuente no delimitada, el valor predeterminado es true. De lo contrario, es false.
project String El ID del proyecto para tu proyecto de GCP. Esto es necesario si deseas ejecutar tu canalización con el servicio administrado de Cloud Dataflow. Si no está configurado, se establece de manera predeterminada en el proyecto configurado en la actualidad en el SDK de Cloud.
gcpTempLocation String Ruta de Cloud Storage para archivos temporales. Debe ser una URL de Cloud Storage válida que comience con gs://.
stagingLocation String Ruta de Cloud Storage para archivos locales de etapa de pruebas. Debe ser una URL de Cloud Storage válida que comience con gs://. Si no está configurado, se establece de manera predeterminada en lo que especificaste para tempLocation.
autoscalingAlgorithm String El modo de ajuste de escala automático que se usa para tu trabajo de Cloud Dataflow. Los valores posibles son THROUGHPUT_BASED para habilitar el ajuste de escala automático o NONE para inhabilitarlo. Consulta Funciones de configuración automática para obtener más información sobre el funcionamiento del ajuste de escala automático en el servicio administrado de Dataflow. Se establece de forma predeterminada como THROUGHPUT_BASED para todos los trabajos por lotes de Cloud Dataflow que usan el SDK de Cloud Dataflow de la versión de Java 1.6.0 o versiones posteriores; se establece de forma predeterminada como NONE para los trabajos de transmisión o trabajos por lotes que usan versiones anteriores del SDK de Cloud Dataflow de Java.
numWorkers int La cantidad inicial de instancias de Google Compute Engine que se usarán cuando ejecutes tu canalización. Esta opción determina cuántos trabajadores inicia el servicio de Dataflow cuando comienza tu trabajo. Si no se especifica, el servicio de Cloud Dataflow determina una cantidad apropiada de trabajadores.
maxNumWorkers int La cantidad máxima de instancias de Compute Engine que estarán disponibles para tu canalización durante la ejecución. Ten en cuenta que puede ser mayor que la cantidad inicial de trabajadores (especificada por numWorkers para permitir que tu trabajo escale de forma automática o no). Si no se especifica, el servicio de Cloud Dataflow determinará una cantidad apropiada de trabajadores.
region String Especificar un extremo regional te permite definir una región para implementar tus trabajos de Cloud Dataflow. Si no está configurado, se establece de manera predeterminada como us-central1.
zone String La zona de disponibilidad de Compute Engine para lanzar instancias de trabajadores y ejecutarlas en tu canalización. Si especificaste el parámetro de region, el parámetro de zone se establecerá de manera predeterminada en una zona de disponibilidad de la región correspondiente. Puedes anular este comportamiento si especificas una zona de disponibilidad diferente.
filesToStage List<String> Una lista de archivos locales, directorios de archivos o archivos (JAR o ZIP) para poner a disposición de cada trabajador. Si configuras esta opción, solo se subirán los archivos que especifiques (se ignorará la ruta de clase de Java). Debes especificar todos tus recursos en el orden correcto de la ruta de clase. Los recursos no se limitan al código, también pueden incluir archivos de configuración y otros recursos que estarán disponibles para todos los trabajadores. Tu código puede acceder a los recursos enumerados mediante los métodos de búsqueda de recursos de Java estándar. Precaución: La especificación de una ruta de acceso del directorio poco eficaz debido a que Cloud Dataflow comprimirá los archivos antes de subirlos, lo que implica un costo de tiempo de inicio mayor. Además, no uses esta opción para transferir datos a los trabajadores que la canalización debe procesar, ya que hacer eso es mucho más lento que usar las API nativas de Cloud Storage/BigQuery combinadas con la fuente de datos de Cloud Dataflow adecuada. Si filesToStage está vacía, Cloud Dataflow inferirá los archivos a la etapa según la ruta de clase de Java. Las consideraciones y precauciones mencionadas en la columna de la izquierda también se aplican aquí (tipos de archivos para enumerar y cómo acceder a ellos desde tu código).
network String La red de Compute Engine para iniciar instancias de Compute Engine a fin de ejecutar tu canalización. Consulta cómo especificar tu red. Si no está configurado, GCP supone que deseas usar una red llamada default.
subnetwork String La subred de Compute Engine para iniciar instancias de Compute Engine a fin de ejecutar tu canalización. Consulta cómo especificar tu subred. El servicio de Cloud Dataflow determina el valor predeterminado.
usePublicIps boolean Especifica si los trabajadores de Cloud Dataflow usan direcciones IP públicas. Si el valor está establecido como false, los trabajadores de Cloud Dataflow usan direcciones IP privadas para todas las comunicaciones. En este caso, si se especifica la opción subnetwork, se ignora la opción network. Asegúrate de que la network o subnetwork especificada tenga habilitado el Acceso privado a Google. Si no se configura, el valor predeterminado es true y los trabajadores de Cloud Dataflow usan direcciones IP públicas.
diskSizeGb int El tamaño del disco, en gigabytes, para usar en cada instancia de trabajador de Compute Engine remota. El tamaño mínimo de disco es de 30 GB para almacenar la imagen de arranque del trabajador y los registros locales. Si tu canalización redistribuye los datos, debes asignar más que el mínimo.

Configúralo en 0 para usar el tamaño predeterminado definido en tu proyecto de Cloud Platform.

Advertencia: Reducir el tamaño del disco disminuye la redistribución de E/S disponible. Los trabajos vinculados a la redistribución pueden aumentar el entorno de ejecución y el costo del trabajo.

workerDiskType String El tipo de disco persistente que se usará, especificado por una URL completa del recurso de tipo de disco. Por ejemplo, usa compute.googleapis.com/projects//zones//diskTypes/pd-ssd para especificar un disco persistente SSD. Si quieres obtener más información, consulta la página de referencia de la API de Compute Engine de diskTypes. El servicio de Cloud Dataflow determina el valor predeterminado.
workerMachineType String El tipo de máquina de Compute Engine que usará Cloud Dataflow cuando inicie VM de trabajador. Cloud Dataflow admite series de n1 y tipos personalizados de máquinas. Los tipos de máquina de núcleo compartido como los trabajadores de las series f1 y g1 no se admiten en el Acuerdo de Nivel de Servicio de Cloud Dataflow. El servicio de Cloud Dataflow elegirá el tipo de máquina en función de tu trabajo, si no configuras esta opción.

Consulta la documentación de referencia de la API de Java para la interfaz de PipelineOptions (y sus subinterfaces) a fin de obtener la lista completa de opciones de configuración de la canalización.

Python

Campo Tipo Descripción Valor predeterminado
runner str El PipelineRunner para usar. Este campo puede ser DirectRunner o DataflowRunner. DirectRunner (modo local)
streaming bool Si el modo de transmisión está habilitado o inhabilitado; es verdadero si está habilitado. false
project str El ID del proyecto para tu proyecto de GCP. Esto es necesario si deseas ejecutar tu canalización con el servicio administrado de Cloud Dataflow. Si no se configura, muestra un error.
temp_location str Ruta de Cloud Storage para archivos temporales. Debe ser una URL de Cloud Storage válida que comience con gs://. Si no está configurado, se establece de manera predeterminada en el valor de staging_location. Debes especificar como mínimo una de las temp_location o staging_location para ejecutar tu canalización en la nube de Google.
staging_location str Ruta de Cloud Storage para archivos locales de etapa de pruebas. Debe ser una URL de Cloud Storage válida que comience con gs://. Si no está configurado, se establece de manera predeterminada en un directorio de etapa de pruebas dentro de temp_location. Debes especificar como mínimo una temp_location o staging_location para ejecutar tu canalización en la nube de Google.
autoscaling_algorithm str El modo de ajuste de escala automático que se usa para tu trabajo de Cloud Dataflow. Los valores posibles son THROUGHPUT_BASED para habilitar el ajuste de escala automático o NONE para inhabilitarlo. Consulta Funciones de configuración automática para obtener más información sobre el funcionamiento del ajuste de escala automático en el servicio administrado de Cloud Dataflow. Se establece de forma predeterminada como THROUGHPUT_BASED para todos los trabajos por lotes de Cloud Dataflow.
num_workers int La cantidad de instancias de Compute Engine que se usarán cuando ejecutes tu canalización. Si no se especifica, el servicio de Cloud Dataflow determinará una cantidad apropiada de trabajadores.
max_num_workers int La cantidad máxima de instancias de Compute Engine que estarán disponibles para tu canalización durante la ejecución. Ten en cuenta que puede ser mayor que la cantidad inicial de trabajadores (especificada por num_workers para permitir que tu trabajo escale de forma automática o no). Si no se especifica, el servicio de Cloud Dataflow determinará una cantidad apropiada de trabajadores.
region str Especificar un extremo regional te permite definir una región para implementar tus trabajos de Cloud Dataflow. Si no está configurado, se establece de manera predeterminada como us-central1.
zone str La zona de disponibilidad de Compute Engine para lanzar instancias de trabajadores y ejecutarlas en tu canalización. Si especificaste el parámetro de region, el parámetro de zone se establecerá de manera predeterminada en una zona de disponibilidad de la región correspondiente. Puedes anular este comportamiento si especificas una zona de disponibilidad diferente.
network str La red de Compute Engine para iniciar instancias de Compute Engine a fin de ejecutar tu canalización. Consulta cómo especificar tu red. Si no está configurado, GCP supone que deseas usar una red llamada default.
subnetwork str La subred de Compute Engine para iniciar instancias de Compute Engine a fin de ejecutar tu canalización. Consulta cómo especificar tu subred. El servicio de Cloud Dataflow determina el valor predeterminado.
use_public_ips bool Especifica si los trabajadores de Cloud Dataflow usan direcciones IP públicas. Si el valor está establecido como false, los trabajadores de Cloud Dataflow usan direcciones IP privadas para todas las comunicaciones. En este caso, si se especifica la opción subnetwork, se ignora la opción network. Asegúrate de que la network o subnetwork especificada tenga habilitado el Acceso privado a Google. El parámetro de las IP públicas necesita el SDK de Beam para Python. El SDK de Cloud Dataflow para Python no admite este parámetro. Si no se configura, el valor predeterminado es true y los trabajadores de Cloud Dataflow usan direcciones IP públicas.
disk_size_gb int El tamaño del disco, en gigabytes, para usar en cada instancia de trabajador de Compute Engine remota. El tamaño mínimo de disco es de 30 GB para almacenar la imagen de arranque del trabajador y los registros locales. Si tu canalización redistribuye los datos, debes asignar más que el mínimo. Configúralo en 0 para usar el tamaño predeterminado definido en tu proyecto de GCP.
worker_disk_type str El tipo de disco persistente que se usará, especificado por una URL completa del recurso de tipo de disco. Por ejemplo, usa compute.googleapis.com/projects//zones//diskTypes/pd-ssd para especificar un disco persistente SSD. Si quieres obtener más información, consulta la página de referencia de la API de Compute Engine de diskTypes. El servicio de Cloud Dataflow determina el valor predeterminado.
machine_type str El tipo de máquina de Compute Engine que usará Cloud Dataflow cuando inicie VM de trabajador. Ten en cuenta que worker_machine_type es una marca de alias. El servicio de Cloud Dataflow elegirá el tipo de máquina en función de tu trabajo, si no configuras esta opción.

Java: SDK 1.x

Campo Tipo Descripción Valor predeterminado
runner Class (NameOfRunner) El PipelineRunner para usar. Este campo te permite determinar el PipelineRunner en el entorno de ejecución. DirectPipelineRunner (modo local)
streaming boolean Si el modo de transmisión (por el momento en versión Beta) está habilitado o inhabilitado; es verdadero si está habilitado. false
project String El ID del proyecto para tu proyecto de GCP. Esto es necesario si deseas ejecutar tu canalización con el servicio administrado de Cloud Dataflow. Si no está configurado, se establece de manera predeterminada en el proyecto configurado en la actualidad en el SDK de Cloud.
tempLocation String Ruta de Cloud Storage para archivos temporales. Debe ser una URL de Cloud Storage válida que comience con gs://. Si no está configurado, se establece de manera predeterminada en el valor de stagingLocation. Debes especificar como mínimo una de las tempLocation o stagingLocation para ejecutar tu canalización en la nube de Google.
stagingLocation String Ruta de Cloud Storage para archivos locales de etapa de pruebas. Debe ser una URL de Cloud Storage válida que comience con gs://. Si no está configurado, se establece de manera predeterminada en un directorio de etapa de pruebas dentro de tempLocation. Debes especificar como mínimo una tempLocation o stagingLocation para ejecutar tu canalización en la nube de Google.
autoscalingAlgorithm String El modo de ajuste de escala automático que se usa para tu trabajo de Cloud Dataflow. Los valores posibles son THROUGHPUT_BASED para habilitar el ajuste de escala automático o NONE para inhabilitarlo. Consulta Funciones de configuración automática para obtener más información sobre el funcionamiento del ajuste de escala automático en el servicio administrado de Cloud Dataflow. Se establece de forma predeterminada como THROUGHPUT_BASED para todos los trabajos por lotes de Cloud Dataflow que usan el SDK de Cloud Dataflow de la versión de Java 1.6.0 o versiones posteriores; se establece de forma predeterminada como NONE para los trabajos de transmisión o trabajos por lotes que usan versiones anteriores del SDK de Cloud Dataflow de Java.
numWorkers int La cantidad inicial de instancias de Compute Engine que se usarán cuando ejecutes tu canalización. Esta opción determina cuántos trabajadores inicia el servicio de Cloud Dataflow cuando comienza tu trabajo. Si no se especifica, el servicio de Cloud Dataflow determinará una cantidad apropiada de trabajadores.
maxNumWorkers int La cantidad máxima de instancias de Compute Engine que estarán disponibles para tu canalización durante la ejecución. Ten en cuenta que puede ser mayor que la cantidad inicial de trabajadores (especificada por numWorkers para permitir que tu trabajo escale de forma automática o no). Si no se especifica, el servicio de Cloud Dataflow determinará una cantidad apropiada de trabajadores.
zone String La zona de disponibilidad de Compute Engine para iniciar instancias de trabajadores y ejecutarlas en tu canalización. Si no está configurado, se establece de manera predeterminada como us-central1-f. Puedes especificar una zona de disponibilidad diferente para garantizar, por ejemplo, que tus trabajadores se inicien en la UE.
filesToStage List<String> Una lista de archivos locales, directorios de archivos o archivos (JAR o ZIP) para poner a disposición de cada trabajador. Si configuras esta opción, solo se subirán los archivos que especifiques (se ignorará la ruta de clase de Java). Debes especificar todos tus recursos en el orden correcto de la ruta de clase. Los recursos no se limitan al código, también pueden incluir archivos de configuración y otros recursos que estarán disponibles para todos los trabajadores. Tu código puede acceder a los recursos enumerados mediante los métodos de búsqueda de recursos de Java estándar. Precaución: La especificación de una ruta de acceso del directorio poco eficaz debido a que Cloud Dataflow comprimirá los archivos antes de subirlos, lo que implica un costo de tiempo de inicio mayor. Además, no uses esta opción para transferir datos a los trabajadores que la canalización debe procesar, ya que hacer eso es mucho más lento que usar las API nativas de Cloud Storage/BigQuery combinadas con la fuente de datos de Cloud Dataflow adecuada. Si filesToStage está vacía, Cloud Dataflow inferirá los archivos a la etapa según la ruta de clase de Java. Las consideraciones y precauciones mencionadas en la columna de la izquierda también se aplican aquí (tipos de archivos para enumerar y cómo acceder a ellos desde tu código).
network String La red de Compute Engine para iniciar instancias de Compute Engine a fin de ejecutar tu canalización. Consulta cómo especificar tu red. Si no está configurado, GCP supone que deseas usar una red llamada default.
subnetwork String La subred de Compute Engine para iniciar instancias de Compute Engine a fin de ejecutar tu canalización. Consulta cómo especificar tu subred. El servicio de Cloud Dataflow determina el valor predeterminado.
diskSizeGb int El tamaño del disco, en gigabytes, para usar en cada instancia de trabajador de Compute Engine remota. El tamaño mínimo de disco es de 30 GB para almacenar la imagen de arranque del trabajador y los registros locales. Si tu canalización redistribuye los datos, debes asignar más que el mínimo.

Configúralo en 0 para usar el tamaño predeterminado definido en tu proyecto de GCP.

Advertencia: Reducir el tamaño del disco disminuye la redistribución de E/S disponible. Los trabajos vinculados a la redistribución pueden aumentar el entorno de ejecución y el costo del trabajo.

workerDiskType String El tipo de disco persistente que se usará, especificado por una URL completa del recurso de tipo de disco. Por ejemplo, usa compute.googleapis.com/projects//zones//diskTypes/pd-ssd para especificar un disco persistente SSD. Si quieres obtener más información, consulta la página de referencia de la API de Compute Engine de diskTypes. El servicio de Cloud Dataflow determina el valor predeterminado.
workerMachineType String El tipo de máquina de Compute Engine que usará Cloud Dataflow cuando inicie VM de trabajador. Cloud Dataflow admite series de n1 y tipos personalizados de máquinas. Los tipos de máquina de núcleo compartido como los trabajadores de las series f1 y g1 no se admiten en el Acuerdo de Nivel de Servicio de Cloud Dataflow. El servicio de Cloud Dataflow elegirá el tipo de máquina en función de tu trabajo, si no configuras esta opción.

Consulta la documentación de referencia de la API de Java para la interfaz de PipelineOptions (y sus subinterfaces) a fin de obtener la lista completa de opciones de configuración de la canalización.

Configura PipelineOptions para la ejecución local

En lugar de ejecutar tu canalización en recursos de la nube administrados, puedes elegir ejecutar tu canalización de forma local. La ejecución local tiene ciertas ventajas para probar, depurar o ejecutar tu canalización en conjuntos de datos pequeños. Por ejemplo, la ejecución local quita la dependencia del servicio remoto de Cloud Dataflow y el proyecto de GCP asociado.

Cuando usas la ejecución local, se recomienda que ejecutes tu canalización con conjuntos de datos que sean tan pequeños como para caber en la memoria local. Puedes crear un conjunto de datos pequeño en la memoria con una transformación de Create o puedes usar una transformación de Read para trabajar con archivos locales o remotos pequeños. La ejecución local proporciona una forma rápida y fácil de realizar pruebas y depurar con menos dependencias externas, pero estará limitada por la memoria disponible en tu entorno local.

En el siguiente código de ejemplo, se muestra cómo crear una canalización que se ejecute en tu entorno local.

Java: SDK 2.x

  // Create and set our Pipeline Options.
  PipelineOptions options = PipelineOptionsFactory.create();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Nota: Para el modo local, no debes configurar el ejecutor, dado que DirectRunner ya es el valor predeterminado. Sin embargo, es necesario que incluyas de forma explícita DirectRunner como una dependencia o que lo agregues a la ruta de clase.

Python

# Create and set your Pipeline Options.
options = PipelineOptions()
p = Pipeline(options=options)

Nota: Para el modo local, no debes configurar el ejecutor, dado que DirectRunner ya es el valor predeterminado.

Java: SDK 1.x

  // Create and set our Pipeline Options.
  PipelineOptions options = PipelineOptionsFactory.create();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Nota: Para el modo local, no debes configurar el ejecutor, dado que DirectPipelineRunner ya es el valor predeterminado.

Después de crear tu canalización, ejecútala.

Configura otras opciones de canalización local

Cuando ejecutas tu canalización de manera local, los valores predeterminados para las propiedades en PipelineOptions suelen ser suficientes.

Java: SDK 2.x

Puedes encontrar los valores predeterminados para las PipelineOptions de Java en la referencia de la API de Java; consulta la lista de clase de PipelineOptions para obtener detalles completos.

Si tu canalización usa servicios de GCP, como BigQuery o Cloud Storage para IO, es posible que debas configurar ciertas opciones de credenciales y proyectos de GCP. En esos casos, debes usar GcpOptions.setProject para configurar tu ID del proyecto de Google Cloud. También es posible que debas establecer las credenciales de forma explícita. Consulta la clase GcpOptions, para obtener detalles completos.

Python

Puedes encontrar los valores predeterminados para las PipelineOptions de Python en el módulo de options.py.

Si tu canalización usa servicios de GCP, como BigQuery o Cloud Storage para E/S, es posible que debas configurar ciertas opciones de credenciales y proyectos de GCP. En esos casos, debes usar options.view_as(GoogleCloudOptions).project para configurar tu ID del proyecto de Google Cloud. También es posible que debas establecer las credenciales de forma explícita. Consulta la clase GoogleCloudOptions, para obtener detalles completos.

Java: SDK 1.x

Puedes encontrar los valores predeterminados para las PipelineOptions de Java en la referencia de la API de Java; consulta la lista de clase de PipelineOptions para obtener detalles completos.

Si tu canalización usa servicios de GCP, como BigQuery o Cloud Storage para E/S, es posible que debas configurar ciertas opciones de credenciales y proyectos de GCP. En esos casos, debes usar GcpOptions.setProject para configurar tu ID del proyecto de Google Cloud. También es posible que debas establecer las credenciales de forma explícita. Consulta la clase GcpOptions, para obtener detalles completos.

¿Te ha resultado útil esta página? Enviar comentarios:

Enviar comentarios sobre...

Si necesitas ayuda, visita nuestra página de asistencia.