Especifica los parámetros de ejecución de la canalización

Después de que tu programa de Apache Beam crea una canalización, deberás ejecutarla. La ejecución de la canalización es independiente de la ejecución del programa de Apache Beam; tu programa de Apache Beam crea la canalización, y el código que escribiste genera una serie de pasos que debe ejecutar un ejecutor de canalizaciones. El corredor de canalizaciones puede ser el servicio administrado de Dataflow en Google Cloud, un servicio de corredor externo o un corredor de canalización local que ejecuta los pasos directamente en el entorno local.

Puedes especificar el ejecutor de canalizaciones y otras opciones de ejecución mediante la clase PipelineOptions del SDK de Apache Beam. Utiliza PipelineOptions para configurar la forma y la ubicación en las que se ejecuta tu canalización y los recursos que usa.

En la mayoría de los casos, le recomendamos que su canalización se ejecute en recursos administrados por Google Cloud mediante el servicio de corredor de Dataflow. Al ejecutar tu canalización con el servicio de Dataflow, se crea un trabajo de Dataflow, que usa recursos de Compute Engine y Cloud Storage en tu proyecto de Google Cloud.

También puedes ejecutar tu canalización de manera local. Cuando ejecuta su canalización de manera local, las transformaciones de canalización se ejecutan en la misma máquina donde se ejecuta su programa de Dataflow. La ejecución local es útil para realizar pruebas y depurar, en particular, si tu canalización puede usar conjuntos de datos en la memoria más pequeños.

Configura las PipelineOptions

Usted pasa PipelineOptions cuando crea su objeto Pipeline en su programa de Dataflow. Cuando el servicio de Dataflow ejecuta su canalización, envía una copia de PipelineOptions a cada instancia de trabajador.

Java: SDK 2.x

Nota: Puedes acceder a PipelineOptions dentro de cualquier instancia de DoFn de ParDo a través del método ProcessContext.getPipelineOptions.

Python

Esta función aún no se admite en el SDK de Apache Beam para Python.

Java: SDK 1.x

Configura las PipelineOptions desde argumentos de la línea de comandos

Si bien puedes configurar tu canalización mediante la creación de un objeto PipelineOptions y la configuración directa de los campos, los SDK de Apache Beam incluyen un analizador de línea de comandos que puedes usar para configurar campos en PipelineOptions con los argumentos de línea de comandos.

Java: SDK 2.x

Para leer las opciones desde la línea de comandos, crea tu objeto PipelineOptions con el método PipelineOptionsFactory.fromArgs, como en el siguiente código de ejemplo:

    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
    

Nota: Adjuntar el método .withValidation hace que Dataflow verifique los argumentos de línea de comandos requeridos y valide los valores de argumento.

Usar PipelineOptionsFactory.fromArgs interpreta los argumentos de la línea de comandos que siguen el formato:

    --<option>=<value>
    

Compilar tu objeto PipelineOptions de esta forma te permite especificar cualquiera de las opciones en cualquier subinterfaz de org.apache.beam.sdk.options.PipelineOptions como un argumento de la línea de comandos.

Python

Para leer las opciones desde la línea de comandos, crea tu objeto PipelineOptions, como en el siguiente código de ejemplo:

    options = PipelineOptions(flags=argv)

    

El argumento (flags=argv) de PipelineOptions interpreta los argumentos de la línea de comandos que siguen el formato:

    --<option>=<value>
    

Compilar tu objeto PipelineOptions de esta manera te permite especificar cualquiera de las opciones mediante la creación de subclases a partir de PipelineOptions.

Java: SDK 1.x

Crea opciones personalizadas

Puedes agregar tus propias opciones personalizadas además del objeto PipelineOptions estándar. El analizador de línea de comandos de Dataflow también puede establecer sus opciones personalizadas mediante argumentos de línea de comandos especificados en el mismo formato.

Java: SDK 2.x

Si quieres agregar tus propias opciones, define una interfaz con métodos set y get para cada opción, como en el siguiente ejemplo:

      public interface MyOptions extends PipelineOptions {
        String getMyCustomOption();
        void setMyCustomOption(String myCustomOption);
      }
    

Python

Para agregar tus propias opciones, usa el método add_argument() (que se comporta de la misma manera que el módulo argparse estándar de Python), como en el siguiente ejemplo:

    class MyOptions(PipelineOptions):
      @classmethod
      def _add_argparse_args(cls, parser):
        parser.add_argument('--input')
        parser.add_argument('--output')

    

Java: SDK 1.x

