Iniciar pipelines do Dataflow com o Cloud Composer

Cloud Composer 1 | Cloud Composer 2

Nesta página, descrevemos como usar o DataflowTemplateOperator para iniciar pipelines do Dataflow no Cloud Composer. O pipeline Cloud Storage Text para BigQuery é um pipeline em lote que permite fazer upload de arquivos de texto armazenados no Cloud Storage, transformá-los usando uma função definida pelo usuário (UDF) em JavaScript fornecida por você e gerar os resultados no BigQuery.

uma função definida pelo usuário, um arquivo de entrada e um esquema JSON são enviados
 para um bucket do Cloud Storage. Um DAG que faz referência a esses arquivos inicia um pipeline em lote do Dataflow, que aplica a função definida pelo usuário e o arquivo de esquema JSON ao arquivo de entrada. Depois, será feito o upload desse conteúdo em uma tabela do BigQuery

Visão geral

  • Antes de iniciar o fluxo de trabalho, você criará as seguintes entidades:

    • Uma tabela vazia do BigQuery de um conjunto de dados vazio que conterá as seguintes colunas de informações: location, average_temperature, month e, opcionalmente, inches_of_rain, is_current e latest_measurement.

    • Um arquivo JSON que normalizará os dados do arquivo .txt para o formato correto para o esquema da tabela do BigQuery. O objeto JSON terá uma matriz de BigQuery Schema, em que cada objeto conterá um nome de coluna, tipo de entrada e se é ou não um campo obrigatório.

    • Um arquivo de entrada .txt que conterá os dados que serão enviados por upload em lote para a tabela do BigQuery.

    • Uma função definida pelo usuário escrita em JavaScript que transformará cada linha do arquivo .txt nas variáveis relevantes para a tabela.

    • Um arquivo DAG do Airflow que apontará para o local desses arquivos.

  • Em seguida, faça upload dos arquivos .txt, .js e de esquema .json para um bucket do Cloud Storage. Você também fará upload do DAG para o ambiente do Cloud Composer.

  • Depois que o DAG for carregado, o Airflow executará uma tarefa a partir dele. Essa tarefa iniciará um pipeline do Dataflow que aplicará a função definida pelo usuário ao arquivo .txt e a formatará de acordo com o esquema JSON.

  • Por fim, os dados serão enviados para a tabela do BigQuery que você criou anteriormente.

Antes de começar

  • Este guia requer familiaridade com JavaScript para criar a função definida pelo usuário.
  • Para seguir este guia, você precisa ter um ambiente do Cloud Composer. Consulte Criar ambiente para criar um. É possível usar qualquer versão do Cloud Composer com este guia.
  • Ative as APIs Cloud Composer, Dataflow, Cloud Storage, BigQuery.

    Ative as APIs

Criar uma tabela vazia do BigQuery com definição de esquema

Criar uma tabela do BigQuery com uma definição de esquema Você vai usar essa definição de esquema posteriormente neste guia. Essa tabela do BigQuery manterá os resultados do upload em lote.

Para criar uma tabela vazia com definição de esquema, faça o seguinte:

Console

  1. No console do Google Cloud, acesse a página do BigQuery:

    Acessar o BigQuery

  2. No painel de navegação, na seção Recursos, expanda o projeto.

  3. No painel de detalhes, clique em Criar conjunto de dados.

    clique no botão "Create a dataset"

  4. Na página "Criar conjunto de dados", na seção ID do conjunto de dados, nomeie o conjunto de dados como average_weather. Deixe todos os outros campos no estado padrão.

    Preencha o ID do conjunto de dados com o nome average_weather

  5. Clique em Criar conjunto de dados.

  6. Volte ao painel de navegação, na seção Recursos, expanda o projeto. Em seguida, clique no conjunto de dados average_weather.

  7. No painel de detalhes, clique em Criar tabela.

    clique em "Criar tabela"

  8. Na página Criar tabela, na seção Origem, selecione Tabela vazia.

  9. Na página Criar tabela, na seção Destino:

    • Em Nome do conjunto de dados, escolha o conjunto de dados average_weather.

      Escolha a opção "Conjunto de dados" para o conjunto de dados average_weather

    • No campo Nome da tabela, digite o nome average_weather.

    • Verifique se Table type está definido como Native table.

  10. Na seção Esquema, insira a definição do esquema. Use uma das seguintes abordagens:

    • Para inserir as informações do esquema manualmente, ative a opção Editar como texto e insira o esquema da tabela como uma matriz JSON. Digite os seguintes campos:

      [
          {
              "name": "location",
              "type": "GEOGRAPHY",
              "mode": "REQUIRED"
          },
          {
              "name": "average_temperature",
              "type": "INTEGER",
              "mode": "REQUIRED"
          },
          {
              "name": "month",
              "type": "STRING",
              "mode": "REQUIRED"
          },
          {
              "name": "inches_of_rain",
              "type": "NUMERIC"
          },
          {
              "name": "is_current",
              "type": "BOOLEAN"
          },
          {
              "name": "latest_measurement",
              "type": "DATE"
          }
      ]
      
    • Use Adicionar campo para inserir manualmente o esquema:

      clique em "Adicionar campo" para inserir os campos

  11. Em Configurações de partição e cluster, use o valor padrão No partitioning.

  12. Na seção Opções avançadas, para Criptografia, use o valor padrão Google-managed key.

  13. Clique em Criar tabela.

