Iniciar canalizaciones de Dataflow con Cloud Composer

En esta página, se describe cómo usar DataflowTemplateOperator para iniciar canalizaciones de Dataflow desde Cloud Composer. La canalización de Cloud Storage Text a BigQuery es una canalización por lotes que te permite subir archivos de texto almacenados en Cloud Storage, transformarlos con una función definida por el usuario (UDF) de JavaScript que proporciones, y enviar los resultados a BigQuery.

Una función definida por el usuario, un archivo de entrada y un esquema JSON se subirán a un bucket de Cloud Storage. Un DAG que hace referencia a estos archivos iniciará una canalización por lotes de Dataflow, que aplicará la función definida por el usuario y el archivo de esquema JSON a nuestro archivo de entrada. Después, este contenido se subirá a una tabla de BigQuery

  • Antes de iniciar el flujo de trabajo, necesitaremos crear las siguientes entidades:

    • Una tabla de BigQuery vacía con un conjunto de datos vacío que contendrá las siguientes columnas de información: location, average_temperature, month y, de forma opcional, inches_of_rain, is_current y latest_measurement.

    • Un archivo JSON que normalizará los datos del archivo .txt al formato correcto para el esquema de nuestra tabla de BigQuery. El objeto JSON tendrá un arreglo de BigQuery Schema, en el que cada objeto contendrá un nombre de columna, un tipo de entrada y si es o no un campo obligatorio.

    • Un archivo .txt de entrada que contendrá los datos que deseamos subir por lotes en nuestra tabla de BigQuery.

    • Una función definida por el usuario escrita en JavaScript que transformará cada línea del archivo .txt en las variables relevantes para nuestra tabla.

    • Un archivo de grafo acíclico dirigido (DAG) que apunta a la ubicación de los archivos mencionados antes.

  • A continuación, subiremos el archivo .txt, el archivo de UDF .js y el archivo de esquema .json a un depósito de Storage. También subiremos el DAG a nuestro entorno de Cloud Composer.

  • Una vez que se suba el DAG, se iniciará una tarea de Airflow. La tarea iniciará una canalización de Cloud Dataflow que aplicará la función definida por el usuario a nuestro archivo .txt, y la formateará según el esquema JSON.

  • Por último, los datos se subirán a la tabla de BigQuery que creamos antes.

Costos

En este instructivo, se usan los componentes facturables de Google Cloud, incluidos los siguientes:

  • Cloud Composer
  • Dataflow
  • Cloud Storage
  • BigQuery

Requisitos previos

  • Asegúrate de haber creado un entorno de Cloud Composer.
  • La versión mínima requerida de Cloud Composer es la 1.9.0. Para verificar la versión de la imagen, consulta los detalles del entorno.
  • En este instructivo, se requiere estar familiarizado con JavaScript para escribir la función definida por el usuario.
  • Habilita las API de Cloud Composer, Dataflow, Cloud Storage, BigQuery.

    Habilita las API

Configura tu entorno

Crea una tabla de BigQuery vacía con una definición de esquema

Primero, crearás una tabla de BigQuery con una definición de esquema. Usarás esta definición de esquema más adelante en este instructivo. En esta tabla de BigQuery, se incluirán los resultados de la carga por lotes.

Para crear una tabla vacía con una definición de esquema, haz lo siguiente:

Console

  1. En Cloud Console, ve a la página de BigQuery.

    Ir a BigQuery

  2. En el panel de navegación, en la sección Recursos, expande tu proyecto.

  3. En el lado derecho de la ventana, en el panel de detalles, haz clic en Crear conjunto de datos.

Haz clic en el botón que dice “Crear un conjunto de datos” en el lado derecho de la ventana.

  1. En la página Crear conjunto de datos, en la sección ID del conjunto de datos, asigna el nombre average_weather al conjunto de datos. Deja todos los demás campos con su estado predeterminado.

