Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Neste tutorial, mostramos como usar o Cloud Composer para criar um DAG do Apache Airflow. O DAG une dados de um conjunto de dados público do BigQuery e um arquivo CSV armazenado em um bucket do Cloud Storage e executa um job em lote sem servidor do Dataproc para processar os dados unidos.
O conjunto de dados público do BigQuery neste tutorial é ghcn_d, um banco de dados integrado de resumos climáticos em todo o mundo. O arquivo CSV contém informações sobre as datas e os nomes dos feriados dos EUA de 1997 a 2021.
A pergunta que queremos responder usando o DAG é: "Qual foi a temperatura em Chicago no Dia de Ação de Graças nos últimos 25 anos?"
Objetivos
- Criar um ambiente do Cloud Composer na configuração padrão
- Criar um conjunto de dados vazio do BigQuery
- Crie um novo bucket do Cloud Storage
- Crie e execute um DAG que inclua as seguintes tarefas:
- Carregar um conjunto de dados externo do Cloud Storage para o BigQuery
- Combinar dois conjuntos de dados no BigQuery
- Executar um job do PySpark de análise de dados
Antes de começar
Ativar APIs
Ative as APIs a seguir:
Console
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Conceder permissões
Conceda os seguintes papéis e permissões à sua conta de usuário:
Conceda papéis para gerenciar ambientes e buckets de ambiente do Cloud Composer.
Conceda o papel de Proprietário de dados do BigQuery (
roles/bigquery.dataOwner
) para criar um conjunto de dados do BigQuery.Conceda o papel Administrador do Storage (
roles/storage.admin
) para criar um bucket do Cloud Storage.
Criar e preparar o ambiente do Cloud Composer
Crie um ambiente do Cloud Composer com parâmetros padrão:
- Escolha uma região nos EUA.
- Escolha a versão mais recente do Cloud Composer.
Conceda os seguintes papéis à conta de serviço usada no seu ambiente do Cloud Composer para que os workers do Airflow executem as tarefas do DAG com êxito:
- Usuário do BigQuery (
roles/bigquery.user
) - Proprietário de dados do BigQuery (
roles/bigquery.dataOwner
) - Usuário da conta de serviço (
roles/iam.serviceAccountUser
) - Editor do Dataproc (
roles/dataproc.editor
) - Worker do Dataproc (
roles/dataproc.worker
)
- Usuário do BigQuery (
Criar recursos relacionados
Crie um conjunto de dados vazio do BigQuery com os seguintes parâmetros:
- Nome:
holiday_weather
- Região:
US
- Nome:
Crie um bucket do Cloud Storage na multirregião
US
.Execute o comando a seguir para ativar o Acesso privado do Google na sub-rede padrão da região em que você quer executar o Dataproc sem servidor para atender aos requisitos de rede. Recomendamos usar a mesma região do ambiente do Cloud Composer.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Processamento de dados usando o Dataproc sem servidor
Conheça o exemplo de job do PySpark
O código abaixo é um exemplo de job do PySpark que converte a temperatura de décimos de grau Celsius para graus Celsius. Esse job converte dados de temperatura do conjunto de dados em um formato diferente.
Fazer upload de arquivos de suporte para o Cloud Storage
Para fazer upload do arquivo PySpark e do conjunto de dados armazenado em holidays.csv
:
Salve data_analytics_process.py na sua máquina local.
Salve holidays.csv na sua máquina local.
No console Google Cloud , acesse a página Navegador do Cloud Storage:
Clique no nome do bucket que você criou.
Na guia Objetos do bucket, clique no botão Fazer upload de arquivos, selecione
data_analytics_process.py
eholidays.csv
na caixa de diálogo exibida e clique em Abrir.
DAG de análise de dados
Conheça o exemplo de DAG
O DAG usa vários operadores para transformar e unificar os dados:
O
GCSToBigQueryOperator
ingere o arquivo holidays.csv do Cloud Storage para uma nova tabela no conjunto de dadosholidays_weather
do BigQuery que você criou antes.O
DataprocCreateBatchOperator
cria e executa um job em lote do PySpark usando o Dataproc sem servidor.O
BigQueryInsertJobOperator
mescla os dados de holidays.csv na coluna "Date" com dados meteorológicos do conjunto de dados público do BigQuery ghcn_d. As tarefasBigQueryInsertJobOperator
são geradas dinamicamente usando um loop "for", e essas tarefas estão em umTaskGroup
para melhorar a legibilidade na visualização de gráfico da interface do Airflow.
Usar a interface do Airflow para adicionar variáveis
No Airflow, as variáveis são uma maneira universal de armazenar e recuperar configurações ou configurações arbitrárias como um simples armazenamento de chave-valor. Esse DAG usa variáveis do Airflow para armazenar valores comuns. Para adicioná-los ao seu ambiente:
Acesse Administrador > Variáveis.
Adicione as seguintes variáveis:
gcp_project
: o ID do projeto.gcs_bucket
: o nome do bucket criado anteriormente (sem o prefixogs://
).gce_region
: a região em que você quer o job do Dataproc que atende aos requisitos de rede do Dataproc sem servidor. Essa é a região em que você ativou o Acesso privado do Google anteriormente.dataproc_service_account
: a conta de serviço do seu ambiente do Cloud Composer. Essa conta de serviço pode ser encontrada na guia de configuração do ambiente do Cloud Composer.
Faça upload do DAG para o bucket do ambiente
O Cloud Composer agenda os DAGs que estão na
pasta /dags
no bucket do ambiente. Para fazer o upload do DAG usando o console do
Google Cloud :
Na sua máquina local, salve data_analytics_dag.py.
No console Google Cloud , acesse a página Ambientes.
Na lista de ambientes, na coluna Pasta de DAGs, clique no link DAGs. A pasta DAGs do seu ambiente é aberta.
Clique em Fazer o upload dos arquivos.
Selecione
data_analytics_dag.py
na sua máquina local e clique em Abrir.
Acionar o DAG
No ambiente do Cloud Composer, clique na guia DAGs.
Clique no ID do DAG
data_analytics_dag
.Clique em Acionar DAG.
Aguarde de cinco a dez minutos até ver uma marca de seleção verde indicando que as tarefas foram concluídas com sucesso.
Validar o sucesso do DAG
No console Google Cloud , acesse a página BigQuery.
No painel Explorer, clique no nome do projeto.
Clique em
holidays_weather_joined
.Clique em "Visualização" para conferir a tabela resultante. Os números na coluna "Valor" estão em décimos de grau Celsius.
Clique em
holidays_weather_normalized
.Clique em "Visualização" para conferir a tabela resultante. Os números na coluna "Valor" estão em graus Celsius.
Detalhar o Dataproc sem servidor (opcional)
Você pode testar uma versão avançada desse DAG com um fluxo de processamento de dados PySpark mais complexo. Consulte a extensão do Dataproc para o exemplo de análise de dados no GitHub.
Limpeza
Exclua os recursos individuais criados para este tutorial:
Exclua o bucket do Cloud Storage que você criou para este tutorial.
Exclua o ambiente do Cloud Composer, incluindo a exclusão manual do bucket do ambiente.
A seguir
- Execute um DAG de análise de dados em Google Cloud Usando dados da AWS.
- Executar um DAG de análise de dados no Azure.