También puedes especificar una descripción, que aparece cuando un usuario pasa --help como un argumento de la línea de comandos y un valor predeterminado.

Java: SDK 2.x

Establece la descripción y el valor predeterminado con anotaciones, de la siguiente manera:

      public interface MyOptions extends PipelineOptions {
        @Description("My custom command line argument.")
        @Default.String("DEFAULT")
        String getMyCustomOption();
        void setMyCustomOption(String myCustomOption);
      }
    

Se recomienda que registres tu interfaz con PipelineOptionsFactory y pases la interfaz cuando crees el objeto PipelineOptions. Cuando registras tu interfaz con PipelineOptionsFactory, --help puede encontrar tu interfaz de opciones personalizadas y agregarla al resultado del comando --help. PipelineOptionsFactory también validará que tus opciones personalizadas sean compatibles con todas las otras opciones registradas.

En el siguiente código de ejemplo, se muestra cómo registrar tu interfaz de opciones personalizadas con PipelineOptionsFactory:

      PipelineOptionsFactory.register(MyOptions.class);
      MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                                .withValidation()
                                                .as(MyOptions.class);
    

Ahora tu canalización puede aceptar --myCustomOption=value como un argumento de la línea de comandos.

Python

Establece la descripción y el valor predeterminado de la siguiente manera:

    class MyOptions(PipelineOptions):
      @classmethod
      def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--input',
            help='Input for the pipeline',
            default='gs://my-bucket/input')
        parser.add_argument(
            '--output',
            help='Output for the pipeline',
            default='gs://my-bucket/output')

    

Java: SDK 1.x

Configura PipelineOptions para su ejecución en el servicio de Cloud Dataflow

Para ejecutar tu canalización con el servicio administrado de Dataflow, debes configurar los siguientes campos en PipelineOptions:

Java: SDK 2.x

  • project: El ID de tu proyecto de Google Cloud.
  • runner: El ejecutor de canalizaciones que analizará tu programa y creará tu canalización. Para la ejecución en la nube, debe ser DataflowRunner.
  • gcpTempLocation: Una ruta de acceso de Cloud Storage para que Dataflow muestre los archivos temporales. Debes crear este depósito con anticipación, antes de ejecutar tu canalización. En caso de que no especifiques gcpTempLocation, puedes especificar la opción de canalización tempLocation y, luego, gcpTempLocation se configurará en el valor de tempLocation. Si no se especifica ninguna, se creará una opción gcpTempLocation predeterminada.
  • stagingLocation: Un depósito de Cloud Storage para que Dataflow muestre sus archivos binarios. Si no configuras esta opción, el valor que especificaste para tempLocation se usará también en la ubicación de etapa de pruebas.
  • Se creará una ubicación gcpTempLocation predeterminada si no se especifica esta ni tempLocation. Si se especifica tempLocation, pero no gcpTempLocation, tempLocation será una ruta de acceso de Cloud Storage, que gcpTempLocation usará como la ruta predeterminada. Si no se especifica tempLocation y se especifica gcpTempLocation, tempLocation no se propagará.

Nota: Si usas el SDK de Apache Beam para Java 2.15.0 o posterior, también debes especificar region.

Python

  • job_name: el nombre del trabajo de Dataflow que se está ejecutando.
  • project: El ID de tu proyecto de Google Cloud.
  • runner: El ejecutor de canalizaciones que analizará tu programa y creará tu canalización. Para la ejecución en la nube, debe ser DataflowRunner.
  • staging_location: Una ruta de acceso de Cloud Storage para que Dataflow organice los paquetes de código necesarios para los trabajadores que ejecutan el trabajo.
  • temp_location: una ruta de acceso de Cloud Storage para que Dataflow muestre los archivos de trabajo temporales creados durante la ejecución de la canalización.

Nota: Si usa el SDK de Apache Beam para Python 2.15.0 o posterior, también debe especificar region.

Java: SDK 1.x

Puedes configurar estas opciones de manera programática o especificarlas con la línea de comandos. El siguiente código de ejemplo muestra cómo construir una canalización mediante configuración programática del corredor y otras opciones necesarias para ejecutar la canalización mediante el servicio administrado de Dataflow.

Java: SDK 2.x

      // Create and set your PipelineOptions.
      DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

      // For Cloud execution, set the Cloud Platform project, staging location,
      // and specify DataflowRunner.
      options.setProject("my-project-id");
      options.setStagingLocation("gs://my-bucket/binaries");
      options.setRunner(DataflowRunner.class);

      // Create the Pipeline with the specified options.
      Pipeline p = Pipeline.create(options);
    

