Ejecutar plantillas clásicas

Después de crear y organizar tu plantilla de Dataflow, ejecútala con la consola, la API REST o la CLI de Google Cloud. Google Cloud Puedes desplegar tareas de plantillas de Dataflow desde muchos entornos, como el entorno estándar de App Engine, las funciones de Cloud Run y otros entornos con limitaciones.

Usar la Google Cloud consola

Puedes usar la Google Cloud consola para ejecutar plantillas de Dataflow personalizadas y proporcionadas por Google.

Plantillas proporcionadas por Google

Para ejecutar una plantilla proporcionada por Google, siga estos pasos:

  1. Ve a la página Dataflow de la Google Cloud consola.
  2. Ir a la página Dataflow
  3. Haz clic en CREAR TAREA A PARTIR DE PLANTILLA.
  4. Google Cloud console Create Job From Template Button
  5. En el menú desplegable Plantilla de Dataflow, selecciona la plantilla proporcionada por Google que quieras ejecutar.
  6. Formulario de ejecución de la plantilla WordCount
  7. Introduce un nombre de tarea en el campo Nombre de tarea.
  8. Introduzca los valores de los parámetros en los campos correspondientes. No necesitas la sección Parámetros adicionales cuando usas una plantilla proporcionada por Google.
  9. Haz clic en Ejecutar trabajo.

Plantillas personalizadas

Para ejecutar una plantilla personalizada:

  1. Ve a la página Dataflow de la Google Cloud consola.
  2. Ir a la página Dataflow
  3. Haz clic en CREAR TAREA A PARTIR DE PLANTILLA.
  4. Google Cloud console Create Job From Template Button
  5. Selecciona Plantilla personalizada en el menú desplegable Plantilla de flujo de datos.
  6. Formulario de ejecución de plantilla personalizada
  7. Introduce un nombre de tarea en el campo Nombre de tarea.
  8. Introduce la ruta de Cloud Storage de tu archivo de plantilla en el campo correspondiente.
  9. Si tu plantilla necesita parámetros, haz clic en AÑADIR PARÁMETRO en la sección Parámetros adicionales. Introduce el nombre y el valor del parámetro. Repite este paso con cada parámetro necesario.
  10. Haz clic en Ejecutar trabajo.

Utilizar la API REST

Para ejecutar una plantilla con una solicitud de API REST, envía una solicitud HTTP POST con tu ID de proyecto. Esta solicitud requiere autorización.

Consulta la referencia de la API REST para projects.locations.templates.launch para obtener más información sobre los parámetros disponibles.

Crear una tarea por lotes de plantillas personalizadas

En este ejemplo, la solicitud projects.locations.templates.launch crea una tarea por lotes a partir de una plantilla que lee un archivo de texto y escribe un archivo de texto de salida. Si la solicitud se realiza correctamente, el cuerpo de la respuesta contiene una instancia de LaunchTemplateResponse.

Modifica los siguientes valores:

  • Sustituye YOUR_PROJECT_ID por el ID del proyecto.
  • Sustituye LOCATION por la región de Dataflow que quieras.
  • Sustituye JOB_NAME por el nombre de trabajo que quieras.
  • Sustituye YOUR_BUCKET_NAME por el nombre de tu segmento de Cloud Storage.
  • Define gcsPath como la ubicación de Cloud Storage del archivo de plantilla.
  • Asigna parameters a tu lista de pares clave-valor.
  • Define tempLocation en una ubicación en la que tengas permiso de escritura. Este valor es obligatorio para usar las plantillas proporcionadas por Google.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputFile" : "gs://YOUR_BUCKET_NAME/input/my_input.txt",
            "output": "gs://YOUR_BUCKET_NAME/output/my_output"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

Crear una tarea de streaming con una plantilla personalizada

En este ejemplo, la solicitud projects.locations.templates.launch crea una tarea de streaming a partir de una plantilla clásica que lee datos de una suscripción de Pub/Sub y los escribe en una tabla de BigQuery. Si quieres iniciar una plantilla Flex, usa projects.locations.flexTemplates.launch en su lugar. La plantilla de ejemplo es una plantilla proporcionada por Google. Puedes modificar la ruta de la plantilla para que apunte a una plantilla personalizada. Se usa la misma lógica para iniciar las plantillas proporcionadas por Google y las personalizadas. En este ejemplo, la tabla de BigQuery ya debe existir con el esquema adecuado. Si la solicitud se completa correctamente, el cuerpo de la respuesta contiene una instancia de LaunchTemplateResponse.

Modifica los siguientes valores:

  • Sustituye YOUR_PROJECT_ID por el ID del proyecto.
  • Sustituye LOCATION por la región de Dataflow que quieras.
  • Sustituye JOB_NAME por el nombre de trabajo que quieras.
  • Sustituye YOUR_BUCKET_NAME por el nombre de tu segmento de Cloud Storage.
  • Sustituye GCS_PATH por la ubicación de Cloud Storage del archivo de plantilla. La ubicación debe empezar por gs://
  • Asigna parameters a tu lista de pares clave-valor. Los parámetros que se indican son específicos de este ejemplo de plantilla. Si usas una plantilla personalizada, modifica los parámetros según sea necesario. Si usas la plantilla de ejemplo, sustituye las siguientes variables.
    • Sustituye YOUR_SUBSCRIPTION_NAME por el nombre de tu suscripción de Pub/Sub.
    • Sustituye YOUR_DATASET por tu conjunto de datos de BigQuery y YOUR_TABLE_NAME por el nombre de tu tabla de BigQuery.
  • Define tempLocation en una ubicación en la que tengas permiso de escritura. Este valor es obligatorio para usar las plantillas proporcionadas por Google.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME",
            "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

