Ejecuta plantillas clásicas

Después de crear y almacenar en etapa intermedia tu plantilla de Dataflow, ejecútala con la consola de Google Cloud , la API de REST o Google Cloud CLI. Puedes implementar trabajos de plantilla de Dataflow desde muchos entornos, incluidos el entorno estándar de App Engine, funciones de Cloud Run y otros entornos restringidos.

Usa la consola de Google Cloud

Puedes usar la consola de Google Cloud para ejecutar plantillas de Dataflow personalizadas y proporcionadas por Google.

Plantillas proporcionadas por Google

Usa este comando para ejecutar una plantilla que proporciona Google:

  1. Ve a la página de Dataflow en la consola de Google Cloud .
  2. Ir a la página de Dataflow
  3. Haz clic en CREAR UN TRABAJO A PARTIR DE UNA PLANTILLA (Create job from template).
  4. Google Cloud Botón Crear trabajo a partir de una plantilla de la consola
  5. Selecciona la plantilla que proporciona Google que deseas ejecutar en el menú desplegable Dataflow template (Plantilla de Dataflow).
  6. Formulario de ejecución de una plantilla WordCount
  7. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.
  8. Ingresa los valores de tus parámetros en los campos de parámetros provistos. No necesitas la sección Parámetros adicionales cuando utilizas una plantilla de Google.
  9. Haga clic en Ejecutar trabajo.

Plantillas personalizadas

Usa este comando para ejecutar una plantilla personalizada:

  1. Ve a la página de Dataflow en la consola de Google Cloud .
  2. Ir a la página de Dataflow
  3. Haz clic en CREAR UN TRABAJO A PARTIR DE UNA PLANTILLA.
  4.  Botón Crear trabajo a partir de una plantilla de la consola deGoogle Cloud
  5. Selecciona Custom Template (Plantilla personalizada) en el menú desplegable Dataflow template (Plantilla de Dataflow).
  6. Formulario de ejecución de una plantilla personalizada
  7. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.
  8. Ingresa la ruta de tu archivo de plantilla de Cloud Storage en el campo de la ruta de la plantilla de Cloud Storage.
  9. Si tu plantilla requiere de parámetros, haz clic en AGREGAR PARÁMETRO en la sección Parámetros adicionales. Ingresa el Nombre (Name) y el Valor (Valor) del parámetro. Repite este paso para cada parámetro que necesites.
  10. Haz clic en Ejecutar trabajo.

Usa la API de REST

Para ejecutar una plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Consulta la referencia de la API de REST de projects.locations.templates.launch para obtener más información sobre los parámetros disponibles.

Crea un trabajo por lotes de una plantilla personalizada

Esta solicitud projects.locations.templates.launch de ejemplo crea un trabajo por lotes a partir de una plantilla que lee un archivo de texto y escribe un archivo de texto de salida. Si la solicitud se ejecuta de manera correcta, el cuerpo de la respuesta contendrá una instancia de LaunchTemplateResponse.

Modifica los siguientes valores:

  • Reemplaza YOUR_PROJECT_ID con el ID del proyecto.
  • Reemplaza LOCATION por la región de Dataflow que prefieras.
  • Reemplaza JOB_NAME por un nombre de trabajo a elección.
  • Reemplaza YOUR_BUCKET_NAME por el nombre de tu depósito de Cloud Storage.
  • Establece gcsPath en la ubicación de Cloud Storage del archivo de plantilla.
  • Establece parameters en la lista de pares clave-valor.
  • Configura tempLocation en una ubicación para la que tengas permiso de escritura. Este valor es obligatorio para ejecutar plantillas de Google.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputFile" : "gs://YOUR_BUCKET_NAME/input/my_input.txt",
            "output": "gs://YOUR_BUCKET_NAME/output/my_output"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

Crea un trabajo de transmisión de una plantilla personalizada