Python

    # Create and set your PipelineOptions.
    options = PipelineOptions(flags=argv)

    # For Cloud execution, set the Cloud Platform project, job_name,
    # staging location, temp_location and specify DataflowRunner.
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = 'my-project-id'
    google_cloud_options.job_name = 'myjob'
    google_cloud_options.staging_location = 'gs://my-bucket/binaries'
    google_cloud_options.temp_location = 'gs://my-bucket/temp'
    options.view_as(StandardOptions).runner = 'DataflowRunner'

    # Create the Pipeline with the specified options.
    p = Pipeline(options=options)
    

Java: SDK 1.x

Después de crear tu canalización, especifica todas las transformaciones y las operaciones de lectura y escritura de la canalización y ejecútala.

El siguiente código de ejemplo muestra cómo configurar las opciones obligatorias para la ejecución del servicio de Dataflow con la línea de comandos:

Java: SDK 2.x

      // Create and set your PipelineOptions.
      MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

      // Create the Pipeline with the specified options.
      Pipeline p = Pipeline.create(options);
    

Python

    # Use Python argparse module to parse custom arguments
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument('--input')
    parser.add_argument('--output')
    known_args, pipeline_args = parser.parse_known_args(argv)

    # Create the Pipeline with remaining arguments.
    with beam.Pipeline(argv=pipeline_args) as p:
      lines = p | 'ReadFromText' >> beam.io.ReadFromText(known_args.input)
      lines | 'WriteToText' >> beam.io.WriteToText(known_args.output)
    

Java: SDK 1.x

Después de crear tu canalización, especifica todas las transformaciones y las operaciones de lectura y escritura de la canalización y ejecútala.

Java: SDK 2.x

Cuando pases las opciones necesarias en la línea de comandos, usa las opciones --project, --runner, --gcpTempLocationy, de forma opcional, --stagingLocation.

Python

Cuando pases las opciones necesarias en la línea de comandos, usa las opciones --project, --runner y --stagingLocation.

Java: SDK 1.x

Ejecución asíncrona

Java: SDK 2.x

Usar DataflowRunner hace que tu canalización se ejecute de forma asíncrona en la nube de Google. Mientras se ejecuta la canalización, puede supervisar el progreso del trabajo, ver detalles de ejecución y recibir actualizaciones sobre los resultados de la canalización mediante la interfaz de Dataflow Monitoring o la interfaz de línea de comandos de Dataflow.

Python

Usar DataflowRunner hace que tu canalización se ejecute de forma asíncrona en la nube de Google. Mientras se ejecuta la canalización, puede supervisar el progreso del trabajo, ver detalles de ejecución y recibir actualizaciones sobre los resultados de la canalización mediante la interfaz de Dataflow Monitoring o la interfaz de línea de comandos de Dataflow.

Java: SDK 1.x

Ejecución de bloqueo

Java: SDK 2.x

Especifica DataflowRunner como el ejecutor de canalizaciones y llama de forma explícita a pipeline.run().waitUntilFinish().

Cuando utilizas DataflowRunner y llamas a waitUntilFinish() en el objeto PipelineResult que se muestra en pipeline.run(), la canalización se ejecuta en la nube de la misma manera que DataflowRunner, pero espera a que el trabajo iniciado finalice y muestre el objeto final DataflowPipelineJob. Mientras se ejecuta el trabajo, el servicio de Dataflow imprime las actualizaciones de estado del trabajo y los mensajes de la consola mientras espera.

Si eras usuario del SDK de Java 1.x y usabas --runner BlockingDataflowPipelineRunner en la línea de comandos para hacer que tu programa principal se bloquee de forma interactiva hasta que la canalización finalice, entonces, con Java 2.x, tu programa principal necesita llamar de manera explícita a waitUntilFinish().

Python

Utiliza el método wait_until_finish() del objeto PipelineResult, que se muestra a partir del método run() del ejecutor, para bloquear hasta que se complete la canalización.

Java: SDK 1.x

Nota: Si escribes Ctrl+C desde la línea de comandos, no se cancela tu trabajo. El servicio de Dataflow aún está ejecutando el trabajo en Google Cloud; para cancelar el trabajo, deberá usar la interfaz de Dataflow Monitoring o la interfaz de línea de comandos de Dataflow.

Ejecución de la transmisión

Java: SDK 2.x

