Cómo iniciar canalizaciones de Dataflow con Cloud Composer

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

En esta página, se describe cómo usar DataflowTemplateOperator para iniciar canalizaciones de Dataflow desde Cloud Composer. La canalización de texto de Cloud Storage a BigQuery es una canalización por lotes que te permite subir archivos de texto almacenados en Cloud Storage, transformarlos con una función definida por el usuario (UDF) de JavaScript que proporciones y enviar los resultados a BigQuery.

se subirá una función definida por el usuario, un archivo de entrada y un esquema JSON.
  a un bucket de Cloud Storage. Un DAG que haga referencia a estos archivos iniciará una canalización por lotes de Dataflow, que aplicará la función definida por el usuario y el archivo de esquema JSON al archivo de entrada. Después, este contenido se subirá a una tabla de BigQuery.

Descripción general

  • Antes de iniciar el flujo de trabajo, crearás las siguientes entidades:

    • Una tabla de BigQuery vacía con un conjunto de datos vacío que contendrá las siguientes columnas de información: location, average_temperature, month y, de forma opcional, inches_of_rain, is_current y latest_measurement.

    • Un archivo JSON que normalizará los datos del archivo .txt al formato correcto para el esquema de la tabla de BigQuery. El objeto JSON tendrá un array de BigQuery Schema, en el que cada objeto contendrá un nombre de columna, un tipo de entrada y si Es un campo obligatorio.

    • Un archivo .txt de entrada que contendrá los datos que se subirán por lotes a la tabla de BigQuery.

    • Una función definida por el usuario escrita en JavaScript que transformará cada línea del archivo .txt en las variables relevantes para nuestra tabla.

    • Un archivo DAG de Airflow que apuntará a la ubicación de estos archivos.

  • A continuación, subirás el archivo .txt, el archivo de la UDF .js y el esquema .json. en un bucket de Cloud Storage. También subirás el DAG tu entorno de Cloud Composer.

  • Después de que se suba el DAG, Airflow ejecutará una tarea desde él. Esta tarea iniciará una canalización de Dataflow que aplicará la función definida por el usuario al archivo .txt y la formateará según el esquema JSON.

  • Por último, los datos se subirán a la tabla de BigQuery que creaste anteriormente.

Antes de comenzar

  • Esta guía requiere familiaridad con JavaScript para escribir y la función definida por el usuario.
  • En esta guía, se da por sentado que ya tienes Cloud Composer en un entorno de nube. Consulta Crea un entorno para crear uno. Puedes usar cualquier versión de Cloud Composer con esta guía.
  • Enable the Cloud Composer, Dataflow, Cloud Storage, BigQuery APIs.

    Enable the APIs

Crea una tabla de BigQuery vacía con una definición de esquema

Crear una tabla de BigQuery con una definición de esquema Tú usaremos esta definición de esquema más adelante en esta guía. En esta tabla de BigQuery, se incluirán los resultados de la carga por lotes.

Para crear una tabla vacía con una definición de esquema, haz lo siguiente:

Console

  1. En la consola de Google Cloud, ve a la página de BigQuery:

    Ir a BigQuery

  2. En el panel de navegación, en la sección Recursos, expande tu proyecto.

  3. En el panel de detalles, haz clic en Crear conjunto de datos.

    haz clic en el botón que dice Crear un conjunto de datos

  4. En la página Crear conjunto de datos, en la sección ID del conjunto de datos, asigna un nombre a tu Conjunto de datos average_weather. Deja todos los demás campos con su estado predeterminado.

    Completa el ID del conjunto de datos con el nombre average_weather.

  5. Haz clic en Crear conjunto de datos.

  6. Vuelve al panel de navegación, en la sección Recursos, expande tu proyecto. Luego, haz clic en el conjunto de datos average_weather.

  7. En el panel de detalles, haz clic en Crear tabla (Create table).

    haz clic en Crear tabla

  8. En la página Crear tabla, en la sección Origen, selecciona Tabla vacía.

  9. En la sección Destination (Destino) de la página Create table (Crear tabla), haz lo siguiente:

    • En Nombre del conjunto de datos, selecciona el conjunto de datos average_weather.

      Selecciona la opción Conjunto de datos para el conjunto de datos average_weather.

    • En el campo Nombre de la tabla, ingresa el nombre average_weather.

    • Verifica que Tipo de tabla esté establecido en Tabla nativa.

  10. En la sección Esquema, ingresa la definición del esquema. Puedes usar uno de los siguientes enfoques:

    • Habilita Editar como texto y, luego, ingresa el esquema de la tabla como un array JSON para ingresar la información del esquema de forma manual. Escribe lo siguiente en los siguientes campos:

      [
          {
              "name": "location",
              "type": "GEOGRAPHY",
              "mode": "REQUIRED"
          },
          {
              "name": "average_temperature",
              "type": "INTEGER",
              "mode": "REQUIRED"
          },
          {
              "name": "month",
              "type": "STRING",
              "mode": "REQUIRED"
          },
          {
              "name": "inches_of_rain",
              "type": "NUMERIC"
          },
          {
              "name": "is_current",
              "type": "BOOLEAN"
          },
          {
              "name": "latest_measurement",
              "type": "DATE"
          }
      ]
      
    • Usa Agregar campo para ingresar el esquema de forma manual:

      haz clic en Agregar campo para ingresar los campos

  11. En Configuración de particiones y clústeres, deja los valores predeterminados. valor, No partitioning.

  12. En la sección Opciones avanzadas, en Encriptación, deja el valor predeterminado, Google-managed key.

  13. Haz clic en Crear tabla.