Con esta solicitud de ejemplo projects.locations.templates.launch, se crea un trabajo de transmisión a partir de una plantilla clásica que lee desde una suscripción de Pub/Sub y escribe en una tabla de BigQuery. Si quieres iniciar una plantilla de Flex, usa projects.locations.flexTemplates.launch en su lugar. La plantilla de ejemplo es una plantilla que proporciona Google. Puedes modificar la ruta de acceso en la plantilla para que apunte a una plantilla personalizada. La misma lógica se usa para iniciar las plantillas que proporciona Google y las plantillas personalizadas. En este ejemplo, la tabla de BigQuery debe existir con anterioridad, con el esquema adecuado. Si se ejecuta de manera correcta, el cuerpo de la respuesta contendrá una instancia de LaunchTemplateResponse.

Modifica los siguientes valores:

  • Reemplaza YOUR_PROJECT_ID con el ID del proyecto.
  • Reemplaza LOCATION por la región de Dataflow que prefieras.
  • Reemplaza JOB_NAME por un nombre de trabajo a elección.
  • Reemplaza YOUR_BUCKET_NAME por el nombre del depósito de Cloud Storage.
  • Reemplaza GCS_PATH por la ubicación de Cloud Storage del archivo de plantilla. La ubicación debe comenzar con gs://
  • Establece parameters en la lista de pares clave-valor. Los parámetros enumerados son específicos de este ejemplo de plantilla. Si usas una plantilla personalizada, modifica los parámetros según sea necesario. Si usas la plantilla de ejemplo, reemplaza las siguientes variables.
    • Reemplaza YOUR_SUBSCRIPTION_NAME por el nombre de tu suscripción de Pub/Sub.
    • Reemplaza YOUR_DATASET por tu conjunto de datos de BigQuery y YOUR_TABLE_NAME por el nombre de tu tabla de BigQuery.
  • Configura tempLocation en una ubicación para la que tengas permiso de escritura. Este valor es obligatorio para ejecutar plantillas de Google.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME",
            "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

Actualiza un trabajo de transmisión de una plantilla personalizada

En esta solicitud de ejemplo projects.locations.templates.launch, se muestra cómo actualizar un trabajo de transmisión de plantilla. Si quieres actualizar una plantilla de Flex, usa projects.locations.flexTemplates.launch en su lugar.

  1. Ejecuta Ejemplo 2: Crea un trabajo de transmisión de una plantilla personalizada para iniciar un trabajo de plantilla de transmisión.
  2. Envía la siguiente solicitud HTTP POST, con los siguientes valores modificados:
    • Reemplaza YOUR_PROJECT_ID con el ID del proyecto.
    • Reemplaza LOCATION por la región de Dataflow del trabajo que estás actualizando.
    • Reemplaza JOB_NAME por el nombre exacto del trabajo que deseas actualizar.
    • Reemplaza GCS_PATH por la ubicación de Cloud Storage del archivo de plantilla. La ubicación debe comenzar con gs://
    • Establece parameters en la lista de pares clave-valor. Los parámetros enumerados son específicos de este ejemplo de plantilla. Si usas una plantilla personalizada, modifica los parámetros según sea necesario. Si usas la plantilla de ejemplo, reemplaza las siguientes variables.
      • Reemplaza YOUR_SUBSCRIPTION_NAME por el nombre de tu suscripción de Pub/Sub.
      • Reemplaza YOUR_DATASET por tu conjunto de datos de BigQuery y YOUR_TABLE_NAME por el nombre de tu tabla de BigQuery.
    • Usa el parámetro environment para cambiar la configuración del entorno, como el tipo de máquina. En este ejemplo, se usa el tipo de máquina n2-highmem-2, que tiene más memoria y CPU por trabajador que el tipo de máquina predeterminado.
        POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH
        {
            "jobName": "JOB_NAME",
            "parameters": {
                "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_TOPIC_NAME",
                "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
            },
            "environment": {
                "machineType": "n2-highmem-2"
            },
            "update": true
        }
    
  3. Accede a la interfaz de supervisión de Dataflow y verifica que se haya creado un trabajo nuevo con el mismo nombre. Este trabajo tiene el estado Actualizado.

