Inicie pipelines do Dataflow com o Cloud Composer

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Esta página descreve como usar o DataflowTemplateOperator para iniciar pipelines do Dataflow a partir do Cloud Composer. O pipeline de texto do Cloud Storage para o BigQuery é um pipeline em lote que lhe permite carregar ficheiros de texto armazenados no Cloud Storage, transformá-los através de uma função definida pelo utilizador (UDF) em JavaScript que fornece e gerar os resultados no BigQuery.

Uma função definida pelo utilizador, um ficheiro de entrada e um esquema JSON são carregados
  para um contentor do Cloud Storage. Um DAG que referencie estes ficheiros inicia um pipeline em lote do Dataflow, que aplica a função definida pelo utilizador e o ficheiro de esquema JSON ao ficheiro de entrada. Posteriormente, este conteúdo é carregado para uma tabela do BigQuery

Vista geral

  • Antes de iniciar o fluxo de trabalho, vai criar as seguintes entidades:

    • Uma tabela do BigQuery vazia de um conjunto de dados vazio que vai conter as seguintes colunas de informações: location, average_temperature, month e, opcionalmente, inches_of_rain, is_current e latest_measurement.

    • Um ficheiro JSON que normaliza os dados do ficheiro .txt no formato correto para o esquema da tabela do BigQuery. O objeto JSON tem uma matriz de BigQuery Schema, onde cada objeto contém um nome de coluna, um tipo de entrada e se é ou não um campo obrigatório.

    • Um ficheiro de entrada .txt que vai conter os dados que vão ser carregados em lote para a tabela do BigQuery.

    • Uma função definida pelo utilizador escrita em JavaScript que transforma cada linha do ficheiro .txt nas variáveis relevantes para a nossa tabela.

    • Um ficheiro DAG do Airflow que vai apontar para a localização destes ficheiros.

  • Em seguida, vai carregar o ficheiro .txt, o ficheiro UDF .js e o ficheiro de esquema .json para um contentor do Cloud Storage. Também carrega o DAG para o seu ambiente do Cloud Composer.

  • Depois de carregar o DAG, o Airflow executa uma tarefa a partir dele. Esta tarefa vai iniciar um pipeline do Dataflow que vai aplicar a função definida pelo utilizador ao ficheiro .txt e formatá-lo de acordo com o esquema JSON.

  • Por último, os dados são carregados para a tabela do BigQuery que criou anteriormente.

Antes de começar

  • Este guia requer familiaridade com o JavaScript para escrever a função definida pelo utilizador.
  • Este guia pressupõe que já tem um ambiente do Cloud Composer. Consulte o artigo Crie um ambiente para criar um. Pode usar qualquer versão do Cloud Composer com este guia.
  • Enable the Cloud Composer, Dataflow, Cloud Storage, BigQuery APIs.

    Enable the APIs

  • Certifique-se de que tem as seguintes autorizações:

  • Certifique-se de que a conta de serviço do seu ambiente tem autorizações para criar tarefas do Dataflow, aceder ao contentor do Cloud Storage e ler e atualizar dados para a tabela no BigQuery.

Crie uma tabela do BigQuery vazia com uma definição de esquema

Crie uma tabela do BigQuery com uma definição de esquema. Vai usar esta definição do esquema mais tarde neste guia. Esta tabela do BigQuery vai conter os resultados do carregamento em lote.

Para criar uma tabela vazia com uma definição de esquema:

Consola

  1. Na Google Cloud consola, aceda à página do BigQuery:

    Aceda ao BigQuery

  2. No painel de navegação, na secção Recursos, expanda o seu projeto.

  3. No painel de detalhes, clique em Criar conjunto de dados.

    clique no botão que indica criar um conjunto de dados

  4. Na página Criar conjunto de dados, na secção ID do conjunto de dados, atribua um nome ao conjunto de dados average_weather. Deixe todos os outros campos no estado predefinido.

    preencha o ID do conjunto de dados com o nome average_weather

  5. Clique em Criar conjunto de dados.

  6. Volte ao painel de navegação, na secção Recursos, expanda o seu projeto. Em seguida, clique no conjunto de dados average_weather.

  7. No painel de detalhes, clique em Criar tabela.

    clique em criar tabela

  8. Na página Criar tabela, na secção Origem, selecione Tabela vazia.

  9. Na página Criar tabela, na secção Destino:

    • Para Nome do conjunto de dados, escolha o conjunto de dados average_weather.

      Escolha a opção Dataset para o conjunto de dados average_weather

    • No campo Nome da tabela, introduza o nome average_weather.

    • Verifique se o Tipo de tabela está definido como Tabela nativa.

  10. Na secção Esquema, introduza a definição do esquema. Pode usar uma das seguintes abordagens:

    • Introduza manualmente as informações do esquema ativando a opção Editar como texto e introduzindo o esquema de tabela como uma matriz JSON. Escreva nos seguintes campos:

      [
          {
              "name": "location",
              "type": "GEOGRAPHY",
              "mode": "REQUIRED"
          },
          {
              "name": "average_temperature",
              "type": "INTEGER",
              "mode": "REQUIRED"
          },
          {
              "name": "month",
              "type": "STRING",
              "mode": "REQUIRED"
          },
          {
              "name": "inches_of_rain",
              "type": "NUMERIC"
          },
          {
              "name": "is_current",
              "type": "BOOLEAN"
          },
          {
              "name": "latest_measurement",
              "type": "DATE"
          }
      ]
      
    • Use Adicionar campo para introduzir manualmente o esquema:

      Clique em adicionar campo para introduzir os campos

  11. Para Definições de partição e cluster, deixe o valor predefinido, No partitioning.

  12. Na secção Opções avançadas, para Encriptação, mantenha o valor predefinido, Google-owned and managed key.

  13. Clique em Criar tabela.

bq

Use o comando bq mk para criar um conjunto de dados vazio e uma tabela neste conjunto de dados.

Execute o seguinte comando para criar um conjunto de dados da média meteorológica global:

bq --location=LOCATION mk \
    --dataset PROJECT_ID:average_weather

Substitua o seguinte:

  • LOCATION: a região onde o ambiente está localizado.
  • PROJECT_ID: o ID do projeto.

Execute o seguinte comando para criar uma tabela vazia neste conjunto de dados com a definição do esquema:

bq mk --table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE

Depois de criar a tabela, pode atualizar a validade, a descrição e as etiquetas da tabela. Também pode modificar a definição do esquema.

Python

Guarde este código como dataflowtemplateoperator_create_dataset_and_table_helper.py e atualize as variáveis no mesmo para refletir o seu projeto e localização. Em seguida, execute-o com o seguinte comando:

python dataflowtemplateoperator_create_dataset_and_table_helper.py

Python

Para se autenticar no Cloud Composer, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.


# Make sure to follow the quickstart setup instructions beforehand.
# See instructions here:
# https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries

# Before running the sample, be sure to install the bigquery library
# in your local environment by running pip install google.cloud.bigquery

from google.cloud import bigquery

# TODO(developer): Replace with your values
project = "your-project"  # Your GCP Project
location = "US"  # the location where you want your BigQuery data to reside. For more info on possible locations see https://cloud.google.com/bigquery/docs/locations
dataset_name = "average_weather"


