Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Nesta página, descrevemos como usar o DataflowTemplateOperator
para iniciar pipelines do Dataflow no Cloud Composer.
O pipeline do Cloud Storage Text para BigQuery
é um pipeline em lote que permite fazer upload de arquivos de texto armazenados no
Cloud Storage, transformá-los usando uma função definida pelo usuário (UDF, na sigla em inglês) do JavaScript fornecida por você e gerar o resultado no
BigQuery.
Visão geral
Antes de iniciar o fluxo de trabalho, você vai criar as seguintes entidades:
Uma tabela vazia do BigQuery de um conjunto de dados vazio que conterá as seguintes colunas de informações:
location
,average_temperature
,month
e, opcionalmente,inches_of_rain
,is_current
elatest_measurement
.Um arquivo JSON que normalizará os dados do arquivo
.txt
no formato correto para o esquema da tabela do BigQuery. O objeto JSON terá uma matriz deBigQuery Schema
, em que cada objeto conterá um nome de coluna, tipo de entrada e se é ou não um campo obrigatório.Um arquivo
.txt
de entrada que conterá os dados que serão enviados em lote para a tabela do BigQuery.Uma função definida pelo usuário escrita em JavaScript que transformará cada linha do arquivo
.txt
nas variáveis relevantes para a tabela.Um arquivo DAG do Airflow que aponta para o local desses arquivos.
Em seguida, faça upload do arquivo
.txt
, do arquivo UDF.js
e do arquivo de esquema.json
para um bucket do Cloud Storage. Você também vai fazer upload do DAG para o ambiente do Cloud Composer.Depois que o DAG for enviado, o Airflow vai executar uma tarefa dele. Essa tarefa vai iniciar um pipeline do Dataflow que vai aplicar a função definida pelo usuário ao arquivo
.txt
e formatá-lo de acordo com o esquema JSON.Por fim, os dados serão enviados para a tabela do BigQuery que você criou anteriormente.
Antes de começar
- Para escrever a função definida pelo usuário deste guia, recomendamos que você tenha familiaridade com o JavaScript.
- Este guia pressupõe que você já tenha um ambiente do Cloud Composer. Consulte Criar um ambiente para criar um. É possível usar qualquer versão do Cloud Composer com este guia.
-
Enable the Cloud Composer, Dataflow, Cloud Storage, BigQuery APIs.
Criar uma tabela vazia do BigQuery com uma definição de esquema
Crie uma tabela do BigQuery com uma definição de esquema. Você vai usar essa definição de esquema mais adiante neste guia. Essa tabela do BigQuery vai armazenar os resultados do upload em lote.
Para criar uma tabela vazia com definição de esquema, faça o seguinte:
Console
No console do Google Cloud, acesse a página do BigQuery:
No painel de navegação, na seção Recursos, expanda o projeto.
No painel de detalhes, clique em Criar conjunto de dados.
Na página "Criar conjunto de dados", na seção ID do conjunto de dados, nomeie o conjunto de dados como
average_weather
. Deixe todos os outros campos no estado padrão.Clique em Criar conjunto de dados.
Volte para o painel de navegação, na seção Recursos, e expanda seu projeto. Em seguida, clique no conjunto de dados
average_weather
.No painel de detalhes, clique em Criar tabela.
Na página Criar tabela, na seção Origem, selecione Tabela vazia.
Na página Criar tabela, na seção Destino:
Em Nome do conjunto de dados, escolha o conjunto de dados
average_weather
.No campo Nome da tabela, digite o nome
average_weather
.Verifique se Table type está definido como Native table.
Na seção Esquema, insira a definição do esquema. Você pode usar uma das seguintes abordagens:
Insira as informações do esquema manualmente ativando Editar como texto e insira o esquema da tabela como uma matriz JSON. Digite os seguintes campos:
[ { "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED" }, { "name": "average_temperature", "type": "INTEGER", "mode": "REQUIRED" }, { "name": "month", "type": "STRING", "mode": "REQUIRED" }, { "name": "inches_of_rain", "type": "NUMERIC" }, { "name": "is_current", "type": "BOOLEAN" }, { "name": "latest_measurement", "type": "DATE" } ]
Use Adicionar campo para inserir manualmente o esquema:
Em Configurações de partição e cluster, use o valor padrão,
No partitioning
.Na seção Opções avançadas, para Criptografia, use o valor padrão
Google-owned and managed key
.Clique em Criar tabela.
bq
Use o comando bq mk
para criar um conjunto de dados vazio e uma tabela nele.
Execute o comando a seguir para criar um conjunto de dados de clima global médio:
bq --location=LOCATION mk \
--dataset PROJECT_ID:average_weather
Substitua:
LOCATION
: a região em que o ambiente está localizado.PROJECT_ID
: o ID do projeto.
Execute o comando abaixo para criar uma tabela vazia nesse conjunto de dados com a definição de esquema:
bq mk --table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE
Após a criação da tabela, é possível atualizar a expiração, a descrição e os rótulos dela. É possível também modificar a definição do esquema.
Python
Salve esse código como
dataflowtemplateoperator_create_dataset_and_table_helper.py
e atualize as variáveis nele para refletir o projeto e local. Em seguida, execute-o com o seguinte comando:
python dataflowtemplateoperator_create_dataset_and_table_helper.py
Python
Para autenticar no Cloud Composer, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.
Criar um bucket do Cloud Storage
Crie um bucket para armazenar todos os arquivos necessários para o fluxo de trabalho. O DAG que você cria mais adiante neste guia faz referência aos arquivos enviados para esse bucket de armazenamento. Para criar um novo bucket de armazenamento:
Console
Abra o Cloud Storage no console do Google Cloud.
Clique em Criar bucket para abrir o formulário de criação de bucket.
Insira as informações do bucket e clique em Continuar para concluir cada etapa:
Especifique um Nome globalmente exclusivo para o bucket. Este guia usa
bucketName
como exemplo.Selecione Região para o tipo de local. Em seguida, selecione um Local onde os dados do bucket serão armazenados.
Selecione Padrão como a classe de armazenamento padrão dos dados.
Selecione o controle de acesso Uniforme para acessar os objetos.
Clique em Concluído.
gcloud
Use o comando gcloud storage buckets create
:
gcloud storage buckets create gs://bucketName/
Substitua:
bucketName
: o nome do bucket criado anteriormente neste guia.
Amostras de código
C#
Para autenticar no Cloud Composer, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.
Go
Para autenticar no Cloud Composer, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.
Java
Para autenticar no Cloud Composer, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.
Python
Para autenticar no Cloud Composer, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.
Ruby
Para autenticar no Cloud Composer, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.
Criar um esquema do BigQuery formatado em JSON para a tabela de saída
Crie um arquivo de esquema do BigQuery formatado em JSON que corresponda à
tabela de saída criada anteriormente. Os nomes, tipos e modos do campo
precisam corresponder aos definidos anteriormente no esquema da tabela do BigQuery. Esse arquivo vai normalizar os dados do arquivo .txt
em um formato
compatível com o esquema do BigQuery. Nomeie esse arquivo como
jsonSchema.json
.
{
"BigQuery Schema": [
{
"name": "location",
"type": "GEOGRAPHY",
"mode": "REQUIRED"
},
{
"name": "average_temperature",
"type": "INTEGER",
"mode": "REQUIRED"
},
{
"name": "month",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "inches_of_rain",
"type": "NUMERIC"
},
{
"name": "is_current",
"type": "BOOLEAN"
},
{
"name": "latest_measurement",
"type": "DATE"
}]
}
Criar um arquivo JavaScript para formatar os dados
Nesse arquivo, você vai definir a UDF (função definida pelo usuário) que fornece
a lógica para transformar as linhas de texto no arquivo de entrada. Essa função usa cada linha de texto no arquivo de entrada como o próprio argumento dela. Portanto, a função será executada uma vez para cada linha do arquivo de entrada. Nomeie esse arquivo como
transformCSVtoJSON.js
.
Criar o arquivo de entrada
Esse arquivo conterá as informações que você quer enviar para a tabela do BigQuery. Copie esse arquivo localmente e nomeie-o como
inputFile.txt
.
POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null
Fazer upload dos arquivos para o bucket
Faça upload dos seguintes arquivos para o bucket do Cloud Storage criado anteriormente:
- O esquema do BigQuery formatado em JSON (
.json
) - A função JavaScript definida pelo usuário (
transformCSVtoJSON.js
) O arquivo de entrada do texto que você quer processar (
.txt
)
Console
- No Console do Google Cloud, acesse a página Buckets do Cloud Storage.
Na lista de buckets, clique no seu bucket.
Na guia Objetos do bucket, faça o seguinte:
Arraste e solte os arquivos que você quer enviar da área de trabalho ou do gerenciador de arquivos para o painel principal no console do Google Cloud.
Clique no botão Enviar arquivos, selecione os arquivos que você quer enviar na caixa de diálogo exibida e clique em Abrir.
gcloud
Execute o comando gcloud storage cp
:
gcloud storage cp OBJECT_LOCATION gs://bucketName
Substitua:
bucketName
: o nome do bucket criado anteriormente neste guia.OBJECT_LOCATION
: o caminho local para o objeto. Por exemplo,Desktop/transformCSVtoJSON.js
.
Amostras de código
Python
Para autenticar no Cloud Composer, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.
Ruby
Para autenticar no Cloud Composer, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.
Configurar o DataflowTemplateOperator
Antes de executar o DAG, defina as seguintes variáveis do Airflow.
Variável do Airflow | Valor |
---|---|
project_id
|
O ID do projeto |
gce_zone
|
Zona do Compute Engine em que o cluster do Dataflow precisa ser criado |
bucket_path
|
O local do bucket do Cloud Storage criado anteriormente |
Agora você fará referência aos arquivos criados anteriormente para criar um DAG que inicie
o fluxo de trabalho do Dataflow. Copie esse DAG e salve-o localmente
como composer-dataflow-dag.py
.
Fazer upload do DAG para o Cloud Storage
Faça upload do DAG para a pasta /dags
no bucket do ambiente. Depois de concluir o upload, você poderá acessá-lo
clicando no link Pasta DAGs na página "Ambientes do Cloud Composer".
Conferir o status da tarefa
- Acesse a interface da Web do Airflow.
- Na página "DAGs", clique no nome do DAG, como
composerDataflowDAG
. - Na página "Detalhes dos DAGs", clique em Visualizar gráfico.
Verificar status:
Failed
: a tarefa tem uma caixa vermelha ao redor. Você também pode manter o ponteiro do mouse sobre a tarefa e verificar se há a mensagem Estado: Falha.Success
: a tarefa tem uma caixa verde ao redor. Você também pode manter o ponteiro sobre a tarefa e verificar se há a mensagem Estado: sucesso.
Após alguns minutos, você pode conferir os resultados no Dataflow e no BigQuery.
Ver o job no Dataflow
No Console do Google Cloud, abra a página Dataflow.
O job é nomeado
dataflow_operator_transform_csv_to_bq
com um ID exclusivo anexado ao final do nome com um hífen, como:Clique no nome para conferir os detalhes do job.
Ver os resultados no BigQuery
No Console do Google Cloud, acesse a página BigQuery.
Você pode enviar consultas usando o SQL padrão. Use a consulta a seguir para ver as linhas que foram adicionadas à tabela:
SELECT * FROM projectId.average_weather.average_weather