Después de que tu programa de Apache Beam cree una canalización, deberás ejecutarla. La ejecución de la canalización es independiente de la ejecución del programa de Apache Beam; tu programa de Apache Beam crea la canalización, y el código que escribiste genera una serie de pasos que debe ejecutar un ejecutor de canalizaciones. El ejecutor de canalizaciones puede ser el servicio administrado de Dataflow en Google Cloud, un servicio de ejecutor de terceros o un ejecutor de canalización local que ejecuta los pasos directamente en el entorno local.
Puedes especificar el ejecutor de canalización y otras opciones de ejecución mediante la clase PipelineOptions
del SDK de Apache Beam. Usa PipelineOptions
para configurar la forma y la ubicación en las que se ejecuta tu canalización y los recursos que usa.
En la mayoría de los casos, te recomendamos que la canalización se ejecute en recursos administrados de Google Cloud mediante el servicio de ejecutor de Dataflow. Cuando ejecutas la canalización con el servicio de Dataflow, se crea un trabajo de Dataflow que usa recursos de Compute Engine y Cloud Storage en tu proyecto de Google Cloud.
También puedes ejecutar tu canalización de manera local. Cuando ejecutas la canalización de manera local, las transformaciones de la canalización se ejecutan en la misma máquina en la que se ejecuta tu programa de Dataflow. La ejecución local es útil para realizar pruebas y depurar, en particular, si tu canalización puede usar conjuntos de datos en la memoria más pequeños.
Configura las PipelineOptions
Pasas PipelineOptions
cuando creas el objeto Pipeline
en tu programa de Dataflow. Cuando el servicio de Dataflow ejecuta tu canalización, envía una copia de PipelineOptions
a cada instancia de trabajador.
Java: SDK 2.x
Nota: Puedes acceder a PipelineOptions
dentro de cualquier instancia de DoFn
de ParDo
a través del método ProcessContext.getPipelineOptions
.
Python
Esta función aún no se admite en el SDK de Apache Beam para Python.
Java: SDK 1.x
Configura las PipelineOptions desde argumentos de la línea de comandos
Si bien puedes configurar tu canalización mediante la creación de un objeto PipelineOptions
y la configuración directa de los campos, los SDK de Apache Beam incluyen un analizador de línea de comandos que puedes usar para configurar campos en PipelineOptions
mediante los argumentos de la línea de comandos.
Java: SDK 2.x
Para leer las opciones desde la línea de comandos, crea tu objeto PipelineOptions
mediante el método PipelineOptionsFactory.fromArgs
, como se muestra en el siguiente código de ejemplo:
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Nota: Adjuntar el método .withValidation
hace que Dataflow verifique los argumentos de la línea de comandos requeridos y valide los valores de argumento.
Usar PipelineOptionsFactory.fromArgs
interpreta los argumentos de la línea de comandos que siguen el formato:
--<option>=<value>
Compilar tu objeto PipelineOptions
de esta forma te permite especificar cualquiera de las opciones en cualquier interfaz de org.apache.beam.sdk.options.PipelineOptions como un argumento de la línea de comandos.
Python
Para leer las opciones desde la línea de comandos, crea tu objeto PipelineOptions
, como en el siguiente código de ejemplo:
from apache_beam.options.pipeline_options import PipelineOptions options = PipelineOptions(flags=argv)
El argumento (flags=argv)
de PipelineOptions
interpreta los argumentos de la línea de comandos que siguen el formato:
--<option>=<value>
Compilar tu objeto PipelineOptions
de esta manera te permite especificar cualquiera de las opciones mediante la creación de subclases a partir de PipelineOptions
.
Java: SDK 1.x
Crea opciones personalizadas
Puedes agregar tus propias opciones personalizadas además del objeto PipelineOptions
estándar.
El analizador de línea de comandos de Dataflow también puede establecer tus opciones personalizadas mediante argumentos de la línea de comandos especificados en el mismo formato.
Java: SDK 2.x
Si quieres agregar tus propias opciones, define una interfaz con métodos de obtención y configuración para cada opción, como en el siguiente ejemplo:
public interface MyOptions extends PipelineOptions { String getMyCustomOption(); void setMyCustomOption(String myCustomOption); }
Python
Para agregar tus propias opciones, usa el método add_argument()
(que se comporta de la misma manera que el módulo argparse estándar de Python), como en el siguiente ejemplo:
from apache_beam.options.pipeline_options import PipelineOptions class MyOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument('--input') parser.add_argument('--output')
Java: SDK 1.x
También puedes especificar una descripción, que aparece cuando un usuario pasa --help
como un argumento de la línea de comandos, y un valor predeterminado.
Java: SDK 2.x
Establece la descripción y el valor predeterminado con anotaciones, de la siguiente manera:
public interface MyOptions extends PipelineOptions { @Description("My custom command line argument.") @Default.String("DEFAULT") String getMyCustomOption(); void setMyCustomOption(String myCustomOption); }
Se recomienda que registres tu interfaz con PipelineOptionsFactory
y pases la interfaz cuando crees el objeto PipelineOptions
. Cuando registras tu interfaz con PipelineOptionsFactory
, --help
puede encontrar tu interfaz de opciones personalizadas y agregarla al resultado del comando --help
.
PipelineOptionsFactory
también validará que tus opciones personalizadas sean compatibles con todas las otras opciones registradas.
En el siguiente código de ejemplo, se muestra cómo registrar tu interfaz de opciones personalizadas con PipelineOptionsFactory
:
PipelineOptionsFactory.register(MyOptions.class); MyOptions options = PipelineOptionsFactory.fromArgs(args) .withValidation() .as(MyOptions.class);
Ahora tu canalización puede aceptar --myCustomOption=value
como un argumento de la línea de comandos.
Python
Establece la descripción y el valor predeterminado de la siguiente manera:
from apache_beam.options.pipeline_options import PipelineOptions class MyOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( '--input', help='Input for the pipeline', default='gs://my-bucket/input') parser.add_argument( '--output', help='Output for the pipeline', default='gs://my-bucket/output')
Java: SDK 1.x
Configura PipelineOptions para la ejecución en el servicio de Cloud Dataflow
Para ejecutar tu canalización mediante el servicio administrado de Dataflow, debes configurar los siguientes campos en PipelineOptions
:
Java: SDK 2.x
project
: el ID del proyecto de Google Cloudrunner
: el ejecutor de canalizaciones que analizará tu programa y creará tu canalización. Para la ejecución en la nube, debe serDataflowRunner
gcpTempLocation
: una ruta de acceso de Cloud Storage para que Dataflow almacene en etapa intermedia los archivos temporales. Debes crear este bucket con anticipación, antes de ejecutar tu canalización. En caso de que no especifiquesgcpTempLocation
, puedes especificar la opción de canalizacióntempLocation
y, luego,gcpTempLocation
se configurará en el valor detempLocation
. Si no se especifica ninguna, se creará una opcióngcpTempLocation
predeterminadastagingLocation
: un bucket de Cloud Storage para que Dataflow almacene en etapa intermedia los archivos binarios. Si no configuras esta opción, el valor que especificaste paratempLocation
se usará también en la ubicación de etapa de pruebas
Se creará una ubicación
gcpTempLocation
predeterminada si no se especifica esta ni tempLocation
. Si se especifica tempLocation
, pero no gcpTempLocation
, tempLocation
será una ruta de Cloud Storage, que gcpTempLocation
usará como la ruta predeterminada. Si no se especifica tempLocation
y se especifica gcpTempLocation
, tempLocation
no se propagará
Nota: Si usas el SDK de Apache Beam para Java 2.15.0 o posterior, también debes especificar region
.
Python
project
: el ID del proyecto de Google Cloudrunner
: el ejecutor de canalizaciones que analizará tu programa y creará tu canalización. Para la ejecución en la nube, debe serDataflowRunner
staging_location
: una ruta de acceso de Cloud Storage a fin de que Dataflow almacene en etapa intermedia los paquetes de código necesarios para los trabajadores que ejecutan el trabajotemp_location
: una ruta de acceso de Cloud Storage para que Dataflow almacene en etapa intermedia los archivos de trabajo temporales creados durante la ejecución de la canalización
Nota: Si usas el SDK de Apache Beam para Python 2.15.0 o posterior, también debes especificar region
.
Java: SDK 1.x
Puedes configurar estas opciones de manera programática o especificarlas mediante la línea de comandos. En el siguiente código de ejemplo, se muestra cómo construir una canalización mediante la configuración programática del ejecutor y otras opciones necesarias para ejecutar la canalización con el servicio administrado de Dataflow.
Java: SDK 2.x
// Create and set your PipelineOptions. DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); // For Cloud execution, set the Cloud Platform project, staging location, // and specify DataflowRunner. options.setProject("my-project-id"); options.setStagingLocation("gs://my-bucket/binaries"); options.setRunner(DataflowRunner.class); // Create the Pipeline with the specified options. Pipeline p = Pipeline.create(options);
Python
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions # Create and set your PipelineOptions. # For Cloud execution, specify DataflowRunner and set the Cloud Platform # project, job name, temporary files location, and region. # For more information about regions, check: # https://cloud.google.com/dataflow/docs/concepts/regional-endpoints options = PipelineOptions( flags=argv, runner='DataflowRunner', project='my-project-id', job_name='unique-job-name', temp_location='gs://my-bucket/temp', region='us-central1') # Create the Pipeline with the specified options. # with beam.Pipeline(options=options) as pipeline: # pass # build your pipeline here.
Java: SDK 1.x
Después de crear tu canalización, especifica todas las transformaciones y las operaciones de lectura y escritura de la canalización, y ejecútala.
En el siguiente código de ejemplo, se muestra cómo configurar las opciones obligatorias para la ejecución del servicio de Dataflow mediante la línea de comandos:
Java: SDK 2.x
// Create and set your PipelineOptions. MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation(); // Create the Pipeline with the specified options. Pipeline p = Pipeline.create(options);
Python
# Use Python argparse module to parse custom arguments import argparse import apache_beam as beam parser = argparse.ArgumentParser() parser.add_argument('--input') parser.add_argument('--output') args, beam_args = parser.parse_known_args(argv) # Create the Pipeline with remaining arguments. with beam.Pipeline(argv=beam_args) as pipeline: lines = pipeline | 'Read files' >> beam.io.ReadFromText(args.input) lines | 'Write files' >> beam.io.WriteToText(args.output)
Java: SDK 1.x
Después de crear tu canalización, especifica todas las transformaciones y las operaciones de lectura y escritura de la canalización, y ejecútala.
Java: SDK 2.x
Cuando pases las opciones obligatorias en la línea de comandos, usa las opciones --project
, --runner
, --gcpTempLocation
y, de forma opcional, --stagingLocation
.
Python
Cuando pases las opciones obligatorias en la línea de comandos, usa las opciones --project
, --runner
y --staging_location
.
Java: SDK 1.x
Ejecución asíncrona
Java: SDK 2.x
Usar DataflowRunner
hace que tu canalización se ejecute de forma asíncrona en la nube de Google. Mientras se ejecuta la canalización, puedes supervisar el progreso del trabajo, ver detalles de la ejecución y recibir actualizaciones sobre los resultados de la canalización mediante la interfaz de supervisión de Dataflow o la interfaz de línea de comandos de Dataflow.
Python
Usar DataflowRunner
hace que tu canalización se ejecute de forma asíncrona en la nube de Google. Mientras se ejecuta la canalización, puedes supervisar el progreso del trabajo, ver detalles de la ejecución y recibir actualizaciones sobre los resultados de la canalización mediante la interfaz de supervisión de Dataflow o la interfaz de línea de comandos de Dataflow.
Java: SDK 1.x
Ejecución síncrona
Java: SDK 2.x
Especifica DataflowRunner
como el ejecutor de canalizaciones y llama de forma explícita a pipeline.run().waitUntilFinish()
.
Cuando usas DataflowRunner
y llamas a waitUntilFinish()
en el objeto PipelineResult
que muestra pipeline.run()
, la canalización se ejecuta en la nube, pero el código local espera a que finalice el trabajo en la nube y muestra el objeto final DataflowPipelineJob
.
Mientras se ejecuta el trabajo, el servicio de Dataflow imprime actualizaciones del estado del trabajo y mensajes de la consola mientras espera.
Si eras usuario del SDK de Java 1.x y usabas --runner
BlockingDataflowPipelineRunner
en la línea de comandos para hacer que tu programa principal se bloquee de forma interactiva hasta que la canalización finalice, entonces, con Java 2.x, tu programa principal necesita llamar de manera explícita a waitUntilFinish()
.
Python
Usa el método wait_until_finish()
del objeto PipelineResult
, que se muestra a partir del método run()
del ejecutor, para bloquear hasta que se complete la canalización.
Java: SDK 1.x
Nota: Si escribes Ctrl+C
desde la línea de comandos, no se cancela tu trabajo. El servicio de Dataflow aún ejecuta el trabajo en Google Cloud; para cancelar el trabajo, deberás usar la interfaz de supervisión de Dataflow o la interfaz de línea de comandos de Dataflow.
Ejecución de la transmisión
Java: SDK 2.x
Si tu canalización lee desde una fuente de datos no delimitada, como Pub/Sub, la canalización se ejecuta de forma automática en modo de transmisión.
Si tu canalización usa fuentes de datos y receptores no delimitados, es necesario seleccionar una estrategia del sistema de ventanas para tus PCollections no delimitadas antes de que se pueda usar cualquier agregación como un GroupByKey
.
Python
Si tu canalización usa una fuente de datos o un receptor no delimitados (como Pub/Sub), debes configurar la opción streaming
como verdadera.
Los trabajos de transmisión utilizan un tipo de máquina de Compute Engine de n1-standard-2
o superior según la configuración predeterminada. No debes anular esto, puesto que n1-standard-2
es el tipo de máquina mínimo necesario para ejecutar trabajos de transmisión.
Si tu canalización usa fuentes de datos y receptores no delimitados, es necesario seleccionar una estrategia del sistema de ventanas para tus PCollections no delimitadas antes de que se pueda usar cualquier agregación como un GroupByKey
.
Java: SDK 1.x
Configura otras opciones de canalización de Cloud Dataflow
Para ejecutar tu canalización en la nube, puedes establecer los siguientes campos de manera programática en tu objeto PipelineOptions
:
Java: SDK 2.x
Campo | Tipo | Descripción | Valor predeterminado |
---|---|---|---|
runner |
Class (NameOfRunner) |
El PipelineRunner que se debe usar. Este campo te permite determinar el PipelineRunner en el entorno de ejecución. |
DirectRunner (modo local) |
streaming |
boolean |
Especifica si el modo de transmisión está habilitado o inhabilitado; es verdadero si está habilitado. | Si tu canalización lee desde una fuente no delimitada, el valor predeterminado es true . De lo contrario, es false . |
project |
String |
El ID de tu proyecto Google Cloud. Esto es necesario si deseas ejecutar tu canalización mediante el servicio administrado de Dataflow. | Si no está configurado, se establece de manera predeterminada en el proyecto configurado en la actualidad en el SDK de Cloud. |
jobName |
String |
El nombre del trabajo de Dataflow que se ejecuta como aparece en la lista de trabajos de Dataflow y los detalles del trabajo. También se usa cuando se actualiza una canalización existente. | Dataflow genera un nombre único de forma automática. |
gcpTempLocation |
String |
Ruta de Cloud Storage para archivos temporales. Debe ser una URL válida de Cloud Storage que comience con gs:// . |
|
stagingLocation |
String |
Ruta de Cloud Storage para archivos locales de etapa de pruebas. Debe ser una URL válida de Cloud Storage que comience con gs:// . |
Si no está configurado, se establece de manera predeterminada en lo que especificaste para tempLocation . |
autoscalingAlgorithm |
String
| El modo de ajuste de escala automático para tu trabajo de Dataflow. Los valores posibles son THROUGHPUT_BASED , para habilitar el ajuste de escala automático, o NONE , para inhabilitarlo. Consulta Características del ajuste automático para obtener más información sobre el funcionamiento del ajuste de escala automático en el servicio administrado de Dataflow. |
La configuración predeterminada es THROUGHPUT_BASED para todos los trabajos por lotes de Dataflow y los trabajos de transmisión que usan Streaming Engine.
La configuración predeterminada es NONE para los trabajos de transmisión que no usan Streaming Engine. |
numWorkers |
int |
La cantidad inicial de instancias de Google Compute Engine que se usarán cuando ejecutes tu canalización. Esta opción determina cuántos trabajadores inicia el servicio de Dataflow cuando comienza tu trabajo. | Si no se especifica, el servicio de Dataflow determina una cantidad apropiada de trabajadores. |
maxNumWorkers |
int |
La cantidad máxima de instancias de Compute Engine que estarán disponibles para tu canalización durante la ejecución. Ten en cuenta que puede ser mayor que la cantidad inicial de trabajadores (que especifica numWorkers para permitir que tu trabajo escale verticalmente de forma automática o no). |
Si no se especifica, el servicio de Dataflow determinará una cantidad adecuada de trabajadores. |
numberOfWorkerHarnessThreads |
int |
La cantidad de subprocesos por agente de trabajo. | Si no se especifica, el servicio de Dataflow determina una cantidad adecuada de subprocesos por trabajador. |
region |
String |
Especifica un extremo regional para implementar tus trabajos de Dataflow. | Si no se establece, el valor predeterminado es us-central1 . |
workerRegion |
String |
Especifica una región de Compute Engine para iniciar instancias de trabajador a fin de ejecutar tu canalización. Esta opción se usa para ejecutar trabajadores en una ubicación diferente a la Nota: Esta opción no se puede combinar con |
Si no está configurada, se establece de manera predeterminada en el valor establecido para region . |
workerZone |
String |
Especifica una zona de Compute Engine para iniciar instancias de trabajador a fin de ejecutar tu canalización. Esta opción se usa para ejecutar trabajadores en una ubicación diferente a la Nota: Esta opción no se puede combinar con |
Si especificas region o workerRegion , workerZone usa una zona predeterminada de la región correspondiente. Puedes anular este comportamiento mediante la especificación de una zona diferente. |
zone |
String |
(Obsoleto) En el SDK de Apache Beam 2.17.0 o anterior, esto especificaba la zona de Compute Engine para iniciar instancias de trabajador a fin de ejecutar tu canalización. | Si especificas region , zone usa una zona predeterminada de la región correspondiente. Puedes anular este comportamiento mediante la especificación de una zona diferente. |
dataflowKmsKey |
String |
Especifica la clave de encriptación administrada por el cliente (CMEK) que se utiliza para encriptar datos en reposo. Puedes controlar la clave de encriptación a través de Cloud KMS.
También debes especificar gcpTempLocation para usar esta función. |
Si no se especifica, Dataflow usa la encriptación de Google Cloud predeterminada en lugar de una CMEK. |
flexRSGoal |
String |
Especifica la Programación flexible de recursos (FlexRS) para trabajos por lotes con ajuste de escala automático. Afecta los parámetros numWorkers , autoscalingAlgorithm , zone , region y workerMachineType . Para obtener más información, consulta la sección de opciones de canalización de FlexRS. |
Si no se especifica, el valor predeterminado es SPEED_OPTIMIZED, que es lo mismo que omitir esta marca. Para activar FlexRS, debes especificar el valor COST_OPTIMIZED a fin de permitir que el servicio de Dataflow elija cualquier recurso con descuento disponible. |
filesToStage |
List<String> |
Una lista no vacía de archivos locales, directorios de archivos o archivos (como archivos JAR o ZIP) para poner a disposición de cada trabajador. Si configuras esta opción, solo se subirán los archivos que especifiques (se ignorará la ruta de clase de Java). Debes especificar todos tus recursos en el orden correcto de la ruta de clase. Los recursos no se limitan al código, también pueden incluir archivos de configuración y otros recursos que estarán disponibles para todos los trabajadores. Tu código puede acceder a los recursos enumerados mediante los métodos de búsqueda de recursos estándar de Java. Precaución: La especificación de una ruta de acceso del directorio es poco eficaz debido a que Dataflow comprimirá los archivos antes de subirlos, lo que implica un costo de tiempo de inicio mayor. Además, no uses esta opción para transferir datos a los trabajadores que la canalización debe procesar, ya que hacerlo es mucho más lento que usar las API Cloud Storage/BigQuery nativas combinadas con la fuente de datos Dataflow adecuada.
|
Si filesToStage está en blanco, Dataflow inferirá los archivos que se deben almacenar en etapa intermedia según la ruta de clase de Java. Las consideraciones y precauciones mencionadas en la columna de la izquierda también se aplican aquí (tipos de archivos para enumerar y cómo acceder a ellos desde tu código).
|
network |
String |
La red de Compute Engine para iniciar instancias de Compute Engine a fin de ejecutar tu canalización. Consulta cómo especificar tu red. | Si no se establece, Google Cloud supone que tienes la intención de usar una red llamada default . |
subnetwork |
String |
La subred de Compute Engine para iniciar instancias de Compute Engine a fin de ejecutar tu canalización. Consulta cómo especificar tu subred. | El servicio de Dataflow determina el valor predeterminado. |
usePublicIps |
boolean |
Especifica si los trabajadores de Dataflow usan direcciones IP públicas.
Si el valor se establece en false , los trabajadores de Dataflow usan direcciones IP privadas para todas las comunicaciones. En este caso, si se especifica la opción subnetwork , se ignora la opción network . Asegúrate de que las network o subnetwork especificadas tengan habilitado el Acceso privado a Google. |
Si no se configura, el valor predeterminado es true y los trabajadores de Dataflow usan direcciones IP públicas. |
enableStreamingEngine |
boolean |
Especifica si Streaming Engine de Dataflow está habilitado o no; es verdadero si está habilitado. Habilitar Streaming Engine te permite ejecutar los pasos de tu canalización de transmisión en el backend del servicio de Dataflow y conservar así los recursos de CPU, memoria y almacenamiento de Persistent Disk. | El valor predeterminado es false. Esto significa que los pasos de tu canalización de transmisión se ejecutan por completo en las VM de trabajador. |
createFromSnapshot |
String |
Especifica el ID de la instantánea que se usará cuando se cree un trabajo de transmisión. Las instantáneas guardan el estado de una canalización de transmisión y te permiten iniciar una versión nueva de tu trabajo desde ese estado. Para obtener más información sobre las instantáneas, consulta Usa instantáneas. | Si no se configura, no se usan instantáneas para crear un trabajo. |
hotKeyLoggingEnabled |
boolean |
Especifica que cuando se detecta una clave activa en la canalización, la clave se imprime en el proyecto de Cloud Logging del usuario. | Si no se configura, solo se registra la presencia de una clave activa. |
diskSizeGb |
int |
El tamaño del disco, en gigabytes, para usar en cada instancia de trabajador remota de Compute Engine. Si se configura, especifica al menos 30 GB para tener en cuenta la imagen de arranque del trabajador y los registros locales. Para los trabajos por lotes que usan Dataflow Shuffle, esta opción establece el tamaño del disco de arranque de una VM de trabajador. En los trabajos por lotes que no usan Dataflow Shuffle, esta opción establece el tamaño de los discos que se usan para almacenar datos aleatorios; el tamaño del disco de arranque no se ve afectado.
En los trabajos de transmisión que usan Streaming Engine, esta opción establece el tamaño de los discos de arranque. En los trabajos de transmisión que no usan Streaming Engine, esta opción establece el tamaño de cada disco persistente adicional que crea el servicio de Dataflow; el disco de arranque no se ve afectado.
Si un trabajo de transmisión no usa Streaming Engine, puedes establecer el tamaño del disco de arranque con la marca del experimento |
Configúralo en Si un trabajo por lotes usa Dataflow Shuffle, el valor predeterminado es 25 GB; de lo contrario, el valor predeterminado es 250 GB. Si un trabajo de transmisión usa Streaming Engine, el valor predeterminado es 30 GB; de lo contrario, el valor predeterminado es 400 GB. Advertencia: Reducir el tamaño del disco reduce la E/S aleatoria disponible. Los trabajos vinculados a Shuffle que no usan Dataflow Shuffle o Streaming Engine pueden aumentar el entorno de ejecución y el costo del trabajo. |
serviceAccount |
String |
Especifica una cuenta de servicio de controlador administrada por el usuario, mediante el formato my-service-account-name@<project-id>.iam.gserviceaccount.com . Para obtener más información, consulta la sección Cuenta de servicio del controlador de la página de seguridad y permisos de Cloud Dataflow.
|
Si no se configura, los trabajadores usan la cuenta de servicio de Compute Engine de tu proyecto como la cuenta de servicio del controlador. |
workerDiskType |
String |
El tipo de disco persistente que se usará, especificado por una URL completa del recurso de tipo de disco. Por ejemplo, usa compute.googleapis.com/projects//zones//diskTypes/pd-ssd para especificar un disco persistente SSD. Si quieres obtener más información, consulta la página de referencia de la API de Compute Engine para diskTypes. |
El servicio de Dataflow determina el valor predeterminado. |
workerMachineType |
String |
El tipo de máquina de Compute Engine que usa Dataflow cuando inicia las VM de trabajador. Puedes usar cualquiera de las familias de tipos de máquinas de Compute Engine disponibles, así como los tipos personalizados de máquinas. Para obtener mejores resultados, usa tipos de máquina Ten en cuenta que Dataflow factura por la cantidad de CPU virtuales y GB de memoria en los trabajadores. La facturación es independiente de la familia de tipos de máquinas. |
El servicio de Dataflow elegirá el tipo de máquina en función de tu trabajo si no configuras esta opción. |
Consulta la documentación de referencia de la API de Java para la interfaz de PipelineOptions (y sus subinterfaces) a fin de obtener la lista completa de opciones de configuración de la canalización.
Python
Campo | Tipo | Descripción | Valor predeterminado |
---|---|---|---|
runner |
str |
El PipelineRunner que se debe usar. Este campo puede ser DirectRunner o DataflowRunner . |
DirectRunner (modo local) |
streaming |
bool |
Especifica si el modo de transmisión está habilitado o inhabilitado; es verdadero si está habilitado. | false |
project |
str |
El ID de tu proyecto Google Cloud. Esto es necesario si deseas ejecutar tu canalización mediante el servicio administrado de Dataflow. | Si no se configura, muestra un error. |
job_name |
String |
El nombre del trabajo de Dataflow que se ejecuta como aparece en la lista de trabajos de Dataflow y los detalles del trabajo. | Dataflow genera un nombre único de forma automática. |
temp_location |
str |
Ruta de Cloud Storage para archivos temporales. Debe ser una URL válida de Cloud Storage que comience con gs:// . |
Si no está configurado, se establece de manera predeterminada en el valor de staging_location . Debes especificar, como mínimo, uno de los valores temp_location o staging_location para ejecutar tu canalización en la nube de Google. |
staging_location |
str |
Ruta de Cloud Storage para archivos locales de etapa de pruebas. Debe ser una URL válida de Cloud Storage que comience con gs:// . |
Si no está configurado, se establece de manera predeterminada en un directorio de etapa de pruebas dentro de temp_location . Debes especificar, al menos, uno de los valores temp_location o staging_location para ejecutar tu canalización en la nube de Google. |
autoscaling_algorithm |
str
| El modo de ajuste de escala automático para tu trabajo de Dataflow. Los valores posibles son THROUGHPUT_BASED , para habilitar el ajuste de escala automático, o NONE , para inhabilitarlo. Consulta Características del ajuste automático para obtener más información sobre el funcionamiento del ajuste de escala automático en el servicio administrado de Dataflow. |
La configuración predeterminada es THROUGHPUT_BASED para todos los trabajos por lotes de Dataflow y los trabajos de transmisión que usan Streaming Engine.
La configuración predeterminada es NONE para los trabajos de transmisión que no usan Streaming Engine. |
num_workers |
int |
La cantidad de instancias de Compute Engine que se usarán cuando ejecutes tu canalización. | Si no se especifica, el servicio de Dataflow determinará una cantidad adecuada de trabajadores. |
max_num_workers |
int |
La cantidad máxima de instancias de Compute Engine que estarán disponibles para tu canalización durante la ejecución. Ten en cuenta que puede ser mayor que la cantidad inicial de trabajadores (que especifica num_workers para permitir que tu trabajo escale verticalmente de forma automática o no). |
Si no se especifica, el servicio de Dataflow determinará una cantidad adecuada de trabajadores. |
number_of_worker_harness_threads |
int |
La cantidad de subprocesos por agente de trabajo. | Si no se especifica, el servicio de Dataflow determina una cantidad adecuada de subprocesos por trabajador. Para usar este parámetro, también debes usar la marca --experiments=use_runner_v2 |
region |
str |
Especifica un extremo regional para implementar tus trabajos de Dataflow. | Si no se establece, el valor predeterminado es us-central1 . |
worker_region |
String |
Especifica una región de Compute Engine para iniciar instancias de trabajador a fin de ejecutar tu canalización. Esta opción se usa para ejecutar trabajadores en una ubicación diferente a la Nota: Esta opción no se puede combinar con |
Si no está configurada, se establece de manera predeterminada en el valor establecido para region . |
worker_zone |
String |
Especifica una zona de Compute Engine para iniciar instancias de trabajador a fin de ejecutar tu canalización. Esta opción se usa para ejecutar trabajadores en una ubicación diferente a la Nota: Esta opción no se puede combinar con |
Si especificas region o worker_region , worker_zone usa una zona predeterminada de la región correspondiente. Puedes anular este comportamiento mediante la especificación de una zona diferente. |
zone |
str |
(Obsoleto) En el SDK de Apache Beam 2.17.0 o anterior, esto especificaba la zona de Compute Engine para iniciar instancias de trabajador a fin de ejecutar tu canalización. | Si especificas region , zone usa una zona predeterminada de la región correspondiente. Puedes anular este comportamiento mediante la especificación de una zona diferente. |
dataflow_kms_key |
str |
Especifica la clave de encriptación administrada por el cliente (CMEK) que se utiliza para encriptar datos en reposo. Puedes controlar la clave de encriptación a través de Cloud KMS.
También debes especificar temp_location para usar esta función. |
Si no se especifica, Dataflow usa la encriptación de Google Cloud predeterminada en lugar de una CMEK. |
flexrs_goal |
str |
Especifica la Programación flexible de recursos (FlexRS) para trabajos por lotes con ajuste de escala automático. Afecta los parámetros num_workers , autoscaling_algorithm , zone , region y machine_type . Para obtener más información, consulta la sección de opciones de canalización de FlexRS. |
Si no se especifica, el valor predeterminado es SPEED_OPTIMIZED, que es lo mismo que omitir esta marca. Para activar FlexRS, debes especificar el valor COST_OPTIMIZED a fin de permitir que el servicio de Dataflow elija cualquier recurso con descuento disponible. |
network |
str |
La red de Compute Engine para iniciar instancias de Compute Engine a fin de ejecutar tu canalización. Consulta cómo especificar tu red. | Si no se establece, Google Cloud supone que tienes la intención de usar una red llamada default . |
subnetwork |
str |
La subred de Compute Engine para iniciar instancias de Compute Engine a fin de ejecutar tu canalización. Consulta cómo especificar tu subred. | El servicio de Dataflow determina el valor predeterminado. |
use_public_ips |
bool |
Especifica que los trabajadores de Dataflow deben usar direcciones IP públicas.
Si el valor se establece en false , los trabajadores de Dataflow usan direcciones IP privadas para todas las comunicaciones. En este caso, si se especifica la opción subnetwork , se ignora la opción network . Asegúrate de que las network o subnetwork especificadas tengan habilitado el Acceso privado a Google. Esta opción requiere el SDK de Beam para Python.
El SDK de Dataflow obsoleto para Python no lo admite. |
Si no se configura, los trabajadores de Dataflow usan direcciones IP públicas. |
enable_streaming_engine |
bool |
Especifica si Streaming Engine de Dataflow está habilitado o no; es verdadero si está habilitado. Habilitar Streaming Engine te permite ejecutar los pasos de tu canalización de transmisión en el backend del servicio de Dataflow y conservar así los recursos de CPU, memoria y almacenamiento de Persistent Disk. | El valor predeterminado es false. Esto significa que los pasos de tu canalización de transmisión se ejecutan por completo en las VM de trabajador. |
disk_size_gb |
int |
El tamaño del disco, en gigabytes, para usar en cada instancia de trabajador remota de Compute Engine. Si se establece, especifica al menos 30 GB para la imagen de arranque del trabajador y los registros locales. Para los trabajos por lotes que usan Dataflow Shuffle, esta opción establece el tamaño del disco de arranque de una VM de trabajador. En los trabajos por lotes que no usan Dataflow Shuffle, esta opción establece el tamaño de los discos que se usan para almacenar datos aleatorios; el tamaño del disco de arranque no se ve afectado.
En los trabajos de transmisión que usan Streaming Engine, esta opción establece el tamaño de los discos de arranque. En los trabajos de transmisión que no usan Streaming Engine, esta opción establece el tamaño de cada disco persistente adicional que crea el servicio de Dataflow; el disco de arranque no se ve afectado.
Si un trabajo de transmisión no usa Streaming Engine, puedes establecer el tamaño del disco de arranque con la marca del experimento |
Configúralo en Si un trabajo por lotes usa Dataflow Shuffle, el valor predeterminado es 25 GB; de lo contrario, el valor predeterminado es 250 GB. Si un trabajo de transmisión usa Streaming Engine, el valor predeterminado es 30 GB; de lo contrario, el valor predeterminado es 400 GB. Advertencia: Reducir el tamaño del disco reduce la E/S aleatoria disponible. Los trabajos vinculados a Shuffle que no usan Dataflow Shuffle o Streaming Engine pueden aumentar el entorno de ejecución y el costo del trabajo. |
service_account_email |
str |
Especifica una cuenta de servicio de controlador administrada por el usuario, mediante el formato my-service-account-name@<project-id>.iam.gserviceaccount.com . Para obtener más información, consulta la sección Cuenta de servicio del controlador de la página de seguridad y permisos de Cloud Dataflow.
|
Si no se configura, los trabajadores usan la cuenta de servicio de Compute Engine de tu proyecto como la cuenta de servicio del controlador. |
worker_disk_type |
str |
El tipo de disco persistente que se usará, especificado por una URL completa del recurso de tipo de disco. Por ejemplo, usa compute.googleapis.com/projects//zones//diskTypes/pd-ssd para especificar un disco persistente SSD. Si quieres obtener más información, consulta la página de referencia de la API de Compute Engine para diskTypes. |
El servicio de Dataflow determina el valor predeterminado. |
machine_type |
str |
El tipo de máquina de Compute Engine que usa Dataflow cuando inicia las VM de trabajador. Puedes usar cualquiera de las familias de tipos de máquinas de Compute Engine disponibles, así como los tipos personalizados de máquinas. Para obtener mejores resultados, usa tipos de máquina Ten en cuenta que Dataflow factura por la cantidad de CPU virtuales y GB de memoria en los trabajadores. La facturación es independiente de la familia de tipos de máquinas. |
El servicio de Dataflow elegirá el tipo de máquina en función de tu trabajo si no configuras esta opción. |
Java: SDK 1.x
Configura PipelineOptions para la ejecución local
En lugar de ejecutar tu canalización en recursos de la nube administrados, puedes elegir ejecutar tu canalización de forma local. La ejecución local tiene ciertas ventajas para probar, depurar o ejecutar tu canalización en conjuntos de datos pequeños. Por ejemplo, la ejecución local quita la dependencia del servicio de Dataflow remoto y el proyecto de Google Cloud asociado.
Cuando uses la ejecución local, te recomendamos que ejecutes tu canalización con conjuntos de datos que sean lo bastante pequeños como para caber en la memoria local. Puedes crear un conjunto de datos pequeño en la memoria con una transformación Create
o usar una transformación Read
para trabajar con archivos locales o remotos pequeños. La ejecución local proporciona una forma rápida y fácil de realizar pruebas y depurar con menos dependencias externas, pero estará limitada por la memoria disponible en tu entorno local.
En el siguiente código de ejemplo, se muestra cómo crear una canalización que se ejecute en tu entorno local.
Java: SDK 2.x
// Create and set our Pipeline Options. PipelineOptions options = PipelineOptionsFactory.create(); // Create the Pipeline with the specified options. Pipeline p = Pipeline.create(options);
Nota: Para el modo local, no debes configurar el ejecutor, dado que DirectRunner
ya es el valor predeterminado. Sin embargo, es necesario que incluyas de forma explícita DirectRunner
como una dependencia o que lo agregues a la ruta de clase.
Python
# Create and set your Pipeline Options. options = PipelineOptions(flags=argv) my_options = options.view_as(MyOptions) with Pipeline(options=options) as pipeline: pass # build your pipeline here.
Nota: Para el modo local, no debes configurar el ejecutor, dado que DirectRunner
ya es el valor predeterminado.
Java: SDK 1.x
Después de crear tu canalización, ejecútala.
Configura otras opciones de canalización local
Cuando ejecutas tu canalización de manera local, los valores predeterminados para las propiedades en PipelineOptions
suelen ser suficientes.
Java: SDK 2.x
Puedes encontrar los valores predeterminados para PipelineOptions
de Java en la referencia de la API de Java. Consulta la lista de clase de PipelineOptions para obtener detalles completos.
Si tu canalización usa Google Cloud, como BigQuery o Cloud Storage para IO, es posible que debas configurar ciertas opciones de credenciales y proyectos de Google Cloud. En esos casos, debes usar GcpOptions.setProject
para configurar tu ID del proyecto de Google Cloud. También es posible que necesites configurar credenciales de forma explícita. Consulta la clase GcpOptions para obtener más información.
Python
Puedes encontrar los valores predeterminados para PipelineOptions
de Python en la referencia de la API de Python. Consulta la lista del módulo de PipelineOptions para obtener detalles completos.
Si tu canalización usa servicios de Google Cloud, como BigQuery o Cloud Storage para IO, es posible que debas configurar ciertas opciones de credenciales y proyectos de Google Cloud. En esos casos, debes usar options.view_as(GoogleCloudOptions).project
para configurar tu ID del proyecto de Google Cloud. Es posible que también debas configurar las credenciales de forma explícita. Consulta la clase GoogleCloudOptions para obtener más información.
Java: SDK 1.x