def create_dataset_and_table(project, location, dataset_name):
    # Construct a BigQuery client object.
    client = bigquery.Client(project)

    dataset_id = f"{project}.{dataset_name}"

    # Construct a full Dataset object to send to the API.
    dataset = bigquery.Dataset(dataset_id)

    # Set the location to your desired location for the dataset.
    # For more information, see this link:
    # https://cloud.google.com/bigquery/docs/locations
    dataset.location = location

    # Send the dataset to the API for creation.
    # Raises google.api_core.exceptions.Conflict if the Dataset already
    # exists within the project.
    dataset = client.create_dataset(dataset)  # Make an API request.

    print(f"Created dataset {client.project}.{dataset.dataset_id}")

    # Create a table from this dataset.

    table_id = f"{client.project}.{dataset_name}.average_weather"

    schema = [
        bigquery.SchemaField("location", "GEOGRAPHY", mode="REQUIRED"),
        bigquery.SchemaField("average_temperature", "INTEGER", mode="REQUIRED"),
        bigquery.SchemaField("month", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("inches_of_rain", "NUMERIC", mode="NULLABLE"),
        bigquery.SchemaField("is_current", "BOOLEAN", mode="NULLABLE"),
        bigquery.SchemaField("latest_measurement", "DATE", mode="NULLABLE"),
    ]

    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)  # Make an API request.
    print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")

Crie um contentor do Cloud Storage

Crie um contentor para guardar todos os ficheiros necessários para o fluxo de trabalho. O DAG que criar mais tarde neste guia vai referenciar os ficheiros que carregar para este contentor de armazenamento. Para criar um novo contentor de armazenamento:

Consola

  1. Abra o Cloud Storage na Google Cloud consola.

    Aceda ao Cloud Storage

  2. Clique em Criar contentor para abrir o formulário de criação de contentores.

    1. Introduza as informações do seu contentor e clique em Continuar para concluir cada passo:

      • Especifique um Nome globalmente exclusivo para o seu contentor. Este guia usa bucketName como exemplo.

      • Selecione Região para o tipo de estabelecimento. Em seguida, selecione uma Localização onde os dados do contentor vão ser armazenados.

      • Selecione Padrão como a classe de armazenamento predefinida para os seus dados.

      • Selecione o controlo de acesso Uniforme para aceder aos seus objetos.

    2. Clique em Concluído.

gcloud

Use o comando gcloud storage buckets create:

gcloud storage buckets create gs://bucketName/

Substitua o seguinte:

  • bucketName: o nome do contentor que criou anteriormente neste guia.

Exemplos de código

C#

Para se autenticar no Cloud Composer, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.


using Google.Apis.Storage.v1.Data;
using Google.Cloud.Storage.V1;
using System;

public class CreateBucketSample
{
    public Bucket CreateBucket(
        string projectId = "your-project-id",
        string bucketName = "your-unique-bucket-name")
    {
        var storage = StorageClient.Create();
        var bucket = storage.CreateBucket(projectId, bucketName);
        Console.WriteLine($"Created {bucketName}.");
        return bucket;
    }
}

Go

Para se autenticar no Cloud Composer, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
)