Actualizar un trabajo de streaming de plantilla personalizada

En este ejemplo de solicitud projects.locations.templates.launch se muestra cómo actualizar un trabajo de streaming de plantillas. Si quieres actualizar una plantilla flex, usa projects.locations.flexTemplates.launch en su lugar.

  1. Ejecuta el ejemplo 2: Crear una tarea de streaming con una plantilla personalizada para iniciar una tarea de plantilla de streaming.
  2. Envía la siguiente solicitud HTTP POST con los valores modificados:
    • Sustituye YOUR_PROJECT_ID por el ID del proyecto.
    • Sustituye LOCATION por la región de Dataflow de la tarea que vas a actualizar.
    • Sustituye JOB_NAME por el nombre exacto del trabajo que quieras actualizar.
    • Sustituye GCS_PATH por la ubicación de Cloud Storage del archivo de plantilla. La ubicación debe empezar por gs://
    • Asigna parameters a tu lista de pares clave-valor. Los parámetros que se indican son específicos de este ejemplo de plantilla. Si usas una plantilla personalizada, modifica los parámetros según sea necesario. Si usas la plantilla de ejemplo, sustituye las siguientes variables.
      • Sustituye YOUR_SUBSCRIPTION_NAME por el nombre de tu suscripción de Pub/Sub.
      • Sustituye YOUR_DATASET por tu conjunto de datos de BigQuery y YOUR_TABLE_NAME por el nombre de tu tabla de BigQuery.
    • Usa el parámetro environment para cambiar los ajustes del entorno, como el tipo de máquina. En este ejemplo se usa el tipo de máquina n2-highmem-2, que tiene más memoria y CPU por trabajador que el tipo de máquina predeterminado.
        POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH
        {
            "jobName": "JOB_NAME",
            "parameters": {
                "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_TOPIC_NAME",
                "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
            },
            "environment": {
                "machineType": "n2-highmem-2"
            },
            "update": true
        }
    
  3. Accede a la interfaz de monitorización de Dataflow y comprueba que se ha creado una tarea con el mismo nombre. Este trabajo tiene el estado Actualizado.

Usar las bibliotecas de cliente de las APIs de Google

Te recomendamos que utilices las bibliotecas de cliente de las APIs de Google para hacer llamadas a las APIs REST de Dataflow fácilmente. Este script de ejemplo usa la biblioteca de cliente de las APIs de Google para Python.

En este ejemplo, debes definir las siguientes variables:

  • project: se asigna a tu ID de proyecto.
  • job: asigna el nombre único que quieras al trabajo.
  • template: se define en la ubicación de Cloud Storage del archivo de plantilla.
  • parameters: se asigna a un diccionario con los parámetros de la plantilla.

Para definir la región, incluya el parámetro location.

from googleapiclient.discovery import build

# project = 'your-gcp-project'
# job = 'unique-job-name'
# template = 'gs://dataflow-templates/latest/Word_Count'
# parameters = {
#     'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
#     'output': 'gs://<your-gcs-bucket>/wordcount/outputs',
# }

dataflow = build("dataflow", "v1b3")
request = (
    dataflow.projects()
    .templates()
    .launch(
        projectId=project,
        gcsPath=template,
        body={
            "jobName": job,
            "parameters": parameters,
        },
    )
)

response = request.execute()

Para obtener más información sobre las opciones disponibles, consulta el método projects.locations.templates.launch de la referencia de la API REST de Dataflow.

Usar la CLI de gcloud

La CLI de gcloud puede ejecutar una plantilla personalizada o una proporcionada por Google con el comando gcloud dataflow jobs run. En la página de plantillas proporcionadas por Google se documentan ejemplos de cómo ejecutar plantillas proporcionadas por Google.

En los siguientes ejemplos de plantillas personalizadas, defina los valores que se indican a continuación:

  • Sustituye JOB_NAME por el nombre de trabajo que quieras.
  • Sustituye YOUR_BUCKET_NAME por el nombre de tu segmento de Cloud Storage.
  • Define --gcs-location como la ubicación de Cloud Storage del archivo de plantilla.
  • Asigna a --parameters la lista de parámetros separados por comas que se van a transferir a la tarea. No se permiten espacios entre comas y valores.
  • Para evitar que las VMs acepten claves SSH almacenadas en los metadatos del proyecto, usa la marca additional-experiments con la opción de servicio block_project_ssh_keys: --additional-experiments=block_project_ssh_keys.

Crear una tarea por lotes de plantillas personalizadas

En este ejemplo se crea una tarea por lotes a partir de una plantilla que lee un archivo de texto y escribe un archivo de texto de salida.

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters inputFile=gs://YOUR_BUCKET_NAME/input/my_input.txt,output=gs://YOUR_BUCKET_NAME/output/my_output

La solicitud devuelve una respuesta con el siguiente formato.

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_BATCH

Crear una tarea de streaming con una plantilla personalizada

En este ejemplo se crea una tarea de streaming a partir de una plantilla que lee datos de un tema de Pub/Sub y los escribe en una tabla de BigQuery. La tabla de BigQuery ya debe existir con el esquema adecuado.

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters topic=projects/project-identifier/topics/resource-name,table=my_project:my_dataset.my_table_name

La solicitud devuelve una respuesta con el siguiente formato.

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_STREAMING

Para ver una lista completa de las marcas del comando gcloud dataflow jobs run, consulta la referencia de la CLI de gcloud.

Monitorización y solución de problemas

La interfaz de monitorización de Dataflow te permite monitorizar tus tareas de Dataflow. Si un trabajo falla, puedes consultar consejos para solucionar problemas, estrategias de depuración y un catálogo de errores habituales en la guía Solucionar problemas de tu canalización.