Iniciar flujos de procesamiento de Dataflow con Cloud Composer

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta página se describe cómo usar el DataflowTemplateOperator para lanzar flujos de procesamiento de Dataflow desde Cloud Composer. El flujo de procesamiento de texto de Cloud Storage a BigQuery es un flujo de procesamiento por lotes que te permite subir archivos de texto almacenados en Cloud Storage, transformarlos mediante una función de JavaScript definida por el usuario (UDF) que proporciones y, por último, mostrar los resultados en BigQuery.

Se subirán a un segmento de Cloud Storage una función definida por el usuario, un archivo de entrada y un esquema JSON. Un DAG que haga referencia a estos archivos iniciará un flujo de procesamiento por lotes de Dataflow que aplicará la función definida por el usuario y el archivo de esquema JSON al archivo de entrada. Después, este contenido se subirá a una tabla de BigQuery.

Información general

  • Antes de iniciar el flujo de trabajo, crearás las siguientes entidades:

    • Una tabla de BigQuery vacía de un conjunto de datos vacío que contendrá las siguientes columnas de información: location, average_temperature, month y, opcionalmente, inches_of_rain, is_current y latest_measurement.

    • Un archivo JSON que normalizará los datos del archivo .txt al formato correcto para el esquema de la tabla de BigQuery. El objeto JSON tendrá una matriz de BigQuery Schema, donde cada objeto contendrá un nombre de columna, un tipo de entrada y si es un campo obligatorio o no.

    • Un archivo .txt de entrada que contendrá los datos que se subirán en lote a la tabla de BigQuery.

    • Una función definida por el usuario escrita en JavaScript que transformará cada línea del archivo .txt en las variables relevantes para nuestra tabla.

    • Un archivo DAG de Airflow que apuntará a la ubicación de estos archivos.

  • A continuación, sube el archivo .txt, el archivo .js UDF y el archivo de esquema .json a un segmento de Cloud Storage. También subirás el DAG a tu entorno de Cloud Composer.

  • Una vez que se haya subido el DAG, Airflow ejecutará una tarea a partir de él. Esta tarea iniciará una canalización de Dataflow que aplicará la función definida por el usuario al archivo .txt y le dará formato según el esquema JSON.

  • Por último, los datos se subirán a la tabla de BigQuery que has creado anteriormente.

Antes de empezar

  • Para escribir la función definida por el usuario, debes tener conocimientos de JavaScript.
  • En esta guía se da por hecho que ya tienes un entorno de Cloud Composer. Consulta Crear un entorno para crear uno. Puedes usar cualquier versión de Cloud Composer con esta guía.
  • Enable the Cloud Composer, Dataflow, Cloud Storage, BigQuery APIs.

    Enable the APIs

  • Asegúrate de que tienes los siguientes permisos:

    • Roles de Cloud Composer: crea un entorno (si no tienes ninguno), gestiona objetos en el segmento del entorno, ejecuta DAGs y accede a la interfaz de usuario de Airflow.
    • Roles de Cloud Storage: crea un segmento y gestiona los objetos que contiene.
    • Roles de BigQuery: crear un conjunto de datos y una tabla, modificar los datos de la tabla, modificar el esquema y los metadatos de la tabla.
    • Roles de Dataflow: consulta las tareas de Dataflow.
  • Asegúrate de que la cuenta de servicio de tu entorno tenga permisos para crear trabajos de Dataflow, acceder al segmento de Cloud Storage y leer y actualizar datos de la tabla en BigQuery.

Crear una tabla de BigQuery vacía con una definición de esquema

Crea una tabla de BigQuery con una definición de esquema. Usarás esta definición de esquema más adelante en esta guía. Esta tabla de BigQuery contendrá los resultados de la subida por lotes.

Para crear una tabla vacía con una definición de esquema, sigue estos pasos:

Consola

  1. En la Google Cloud consola, ve a la página BigQuery:

    Ir a BigQuery

  2. En el panel de navegación, en la sección Recursos, despliega tu proyecto.

  3. En el panel de detalles, haz clic en Crear conjunto de datos.

    Haz clic en el botón para crear un conjunto de datos.

  4. En la página Crear conjunto de datos, en la sección ID del conjunto de datos, asigna un nombre al conjunto de datos average_weather. Deja el resto de los campos con su valor predeterminado.

    Rellena el ID del conjunto de datos con el nombre average_weather

  5. Haz clic en Crear conjunto de datos.

  6. Vuelve al panel de navegación y, en la sección Recursos, despliega tu proyecto. A continuación, haga clic en el conjunto de datos average_weather.

  7. En el panel de detalles, haz clic en Crear tabla.

    Haz clic en Crear tabla.

  8. En la página Crear tabla, ve a la sección Fuente y selecciona Tabla vacía.

  9. En la página Crear tabla, ve a la sección Destino:

    • En Nombre del conjunto de datos, elige el conjunto de datos average_weather.

      Elige la opción Conjunto de datos para el conjunto de datos average_weather.

    • En el campo Nombre de la tabla, introduce el nombre average_weather.

    • Verifica que el Tipo de tabla sea Tabla nativa.

  10. En la sección Schema (Esquema), introduce la definición de schema. Puedes usar uno de los siguientes métodos:

    • Introduce la información del esquema manualmente. Para ello, habilita la opción Editar como texto e introduce el esquema de la tabla como una matriz JSON. Escribe lo siguiente en los campos:

      [
          {
              "name": "location",
              "type": "GEOGRAPHY",
              "mode": "REQUIRED"
          },
          {
              "name": "average_temperature",
              "type": "INTEGER",
              "mode": "REQUIRED"
          },
          {
              "name": "month",
              "type": "STRING",
              "mode": "REQUIRED"
          },
          {
              "name": "inches_of_rain",
              "type": "NUMERIC"
          },
          {
              "name": "is_current",
              "type": "BOOLEAN"
          },
          {
              "name": "latest_measurement",
              "type": "DATE"
          }
      ]
      
    • Usa Añadir campo para introducir el esquema manualmente:

      Haz clic en Añadir campo para introducir los campos.

  11. En Configuración de partición y clústeres, deja el valor predeterminado, No partitioning.

  12. En la sección Opciones avanzadas, en Cifrado, deje el valor predeterminado, Google-owned and managed key.

  13. Haz clic en Crear tabla.

bq

Usa el comando bq mk para crear un conjunto de datos vacío y una tabla en este conjunto de datos.

Ejecuta el siguiente comando para crear un conjunto de datos con la media del tiempo global:

bq --location=LOCATION mk \
    --dataset PROJECT_ID:average_weather

Haz los cambios siguientes:

  • LOCATION: la región en la que se encuentra el entorno.
  • PROJECT_ID: el ID de proyecto.

Ejecuta el siguiente comando para crear una tabla vacía en este conjunto de datos con la definición de esquema:

bq mk --table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE

Una vez creada la tabla, puede actualizar su fecha de vencimiento, su descripción y sus etiquetas. También puedes modificar la definición del esquema.

Python

Guarda este código como dataflowtemplateoperator_create_dataset_and_table_helper.py y actualiza las variables para que reflejen tu proyecto y tu ubicación. A continuación, ejecuta el código con el siguiente comando:

python dataflowtemplateoperator_create_dataset_and_table_helper.py

Python

Para autenticarte en Cloud Composer, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.


# Make sure to follow the quickstart setup instructions beforehand.
# See instructions here:
# https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries

# Before running the sample, be sure to install the bigquery library
# in your local environment by running pip install google.cloud.bigquery

from google.cloud import bigquery

# TODO(developer): Replace with your values
project = "your-project"  # Your GCP Project
location = "US"  # the location where you want your BigQuery data to reside. For more info on possible locations see https://cloud.google.com/bigquery/docs/locations
dataset_name = "average_weather"


