Iniciar canalizaciones de Dataflow con Cloud Composer

Cloud Composer 1 | Cloud Composer 2

En esta página, se describe cómo usar el DataflowTemplateOperator para iniciar canalizaciones de Dataflow desde Cloud Composer. La canalización de Cloud Storage Text a BigQuery es una canalización por lotes que te permite subir archivos de texto almacenados en Cloud Storage, transformarlos con una función definida por el usuario (UDF) de JavaScript que proporciones y enviar los resultados a BigQuery.

se subirán una función definida por el usuario, un archivo de entrada y un esquema JSON a un bucket de Cloud Storage. Un DAG que haga referencia a estos archivos iniciará una canalización por lotes de Dataflow que aplicará la función definida por el usuario y el archivo de esquema JSON al archivo de entrada. Después, este contenido se subirá a una tabla de BigQuery

Descripción general

  • Antes de iniciar el flujo de trabajo, crearás las siguientes entidades:

    • Una tabla de BigQuery vacía con un conjunto de datos vacío que contendrá las siguientes columnas de información: location, average_temperature, month y, de forma opcional, inches_of_rain, is_current y latest_measurement.

    • Un archivo JSON que normalizará los datos del archivo .txt en el formato correcto para el esquema de la tabla de BigQuery El objeto JSON tendrá un array de BigQuery Schema, en el que cada objeto contendrá un nombre de columna, un tipo de entrada y si es un campo obligatorio o no.

    • Un archivo .txt de entrada que contendrá los datos que se subirán por lotes a la tabla de BigQuery

    • Una función definida por el usuario escrita en JavaScript que transformará cada línea del archivo .txt en las variables relevantes para nuestra tabla.

    • Un archivo DAG de Airflow que apuntará a la ubicación de estos archivos

  • A continuación, subirás el archivo .txt, el archivo .js de la UDF y el archivo de esquema .json a un bucket de Cloud Storage. También subirás el DAG a tu entorno de Cloud Composer.

  • Después de que se suba el DAG, Airflow ejecutará una tarea desde él. Esta tarea iniciará una canalización de Dataflow que aplicará la función definida por el usuario al archivo .txt y la formateará según el esquema JSON.

  • Por último, los datos se subirán a la tabla de BigQuery que creaste antes.

Antes de comenzar

  • En esta guía, debes tener conocimientos de JavaScript para escribir la función definida por el usuario.
  • En esta guía, se supone que ya tienes un entorno de Cloud Composer. Consulta Crear entorno para crear uno. Con esta guía, puedes usar cualquier versión de Cloud Composer.
  • Habilita las API de Cloud Composer, Dataflow, Cloud Storage, BigQuery.

    Habilita las API

Crear una tabla de BigQuery vacía con una definición de esquema

Crear una tabla de BigQuery con una definición de esquema Usarás esta definición de esquema más adelante en esta guía. Esta tabla de BigQuery contendrá los resultados de la carga por lotes.

Para crear una tabla vacía con una definición de esquema, haz lo siguiente:

Console

  1. En la consola de Google Cloud, ve a la página de BigQuery:

    Ir a BigQuery

  2. En el panel de navegación, en la sección Recursos, expande tu proyecto.

  3. En el panel de detalles, haz clic en Crear conjunto de datos.

    haz clic en el botón que dice Crear un conjunto de datos

  4. En la página Crear conjunto de datos, en la sección ID del conjunto de datos, asigna el nombre average_weather al conjunto de datos. Deja el resto de los campos con su estado predeterminado.

    Completa el ID del conjunto de datos con el nombre average_weather.

  5. Haz clic en Crear conjunto de datos.

  6. Regresa al panel de navegación y, en la sección Recursos, expande tu proyecto. Luego, haz clic en el conjunto de datos average_weather.

  7. En el panel de detalles, haz clic en Crear tabla (Create table).

    haz clic en Crear tabla

  8. En la página Crear tabla, en la sección Origen, selecciona Tabla vacía.

  9. En la página Crear tabla, en la sección Destino (Destination), haz lo siguiente:

    • En Nombre del conjunto de datos, selecciona el conjunto de datos average_weather.

      Selecciona la opción Conjunto de datos para el conjunto de datos average_weather.

    • En el campo Nombre de la tabla, ingresa el nombre average_weather.

    • Verifica que Tipo de tabla esté establecido en Tabla nativa.

  10. En la sección Esquema, escribe la definición del esquema. Puedes usar uno de los siguientes enfoques:

    • Para ingresar la información del esquema de forma manual, habilita Editar como texto y, luego, ingresa el esquema de la tabla como un arreglo JSON. Escribe en los siguientes campos:

      [
          {
              "name": "location",
              "type": "GEOGRAPHY",
              "mode": "REQUIRED"
          },
          {
              "name": "average_temperature",
              "type": "INTEGER",
              "mode": "REQUIRED"
          },
          {
              "name": "month",
              "type": "STRING",
              "mode": "REQUIRED"
          },
          {
              "name": "inches_of_rain",
              "type": "NUMERIC"
          },
          {
              "name": "is_current",
              "type": "BOOLEAN"
          },
          {
              "name": "latest_measurement",
              "type": "DATE"
          }
      ]
      
    • Usa Agregar campo para ingresar el esquema de forma manual:

      haz clic en Agregar campo para ingresar los campos

  11. En Configuración de partición y agrupamiento en clústeres, deja el valor predeterminado, No partitioning.

  12. En la sección Opciones avanzadas, en Encriptación, deja el valor predeterminado, Google-managed key.

  13. Haz clic en Crear tabla.