Si tu canalización lee desde una fuente de datos no delimitada, como Cloud Pub/Sub, la canalización se ejecuta automáticamente en modo de transmisión.

Si tu canalización usa fuentes de datos y receptores no delimitados, es necesario seleccionar una estrategia del sistema de ventanas para tus PCollections no delimitadas antes de que se pueda usar cualquier agregación como un GroupByKey.

Python

Si tu canalización utiliza una fuente de datos o un receptor no delimitados (como Pub/Sub), debes configurar la opción streaming como verdadera.

Los trabajos de transmisión utilizan un tipo de máquina de Compute Engine de n1-standard-2 o superior según la configuración predeterminada. No debes anular esto, puesto que n1-standard-2 es el tipo de máquina mínimo necesario para ejecutar trabajos de transmisión.

Si tu canalización usa fuentes de datos y receptores no delimitados, debes seleccionar una estrategia del sistema de ventanas para tus PCollections no delimitadas antes de usar cualquier agregación, como GroupByKey.

Java: SDK 1.x

Configura otras opciones de canalización de Cloud Dataflow

Para ejecutar tu canalización en la nube, puedes establecer los siguientes campos de manera programática en tu objeto PipelineOptions:

Java: SDK 2.x

Campo Tipo Descripción Valor predeterminado
runner Class (NameOfRunner) El PipelineRunner que se debe usar. Este campo te permite determinar el PipelineRunner en el entorno de ejecución. DirectRunner (modo local)
streaming boolean Especifica si el modo de transmisión está habilitado o inhabilitado; es verdadero si está habilitado. Si tu canalización lee desde una fuente no delimitada, el valor predeterminado es true. De lo contrario, es false.
project String El ID del proyecto para tu proyecto de Google Cloud Esto es obligatorio si desea ejecutar su canalización con el servicio administrado de Dataflow. Si no está configurado, se establece de manera predeterminada en el proyecto configurado actual en el SDK de Cloud.
gcpTempLocation String Ruta de Cloud Storage para archivos temporales. Debe ser una URL válida de Cloud Storage que comience con gs://.
stagingLocation String Ruta de acceso de Cloud Storage para archivos locales de etapa de pruebas. Debe ser una URL válida de Cloud Storage que comience con gs://. Si no está configurado, se establece de manera predeterminada en lo que especificaste para tempLocation.
autoscalingAlgorithm String El modo de ajuste de escala automático para tu trabajo de Dataflow. Los valores posibles son THROUGHPUT_BASED para habilitar el ajuste de escala automático o NONE para inhabilitarlo. Consulta Funciones de configuración automática para obtener más información sobre el funcionamiento del ajuste de escala automático en el servicio administrado de Dataflow. La configuración predeterminada es THROUGHPUT_BASED para todos los trabajos por lotes de Dataflow que usan el SDK de Dataflow de la versión 1.6.0 o posterior para Java. El valor predeterminado es NONE para los trabajos de transmisión o los trabajos por lotes que usan versiones anteriores del SDK de Dataflow para Java.
numWorkers int La cantidad inicial de instancias de Google Compute Engine que se usarán cuando ejecutes tu canalización. Esta opción determina cuántos trabajadores inician el servicio de Dataflow cuando comienza tu trabajo. Si no se especifica, el servicio de Dataflow determina una cantidad adecuada de trabajadores.
maxNumWorkers int La cantidad máxima de instancias de Compute Engine que estarán disponibles para tu canalización durante la ejecución. Ten en cuenta que puede ser mayor que la cantidad inicial de trabajadores (especificada por numWorkers para permitir que tu trabajo escale de forma automática o no). Si no se especifica, el servicio de Dataflow determinará una cantidad adecuada de trabajadores.
region String Especificar un extremo regional le permite definir una región para implementar sus trabajos de Dataflow. Si no se establece, el valor predeterminado es us-central1.
zone String La zona de Compute Engine para iniciar instancias de trabajadores y ejecutarlas en tu canalización. Si especificaste el parámetro region, el parámetro zone se establecerá de manera predeterminada en una zona de la región correspondiente. Puedes anular este comportamiento mediante la especificación de una zona diferente.
dataflowKmsKey String Especifica la clave de encriptación administrada por el cliente (CMEK) que se utiliza para encriptar datos en reposo. Puedes controlar la clave de encriptación a través de Cloud KMS. También debes especificar gcpTempLocation para usar esta función. Si no se especifica, Dataflow usa la encriptación de Google Cloud predeterminada en lugar de una CMEK.
flexRSGoal String Especifica la Programación flexible de recursos (FlexRS) para trabajos por lotes con ajuste de escala automático. Afecta los parámetros numWorkers, autoscalingAlgorithm, zone, region y workerMachineType. Para obtener más información, consulta la sección de opciones de canalización de FlexRS. Si no se especifica, el valor predeterminado es SPEED_OPTIMIZED, que es lo mismo que omitir esta marca. Para activar FlexRS, debe especificar el valor COST_OPTIMIZED para permitir que el servicio de Dataflow elija los recursos con descuento disponibles.
filesToStage List<String> Una lista de archivos locales, directorios de archivos o archivos (.jar o .zip) para poner a disposición de cada trabajador. Si configuras esta opción, solo se subirán los archivos que especifiques (se ignorará la ruta de clase de Java). Debes especificar todos tus recursos en el orden correcto de la ruta de clase. Los recursos no se limitan al código, también pueden incluir archivos de configuración y otros recursos que estarán disponibles para todos los trabajadores. Tu código puede acceder a los recursos enumerados mediante los métodos de búsqueda de recursos de Java estándar. Precauciones: La especificación de una ruta del directorio es poco eficaz ya que Dataflow comprime los archivos antes de subirlos, lo que implica un costo de inicio más alto. Además, no use esta opción para transferir datos a los trabajadores que la canalización debe procesar, ya que hacerlo es mucho más lento que usar las API Cloud Storage/BigQuery nativas combinadas con la fuente de datos Dataflow adecuada. Si filesToStage está en blanco, Dataflow inferirá los archivos a la etapa según la ruta de clases de Java. Las consideraciones y precauciones mencionadas en la columna de la izquierda también se aplican aquí (tipos de archivos para enumerar y cómo acceder a ellos desde tu código).
network String La red de Compute Engine para iniciar instancias de Compute Engine a fin de ejecutar tu canalización. Consulta cómo especificar tu red. Si no se establece, Google Cloud supone que tienes la intención de usar una red llamada default.
subnetwork String La subred de Compute Engine para iniciar instancias de Compute Engine a fin de ejecutar tu canalización. Consulta cómo especificar tu subred. El servicio de Dataflow determina el valor predeterminado.
usePublicIps boolean Especifica si los trabajadores de Dataflow usan direcciones IP públicas. Si el valor se establece en false, los trabajadores de Dataflow usan direcciones IP privadas para todas las comunicaciones. En este caso, si se especifica la opción subnetwork, se ignora la opción network. Asegúrate de que las network o subnetwork especificadas tengan habilitado el Acceso privado a Google. Si no se configura, el valor predeterminado es true y los trabajadores de Dataflow usan direcciones IP públicas.
enableStreamingEngine boolean Especifica si Dataflow Streaming Engine está habilitado o no; verdadero si está habilitado. Habilitar Streaming Engine te permite ejecutar los pasos de tu canalización de transmisión en el backend del servicio de Dataflow, conservando así los recursos de CPU, memoria y almacenamiento de Persistent Disk. El valor predeterminado es false. Esto significa que los pasos de tu canalización de transmisión se ejecutan completamente en las VM de trabajador.
diskSizeGb int