def create_dataset_and_table(project, location, dataset_name):
    # Construct a BigQuery client object.
    client = bigquery.Client(project)

    dataset_id = f"{project}.{dataset_name}"

    # Construct a full Dataset object to send to the API.
    dataset = bigquery.Dataset(dataset_id)

    # Set the location to your desired location for the dataset.
    # For more information, see this link:
    # https://cloud.google.com/bigquery/docs/locations
    dataset.location = location

    # Send the dataset to the API for creation.
    # Raises google.api_core.exceptions.Conflict if the Dataset already
    # exists within the project.
    dataset = client.create_dataset(dataset)  # Make an API request.

    print(f"Created dataset {client.project}.{dataset.dataset_id}")

    # Create a table from this dataset.

    table_id = f"{client.project}.{dataset_name}.average_weather"

    schema = [
        bigquery.SchemaField("location", "GEOGRAPHY", mode="REQUIRED"),
        bigquery.SchemaField("average_temperature", "INTEGER", mode="REQUIRED"),
        bigquery.SchemaField("month", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("inches_of_rain", "NUMERIC", mode="NULLABLE"),
        bigquery.SchemaField("is_current", "BOOLEAN", mode="NULLABLE"),
        bigquery.SchemaField("latest_measurement", "DATE", mode="NULLABLE"),
    ]

    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)  # Make an API request.
    print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")

Crea un segmento de Cloud Storage

Crea un segmento para almacenar todos los archivos necesarios para el flujo de trabajo. El DAG que crees más adelante en esta guía hará referencia a los archivos que subas a este segmento de almacenamiento. Para crear un nuevo segmento de almacenamiento, sigue estos pasos:

Consola

  1. Abre Cloud Storage en la Google Cloud consola.

    Ir a Cloud Storage

  2. Haz clic en Crear segmento para abrir el formulario de creación de segmentos.

    1. Introduce la información del contenedor y haz clic en Continuar para completar cada paso:

      • Especifique un nombre único a nivel global para el segmento. En esta guía se usa bucketName como ejemplo.

      • Seleccione Región como tipo de ubicación. A continuación, selecciona una ubicación donde se almacenarán los datos del segmento.

      • Selecciona Estándar como clase de almacenamiento predeterminada para tus datos.

      • Selecciona el control de acceso Uniforme para acceder a tus objetos.

    2. Haz clic en Listo.

gcloud

Usa el comando gcloud storage buckets create:

gcloud storage buckets create gs://bucketName/

Haz los cambios siguientes:

  • bucketName: el nombre del segmento que has creado anteriormente en esta guía.

Códigos de ejemplo

C#

Para autenticarte en Cloud Composer, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.


using Google.Apis.Storage.v1.Data;
using Google.Cloud.Storage.V1;
using System;

public class CreateBucketSample
{
    public Bucket CreateBucket(
        string projectId = "your-project-id",
        string bucketName = "your-unique-bucket-name")
    {
        var storage = StorageClient.Create();
        var bucket = storage.CreateBucket(projectId, bucketName);
        Console.WriteLine($"Created {bucketName}.");
        return bucket;
    }
}

Go

Para autenticarte en Cloud Composer, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
)