bq

Usa el comando bq mk para crear un conjunto de datos vacío y una tabla en este conjunto de datos.

Ejecuta el siguiente comando para crear un conjunto de datos del clima global promedio:

bq --location=LOCATION mk \
    --dataset PROJECT_ID:average_weather

Reemplaza lo siguiente:

  • LOCATION: Es la región en la que se encuentra el entorno.
  • PROJECT_ID: El ID del proyecto

Ejecuta el siguiente comando para crear una tabla vacía en este conjunto de datos con la definición de esquema:

bq mk --table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE

Una vez que se crea la tabla, puedes actualizar el vencimiento, la descripción y las etiquetas. También puedes modificar la definición de esquema.

Python

Guarda este código como dataflowtemplateoperator_create_dataset_and_table_helper.py y actualiza las variables en él para que reflejen el proyecto y la ubicación. Luego, ejecútalo con el siguiente comando:

python dataflowtemplateoperator_create_dataset_and_table_helper.py

Python

Para autenticarte en Cloud Composer, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.


# Make sure to follow the quickstart setup instructions beforehand.
# See instructions here:
# https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries

# Before running the sample, be sure to install the bigquery library
# in your local environment by running pip install google.cloud.bigquery

from google.cloud import bigquery

# TODO(developer): Replace with your values
project = "your-project"  # Your GCP Project
location = "US"  # the location where you want your BigQuery data to reside. For more info on possible locations see https://cloud.google.com/bigquery/docs/locations
dataset_name = "average_weather"

def create_dataset_and_table(project, location, dataset_name):
    # Construct a BigQuery client object.
    client = bigquery.Client(project)

    dataset_id = f"{project}.{dataset_name}"

    # Construct a full Dataset object to send to the API.
    dataset = bigquery.Dataset(dataset_id)

    # Set the location to your desired location for the dataset.
    # For more information, see this link:
    # https://cloud.google.com/bigquery/docs/locations
    dataset.location = location

    # Send the dataset to the API for creation.
    # Raises google.api_core.exceptions.Conflict if the Dataset already
    # exists within the project.
    dataset = client.create_dataset(dataset)  # Make an API request.

    print(f"Created dataset {client.project}.{dataset.dataset_id}")

    # Create a table from this dataset.

    table_id = f"{client.project}.{dataset_name}.average_weather"

    schema = [
        bigquery.SchemaField("location", "GEOGRAPHY", mode="REQUIRED"),
        bigquery.SchemaField("average_temperature", "INTEGER", mode="REQUIRED"),
        bigquery.SchemaField("month", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("inches_of_rain", "NUMERIC", mode="NULLABLE"),
        bigquery.SchemaField("is_current", "BOOLEAN", mode="NULLABLE"),
        bigquery.SchemaField("latest_measurement", "DATE", mode="NULLABLE"),
    ]

    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)  # Make an API request.
    print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")

Crea un bucket de Cloud Storage

Crear un bucket para almacenar todos los archivos necesarios para el flujo de trabajo El DAG que crees más adelante en esta guía hará referencia a los archivos que subas a este bucket de almacenamiento. Para crear un bucket de almacenamiento nuevo, sigue estos pasos:

Console

  1. Abre Cloud Storage en la consola de Google Cloud.

    Ir a Cloud Storage

  2. Haz clic en Crear bucket para abrir el formulario de creación de depósitos.

    1. Ingresa la información de tu bucket y haz clic en Continuar para completar cada paso:

      • Especifica un Nombre único a nivel global para tu bucket. En esta guía, se usa bucketName como ejemplo.

      • Selecciona Región para el tipo de ubicación. Luego, selecciona una ubicación donde se almacenarán los datos del bucket.

      • Selecciona Estándar como la clase de almacenamiento predeterminada para tus datos.

      • Selecciona el control de acceso Uniforme para acceder a los objetos.

    2. Haz clic en Listo.

gsutil

Usa el comando gsutil mb:

gsutil mb gs://bucketName/

Reemplaza lo siguiente:

  • bucketName: Es el nombre del bucket que creaste antes en esta guía.