El tamaño del disco, en gigabytes, para usar en cada instancia de trabajador de Compute Engine remota. Si se establece, especifique al menos 30&nbspGB para tener en cuenta la imagen de inicio del trabajador y los registros locales.

En el caso de los trabajos por lotes, esta opción establece el tamaño del disco de arranque de una VM de un trabajador.

Si un trabajo de transmisión utiliza Streaming Engine, esta opción establece el tamaño de los discos de inicio. Para los trabajos de transmisión que no usan Streaming Engine, esta opción establece el tamaño de cada disco persistente adicional creado por el servicio de Dataflow; el disco de arranque no se ve afectado. Si un trabajo de transmisión no utiliza Streaming Engine, puedes establecer el tamaño de disco de inicio con la marca de experimento streaming_boot_disk_size_gb. Por ejemplo, especifica --experiments=streaming_boot_disk_size_gb=80 para crear discos de arranque de 80 GB.

Configúralo en 0 para usar el tamaño predeterminado definido en tu proyecto de Cloud Platform.

Si un trabajo por lotes usa Dataflow Shuffle, el valor predeterminado es 25& nbspGB; de lo contrario, el valor predeterminado es 250& nbspGB.

Si un trabajo de transmisión utiliza Streaming Engine, el valor predeterminado es 30& nbspGB; de lo contrario, el valor predeterminado es 400& nbspGB.