// createBucket creates a new bucket in the project.
func createBucket(w io.Writer, projectID, bucketName string) error {
	// projectID := "my-project-id"
	// bucketName := "bucket-name"
	ctx := context.Background()
	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("storage.NewClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()

	bucket := client.Bucket(bucketName)
	if err := bucket.Create(ctx, projectID, nil); err != nil {
		return fmt.Errorf("Bucket(%q).Create: %w", bucketName, err)
	}
	fmt.Fprintf(w, "Bucket %v created\n", bucketName)
	return nil
}

Java

Para se autenticar no Cloud Composer, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class CreateBucket {
  public static void createBucket(String projectId, String bucketName) {
    // The ID of your GCP project
    // String projectId = "your-project-id";

    // The ID to give your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    Storage storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService();

    Bucket bucket = storage.create(BucketInfo.newBuilder(bucketName).build());

    System.out.println("Created bucket " + bucket.getName());
  }
}

Python

Para se autenticar no Cloud Composer, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

from google.cloud import storage


def create_bucket(bucket_name):
    """Creates a new bucket."""
    # bucket_name = "your-new-bucket-name"

    storage_client = storage.Client()

    bucket = storage_client.create_bucket(bucket_name)

    print(f"Bucket {bucket.name} created")

Ruby

Para se autenticar no Cloud Composer, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

def create_bucket bucket_name:
  # The ID to give your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.create_bucket bucket_name

  puts "Created bucket: #{bucket.name}"
end

Crie um esquema do BigQuery formatado em JSON para a tabela de saída

Crie um ficheiro de esquema do BigQuery formatado em JSON que corresponda à tabela de saída que criou anteriormente. Tenha em atenção que os nomes, os tipos e os modos dos campos têm de corresponder aos definidos anteriormente no esquema da tabela do BigQuery. Este ficheiro normaliza os dados do seu ficheiro .txt num formato compatível com o seu esquema do BigQuery. Atribua um nome a este ficheiro jsonSchema.json.

{
    "BigQuery Schema": [
    {
        "name": "location",
        "type": "GEOGRAPHY",
        "mode": "REQUIRED"
    },
    {
        "name": "average_temperature",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },
    {
        "name": "month",
        "type": "STRING",
        "mode": "REQUIRED"
    },
    {
        "name": "inches_of_rain",
        "type": "NUMERIC"
    },
    {
        "name": "is_current",
        "type": "BOOLEAN"
    },
    {
        "name": "latest_measurement",
        "type": "DATE"
    }]
}

Crie um ficheiro JavaScript para formatar os seus dados

Neste ficheiro, vai definir a sua FDU (função definida pelo utilizador) que fornece a lógica para transformar as linhas de texto no seu ficheiro de entrada. Tenha em atenção que esta função considera cada linha de texto no ficheiro de entrada como o seu próprio argumento, pelo que a função é executada uma vez para cada linha do ficheiro de entrada. Atribua um nome a este ficheiro transformCSVtoJSON.js.


function transformCSVtoJSON(line) {
  var values = line.split(',');
  var properties = [
    'location',
    'average_temperature',
    'month',
    'inches_of_rain',
    'is_current',
    'latest_measurement',
  ];
  var weatherInCity = {};

  for (var count = 0; count < values.length; count++) {
    if (values[count] !== 'null') {
      weatherInCity[properties[count]] = values[count];
    }
  }

  return JSON.stringify(weatherInCity);
}

Crie o ficheiro de entrada

Este ficheiro vai conter as informações que quer carregar para a sua tabela do BigQuery. Copie este ficheiro localmente e atribua-lhe o nome inputFile.txt.

POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null

Carregue os seus ficheiros para o seu contentor

Carregue os seguintes ficheiros para o contentor do Cloud Storage que criou anteriormente:

  • Esquema do BigQuery formatado em JSON (.json)
  • Função definida pelo utilizador em JavaScript (transformCSVtoJSON.js)
  • O ficheiro de entrada do texto que quer processar (.txt)

Consola

  1. Na Google Cloud consola, aceda à página Recipientes do Cloud Storage.

    Aceda aos contentores

  2. Na lista de contentores, clique no seu contentor.

  3. No separador Objetos do contentor, faça uma das seguintes ações:

    • Arraste e largue os ficheiros pretendidos do ambiente de trabalho ou do gestor de ficheiros no painel principal da Google Cloud consola.

    • Clique no botão Carregar ficheiros, selecione os ficheiros que quer carregar na caixa de diálogo apresentada e clique em Abrir.

gcloud

Execute o comando gcloud storage cp:

gcloud storage cp OBJECT_LOCATION gs://bucketName

Substitua o seguinte:

  • bucketName: o nome do contentor que criou anteriormente neste guia.
  • OBJECT_LOCATION: o caminho local para o seu objeto. Por exemplo, Desktop/transformCSVtoJSON.js.

Exemplos de código

Python

Para se autenticar no Cloud Composer, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

from google.cloud import storage


def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"
    # The path to your file to upload
    # source_file_name = "local/path/to/file"
    # The ID of your GCS object
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Optional: set a generation-match precondition to avoid potential race conditions
    # and data corruptions. The request to upload is aborted if the object's
    # generation number does not match your precondition. For a destination
    # object that does not yet exist, set the if_generation_match precondition to 0.
    # If the destination object already exists in your bucket, set instead a
    # generation-match precondition using its generation number.
    generation_match_precondition = 0

    blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition)

    print(
        f"File {source_file_name} uploaded to {destination_blob_name}."
    )