bq

Usa el comando bq mk para crear un conjunto de datos vacío y una tabla en este conjunto.

Ejecuta el siguiente comando para crear un conjunto de datos del clima promedio global:

bq --location=LOCATION mk \
    --dataset PROJECT_ID:average_weather

Reemplaza lo siguiente:

  • LOCATION: Es la región en la que se encuentra el entorno.
  • PROJECT_ID: El ID del proyecto

Ejecuta el siguiente comando para crear una tabla vacía en este conjunto de datos con la definición del esquema:

bq mk --table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE

Después de crear la tabla, puedes actualización el vencimiento, la descripción y las etiquetas de la tabla. También puedes modificar la definición de esquema.

Python

Guardar este código como dataflowtemplateoperator_create_dataset_and_table_helper.py y actualizar las variables para reflejar el proyecto y la ubicación, ejecútalo con el siguiente comando:

python dataflowtemplateoperator_create_dataset_and_table_helper.py

Python

Para autenticarte en Cloud Composer, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.


# Make sure to follow the quickstart setup instructions beforehand.
# See instructions here:
# https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries

# Before running the sample, be sure to install the bigquery library
# in your local environment by running pip install google.cloud.bigquery

from google.cloud import bigquery

# TODO(developer): Replace with your values
project = "your-project"  # Your GCP Project
location = "US"  # the location where you want your BigQuery data to reside. For more info on possible locations see https://cloud.google.com/bigquery/docs/locations
dataset_name = "average_weather"


def create_dataset_and_table(project, location, dataset_name):
    # Construct a BigQuery client object.
    client = bigquery.Client(project)

    dataset_id = f"{project}.{dataset_name}"

    # Construct a full Dataset object to send to the API.
    dataset = bigquery.Dataset(dataset_id)

    # Set the location to your desired location for the dataset.
    # For more information, see this link:
    # https://cloud.google.com/bigquery/docs/locations
    dataset.location = location

    # Send the dataset to the API for creation.
    # Raises google.api_core.exceptions.Conflict if the Dataset already
    # exists within the project.
    dataset = client.create_dataset(dataset)  # Make an API request.

    print(f"Created dataset {client.project}.{dataset.dataset_id}")

    # Create a table from this dataset.

    table_id = f"{client.project}.{dataset_name}.average_weather"

    schema = [
        bigquery.SchemaField("location", "GEOGRAPHY", mode="REQUIRED"),
        bigquery.SchemaField("average_temperature", "INTEGER", mode="REQUIRED"),
        bigquery.SchemaField("month", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("inches_of_rain", "NUMERIC", mode="NULLABLE"),
        bigquery.SchemaField("is_current", "BOOLEAN", mode="NULLABLE"),
        bigquery.SchemaField("latest_measurement", "DATE", mode="NULLABLE"),
    ]

    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)  # Make an API request.
    print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")

Cree un bucket de Cloud Storage

Crea un bucket que contenga todos los archivos necesarios para el flujo de trabajo. El DAG que creas más adelante en esta guía hará referencia a los archivos que subas a este bucket de almacenamiento. Para crear un bucket de almacenamiento nuevo, sigue estos pasos:

Console

  1. Abre Cloud Storage en la consola de Google Cloud.

    Ir a Cloud Storage

  2. Haz clic en Crear depósito para abrir el formulario de creación de depósitos.

    1. Ingresa la información de tu bucket y haz clic en Continuar para completar cada paso:

      • Especifica un nombre global único para tu bucket. En esta guía, se utiliza bucketName como ejemplo.

      • Selecciona Región para el tipo de ubicación. Luego, selecciona una Ubicación en la que se almacenarán los datos del bucket.

      • Selecciona Estándar como la clase de almacenamiento predeterminada para tus datos.

      • Selecciona el control de acceso Uniforme para acceder a los objetos.

    2. Haz clic en Listo.

gcloud