Advertencia: Reducir el tamaño del disco reduce la E/S aleatoria disponible. Los trabajos vinculados a la redistribución que no usan Dataflow Shuffle o Streaming Engine pueden aumentar el entorno de ejecución y el costo del trabajo.

serviceAccount String Especifica una cuenta de servicio de controlador administrada por el usuario, mediante el formato my-service-account-name@<project-id>.iam.gserviceaccount.com. Para obtener más información, consulta la sección Cuenta de servicio del controlador de la página de seguridad y permisos de Cloud Dataflow. Si no se configura, los trabajadores utilizan la cuenta de servicio de Compute Engine de tu proyecto como la cuenta de servicio del controlador.
workerDiskType String El tipo de disco persistente que se usará, especificado por una URL completa del recurso de tipo de disco. Por ejemplo, usa compute.googleapis.com/projects//zones//diskTypes/pd-ssd para especificar un disco persistente SSD. Si quieres obtener más información, consulta la página de referencia de la API de Compute Engine para diskTypes. El servicio de Dataflow determina el valor predeterminado.
workerMachineType String

El tipo de máquina de Compute Engine que Dataflow usa cuando inicia las VM de los trabajadores. Puedes usar cualquiera de las familias de tipos de máquina de Compute Engine disponibles, así como los tipos de máquina personalizados.

Para obtener mejores resultados, usa tipos de máquina n1. Los tipos de máquinas de núcleo compartido, como los trabajadores de la serie f1 y g1, no son compatibles con el Acuerdo de nivel de servicio de Dataflow.

Tenga en cuenta que Dataflow factura por la cantidad de CPU virtuales y GB de memoria de los trabajadores. La facturación es independiente de la familia de tipos de máquina.

El servicio de Dataflow elegirá el tipo de máquina según su trabajo si no establece esta opción.

Consulta la documentación de referencia de la API de Java para la interfaz de PipelineOptions (y sus subinterfaces) a fin de obtener la lista completa de opciones de configuración de la canalización.

Python