Ruby

Para se autenticar no Cloud Composer, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

def upload_file bucket_name:, local_file_path:, file_name: nil
  # The ID of your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  # The path to your file to upload
  # local_file_path = "/local/path/to/file.txt"

  # The ID of your GCS object
  # file_name = "your-file-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.bucket bucket_name, skip_lookup: true

  file = bucket.create_file local_file_path, file_name

  puts "Uploaded #{local_file_path} as #{file.name} in bucket #{bucket_name}"
end

Configure o DataflowTemplateOperator

Antes de executar o DAG, defina as seguintes variáveis do Airflow.

Variável de fluxo de ar Valor
project_id O ID do projeto. Exemplo: example-project.
gce_zone Zona do Compute Engine onde o cluster do Dataflow tem de ser criado. Exemplo: us-central1-a. Para mais informações sobre as zonas válidas, consulte o artigo Regiões e zonas.
bucket_path A localização do contentor do Cloud Storage que criou anteriormente. Exemplo: gs://example-bucket.

Agora, vai fazer referência aos ficheiros que criou anteriormente para criar um DAG que inicia o fluxo de trabalho do Dataflow. Copie este DAG e guarde-o localmente como composer-dataflow-dag.py.



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
For more info on zones where Dataflow is available see:
https://cloud.google.com/dataflow/docs/resources/locations
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataflow import (
    DataflowTemplatedJobStartOperator,
)
from airflow.utils.dates import days_ago

bucket_path = "{{var.value.bucket_path}}"
project_id = "{{var.value.project_id}}"
gce_zone = "{{var.value.gce_zone}}"


default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "tempLocation": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    start_template_job = DataflowTemplatedJobStartOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

Carregue o DAG para o Cloud Storage

Carregue o DAG para a pasta /dags no contentor do seu ambiente. Quando o carregamento estiver concluído com êxito, pode vê-lo clicando no link DAGs Folder na página Environments do Cloud Composer.

A pasta DAGs no seu ambiente contém o seu DAG

Veja o estado da tarefa

  1. Aceda à interface Web do Airflow.
  2. Na página DAGs, clique no nome do DAG (como composerDataflowDAG).
  3. Na página de detalhes dos DAGs, clique em Vista de gráfico.
  4. Verificar estado:

    • Failed: a tarefa tem uma caixa vermelha à volta. Também pode manter o ponteiro sobre a tarefa e procurar Estado: Falhou.

    • Success: a tarefa tem uma caixa verde à volta. Também pode passar o ponteiro sobre a tarefa e verificar se o Estado: Concluído.

Após alguns minutos, pode verificar os resultados no Dataflow e no BigQuery.

Veja a sua tarefa no Dataflow

  1. Na Google Cloud consola, aceda à página Fluxo de dados.

    Aceda ao Dataflow

  2. A sua tarefa tem o nome dataflow_operator_transform_csv_to_bq com um ID exclusivo anexado ao final do nome com um hífen, da seguinte forma:

    A tarefa do Dataflow tem um ID exclusivo

  3. Clique no nome para ver os detalhes da tarefa.

    pode ver todos os detalhes do emprego

Veja os resultados no BigQuery

  1. Na Google Cloud consola, aceda à página BigQuery.

    Aceda ao BigQuery

  2. Pode enviar consultas através do SQL padrão. Use a seguinte consulta para ver as linhas que foram adicionadas à sua tabela:

    SELECT * FROM projectId.average_weather.average_weather
    

O que se segue?