Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Esta página descreve como usar o DataflowTemplateOperator
para iniciar pipelines do Dataflow a partir do Cloud Composer.
O pipeline de texto do Cloud Storage para o BigQuery
é um pipeline em lote que lhe permite carregar ficheiros de texto armazenados no
Cloud Storage, transformá-los através de uma função definida pelo utilizador (UDF) em JavaScript que fornece e gerar os resultados no
BigQuery.
Vista geral
Antes de iniciar o fluxo de trabalho, vai criar as seguintes entidades:
Uma tabela do BigQuery vazia de um conjunto de dados vazio que vai conter as seguintes colunas de informações:
location
,average_temperature
,month
e, opcionalmente,inches_of_rain
,is_current
elatest_measurement
.Um ficheiro JSON que normaliza os dados do ficheiro
.txt
no formato correto para o esquema da tabela do BigQuery. O objeto JSON tem uma matriz deBigQuery Schema
, onde cada objeto contém um nome de coluna, um tipo de entrada e se é ou não um campo obrigatório.Um ficheiro de entrada
.txt
que vai conter os dados que vão ser carregados em lote para a tabela do BigQuery.Uma função definida pelo utilizador escrita em JavaScript que transforma cada linha do ficheiro
.txt
nas variáveis relevantes para a nossa tabela.Um ficheiro DAG do Airflow que vai apontar para a localização destes ficheiros.
Em seguida, vai carregar o ficheiro
.txt
, o ficheiro UDF.js
e o ficheiro de esquema.json
para um contentor do Cloud Storage. Também carrega o DAG para o seu ambiente do Cloud Composer.Depois de carregar o DAG, o Airflow executa uma tarefa a partir dele. Esta tarefa vai iniciar um pipeline do Dataflow que vai aplicar a função definida pelo utilizador ao ficheiro
.txt
e formatá-lo de acordo com o esquema JSON.Por último, os dados são carregados para a tabela do BigQuery que criou anteriormente.
Antes de começar
- Este guia requer familiaridade com o JavaScript para escrever a função definida pelo utilizador.
- Este guia pressupõe que já tem um ambiente do Cloud Composer. Consulte o artigo Crie um ambiente para criar um. Pode usar qualquer versão do Cloud Composer com este guia.
Enable the Cloud Composer, Dataflow, Cloud Storage, BigQuery APIs.
Certifique-se de que tem as seguintes autorizações:
- Funções do Cloud Composer: criar um ambiente (se não tiver um), gerir objetos no contentor do ambiente, executar DAGs e aceder à IU do Airflow.
- Funções do Cloud Storage: crie um contentor e faça a gestão de objetos no mesmo.
- Funções do BigQuery: criar um conjunto de dados e uma tabela, modificar dados na tabela, modificar o esquema e os metadados da tabela.
- Funções do Dataflow: veja tarefas do Dataflow.
Certifique-se de que a conta de serviço do seu ambiente tem autorizações para criar tarefas do Dataflow, aceder ao contentor do Cloud Storage e ler e atualizar dados para a tabela no BigQuery.
Crie uma tabela do BigQuery vazia com uma definição de esquema
Crie uma tabela do BigQuery com uma definição de esquema. Vai usar esta definição do esquema mais tarde neste guia. Esta tabela do BigQuery vai conter os resultados do carregamento em lote.
Para criar uma tabela vazia com uma definição de esquema:
Consola
Na Google Cloud consola, aceda à página do BigQuery:
No painel de navegação, na secção Recursos, expanda o seu projeto.
No painel de detalhes, clique em Criar conjunto de dados.
Na página Criar conjunto de dados, na secção ID do conjunto de dados, atribua um nome ao conjunto de dados
average_weather
. Deixe todos os outros campos no estado predefinido.Clique em Criar conjunto de dados.
Volte ao painel de navegação, na secção Recursos, expanda o seu projeto. Em seguida, clique no conjunto de dados
average_weather
.No painel de detalhes, clique em Criar tabela.
Na página Criar tabela, na secção Origem, selecione Tabela vazia.
Na página Criar tabela, na secção Destino:
Para Nome do conjunto de dados, escolha o conjunto de dados
average_weather
.No campo Nome da tabela, introduza o nome
average_weather
.Verifique se o Tipo de tabela está definido como Tabela nativa.
Na secção Esquema, introduza a definição do esquema. Pode usar uma das seguintes abordagens:
Introduza manualmente as informações do esquema ativando a opção Editar como texto e introduzindo o esquema de tabela como uma matriz JSON. Escreva nos seguintes campos:
[ { "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED" }, { "name": "average_temperature", "type": "INTEGER", "mode": "REQUIRED" }, { "name": "month", "type": "STRING", "mode": "REQUIRED" }, { "name": "inches_of_rain", "type": "NUMERIC" }, { "name": "is_current", "type": "BOOLEAN" }, { "name": "latest_measurement", "type": "DATE" } ]
Use Adicionar campo para introduzir manualmente o esquema:
Para Definições de partição e cluster, deixe o valor predefinido,
No partitioning
.Na secção Opções avançadas, para Encriptação, mantenha o valor predefinido,
Google-owned and managed key
.Clique em Criar tabela.
bq
Use o comando bq mk
para criar um conjunto de dados vazio e uma tabela neste conjunto de dados.
Execute o seguinte comando para criar um conjunto de dados da média meteorológica global:
bq --location=LOCATION mk \
--dataset PROJECT_ID:average_weather
Substitua o seguinte:
LOCATION
: a região onde o ambiente está localizado.PROJECT_ID
: o ID do projeto.
Execute o seguinte comando para criar uma tabela vazia neste conjunto de dados com a definição do esquema:
bq mk --table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE
Depois de criar a tabela, pode atualizar a validade, a descrição e as etiquetas da tabela. Também pode modificar a definição do esquema.
Python
Guarde este código como
dataflowtemplateoperator_create_dataset_and_table_helper.py
e atualize as variáveis no mesmo para refletir o seu projeto e localização. Em seguida, execute-o com o seguinte comando:
python dataflowtemplateoperator_create_dataset_and_table_helper.py
Python
Para se autenticar no Cloud Composer, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.
Crie um contentor do Cloud Storage
Crie um contentor para guardar todos os ficheiros necessários para o fluxo de trabalho. O DAG que criar mais tarde neste guia vai referenciar os ficheiros que carregar para este contentor de armazenamento. Para criar um novo contentor de armazenamento:
Consola
Abra o Cloud Storage na Google Cloud consola.
Clique em Criar contentor para abrir o formulário de criação de contentores.
Introduza as informações do seu contentor e clique em Continuar para concluir cada passo:
Especifique um Nome globalmente exclusivo para o seu contentor. Este guia usa
bucketName
como exemplo.Selecione Região para o tipo de estabelecimento. Em seguida, selecione uma Localização onde os dados do contentor vão ser armazenados.
Selecione Padrão como a classe de armazenamento predefinida para os seus dados.
Selecione o controlo de acesso Uniforme para aceder aos seus objetos.
Clique em Concluído.
gcloud
Use o comando gcloud storage buckets create
:
gcloud storage buckets create gs://bucketName/
Substitua o seguinte:
bucketName
: o nome do contentor que criou anteriormente neste guia.
Exemplos de código
C#
Para se autenticar no Cloud Composer, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.
Go
Para se autenticar no Cloud Composer, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.
Java
Para se autenticar no Cloud Composer, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.
Python
Para se autenticar no Cloud Composer, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.
Ruby
Para se autenticar no Cloud Composer, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.
Crie um esquema do BigQuery formatado em JSON para a tabela de saída
Crie um ficheiro de esquema do BigQuery formatado em JSON que corresponda à
tabela de saída que criou anteriormente. Tenha em atenção que os nomes, os tipos e os modos dos campos têm de corresponder aos definidos anteriormente no esquema da tabela do BigQuery. Este ficheiro normaliza os dados do seu ficheiro .txt
num formato compatível com o seu esquema do BigQuery. Atribua um nome a este ficheiro
jsonSchema.json
.
{
"BigQuery Schema": [
{
"name": "location",
"type": "GEOGRAPHY",
"mode": "REQUIRED"
},
{
"name": "average_temperature",
"type": "INTEGER",
"mode": "REQUIRED"
},
{
"name": "month",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "inches_of_rain",
"type": "NUMERIC"
},
{
"name": "is_current",
"type": "BOOLEAN"
},
{
"name": "latest_measurement",
"type": "DATE"
}]
}
Crie um ficheiro JavaScript para formatar os seus dados
Neste ficheiro, vai definir a sua FDU (função definida pelo utilizador) que fornece a lógica para transformar as linhas de texto no seu ficheiro de entrada. Tenha em atenção que esta função considera cada linha de texto no ficheiro de entrada como o seu próprio argumento, pelo que a função é executada uma vez para cada linha do ficheiro de entrada. Atribua um nome a este ficheiro
transformCSVtoJSON.js
.
Crie o ficheiro de entrada
Este ficheiro vai conter as informações que quer carregar para a sua tabela do BigQuery. Copie este ficheiro localmente e atribua-lhe o nome
inputFile.txt
.
POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null
Carregue os seus ficheiros para o seu contentor
Carregue os seguintes ficheiros para o contentor do Cloud Storage que criou anteriormente:
- Esquema do BigQuery formatado em JSON (
.json
) - Função definida pelo utilizador em JavaScript (
transformCSVtoJSON.js
) O ficheiro de entrada do texto que quer processar (
.txt
)
Consola
- Na Google Cloud consola, aceda à página Recipientes do Cloud Storage.
Na lista de contentores, clique no seu contentor.
No separador Objetos do contentor, faça uma das seguintes ações:
Arraste e largue os ficheiros pretendidos do ambiente de trabalho ou do gestor de ficheiros no painel principal da Google Cloud consola.
Clique no botão Carregar ficheiros, selecione os ficheiros que quer carregar na caixa de diálogo apresentada e clique em Abrir.
gcloud
Execute o comando gcloud storage cp
:
gcloud storage cp OBJECT_LOCATION gs://bucketName
Substitua o seguinte:
bucketName
: o nome do contentor que criou anteriormente neste guia.OBJECT_LOCATION
: o caminho local para o seu objeto. Por exemplo,Desktop/transformCSVtoJSON.js
.
Exemplos de código
Python
Para se autenticar no Cloud Composer, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.
Ruby
Para se autenticar no Cloud Composer, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.
Configure o DataflowTemplateOperator
Antes de executar o DAG, defina as seguintes variáveis do Airflow.
Variável de fluxo de ar | Valor |
---|---|
project_id
|
O ID do projeto. Exemplo: example-project . |
gce_zone
|
Zona do Compute Engine onde o cluster do Dataflow tem de ser criado. Exemplo: us-central1-a . Para mais informações sobre as zonas válidas, consulte o artigo Regiões e zonas. |
bucket_path
|
A localização do contentor do Cloud Storage que criou anteriormente. Exemplo: gs://example-bucket |
Agora, vai fazer referência aos ficheiros que criou anteriormente para criar um DAG que inicia o fluxo de trabalho do Dataflow. Copie este DAG e guarde-o localmente
como composer-dataflow-dag.py
.
Airflow 2
Fluxo de ar 1
Carregue o DAG para o Cloud Storage
Carregue o DAG para a pasta /dags
no contentor do seu ambiente. Quando o carregamento estiver concluído com êxito, pode vê-lo
clicando no link DAGs Folder na página
Environments do Cloud Composer.
Veja o estado da tarefa
- Aceda à interface Web do Airflow.
- Na página DAGs, clique no nome do DAG (como
composerDataflowDAG
). - Na página de detalhes dos DAGs, clique em Vista de gráfico.
Verificar estado:
Failed
: a tarefa tem uma caixa vermelha à volta. Também pode manter o ponteiro sobre a tarefa e procurar Estado: Falhou.Success
: a tarefa tem uma caixa verde à volta. Também pode passar o ponteiro sobre a tarefa e verificar se o Estado: Concluído.
Após alguns minutos, pode verificar os resultados no Dataflow e no BigQuery.
Veja a sua tarefa no Dataflow
Na Google Cloud consola, aceda à página Fluxo de dados.
A sua tarefa tem o nome
dataflow_operator_transform_csv_to_bq
com um ID exclusivo anexado ao final do nome com um hífen, da seguinte forma:Clique no nome para ver os detalhes da tarefa.
Veja os resultados no BigQuery
Na Google Cloud consola, aceda à página BigQuery.
Pode enviar consultas através do SQL padrão. Use a seguinte consulta para ver as linhas que foram adicionadas à sua tabela:
SELECT * FROM projectId.average_weather.average_weather