Campo Tipo Descripción Valor predeterminado
runner str El PipelineRunner que se debe usar. Este campo puede ser DirectRunner o DataflowRunner. DirectRunner (modo local)
streaming bool Especifica si el modo de transmisión está habilitado o inhabilitado; es verdadero si está habilitado. false
project str El ID del proyecto para tu proyecto de Google Cloud Esto es obligatorio si desea ejecutar su canalización con el servicio administrado de Dataflow. Si no se configura, muestra un error.
temp_location str Ruta de Cloud Storage para archivos temporales. Debe ser una URL válida de Cloud Storage que comience con gs://. Si no está configurado, se establece de manera predeterminada en el valor de staging_location. Debes especificar, al menos, uno de los valores temp_location o staging_location para ejecutar tu canalización en la nube de Google.
staging_location str Ruta de acceso de Cloud Storage para archivos locales de etapa de pruebas. Debe ser una URL válida de Cloud Storage que comience con gs://. Si no está configurado, se establece de manera predeterminada en un directorio de etapa de pruebas dentro de temp_location. Debes especificar, al menos, uno de los valores temp_location o staging_location para ejecutar tu canalización en la nube de Google.
autoscaling_algorithm str El modo de ajuste de escala automático para tu trabajo de Dataflow. Los valores posibles son THROUGHPUT_BASED para habilitar el ajuste de escala automático o NONE para inhabilitarlo. Consulta Funciones de configuración automática para obtener más información sobre el funcionamiento del ajuste de escala automático en el servicio administrado de Dataflow. El valor predeterminado es THROUGHPUT_BASED para todos los trabajos por lotes de Dataflow y NONE para todos los trabajos de transmisión de Dataflow.
num_workers int La cantidad de instancias de Compute Engine que se usarán cuando ejecutes tu canalización. Si no se especifica, el servicio de Dataflow determinará una cantidad adecuada de trabajadores.
max_num_workers int La cantidad máxima de instancias de Compute Engine que estarán disponibles para tu canalización durante la ejecución. Ten en cuenta que puede ser mayor que la cantidad inicial de trabajadores (especificada por num_workers para permitir que tu trabajo escale de forma automática o no). Si no se especifica, el servicio de Dataflow determinará una cantidad adecuada de trabajadores.
region str Especificar un extremo regional le permite definir una región para implementar sus trabajos de Dataflow. Si no se establece, el valor predeterminado es us-central1.
zone str La zona de Compute Engine para iniciar instancias de trabajadores y ejecutarlas en tu canalización. Si especificaste el parámetro region, el parámetro zone se establecerá de manera predeterminada en una zona de la región correspondiente. Puedes anular este comportamiento mediante la especificación de una zona diferente.
dataflow_kms_key str Especifica la clave de encriptación administrada por el cliente (CMEK) que se utiliza para encriptar datos en reposo. Puedes controlar la clave de encriptación a través de Cloud KMS. También debes especificar temp_location para usar esta función. Si no se especifica, Dataflow usa la encriptación de Google Cloud predeterminada en lugar de una CMEK.
flexrs_goal str Especifica la Programación flexible de recursos (FlexRS) para trabajos por lotes con ajuste de escala automático. Afecta los parámetros num_workers, autoscaling_algorithm, zone, region y machine_type. Para obtener más información, consulta la sección de opciones de canalización de FlexRS. Si no se especifica, el valor predeterminado es SPEED_OPTIMIZED, que es lo mismo que omitir esta marca. Para activar FlexRS, debe especificar el valor COST_OPTIMIZED para permitir que el servicio de Dataflow elija los recursos con descuento disponibles.
network str La red de Compute Engine para iniciar instancias de Compute Engine a fin de ejecutar tu canalización. Consulta cómo especificar tu red. Si no se establece, Google Cloud supone que tienes la intención de usar una red llamada default.
subnetwork str La subred de Compute Engine para iniciar instancias de Compute Engine a fin de ejecutar tu canalización. Consulta cómo especificar tu subred. El servicio de Dataflow determina el valor predeterminado.
no_use_public_ips bool Especifica que los trabajadores de Dataflow no deben usar direcciones IP públicas. Como resultado, los trabajadores de Dataflow usarán direcciones IP privadas para todas las comunicaciones. Si se especifica la opción subnetwork, se omite la opción network. Asegúrate de que las network o subnetwork especificadas tengan habilitado el Acceso privado a Google. El parámetro de las IP públicas necesita el SDK de Beam para Python. El SDK de Dataflow para Python no admite este parámetro. Si no están configurados, los trabajadores de Dataflow usan direcciones IP públicas.
enable_streaming_engine boolean Especifica si Dataflow Streaming Engine está habilitado o no; verdadero si está habilitado. Habilitar Streaming Engine te permite ejecutar los pasos de tu canalización de transmisión en el backend del servicio de Dataflow, conservando así los recursos de CPU, memoria y almacenamiento de Persistent Disk. El valor predeterminado es false. Esto significa que los pasos de tu canalización de transmisión se ejecutan completamente en las VM de trabajador.
disk_size_gb int

El tamaño del disco, en gigabytes, para usar en cada instancia de trabajador de Compute Engine remota. Si se establece, especifique al menos 30&nbspGB para tener en cuenta la imagen de inicio del trabajador y los registros locales.

En el caso de los trabajos por lotes, esta opción establece el tamaño del disco de arranque de una VM de un trabajador.

Si un trabajo de transmisión utiliza Streaming Engine, esta opción establece el tamaño de los discos de inicio. Para los trabajos de transmisión que no usan Streaming Engine, esta opción establece el tamaño de cada disco persistente adicional creado por el servicio de Dataflow; el disco de arranque no se ve afectado. Si un trabajo de transmisión no utiliza Streaming Engine, puedes establecer el tamaño de disco de inicio con la marca de experimento streaming_boot_disk_size_gb. Por ejemplo, especifica --experiments=streaming_boot_disk_size_gb=80 para crear discos de arranque de 80 GB.

Configúralo en 0 para usar el tamaño predeterminado definido en tu proyecto de Cloud Platform.

Si un trabajo por lotes usa Dataflow Shuffle, el valor predeterminado es 25& nbspGB; de lo contrario, el valor predeterminado es 250& nbspGB.

Si un trabajo de transmisión utiliza Streaming Engine, el valor predeterminado es 30& nbspGB; de lo contrario, el valor predeterminado es 400& nbspGB.

Advertencia: Reducir el tamaño del disco reduce la E/S aleatoria disponible. Los trabajos vinculados a la redistribución que no usan Dataflow Shuffle o Streaming Engine pueden aumentar el entorno de ejecución y el costo del trabajo.