// createBucket creates a new bucket in the project.
func createBucket(w io.Writer, projectID, bucketName string) error {
	// projectID := "my-project-id"
	// bucketName := "bucket-name"
	ctx := context.Background()
	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("storage.NewClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()

	bucket := client.Bucket(bucketName)
	if err := bucket.Create(ctx, projectID, nil); err != nil {
		return fmt.Errorf("Bucket(%q).Create: %w", bucketName, err)
	}
	fmt.Fprintf(w, "Bucket %v created\n", bucketName)
	return nil
}

Java

Para autenticarte en Cloud Composer, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class CreateBucket {
  public static void createBucket(String projectId, String bucketName) {
    // The ID of your GCP project
    // String projectId = "your-project-id";

    // The ID to give your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    Storage storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService();

    Bucket bucket = storage.create(BucketInfo.newBuilder(bucketName).build());

    System.out.println("Created bucket " + bucket.getName());
  }
}

Python

Para autenticarte en Cloud Composer, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

from google.cloud import storage


def create_bucket(bucket_name):
    """Creates a new bucket."""
    # bucket_name = "your-new-bucket-name"

    storage_client = storage.Client()

    bucket = storage_client.create_bucket(bucket_name)

    print(f"Bucket {bucket.name} created")

Ruby

Para autenticarte en Cloud Composer, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

def create_bucket bucket_name:
  # The ID to give your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.create_bucket bucket_name

  puts "Created bucket: #{bucket.name}"
end

Crear un esquema de BigQuery en formato JSON para la tabla de salida

Crea un archivo de esquema de BigQuery con formato JSON que coincida con la tabla de salida que has creado anteriormente. Ten en cuenta que los nombres, los tipos y los modos de los campos deben coincidir con los definidos anteriormente en el esquema de tu tabla de BigQuery. Este archivo normalizará los datos de tu archivo .txt en un formato compatible con tu esquema de BigQuery. Dale el nombre jsonSchema.json a este archivo.

{
    "BigQuery Schema": [
    {
        "name": "location",
        "type": "GEOGRAPHY",
        "mode": "REQUIRED"
    },
    {
        "name": "average_temperature",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },
    {
        "name": "month",
        "type": "STRING",
        "mode": "REQUIRED"
    },
    {
        "name": "inches_of_rain",
        "type": "NUMERIC"
    },
    {
        "name": "is_current",
        "type": "BOOLEAN"
    },
    {
        "name": "latest_measurement",
        "type": "DATE"
    }]
}

Crear un archivo JavaScript para dar formato a los datos

En este archivo, definirás la función definida por el usuario (UDF) que ofrece la lógica necesaria para transformar las líneas de texto del archivo de entrada. Ten en cuenta que esta función toma cada línea de texto de tu archivo de entrada como un argumento independiente, por lo que la función se ejecutará una vez por cada línea del archivo de entrada. Dale el nombre transformCSVtoJSON.js a este archivo.


function transformCSVtoJSON(line) {
  var values = line.split(',');
  var properties = [
    'location',
    'average_temperature',
    'month',
    'inches_of_rain',
    'is_current',
    'latest_measurement',
  ];
  var weatherInCity = {};

  for (var count = 0; count < values.length; count++) {
    if (values[count] !== 'null') {
      weatherInCity[properties[count]] = values[count];
    }
  }

  return JSON.stringify(weatherInCity);
}

Crear un archivo de entrada

Este archivo contendrá la información que quieras subir a tu tabla de BigQuery. Copia este archivo localmente y llámalo inputFile.txt.

POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null

Subir archivos a un segmento

Sube los siguientes archivos al segmento de Cloud Storage que has creado anteriormente:

  • Esquema de BigQuery en formato JSON (.json)
  • Función definida por el usuario de JavaScript (transformCSVtoJSON.js)
  • El archivo de entrada del texto que quieres procesar (.txt)

Consola

  1. En la Google Cloud consola, ve a la página Segmentos de Cloud Storage.

    Ir a Contenedores

  2. En la lista de contenedores, haga clic en el suyo.

  3. En la pestaña Objetos del cubo, haga una de las siguientes acciones:

    • Arrastra los archivos que quieras desde el escritorio o el gestor de archivos hasta el panel principal de la Google Cloud consola.

    • Haz clic en el botón Subir archivos, selecciona los archivos que quieras subir en el cuadro de diálogo que aparece y haz clic en Abrir.

gcloud

Ejecuta el comando gcloud storage cp:

gcloud storage cp OBJECT_LOCATION gs://bucketName

Haz los cambios siguientes:

  • bucketName: el nombre del segmento que has creado anteriormente en esta guía.
  • OBJECT_LOCATION: la ruta local a tu objeto. Por ejemplo, Desktop/transformCSVtoJSON.js.

Códigos de ejemplo

Python

Para autenticarte en Cloud Composer, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

from google.cloud import storage


def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"
    # The path to your file to upload
    # source_file_name = "local/path/to/file"
    # The ID of your GCS object
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Optional: set a generation-match precondition to avoid potential race conditions
    # and data corruptions. The request to upload is aborted if the object's
    # generation number does not match your precondition. For a destination
    # object that does not yet exist, set the if_generation_match precondition to 0.
    # If the destination object already exists in your bucket, set instead a
    # generation-match precondition using its generation number.
    generation_match_precondition = 0

    blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition)

    print(
        f"File {source_file_name} uploaded to {destination_blob_name}."
    )