Completa el ID del conjunto de datos con el nombre average_weather.

  1. Haz clic en Create dataset.

  2. Regresa al panel de navegación, en la sección Recursos, expande tu proyecto. Luego, haz clic en el conjunto de datos average_weather.

  3. En el lado derecho de la ventana, en el panel de detalles, haz clic en Crear tabla (Create table).

Haz clic en “Crear tabla” en el lado derecho de la ventana.

  1. En la página Crear tabla, en la sección Origen, selecciona Tabla vacía.

  2. En la página Crear tabla, en la sección Destino, haz lo siguiente:

    • En Nombre del conjunto de datos, selecciona el conjunto de datos average_weather.

      Selecciona la opción Conjunto de datos para el conjunto de datos average_weather.

    • En el campo Nombre de la tabla, ingresa el nombre average_weather.

    • Verifica que Tipo de tabla esté establecido en Tabla nativa.

  3. En la sección Esquema, ingresa la definición del esquema.

    • Ingresa la información del esquema de forma manual de la siguiente manera:

      • Habilita Editar como texto y, luego, ingresa el esquema de la tabla como un arreglo JSON. Escribe los siguientes campos para esta opción:

        [
        {
        "name": "location",
        "type": "GEOGRAPHY",
        "mode": "REQUIRED"
        },
        {
        "name": "average_temperature",
        "type": "INTEGER",
        "mode": "REQUIRED"
        },
        {
        "name": "month",
        "type": "STRING",
        "mode": "REQUIRED"
        },
        {
        "name": "inches_of_rain",
        "type": "NUMERIC"
        },
        {
        "name": "is_current",
        "type": "BOOLEAN"
        },
        {
        "name": "latest_measurement",
        "type": "DATE"
        }
        ]
        

      • Usa Agregar campo para ingresar el esquema de forma manual.

      Haz clic en Agregar campo en la parte inferior de la pantalla para ingresar los campos.

  4. En Configuración de partición y agrupamiento en clústeres, deja los valores predeterminados: No partitioning.

  5. En la sección Opciones avanzadas, en Encriptación, deja el valor predeterminado: Google-managed key. De forma predeterminada, Compute Engine encripta el contenido en reposo del cliente.

  6. Haz clic en Crear tabla.

bq

Usa el comando bq mk con la marca --location para crear un conjunto de datos vacío. Reemplaza PROJECT_ID por tu ID del proyecto y LOCATION por tu ubicación preferida. Recomendamos elegir la misma región en la que se encuentra tu entorno de Composer para minimizar la latencia.

Copia el siguiente comando para crear un conjunto de datos del clima global promedio:

bq --location=LOCATION mk \
--dataset \
PROJECT_ID:average_weather

Para crear una tabla vacía en este conjunto de datos con nuestra definición de esquema, reemplaza PROJECT_ID por el ID del proyecto en el comando a continuación y, luego, ingrésalo en la terminal:

bq mk \
--table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE

Una vez que se crea la tabla, puedes actualizar el vencimiento, la descripción y las etiquetas. También puedes modificar la definición de esquema.

Python

Antes de ejecutar la muestra, asegúrate de ejecutar el siguiente comando para instalar la biblioteca en tu entorno:

pip install google.cloud.bigquery

Guarda este código como dataflowtemplateoperator_create_dataset_and_table_helper.py y actualiza las variables en él para reflejar tu proyecto y ubicación; luego, ejecútalo con el siguiente comando:

python dataflowtemplateoperator_create_dataset_and_table_helper.py

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la Guía de inicio rápido de Compute Engine: usa bibliotecas cliente. Si quieres obtener más información, consulta la Documentación de referencia de la API de Compute Engine para Python.


# Make sure to follow the quickstart setup instructions beforehand.
# See instructions here:
# https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries

# Before running the sample, be sure to install the bigquery library
# in your local environment by running pip install google.cloud.bigquery

from google.cloud import bigquery

# TODO(developer): Replace with your values
project = 'your-project'  # Your GCP Project
location = 'US'  # the location where you want your BigQuery data to reside. For more info on possible locations see https://cloud.google.com/bigquery/docs/locations
dataset_name = 'average_weather'