service_account_email str Especifica una cuenta de servicio de controlador administrada por el usuario, mediante el formato my-service-account-name@<project-id>.iam.gserviceaccount.com. Para obtener más información, consulta la sección Cuenta de servicio del controlador de la página de seguridad y permisos de Cloud Dataflow. Si no se configura, los trabajadores utilizan la cuenta de servicio de Compute Engine de tu proyecto como la cuenta de servicio del controlador.
worker_disk_type str El tipo de disco persistente que se usará, especificado por una URL completa del recurso de tipo de disco. Por ejemplo, usa compute.googleapis.com/projects//zones//diskTypes/pd-ssd para especificar un disco persistente SSD. Si quieres obtener más información, consulta la página de referencia de la API de Compute Engine para diskTypes. El servicio de Dataflow determina el valor predeterminado.
machine_type str

El tipo de máquina de Compute Engine que Dataflow usa cuando inicia las VM de los trabajadores. Puedes usar cualquiera de las familias de tipos de máquina de Compute Engine disponibles, así como los tipos de máquina personalizados.

Para obtener mejores resultados, usa tipos de máquina n1. Los tipos de máquinas de núcleo compartido, como los trabajadores de la serie f1 y g1, no son compatibles con el Acuerdo de nivel de servicio de Dataflow.

Tenga en cuenta que Dataflow factura por la cantidad de CPU virtuales y GB de memoria de los trabajadores. La facturación es independiente de la familia de tipos de máquina.

El servicio de Dataflow elegirá el tipo de máquina según su trabajo si no establece esta opción.

Java: SDK 1.x

Configura PipelineOptions para la ejecución local

En lugar de ejecutar tu canalización en recursos de la nube administrados, puedes elegir ejecutar tu canalización de forma local. La ejecución local tiene ciertas ventajas para probar, depurar o ejecutar tu canalización en conjuntos de datos pequeños. Por ejemplo, la ejecución local quita la dependencia del servicio remoto de Dataflow y del proyecto de Google Cloud asociado.

Cuando uses la ejecución local, te recomendamos que ejecutes tu canalización con conjuntos de datos que sean tan pequeños como para caber en la memoria local. Puedes crear un conjunto de datos pequeño en la memoria con una transformación Create o puedes usar una transformación Read para trabajar con archivos locales o remotos pequeños. La ejecución local proporciona una forma rápida y fácil de realizar pruebas y depurar con menos dependencias externas, pero estará limitada por la memoria disponible en tu entorno local.

En el siguiente código de ejemplo, se muestra cómo crear una canalización que se ejecute en tu entorno local.

Java: SDK 2.x

      // Create and set our Pipeline Options.
      PipelineOptions options = PipelineOptionsFactory.create();

      // Create the Pipeline with the specified options.
      Pipeline p = Pipeline.create(options);
    

Nota: Para el modo local, no debes configurar el ejecutor, dado que DirectRunner ya es el valor predeterminado. Sin embargo, es necesario que incluyas de forma explícita a DirectRunner como una dependencia o que lo agregues a la ruta de clase.

Python

    # Create and set your Pipeline Options.
    options = PipelineOptions()
    with Pipeline(options=options) as p:
    

Nota: Para el modo local, no debes configurar el ejecutor, dado que DirectRunner ya es el valor predeterminado.

Java: SDK 1.x

Después de crear tu canalización, ejecútala.

Configura otras opciones de canalización local

Cuando ejecutas tu canalización de manera local, los valores predeterminados para las propiedades en PipelineOptions suelen ser suficientes.

Java: SDK 2.x

Puedes encontrar los valores predeterminados para PipelineOptions de Java en la referencia de la API para Java. Consulta la lista de clase de PipelineOptions para obtener detalles completos.

Si tu canalización utiliza Google Cloud, como BigQuery o Cloud Storage para IO, es posible que debas configurar ciertas opciones de credenciales y proyectos de Google Cloud. En esos casos, debes usar GcpOptions.setProject para configurar tu ID del proyecto de Google Cloud. También es posible que necesites configurar credenciales explícitamente. Consulta la clase GcpOptions para obtener detalles completos.

Python

Puedes encontrar los valores predeterminados para PipelineOptions de Python en la referencia de la API para Python. Consulta la lista del módulo de PipelineOptions para obtener detalles completos.

Si tu canalización utiliza servicios de Google Cloud, como BigQuery o Cloud Storage para IO, es posible que debas configurar ciertas opciones de credenciales y proyectos de Google Cloud. En esos casos, debes usar options.view_as(GoogleCloudOptions).project para configurar tu ID del proyecto de Google Cloud. Es posible que también debas configurar las credenciales de forma explícita. Consulta la clase GoogleCloudOptions para obtener detalles completos.

Java: SDK 1.x