bq

Use o comando bq mk para criar um conjunto de dados vazios e uma tabela nesse conjunto de dados.

Execute o seguinte comando para criar um conjunto de dados de clima global médio:

bq --location=LOCATION mk \
    --dataset PROJECT_ID:average_weather

Substitua:

  • LOCATION: a região em que o ambiente está localizado.
  • PROJECT_ID: o ID do projeto.

Execute o seguinte comando para criar uma tabela vazia nesse conjunto de dados com a definição do esquema:

bq mk --table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE

Após a criação da tabela, é possível atualizar a expiração, a descrição e os rótulos da tabela. É possível também modificar a definição do esquema.

Python

Salve este código como dataflowtemplateoperator_create_dataset_and_table_helper.py e atualize as variáveis nele para refletir seu projeto e local. Em seguida, execute-o com o seguinte comando:

python dataflowtemplateoperator_create_dataset_and_table_helper.py

Python

Para autenticar no Cloud Composer, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.


# Make sure to follow the quickstart setup instructions beforehand.
# See instructions here:
# https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries

# Before running the sample, be sure to install the bigquery library
# in your local environment by running pip install google.cloud.bigquery

from google.cloud import bigquery

# TODO(developer): Replace with your values
project = "your-project"  # Your GCP Project
location = "US"  # the location where you want your BigQuery data to reside. For more info on possible locations see https://cloud.google.com/bigquery/docs/locations
dataset_name = "average_weather"

def create_dataset_and_table(project, location, dataset_name):
    # Construct a BigQuery client object.
    client = bigquery.Client(project)

    dataset_id = f"{project}.{dataset_name}"

    # Construct a full Dataset object to send to the API.
    dataset = bigquery.Dataset(dataset_id)

    # Set the location to your desired location for the dataset.
    # For more information, see this link:
    # https://cloud.google.com/bigquery/docs/locations
    dataset.location = location

    # Send the dataset to the API for creation.
    # Raises google.api_core.exceptions.Conflict if the Dataset already
    # exists within the project.
    dataset = client.create_dataset(dataset)  # Make an API request.

    print(f"Created dataset {client.project}.{dataset.dataset_id}")

    # Create a table from this dataset.

    table_id = f"{client.project}.{dataset_name}.average_weather"

    schema = [
        bigquery.SchemaField("location", "GEOGRAPHY", mode="REQUIRED"),
        bigquery.SchemaField("average_temperature", "INTEGER", mode="REQUIRED"),
        bigquery.SchemaField("month", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("inches_of_rain", "NUMERIC", mode="NULLABLE"),
        bigquery.SchemaField("is_current", "BOOLEAN", mode="NULLABLE"),
        bigquery.SchemaField("latest_measurement", "DATE", mode="NULLABLE"),
    ]

    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)  # Make an API request.
    print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")

crie um bucket do Cloud Storage

Crie um bucket para guardar todos os arquivos necessários para o fluxo de trabalho. O DAG que você criar mais adiante neste guia fará referência aos arquivos que você envia para esse bucket de armazenamento. Para criar um novo bucket de armazenamento:

Console

  1. Abra o Cloud Storage no console do Google Cloud.

    Acesse o Cloud Storage

  2. Clique em Criar bucket para abrir o formulário de criação de bucket.

    1. Insira as informações do bucket e clique em Continuar para concluir cada etapa:

      • Especifique um Nome globalmente exclusivo para o bucket. Este guia usa bucketName como exemplo.

      • Selecione Região para o tipo de local. Em seguida, selecione um Local em que os dados do bucket serão armazenados.

      • Selecione Padrão como a classe de armazenamento padrão dos dados.

      • Selecione o controle de acesso Uniforme para acessar os objetos.

    2. Clique em Concluído.

gsutil

Use o comando gsutil mb:

gsutil mb gs://bucketName/

Substitua:

  • bucketName: o nome do bucket que você criou anteriormente neste guia.

Amostras de código

C#

Para autenticar no Cloud Composer, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.