Usa el comando gcloud storage buckets create:

gcloud storage buckets create gs://bucketName/

Reemplaza lo siguiente:

  • bucketName: Es el nombre del bucket que creaste con anterioridad en este .

Muestras de código

C#

Para autenticarte en Cloud Composer, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.


using Google.Apis.Storage.v1.Data;
using Google.Cloud.Storage.V1;
using System;

public class CreateBucketSample
{
    public Bucket CreateBucket(
        string projectId = "your-project-id",
        string bucketName = "your-unique-bucket-name")
    {
        var storage = StorageClient.Create();
        var bucket = storage.CreateBucket(projectId, bucketName);
        Console.WriteLine($"Created {bucketName}.");
        return bucket;
    }
}

Go

Para autenticarte en Cloud Composer, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
)

// createBucket creates a new bucket in the project.
func createBucket(w io.Writer, projectID, bucketName string) error {
	// projectID := "my-project-id"
	// bucketName := "bucket-name"
	ctx := context.Background()
	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("storage.NewClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()

	bucket := client.Bucket(bucketName)
	if err := bucket.Create(ctx, projectID, nil); err != nil {
		return fmt.Errorf("Bucket(%q).Create: %w", bucketName, err)
	}
	fmt.Fprintf(w, "Bucket %v created\n", bucketName)
	return nil
}

Java

Para autenticarte en Cloud Composer, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class CreateBucket {
  public static void createBucket(String projectId, String bucketName) {
    // The ID of your GCP project
    // String projectId = "your-project-id";

    // The ID to give your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    Storage storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService();

    Bucket bucket = storage.create(BucketInfo.newBuilder(bucketName).build());

    System.out.println("Created bucket " + bucket.getName());
  }
}

Python

Para autenticarte en Cloud Composer, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

from google.cloud import storage


def create_bucket(bucket_name):
    """Creates a new bucket."""
    # bucket_name = "your-new-bucket-name"

    storage_client = storage.Client()

    bucket = storage_client.create_bucket(bucket_name)

    print(f"Bucket {bucket.name} created")

Ruby

Para autenticarte en Cloud Composer, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

def create_bucket bucket_name:
  # The ID to give your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.create_bucket bucket_name

  puts "Created bucket: #{bucket.name}"
end

Crea un esquema de BigQuery con formato JSON para la tabla de salida

Crea un archivo de esquema de BigQuery con formato JSON que coincida con la tabla de salida que creaste antes. Ten en cuenta que los nombres, tipos y modos de campo deben coincidir con los definidos antes en el esquema de tu tabla de BigQuery. Este archivo normalizará los datos del archivo .txt en un formato compatible con el esquema de BigQuery. Asígnale un nombre a este archivo jsonSchema.json.

{
    "BigQuery Schema": [
    {
        "name": "location",
        "type": "GEOGRAPHY",
        "mode": "REQUIRED"
    },
    {
        "name": "average_temperature",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },
    {
        "name": "month",
        "type": "STRING",
        "mode": "REQUIRED"
    },
    {
        "name": "inches_of_rain",
        "type": "NUMERIC"
    },
    {
        "name": "is_current",
        "type": "BOOLEAN"
    },
    {
        "name": "latest_measurement",
        "type": "DATE"
    }]
}

Crea un archivo JavaScript para dar formato a tus datos

En este archivo, definirás tu UDF (función definida por el usuario) que proporciona la lógica para transformar las líneas de texto en el archivo de entrada. Ten en cuenta que esta toma cada línea de texto en tu archivo de entrada como su propio argumento, por lo que la función se ejecutará una vez por cada línea de tu archivo de entrada. Asígnale un nombre al archivo transformCSVtoJSON.js


function transformCSVtoJSON(line) {
  var values = line.split(',');
  var properties = [
    'location',
    'average_temperature',
    'month',
    'inches_of_rain',
    'is_current',
    'latest_measurement',
  ];
  var weatherInCity = {};

  for (var count = 0; count < values.length; count++) {
    if (values[count] !== 'null') {
      weatherInCity[properties[count]] = values[count];
    }
  }

  return JSON.stringify(weatherInCity);
}

Crea tu archivo de entrada

Este archivo contendrá la información que deseas subir a la tabla de BigQuery. Copia este archivo de forma local y asígnale el nombre inputFile.txt.

POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null

Sube los archivos al bucket

Sube los siguientes archivos al bucket de Cloud Storage que creaste antes:

  • Esquema de BigQuery con formato JSON (.json)
  • Función definida por el usuario de JavaScript (transformCSVtoJSON.js)
  • El archivo de entrada del texto que deseas procesar (.txt)

Console

  1. En la consola de Google Cloud, ve a la página Buckets de Cloud Storage.

    Ir a Buckets

  2. En la lista de buckets, haz clic en tu bucket.

  3. En la pestaña Objetos para el bucket, realiza una de las siguientes acciones:

    • Arrastra y suelta los archivos deseados desde tu escritorio o administrador de archivos en el panel principal de la consola de Google Cloud.

    • Haz clic en el botón Subir archivos, selecciona los archivos que deseas subir en el cuadro de diálogo que aparece y haz clic en Abrir.

gcloud

Ejecuta el comando gcloud storage cp:

gcloud storage cp OBJECT_LOCATION gs://bucketName

Reemplaza lo siguiente:

  • bucketName: Es el nombre del bucket que creaste con anterioridad en en esta guía.
  • OBJECT_LOCATION: Es la ruta de acceso local a tu objeto. Por ejemplo, Desktop/transformCSVtoJSON.js

Muestras de código

Python

Para autenticarte en Cloud Composer, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

from google.cloud import storage


def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"
    # The path to your file to upload
    # source_file_name = "local/path/to/file"
    # The ID of your GCS object
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Optional: set a generation-match precondition to avoid potential race conditions
    # and data corruptions. The request to upload is aborted if the object's
    # generation number does not match your precondition. For a destination
    # object that does not yet exist, set the if_generation_match precondition to 0.
    # If the destination object already exists in your bucket, set instead a
    # generation-match precondition using its generation number.
    generation_match_precondition = 0

    blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition)

    print(
        f"File {source_file_name} uploaded to {destination_blob_name}."
    )