Muestras de código

C#

Para autenticarte en Cloud Composer, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.


using Google.Apis.Storage.v1.Data;
using Google.Cloud.Storage.V1;
using System;

public class CreateBucketSample
{
    public Bucket CreateBucket(
        string projectId = "your-project-id",
        string bucketName = "your-unique-bucket-name")
    {
        var storage = StorageClient.Create();
        var bucket = storage.CreateBucket(projectId, bucketName);
        Console.WriteLine($"Created {bucketName}.");
        return bucket;
    }
}

Go

Para autenticarte en Cloud Composer, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
)

// createBucket creates a new bucket in the project.
func createBucket(w io.Writer, projectID, bucketName string) error {
	// projectID := "my-project-id"
	// bucketName := "bucket-name"
	ctx := context.Background()
	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("storage.NewClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()

	bucket := client.Bucket(bucketName)
	if err := bucket.Create(ctx, projectID, nil); err != nil {
		return fmt.Errorf("Bucket(%q).Create: %w", bucketName, err)
	}
	fmt.Fprintf(w, "Bucket %v created\n", bucketName)
	return nil
}

Java

Para autenticarte en Cloud Composer, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class CreateBucket {
  public static void createBucket(String projectId, String bucketName) {
    // The ID of your GCP project
    // String projectId = "your-project-id";

    // The ID to give your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    Storage storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService();

    Bucket bucket = storage.create(BucketInfo.newBuilder(bucketName).build());

    System.out.println("Created bucket " + bucket.getName());
  }
}

Python

Para autenticarte en Cloud Composer, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

from google.cloud import storage

def create_bucket(bucket_name):
    """Creates a new bucket."""
    # bucket_name = "your-new-bucket-name"

    storage_client = storage.Client()

    bucket = storage_client.create_bucket(bucket_name)

    print(f"Bucket {bucket.name} created")

Ruby

Para autenticarte en Cloud Composer, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

def create_bucket bucket_name:
  # The ID to give your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.create_bucket bucket_name

  puts "Created bucket: #{bucket.name}"
end

Crea un esquema de BigQuery con formato JSON para la tabla de salida

Crea un archivo de esquema de BigQuery con formato JSON que coincida con la tabla de salida que creaste antes. Ten en cuenta que los nombres, tipos y modos de los campos deben coincidir con los definidos anteriormente en el esquema de tabla de BigQuery. Este archivo normalizará los datos de tu archivo .txt en un formato compatible con tu esquema de BigQuery. Asígnale el nombre jsonSchema.json a este archivo.

{
    "BigQuery Schema": [
    {
        "name": "location",
        "type": "GEOGRAPHY",
        "mode": "REQUIRED"
    },
    {
        "name": "average_temperature",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },
    {
        "name": "month",
        "type": "STRING",
        "mode": "REQUIRED"
    },
    {
        "name": "inches_of_rain",
        "type": "NUMERIC"
    },
    {
        "name": "is_current",
        "type": "BOOLEAN"
    },
    {
        "name": "latest_measurement",
        "type": "DATE"
    }]
}

Crea un archivo JavaScript para dar formato a tus datos

En este archivo, definirás tu UDF (función definida por el usuario) que proporciona la lógica para transformar las líneas de texto en el archivo de entrada. Ten en cuenta que esta función toma cada línea de texto de tu archivo de entrada como su propio argumento, por lo que se ejecutará una vez por cada línea del archivo de entrada. Asígnale el nombre transformCSVtoJSON.js a este archivo.


function transformCSVtoJSON(line) {
  var values = line.split(',');
  var properties = [
    'location',
    'average_temperature',
    'month',
    'inches_of_rain',
    'is_current',
    'latest_measurement',
  ];
  var weatherInCity = {};

  for (var count = 0; count < values.length; count++) {
    if (values[count] !== 'null') {
      weatherInCity[properties[count]] = values[count];
    }
  }

  return JSON.stringify(weatherInCity);
}

Crea tu archivo de entrada

Este archivo contendrá la información que deseas subir a tu tabla de BigQuery. Copia este archivo de forma local y asígnale el nombre inputFile.txt.

POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null

Sube los archivos al bucket

Sube los siguientes archivos al bucket de Cloud Storage que creaste antes:

  • Esquema de BigQuery con formato JSON (.json)
  • Función definida por el usuario de JavaScript (transformCSVtoJSON.js)
  • El archivo de entrada del texto que deseas procesar (.txt)

Console

  1. En la consola de Google Cloud, ve a la página Buckets de Cloud Storage.

    Ir a Buckets

  2. En la lista de buckets, haz clic en tu bucket.

  3. En la pestaña Objetos del bucket, realiza una de las siguientes acciones:

    • Arrastra y suelta los archivos deseados desde tu escritorio o administrador de archivos en el panel principal de la consola de Google Cloud.

    • Haz clic en el botón Subir archivos, selecciona los archivos que deseas subir en el cuadro de diálogo que aparece y haz clic en Abrir.

gsutil

Ejecuta el comando gsutil cp:

gsutil cp OBJECT_LOCATION gs://bucketName

Reemplaza lo siguiente:

  • bucketName: Es el nombre del bucket que creaste antes en esta guía.
  • OBJECT_LOCATION: Es la ruta de acceso local a tu objeto. Por ejemplo, Desktop/transformCSVtoJSON.js

Muestras de código

Python

Para autenticarte en Cloud Composer, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

from google.cloud import storage

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"
    # The path to your file to upload
    # source_file_name = "local/path/to/file"
    # The ID of your GCS object
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Optional: set a generation-match precondition to avoid potential race conditions
    # and data corruptions. The request to upload is aborted if the object's
    # generation number does not match your precondition. For a destination
    # object that does not yet exist, set the if_generation_match precondition to 0.
    # If the destination object already exists in your bucket, set instead a
    # generation-match precondition using its generation number.
    generation_match_precondition = 0

    blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition)

    print(
        f"File {source_file_name} uploaded to {destination_blob_name}."
    )