Ruby

Para autenticarte en Cloud Composer, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

def upload_file bucket_name:, local_file_path:, file_name: nil
  # The ID of your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  # The path to your file to upload
  # local_file_path = "/local/path/to/file.txt"

  # The ID of your GCS object
  # file_name = "your-file-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.bucket bucket_name, skip_lookup: true

  file = bucket.create_file local_file_path, file_name

  puts "Uploaded #{local_file_path} as #{file.name} in bucket #{bucket_name}"
end

Configurar DataflowTemplateOperator

Antes de ejecutar el DAG, define las siguientes variables de Airflow.

Variable de Airflow Valor
project_id El ID del proyecto. Ejemplo: example-project
gce_zone Zona de Compute Engine en la que se debe crear el clúster de Dataflow. Ejemplo: us-central1-a Para obtener más información sobre las zonas válidas, consulta Regiones y zonas.
bucket_path Ubicación del segmento de Cloud Storage que has creado anteriormente. Ejemplo: gs://example-bucket.

Ahora harás referencia a los archivos que has creado antes para crear un DAG que inicie el flujo de trabajo de Dataflow. Copia este DAG y guárdalo localmente como composer-dataflow-dag.py.

Airflow 2



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
For more info on zones where Dataflow is available see:
https://cloud.google.com/dataflow/docs/resources/locations
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataflow import (
    DataflowTemplatedJobStartOperator,
)
from airflow.utils.dates import days_ago

bucket_path = "{{var.value.bucket_path}}"
project_id = "{{var.value.project_id}}"
gce_zone = "{{var.value.gce_zone}}"


default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "tempLocation": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    start_template_job = DataflowTemplatedJobStartOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

Airflow 1



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
  created.
Learn more about the difference between the two here:
https://cloud.google.com/compute/docs/regions-zones
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.utils.dates import days_ago

bucket_path = "{{var.value.bucket_path}}"
project_id = "{{var.value.project_id}}"
gce_zone = "{{var.value.gce_zone}}"


default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "tempLocation": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    start_template_job = DataflowTemplateOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

Sube el DAG a Cloud Storage

Sube tu DAG a la carpeta /dags del bucket de tu entorno. Una vez que se haya completado la subida correctamente, podrá verlo haciendo clic en el enlace Carpeta DAGs de la página Entornos de Cloud Composer.

La carpeta DAGs de tu entorno contiene tu DAG

Ver el estado de la tarea

  1. Ve a la interfaz web de Airflow.
  2. En la página de los DAGs, haz clic en el nombre del DAG (por ejemplo, composerDataflowDAG).
  3. En la página Detalles de los DAGs, haz clic en Vista de gráfico.
  4. Comprobar el estado:

    • Failed: La tarea tiene un recuadro rojo. También puedes mantener el puntero sobre la tarea y buscar Estado: Fallido.

    • Success: La tarea tiene un recuadro verde alrededor. También puedes mantener el puntero sobre la tarea y comprobar si aparece el mensaje Estado: Éxito.

Después de unos minutos, puedes consultar los resultados en Dataflow y BigQuery.

Ver tu trabajo en Dataflow

  1. En la Google Cloud consola, ve a la página Dataflow.

    Ir a Dataflow

  2. Tu tarea se llama dataflow_operator_transform_csv_to_bq y tiene un ID único que se añade al final del nombre con un guion, de esta forma:

    La tarea de Dataflow tiene un ID único.

  3. Haz clic en el nombre para ver los detalles del trabajo.

    ver todos los detalles del empleo

Ver los resultados en BigQuery

  1. En la Google Cloud consola, ve a la página BigQuery.

    Ir a BigQuery

  2. Puedes enviar consultas mediante SQL estándar. Usa la siguiente consulta para ver las filas que se han añadido a tu tabla:

    SELECT * FROM projectId.average_weather.average_weather
    

Siguientes pasos