def create_dataset_and_table(project, location, dataset_name):
    # Construct a BigQuery client object.
    client = bigquery.Client(project)

    dataset_id = f"{project}.{dataset_name}"

    # Construct a full Dataset object to send to the API.
    dataset = bigquery.Dataset(dataset_id)

    # Set the location to your desired location for the dataset.
    # For more information, see this link:
    # https://cloud.google.com/bigquery/docs/locations
    dataset.location = location

    # Send the dataset to the API for creation.
    # Raises google.api_core.exceptions.Conflict if the Dataset already
    # exists within the project.
    dataset = client.create_dataset(dataset)  # Make an API request.

    print(f"Created dataset {client.project}.{dataset.dataset_id}")

    # Create a table from this dataset.

    table_id = f"{client.project}.{dataset_name}.average_weather"

    schema = [
        bigquery.SchemaField("location", "GEOGRAPHY", mode="REQUIRED"),
        bigquery.SchemaField("average_temperature", "INTEGER", mode="REQUIRED"),
        bigquery.SchemaField("month", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("inches_of_rain", "NUMERIC", mode="NULLABLE"),
        bigquery.SchemaField("is_current", "BOOLEAN", mode="NULLABLE"),
        bigquery.SchemaField("latest_measurement", "DATE", mode="NULLABLE"),
    ]

    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)  # Make an API request.
    print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")

Cree un bucket de almacenamiento

A continuación, deberás crear un bucket de almacenamiento que contenga todos los archivos necesarios para el flujo de trabajo. El DAG que crees en el futuro hará referencia a los archivos que subiste a este bucket de almacenamiento. Para crear un bucket de almacenamiento nuevo, sigue estos pasos:

Console

  1. Abre Cloud Storage en Cloud Console.

    Abrir Cloud Storage

  2. Haz clic en Crear depósito para abrir el formulario de creación de depósitos.

  3. Ingresa la información del depósito y haz clic en Continuar para completar cada paso:

    • Especifica un Nombre único a nivel global para tu depósito (se le hará referencia como bucketName en el resto del instructivo).

    • Selecciona Región para el tipo de ubicación. Luego, selecciona una Ubicación en la que se almacenarán los datos del depósito de forma permanente.

    • Selecciona Estándar como la clase de almacenamiento predeterminada para tus datos.

    • Selecciona el control de acceso Uniforme para acceder a los objetos.

  4. Haga clic en Done

gsutil

  1. Usa el comando gsutil mb:
    gsutil mb gs://bucketName/
    

Muestras de código

C#

Antes de probar esta muestra, sigue las instrucciones de configuración de C# en la Guía de inicio rápido de Compute Engine: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de C# para Compute Engine.


using Google.Apis.Storage.v1.Data;
using Google.Cloud.Storage.V1;
using System;

public class CreateBucketSample
{
    public Bucket CreateBucket(
        string projectId = "your-project-id",
        string bucketName = "your-unique-bucket-name")
    {
        var storage = StorageClient.Create();
        var bucket = storage.CreateBucket(projectId, bucketName);
        Console.WriteLine($"Created {bucketName}.");
        return bucket;
    }
}

Comienza a usarlo

Antes de probar esta muestra, sigue las instrucciones de configuración de Go de la Guía de inicio rápido de Compute Engine mediante bibliotecas cliente. Si deseas obtener más información, consulta la documentación de referencia de la API de Compute Engine para Go.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
)

// createBucket creates a new bucket in the project.
func createBucket(w io.Writer, projectID, bucketName string) error {
	// projectID := "my-project-id"
	// bucketName := "bucket-name"
	ctx := context.Background()
	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("storage.NewClient: %v", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*10)
	defer cancel()

	bucket := client.Bucket(bucketName)
	if err := bucket.Create(ctx, projectID, nil); err != nil {
		return fmt.Errorf("Bucket(%q).Create: %v", bucketName, err)
	}
	fmt.Fprintf(w, "Bucket %v created\n", bucketName)
	return nil
}

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la Guía de inicio rápido de Compute Engine: usa bibliotecas cliente. Si quieres obtener más información, consulta la Documentación de referencia de la API de Compute Engine para Python.