Ruby

Para autenticarte en Cloud Composer, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

def upload_file bucket_name:, local_file_path:, file_name: nil
  # The ID of your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  # The path to your file to upload
  # local_file_path = "/local/path/to/file.txt"

  # The ID of your GCS object
  # file_name = "your-file-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.bucket bucket_name, skip_lookup: true

  file = bucket.create_file local_file_path, file_name

  puts "Uploaded #{local_file_path} as #{file.name} in bucket #{bucket_name}"
end

Configura DataflowTemplateOperator

Antes de ejecutar el DAG, configura las siguientes variables de Airflow.

Variable de Airflow Valor
project_id El ID del proyecto
gce_zone Zona de Compute Engine en la que se debe crear el clúster de Dataflow
bucket_path La ubicación del bucket de Cloud Storage que creaste antes

Ahora harás referencia a los archivos que creaste antes para crear un DAG que inicie el flujo de trabajo de Dataflow. Copia este DAG y guárdalo de forma local como composer-dataflow-dag.py.

Airflow 2



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
For more info on zones where Dataflow is available see:
https://cloud.google.com/dataflow/docs/resources/locations
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataflow import (
    DataflowTemplatedJobStartOperator,
)
from airflow.utils.dates import days_ago

bucket_path = "{{var.value.bucket_path}}"
project_id = "{{var.value.project_id}}"
gce_zone = "{{var.value.gce_zone}}"

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "tempLocation": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    start_template_job = DataflowTemplatedJobStartOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

Airflow 1



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
  created.
Learn more about the difference between the two here:
https://cloud.google.com/compute/docs/regions-zones
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.utils.dates import days_ago

bucket_path = "{{var.value.bucket_path}}"
project_id = "{{var.value.project_id}}"
gce_zone = "{{var.value.gce_zone}}"

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "tempLocation": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    start_template_job = DataflowTemplateOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

Sube el DAG a Cloud Storage

Sube tu DAG a la carpeta /dags en el bucket de tu entorno. Una vez que se haya completado la carga de forma correcta, podrás verla haciendo clic en el vínculo Carpeta de DAG en la página Entornos de Cloud Composer.

La carpeta DAGs de tu entorno contiene tu DAG

Visualiza el estado de la tarea

  1. Ve a la interfaz web de Airflow.
  2. En la página de DAG, haz clic en su nombre (como composerDataflowDAG).
  3. En la página de detalles de los DAG, haz clic en Graph View.
  4. Verifica el estado:

    • Failed: La tarea tiene un cuadro rojo alrededor. También puedes mantener el puntero sobre la tarea y buscar State: Failed.

    • Success: La tarea tiene un cuadro verde alrededor. También puedes mantener el puntero sobre la tarea y verificar si aparece Estado: finalizado.

Después de unos minutos, puedes verificar los resultados en Dataflow y BigQuery.

Visualiza tu trabajo en Dataflow

  1. En la consola de Google Cloud, ve a la página Dataflow.

    Ir a Dataflow

  2. Tu trabajo se llama dataflow_operator_transform_csv_to_bq con un ID único adjunto al final del nombre con un guion, de la siguiente manera:

    El trabajo de Dataflow tiene un ID único.

  3. Haz clic en el nombre para ver los detalles del trabajo.

    ver todos los detalles del trabajo

Visualiza tus resultados en BigQuery

  1. En la consola de Google Cloud, ve a la página de BigQuery.

    Ir a BigQuery

  2. Puedes enviar consultas con SQL estándar. Usa la siguiente consulta para ver las filas que se agregaron a su tabla:

    SELECT * FROM projectId.average_weather.average_weather