Ruby

Para autenticarte en Cloud Composer, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

def upload_file bucket_name:, local_file_path:, file_name: nil
  # The ID of your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  # The path to your file to upload
  # local_file_path = "/local/path/to/file.txt"

  # The ID of your GCS object
  # file_name = "your-file-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.bucket bucket_name, skip_lookup: true

  file = bucket.create_file local_file_path, file_name

  puts "Uploaded #{local_file_path} as #{file.name} in bucket #{bucket_name}"
end

Configura DataflowTemplateOperator

Antes de ejecutar el DAG, configura las siguientes variables de Airflow.

Variable de Airflow Valor
project_id El ID del proyecto
gce_zone Zona de Compute Engine en la que se debe crear el clúster de Dataflow
bucket_path La ubicación del bucket de Cloud Storage que creaste más temprano

Ahora harás referencia a los archivos que creaste antes para crear un DAG que inicie el flujo de trabajo de Dataflow. Copia este DAG y guárdalo de forma local como composer-dataflow-dag.py.



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
For more info on zones where Dataflow is available see:
https://cloud.google.com/dataflow/docs/resources/locations
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataflow import (
    DataflowTemplatedJobStartOperator,
)
from airflow.utils.dates import days_ago

bucket_path = "{{var.value.bucket_path}}"
project_id = "{{var.value.project_id}}"
gce_zone = "{{var.value.gce_zone}}"


default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "tempLocation": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    start_template_job = DataflowTemplatedJobStartOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

Sube el DAG a Cloud Storage

Sube tu DAG a la carpeta /dags en el bucket de tu entorno. Una vez que la carga se haya completado de forma correcta, puedes verla si haces clic en el vínculo Carpeta de DAG en la página Entornos de Cloud Composer.

La carpeta de DAGs en tu entorno contiene tu DAG.

Consulta el estado de la tarea

  1. Ve a la interfaz web de Airflow.
  2. En la página de los DAG, haz clic en el nombre del DAG (como composerDataflowDAG).
  3. En la página de detalles de los DAG, haz clic en Graph View.
  4. Verifica el estado:

    • Failed: La tarea tiene un cuadro rojo a su alrededor. También puedes mantener el puntero sobre la tarea y ver si aparece el mensaje Estado: con errores.

    • Success: La tarea tiene un recuadro verde alrededor. También puedes mantener el puntero sobre la tarea y buscar Estado: Correcto.

Después de unos minutos, puedes verificar los resultados en Dataflow y BigQuery.

Visualiza tu trabajo en Dataflow

  1. En la consola de Google Cloud, ve a la página Dataflow.

    Ir a Dataflow

  2. Tu trabajo se llamará dataflow_operator_transform_csv_to_bq con un ID único al final del nombre con un guion, como el siguiente:

    El trabajo de Dataflow tiene un ID único.

  3. Haz clic en el nombre para ver los detalles del trabajo.

    ver todos los detalles del trabajo

Visualiza tus resultados en BigQuery

  1. En la consola de Google Cloud, ve a la página de BigQuery.

    Ir a BigQuery

  2. Puedes enviar una consulta mediante SQL estándar. Usa la siguiente consulta para ver las filas que se agregaron a su tabla:

    SELECT * FROM projectId.average_weather.average_weather