using Google.Apis.Storage.v1.Data;
using Google.Cloud.Storage.V1;
using System;

public class CreateBucketSample
{
    public Bucket CreateBucket(
        string projectId = "your-project-id",
        string bucketName = "your-unique-bucket-name")
    {
        var storage = StorageClient.Create();
        var bucket = storage.CreateBucket(projectId, bucketName);
        Console.WriteLine($"Created {bucketName}.");
        return bucket;
    }
}

Go

Para autenticar no Cloud Composer, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
)

// createBucket creates a new bucket in the project.
func createBucket(w io.Writer, projectID, bucketName string) error {
	// projectID := "my-project-id"
	// bucketName := "bucket-name"
	ctx := context.Background()
	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("storage.NewClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()

	bucket := client.Bucket(bucketName)
	if err := bucket.Create(ctx, projectID, nil); err != nil {
		return fmt.Errorf("Bucket(%q).Create: %w", bucketName, err)
	}
	fmt.Fprintf(w, "Bucket %v created\n", bucketName)
	return nil
}

Java

Para autenticar no Cloud Composer, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class CreateBucket {
  public static void createBucket(String projectId, String bucketName) {
    // The ID of your GCP project
    // String projectId = "your-project-id";

    // The ID to give your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    Storage storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService();

    Bucket bucket = storage.create(BucketInfo.newBuilder(bucketName).build());

    System.out.println("Created bucket " + bucket.getName());
  }
}

Python

Para autenticar no Cloud Composer, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

from google.cloud import storage

def create_bucket(bucket_name):
    """Creates a new bucket."""
    # bucket_name = "your-new-bucket-name"

    storage_client = storage.Client()

    bucket = storage_client.create_bucket(bucket_name)

    print(f"Bucket {bucket.name} created")

Ruby

Para autenticar no Cloud Composer, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

def create_bucket bucket_name:
  # The ID to give your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.create_bucket bucket_name

  puts "Created bucket: #{bucket.name}"
end

Criar um esquema do BigQuery formatado em JSON para a tabela de saída

Crie um arquivo de esquema do BigQuery formatado em JSON que corresponda à tabela de saída criada anteriormente. Os nomes, tipos e modos de campos precisam corresponder aos definidos anteriormente no esquema da tabela do BigQuery. Esse arquivo normalizará os dados do arquivo .txt para um formato compatível com o esquema do BigQuery. Nomeie esse arquivo como jsonSchema.json.

{
    "BigQuery Schema": [
    {
        "name": "location",
        "type": "GEOGRAPHY",
        "mode": "REQUIRED"
    },
    {
        "name": "average_temperature",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },
    {
        "name": "month",
        "type": "STRING",
        "mode": "REQUIRED"
    },
    {
        "name": "inches_of_rain",
        "type": "NUMERIC"
    },
    {
        "name": "is_current",
        "type": "BOOLEAN"
    },
    {
        "name": "latest_measurement",
        "type": "DATE"
    }]
}

Criar um arquivo JavaScript para formatar os dados

Nesse arquivo, você definirá a função definida pelo usuário (UDF, na sigla em inglês) que fornece a lógica para transformar as linhas de texto no arquivo de entrada. Essa função usa cada linha de texto do arquivo de entrada como o próprio argumento, portanto, a função será executada uma vez para cada linha do arquivo de entrada. Nomeie esse arquivo como transformCSVtoJSON.js.


function transformCSVtoJSON(line) {
  var values = line.split(',');
  var properties = [
    'location',
    'average_temperature',
    'month',
    'inches_of_rain',
    'is_current',
    'latest_measurement',
  ];
  var weatherInCity = {};

  for (var count = 0; count < values.length; count++) {
    if (values[count] !== 'null') {
      weatherInCity[properties[count]] = values[count];
    }
  }

  return JSON.stringify(weatherInCity);
}

Criar o arquivo de entrada

Esse arquivo conterá as informações que você quer enviar para sua tabela do BigQuery. Copie esse arquivo localmente e nomeie-o como inputFile.txt.

POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null

Faça upload dos arquivos no seu bucket

Faça o upload dos seguintes arquivos para o bucket do Cloud Storage criado anteriormente:

  • O esquema do BigQuery formatado em JSON (.json)
  • A função JavaScript definida pelo usuário (transformCSVtoJSON.js)
  • O arquivo de entrada do texto que você quer processar (.txt)

Console

  1. No console do Google Cloud, acesse a página Buckets do Cloud Storage.

    Acessar buckets

  2. Na lista de buckets, clique no seu bucket.

  3. Na guia Objetos do bucket, siga um destes procedimentos:

    • Arraste e solte os arquivos que você quer enviar da área de trabalho ou do gerenciador de arquivos para o painel principal no console do Google Cloud.

    • Clique no botão Fazer upload de arquivos, selecione os arquivos que você quer enviar na caixa de diálogo exibida e clique em Abrir.

gsutil

Execute o comando gsutil cp:

gsutil cp OBJECT_LOCATION gs://bucketName

Substitua:

  • bucketName: o nome do bucket que você criou anteriormente neste guia.
  • OBJECT_LOCATION: o caminho local do objeto. Por exemplo, Desktop/transformCSVtoJSON.js.

Amostras de código

Python

Para autenticar no Cloud Composer, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

from google.cloud import storage

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"
    # The path to your file to upload
    # source_file_name = "local/path/to/file"
    # The ID of your GCS object
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Optional: set a generation-match precondition to avoid potential race conditions
    # and data corruptions. The request to upload is aborted if the object's
    # generation number does not match your precondition. For a destination
    # object that does not yet exist, set the if_generation_match precondition to 0.
    # If the destination object already exists in your bucket, set instead a
    # generation-match precondition using its generation number.
    generation_match_precondition = 0

    blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition)

    print(
        f"File {source_file_name} uploaded to {destination_blob_name}."
    )