from google.cloud import storage

def create_bucket(bucket_name):
    """Creates a new bucket."""
    # bucket_name = "your-new-bucket-name"

    storage_client = storage.Client()

    bucket = storage_client.create_bucket(bucket_name)

    print("Bucket {} created".format(bucket.name))

Ruby

Antes de probar esta muestra, sigue las instrucciones de configuración de Ruby en la guía de inicio rápido de Compute Engine con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Ruby de Compute Engine.

def create_bucket bucket_name:
  # The ID to give your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.create_bucket bucket_name

  puts "Created bucket: #{bucket.name}"
end

Crea un esquema de BigQuery con formato JSON para la tabla de salida

Crea un archivo de esquema de BigQuery con formato JSON que coincida con la tabla de salida que creaste antes. Ten en cuenta que los nombres, tipos y modos de campo deben coincidir con los definidos antes en el esquema de tu tabla de BigQuery. Este archivo normalizará los datos del archivo .txt en un formato compatible con el esquema de BigQuery. Asígnale un nombre a este filtro jsonSchema.json.

{
    "BigQuery Schema": [
    {
        "name": "location",
        "type": "GEOGRAPHY",
        "mode": "REQUIRED"
    },
    {
        "name": "average_temperature",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },
    {
        "name": "month",
        "type": "STRING",
        "mode": "REQUIRED"
    },
    {
        "name": "inches_of_rain",
        "type": "NUMERIC"
    },
    {
        "name": "is_current",
        "type": "BOOLEAN"
    },
    {
        "name": "latest_measurement",
        "type": "DATE"
    }]
}

Crea un archivo JavaScript(.js) para el formato de tus datos

En este archivo, definirás tu UDF (función definida por el usuario) que proporciona la lógica para transformar las líneas de texto en el archivo de entrada. Ten en cuenta que esta función toma cada línea de texto en tu archivo de entrada como su propio argumento, por lo que la función se ejecutará una vez por cada línea de tu archivo de entrada. Asígnale un nombre a este filtro transformCSVtoJSON.js.

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la Guía de inicio rápido de Compute Engine mediante bibliotecas cliente. Si deseas obtener más información, consulta la documentación de referencia de la API de Compute Engine para Node.js.


function transformCSVtoJSON(line) {
  var values = line.split(',');
  var properties = [
    'location',
    'average_temperature',
    'month',
    'inches_of_rain',
    'is_current',
    'latest_measurement',
  ];
  var weatherInCity = {};

  for (var count = 0; count < values.length; count++) {
    if (values[count] !== 'null') {
      weatherInCity[properties[count]] = values[count];
    }
  }

  var jsonString = JSON.stringify(weatherInCity);
  return jsonString;
}

Crea tu archivo de entrada

Este archivo contendrá la información que deseas subir a la tabla de BigQuery. Copia este archivo de forma local y asígnale el nombre inputFile.txt.

POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null

Sube los archivos a tu bucket de Storage y crea una carpeta de etapa de pruebas

Sube los siguientes archivos al bucket de Storage que creaste antes:

  • Esquema de BigQuery con formato JSON (.json)
  • Función definida por el usuario de JavaScript (transformCSVtoJSON.js)
  • El archivo de entrada del texto que deseas procesar (.txt)

Console

  1. En Google Cloud Console, ve a la página Navegador de Cloud Storage.

    Ir al navegador

  2. En la lista de depósitos, haz clic en el depósito bucketName.

  3. En la pestaña Objetos para el depósito, realiza una de estas dos acciones:

    • Arrastra y suelta los archivos deseados desde tu escritorio o administrador de archivos en el panel principal de Cloud Console.

    • Haz clic en el botón Subir archivos, selecciona los archivos que deseas subir en el cuadro de diálogo que aparece y haz clic en Abrir.

