Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Este tutorial mostra como usar o Cloud Composer para criar um DAG do Apache Airflow. O DAG une dados de um conjunto de dados público do BigQuery e um arquivo CSV armazenado em um bucket do Cloud Storage e, em seguida, executa um job em lote sem servidor do Dataproc para processar os dados unidos.
O conjunto de dados público do BigQuery neste tutorial é ghcn_d, um banco de dados integrado de resumos climáticos em todo o mundo. O arquivo CSV contém informações sobre as datas e os nomes dos feriados dos EUA de 1997 a 2021.
A pergunta que queremos responder usando a DAG é: "Qual foi a temperatura em Chicago no Dia de Ação de Graças nos últimos 25 anos?"
Objetivos
- Criar um ambiente do Cloud Composer na configuração padrão
- Criar um conjunto de dados vazio do BigQuery
- Crie um novo bucket do Cloud Storage
- Crie e execute um DAG que inclua as seguintes tarefas:
- Carregar um conjunto de dados externos do Cloud Storage para o BigQuery
- Mesclar dois conjuntos de dados no BigQuery
- Executar um job de análise de dados do PySpark
Antes de começar
Ativar APIs
Ative as APIs a seguir:
Console
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Conceder permissões
Conceda os seguintes papéis e permissões à sua conta de usuário:
Conceda papéis para gerenciar ambientes e buckets de ambiente do Cloud Composer.
Conceda o papel de proprietário de dados do BigQuery (
roles/bigquery.dataOwner
) para criar um conjunto de dados do BigQuery.Conceda a função Administrador do Storage (
roles/storage.admin
) para criar um bucket do Cloud Storage.
Criar e preparar seu ambiente do Cloud Composer
Crie um ambiente do Cloud Composer com parâmetros padrão:
- Escolha uma região dos EUA.
- Escolha a versão mais recente do Cloud Composer.
Conceda os papéis a seguir à conta de serviço usada no seu ambiente do Cloud Composer para que os workers do Airflow possam executar tarefas de DAG:
- Usuário do BigQuery (
roles/bigquery.user
) - Proprietário de dados do BigQuery (
roles/bigquery.dataOwner
) - Usuário da conta de serviço (
roles/iam.serviceAccountUser
) - Editor do Dataproc (
roles/dataproc.editor
) - Worker do Dataproc (
roles/dataproc.worker
)
- Usuário do BigQuery (
Criar recursos relacionados
Crie um conjunto de dados vazio do BigQuery com os seguintes parâmetros:
- Nome:
holiday_weather
- Região:
US
- Nome:
Crie um bucket do Cloud Storage na multirregião
US
.Execute o comando a seguir para ativar o Acesso particular do Google na sub-rede padrão na região em que você quer executar o Dataproc sem servidor para atender aos requisitos de rede. Recomendamos usar a mesma região do seu ambiente do Cloud Composer.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Processamento de dados usando o Dataproc Serverless
Confira o exemplo de job do PySpark
O código mostrado abaixo é um exemplo de job do PySpark que converte a temperatura de décimos de um grau em Celsius para graus Celsius. Esse job converte os dados de temperatura do conjunto de dados em um formato diferente.
Fazer upload de arquivos de suporte para o Cloud Storage
Para fazer upload do arquivo PySpark e do conjunto de dados armazenado em holidays.csv
:
Salve data_analytics_process.py na sua máquina local.
Salve holidays.csv na sua máquina local.
No console do Google Cloud, acesse a página Navegador do Cloud Storage:
Clique no nome do bucket que você criou anteriormente.
Na guia Objects do bucket, clique no botão Upload files, selecione
data_analytics_process.py
eholidays.csv
na caixa de diálogo que aparece e clique em Open.
DAG de análise de dados
Confira o exemplo de DAG
O DAG usa vários operadores para transformar e unificar os dados:
O
GCSToBigQueryOperator
ingere o arquivo holidays.csv do Cloud Storage para uma nova tabela no conjunto de dadosholidays_weather
do BigQuery que você criou anteriormente.O
DataprocCreateBatchOperator
cria e executa um job em lote do PySpark usando o Dataproc sem servidor.O
BigQueryInsertJobOperator
une os dados de holidays.csv na coluna "Data" com os dados meteorológicos do conjunto de dados público do BigQuery ghcn_d. As tarefasBigQueryInsertJobOperator
são geradas dinamicamente usando um loop for, e essas tarefas estão em umTaskGroup
para melhor legibilidade na visualização de gráfico da interface do Airflow.
Usar a interface do Airflow para adicionar variáveis
No Airflow, as variáveis são uma maneira universal de armazenar e recuperar configurações ou configurações arbitrárias como um repositório de chave-valor simples. Esse DAG usa variáveis do Airflow para armazenar valores comuns. Para adicionar ao seu ambiente:
Acesse Administrador > Variáveis.
Adicione as seguintes variáveis:
gcp_project
: o ID do projeto.gcs_bucket
: o nome do bucket criado anteriormente (sem o prefixogs://
).gce_region
: a região em que você quer que seu job do Dataproc atenda aos requisitos de rede do Dataproc sem servidor. Esta é a região em que você ativou o Acesso privado do Google anteriormente.dataproc_service_account
: a conta de serviço do seu ambiente do Cloud Composer. Essa conta de serviço pode ser encontrada na guia de configuração do ambiente do Cloud Composer.
Faça upload do DAG para o bucket do ambiente
O Cloud Composer programa DAGs que estão na
pasta /dags
no bucket do ambiente. Para fazer upload do DAG usando o
console do Google Cloud:
Na máquina local, salve data_analytics_dag.py.
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, na coluna Pasta de DAGs, clique no link DAGs. A pasta de DAGs do seu ambiente é aberta.
Clique em Fazer o upload dos arquivos.
Selecione
data_analytics_dag.py
na sua máquina local e clique em Abrir.
Acionar o DAG
No ambiente do Cloud Composer, clique na guia DAGs.
Clique no ID do DAG
data_analytics_dag
.Clique em Trigger DAG.
Aguarde de cinco a dez minutos até que uma marca de seleção verde apareça, indicando que as tarefas foram concluídas.
Validar o sucesso do DAG
No console do Google Cloud, acesse a página do BigQuery.
No painel Explorer, clique no nome do projeto.
Clique em
holidays_weather_joined
.Clique em "Visualizar" para ver a tabela resultante. Os números na coluna "Valor" estão em décimos de um grau Celsius.
Clique em
holidays_weather_normalized
.Clique em "Visualizar" para ver a tabela resultante. Os números na coluna de valores estão em graus Celsius.
Aprofundamento com o Dataproc sem servidor (opcional)
Você pode testar uma versão avançada desse DAG com um fluxo de processamento de dados do PySpark mais complexo. Consulte a extensão do Dataproc para o exemplo de análise de dados no GitHub.
Limpeza
Exclua os recursos individuais criados para este tutorial:
Exclua o bucket do Cloud Storage que você criou para este tutorial.
Exclua o ambiente do Cloud Composer, incluindo a exclusão manual do bucket do ambiente.
A seguir
- Execute um DAG de análise de dados em Google Cloud Usar dados da AWS.
- Execute um DAG de análise de dados no Azure.