Usa las bibliotecas cliente de la API de Google

Considera usar las bibliotecas cliente de la API de Google para realizar llamadas a la API de REST de Dataflow con facilidad. En esta secuencia de comandos de ejemplo, se usa la biblioteca cliente de la API de Google para Python.

En este ejemplo, debes establecer las siguientes variables:

  • project: Establece el ID del proyecto.
  • job: se establece en el nombre de trabajo único que elijas.
  • template: se establece en la ubicación de Cloud Storage del archivo de plantilla.
  • parameters: Establece un diccionario con los parámetros de la plantilla.

Para configurar la región, incluye el parámetro location.

from googleapiclient.discovery import build

# project = 'your-gcp-project'
# job = 'unique-job-name'
# template = 'gs://dataflow-templates/latest/Word_Count'
# parameters = {
#     'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
#     'output': 'gs://<your-gcs-bucket>/wordcount/outputs',
# }

dataflow = build("dataflow", "v1b3")
request = (
    dataflow.projects()
    .templates()
    .launch(
        projectId=project,
        gcsPath=template,
        body={
            "jobName": job,
            "parameters": parameters,
        },
    )
)

response = request.execute()

Para obtener más información sobre las opciones disponibles, consulta el método projects.locations.templates.launch en la referencia de la API de REST de Dataflow.

Usa gcloud CLI

La CLI de gcloud puede ejecutar una plantilla personalizada o una proporcionada por Google con el comando gcloud dataflow jobs run. Los ejemplos de ejecución de plantillas proporcionadas por Google se documentan en la página de plantillas proporcionadas por Google.

En los ejemplos siguientes de plantillas personalizadas, configura estos valores:

  • Reemplaza JOB_NAME por un nombre de trabajo a elección.
  • Reemplaza YOUR_BUCKET_NAME por el nombre del depósito de Cloud Storage.
  • Establece --gcs-location en la ubicación de Cloud Storage del archivo de plantilla.
  • Establece --parameters en la lista de parámetros separados por comas para pasar al trabajo. No se admiten espacios entre las comas o los valores.
  • Para evitar que las VMs acepten claves SSH almacenadas en metadatos del proyecto, usa la marca additional-experiments con la opción de servicio block_project_ssh_keys:--additional-experiments=block_project_ssh_keys.

Crea un trabajo por lotes de una plantilla personalizada

En este ejemplo, se crea un trabajo en lotes a partir de una plantilla que lee un archivo de texto y escribe un archivo de texto de salida.

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters inputFile=gs://YOUR_BUCKET_NAME/input/my_input.txt,output=gs://YOUR_BUCKET_NAME/output/my_output

La solicitud muestra una respuesta con el siguiente formato.

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_BATCH

Crea un trabajo de transmisión de una plantilla personalizada

En este ejemplo, se crea un trabajo de transmisión a partir de una plantilla que lee desde un tema de Pub/Sub y escribe en una tabla de BigQuery. La tabla de BigQuery debe existir con anterioridad, con el esquema adecuado.

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters topic=projects/project-identifier/topics/resource-name,table=my_project:my_dataset.my_table_name

La solicitud muestra una respuesta con el siguiente formato.

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_STREAMING

A fin de obtener una lista completa de las marcas para el comando gcloud dataflow jobs run, consulta la referencia de la CLI de gcloud.

Supervisión y solución de problemas

La interfaz de supervisión de Dataflow te permite supervisar tus trabajos de Dataflow. Si un trabajo presenta errores, puedes encontrar sugerencias para solucionarlos, estrategias de depuración y un catálogo de errores comunes en la guía Soluciona problemas de tu canalización.