gsutil

Usa el comando gsutil cp:

gsutil cp [OBJECT_LOCATION] gs://bucketName

Aquí:

  • [OBJECT_LOCATION] es la ruta de acceso local a tu objeto. Por ejemplo, Desktop/dog.png

  • [bucketName] es el nombre de depósito único a nivel global que creaste antes.

Si es correcto, la respuesta se parece al siguiente ejemplo:

Operation completed over 1 objects/58.8 KiB.

Muestras de código

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la Guía de inicio rápido de Compute Engine: usa bibliotecas cliente. Si quieres obtener más información, consulta la Documentación de referencia de la API de Compute Engine para Python.

from google.cloud import storage

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"
    # The path to your file to upload
    # source_file_name = "local/path/to/file"
    # The ID of your GCS object
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    blob.upload_from_filename(source_file_name)

    print(
        "File {} uploaded to {}.".format(
            source_file_name, destination_blob_name
        )
    )

Ruby

Antes de probar esta muestra, sigue las instrucciones de configuración de Ruby en la guía de inicio rápido de Compute Engine con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Ruby de Compute Engine.

def upload_file bucket_name:, local_file_path:, file_name: nil
  # The ID of your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  # The path to your file to upload
  # local_file_path = "/local/path/to/file.txt"

  # The ID of your GCS object
  # file_name = "your-file-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.bucket bucket_name

  file = bucket.create_file local_file_path, file_name

  puts "Uploaded #{local_file_path} as #{file.name} in bucket #{bucket_name}"
end

Configuración de DataflowTemplateOperator

Antes de ejecutar la muestra, asegúrate de configurar las variables de entorno apropiadas. Puedes hacerlo con gcloud o la IU de Airflow:

gcloud

Ingresa los siguientes comandos:

CLI de Airflow 1.10

gcloud composer environments run ENVIRONMENT \
  --location LOCATION \
  variables -- \
  --set project_id PROJECT_ID

Donde:

  • ENVIRONMENT es el nombre del entorno de Cloud Composer.
  • LOCATION es la región en la que se encuentra el entorno de Cloud Composer.
  • PROJECT_ID es tu ID del proyecto de Google Cloud.
gcloud composer environments run ENVIRONMENT \
  --location LOCATION \
  variables -- \
  --set gce_zone GCE_ZONE

Donde:

gcloud composer environments run ENVIRONMENT \
  --location LOCATION \
  variables -- \
  --set bucket_path BUCKET_PATH

Donde:

  • BUCKET_PATH es la ubicación del depósito de Cloud Storage que creaste antes.

CLI de Airflow 2.0

gcloud beta composer environments run ENVIRONMENT \
  --location LOCATION \
  variables set -- \
  project_id PROJECT_ID

Donde:

  • ENVIRONMENT es el nombre del entorno de Cloud Composer.
  • LOCATION es la región en la que se encuentra el entorno de Cloud Composer.
  • PROJECT_ID es tu ID del proyecto de Google Cloud.
gcloud beta composer environments run ENVIRONMENT \
  --location LOCATION \
  variables set -- \
  gce_zone GCE_ZONE

Donde:

gcloud beta composer environments run ENVIRONMENT \
  --location LOCATION \
  variables set -- \
  bucket_path BUCKET_PATH

Donde:

  • BUCKET_PATH es la ubicación del depósito de Cloud Storage que creaste antes.