Ruby

Para autenticar no Cloud Composer, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

def upload_file bucket_name:, local_file_path:, file_name: nil
  # The ID of your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  # The path to your file to upload
  # local_file_path = "/local/path/to/file.txt"

  # The ID of your GCS object
  # file_name = "your-file-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.bucket bucket_name, skip_lookup: true

  file = bucket.create_file local_file_path, file_name

  puts "Uploaded #{local_file_path} as #{file.name} in bucket #{bucket_name}"
end

Configurar DataflowTemplateOperator

Antes de executar o DAG, defina as seguintes variáveis do Airflow.

Variável do Airflow Valor
project_id O ID do projeto
gce_zone Zona do Compute Engine em que o cluster do Dataflow precisa ser criado
bucket_path o local do bucket do Cloud Storage criado anteriormente

Agora você fará referência aos arquivos criados anteriormente para criar um DAG que inicie o fluxo de trabalho do Dataflow. Copie esse DAG e salve-o localmente como composer-dataflow-dag.py.

Airflow 2



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
For more info on zones where Dataflow is available see:
https://cloud.google.com/dataflow/docs/resources/locations
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataflow import (
    DataflowTemplatedJobStartOperator,
)
from airflow.utils.dates import days_ago

bucket_path = "{{var.value.bucket_path}}"
project_id = "{{var.value.project_id}}"
gce_zone = "{{var.value.gce_zone}}"

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "tempLocation": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    start_template_job = DataflowTemplatedJobStartOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

Airflow 1



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
  created.
Learn more about the difference between the two here:
https://cloud.google.com/compute/docs/regions-zones
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.utils.dates import days_ago

bucket_path = "{{var.value.bucket_path}}"
project_id = "{{var.value.project_id}}"
gce_zone = "{{var.value.gce_zone}}"

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "tempLocation": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    start_template_job = DataflowTemplateOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

Fazer upload do DAG para o Cloud Storage

Faça upload do DAG para a pasta /dags no bucket do ambiente. Quando o upload for concluído, clique no link Pasta de DAGs na página de ambientes do Cloud Composer para conferi-lo.

A pasta de DAGs no ambiente contém o DAG

Visualizar o status da tarefa

  1. Acesse a interface da Web do Airflow.
  2. Na página DAGs, clique no nome do DAG, como composerDataflowDAG.
  3. Na página "Detalhes dos DAGs", clique em Visualizar gráfico.
  4. Verificar status:

    • Failed: a tarefa tem uma caixa vermelha ao redor. Também é possível manter o ponteiro sobre a tarefa e procurar por State: failed.

    • Success: a tarefa tem uma caixa verde ao redor. Também é possível manter o ponteiro sobre a tarefa e verificar se há State: Success.

Depois de alguns minutos, é possível verificar os resultados no Dataflow e no BigQuery.

Ver o job no Dataflow

  1. No Console do Google Cloud, abra a página Dataflow.

    Acessar o Dataflow

  2. O job é nomeado como dataflow_operator_transform_csv_to_bq com um ID exclusivo anexado ao final do nome com um hífen, da seguinte forma:

    O job do Dataflow tem um ID exclusivo

  3. Clique no nome para ver os detalhes do job.

    Conferir todos os detalhes do job

Ver os resultados no BigQuery

  1. No Console do Google Cloud, acesse a página BigQuery.

    Acessar o BigQuery

  2. Você pode enviar consultas usando o SQL padrão. Use a consulta a seguir para ver as linhas que foram adicionadas à tabela:

    SELECT * FROM projectId.average_weather.average_weather