IU de Airflow

  1. En la barra de herramientas, haz clic en Administrador > Variables.

  2. Haga clic en Crear.

  3. Ingresa la siguiente información:

    • Key: project_id
    • Val: PROJECT_ID es el ID de tu proyecto de Google Cloud.
  4. Haz clic en Guardar y agregar otro.

  5. Ingresa la siguiente información:

    • Key: bucket_path
    • Val: BUCKET_PATH es la ubicación de tu depósito de Cloud Storage (p. ej., “gs://my-bucket”).
  6. Haz clic en Guardar y agregar otro.

  7. Ingresa la siguiente información:

  8. Haga clic en Save.

Ahora harás referencia a los archivos que creaste antes para crear un DAG que inicie el flujo de trabajo de Dataflow. Copia este DAG y guárdalo de forma local como composer-dataflow-dag.py.

Airflow 2

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la Guía de inicio rápido de Compute Engine: usa bibliotecas cliente. Si quieres obtener más información, consulta la Documentación de referencia de la API de Compute Engine para Python.



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/concepts.html#variables
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
* gce_region - Google Compute Engine region where Cloud Dataflow cluster should be
  created.
Learn more about the difference between the two here:
https://cloud.google.com/compute/docs/regions-zones
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
from airflow.utils.dates import days_ago

bucket_path = models.Variable.get("bucket_path")
project_id = models.Variable.get("project_id")
gce_zone = models.Variable.get("gce_zone")

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "temp_location": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:

    start_template_job = DataflowTemplatedJobStartOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

Airflow 1

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la Guía de inicio rápido de Compute Engine: usa bibliotecas cliente. Si quieres obtener más información, consulta la Documentación de referencia de la API de Compute Engine para Python.



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/concepts.html#variables
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
* gce_region - Google Compute Engine region where Cloud Dataflow cluster should be
  created.
Learn more about the difference between the two here:
https://cloud.google.com/compute/docs/regions-zones
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.utils.dates import days_ago

bucket_path = models.Variable.get("bucket_path")
project_id = models.Variable.get("project_id")
gce_zone = models.Variable.get("gce_zone")
gce_region = models.Variable.get("gce_region")

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your region
        "region": gce_region,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "temp_location": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:

    start_template_job = DataflowTemplateOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

Sube el DAG a Cloud Storage

Sube tu DAG a tu carpeta de entorno. Una vez que la carga se haya completado de forma correcta, deberías poder verla si haces clic en el vínculo Carpeta de DAG en la página Entornos de Cloud Composer.

La carpeta de DAG en tu entorno contiene tu DAG.

Visualiza el estado de una tarea

  1. Ve a la interfaz web de Airflow.
  2. En la página de los DAG, haz clic en el nombre del DAG (p. ej., composerDataflowDAG).
  3. En la página de detalles de los DAG, haz clic en Graph View.
  4. Verifica el estado:

    • Tarea con errores: la tarea estará encerrada en un cuadro rojo. También puedes mantener el puntero sobre la tarea y ver si aparece el mensaje Estado: con errores. La tarea tiene un cuadro rojo a su alrededor, lo que indica que falló

Después de unos minutos, deberías ver tus resultados en Dataflow y BigQuery.

Visualiza tu trabajo en Dataflow

  1. Ve a la IU web de Dataflow. IR A LA IU WEB DE DATAFLOW

  2. El trabajo recibirá el nombre dataflow_operator_transform_csv_to_bq con un ID único al final del nombre con un guion, como el siguiente: El trabajo de Dataflow tiene un ID único..

  3. Haz clic en el nombre para ver los detalles del trabajo. Obtén más información sobre los detalles del trabajo de Dataflow. Consulta todos los detalles del trabajo a continuación.

Visualiza tus resultados en BigQuery

  1. Ir a la IU web de BigQuery. IR A LA IU WEB DE BIGQUERY

  2. Puede enviar una consulta mediante SQL estándar. Usa la siguiente consulta para ver las filas que se agregaron a su tabla:

    SELECT * FROM projectId.average_weather.average_weather
    

Limpia

Para evitar que se apliquen cargos a tu cuenta de Google Cloud Platform, puedes borrar los recursos que usaste en este instructivo:

  1. Borra el entorno de Cloud Composer.
  2. Borra el depósito de Cloud Storage para el entorno de Cloud Composer. Si borras el entorno de Cloud Composer, no se borra el bucket.
  3. Detén el trabajo de Dataflow.
  4. Borra la tabla de BigQuery y el conjunto de datos de BigQuery.