Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Neste tutorial, mostramos como usar o Cloud Composer para criar um DAG do Apache Airflow (em inglês). A O DAG mescla os dados de um conjunto de dados público do BigQuery e um arquivo CSV armazenado em um bucket do Cloud Storage e depois executa Job em lote do Dataproc sem servidor para processar os dados mesclados.
O conjunto de dados público do BigQuery neste tutorial é ghcn_d, um banco de dados integrado de resumos climáticos em todo o globo. O arquivo CSV contém informações sobre datas e nomes de feriados nos EUA de 1997 a 2021.
A pergunta que queremos responder usando o DAG é: "Qual foi a temperatura de Chicago no Dia de Ação de Graças nos últimos 25 anos?"
Objetivos
- Criar um ambiente do Cloud Composer na configuração padrão
- Criar um conjunto de dados vazio do BigQuery
- Crie um novo bucket do Cloud Storage
- Crie e execute um DAG que inclua as seguintes tarefas:
- Carregue um conjunto de dados externo do Cloud Storage para Google BigQuery
- mesclar dois conjuntos de dados no BigQuery;
- Executar um job de análise de dados do PySpark
Antes de começar
Ativar APIs
Ative as APIs a seguir:
Console
Ative as APIs Dataproc, Cloud Composer, BigQuery, Cloud Storage.
gcloud
Ative as APIs Dataproc, Cloud Composer, BigQuery, Cloud Storage:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Conceder permissões
Conceda os seguintes papéis e permissões à conta de usuário:
Conceder papéis para gerenciar ambientes e buckets de ambiente do Cloud Composer.
Conceda o papel Proprietário de dados do BigQuery (
roles/bigquery.dataOwner
) para criar um conjunto de dados do BigQuery.Conceda o papel Administrador do Storage (
roles/storage.admin
) para criar um bucket do Cloud Storage.
Criar e preparar o ambiente do Cloud Composer
Crie um ambiente do Cloud Composer com padrões parâmetros:
- Escolha uma região dos EUA.
- Escolha a versão mais recente do Cloud Composer.
Conceda os papéis a seguir à conta de serviço usada no ambiente do Cloud Composer para que os workers do Airflow executar corretamente as tarefas do DAG:
- Usuário do BigQuery (
roles/bigquery.user
) - Proprietário de dados do BigQuery (
roles/bigquery.dataOwner
) - Usuário da conta de serviço (
roles/iam.serviceAccountUser
) - Editor do Dataproc (
roles/dataproc.editor
) - Worker do Dataproc (
roles/dataproc.worker
)
- Usuário do BigQuery (
Criar recursos relacionados
Crie um conjunto de dados vazio do BigQuery com os seguintes parâmetros:
- Nome:
holiday_weather
- Região:
US
- Nome:
Crie um novo bucket do Cloud Storage na multirregião
US
.Execute o comando a seguir para ativar o acesso privado do Google na sub-rede padrão da região em que você quer executar Dataproc sem servidor para realizar requisitos de rede. Qa use a mesma região do Cloud Composer de nuvem.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Processamento de dados com o Dataproc sem servidor
conheça o exemplo de job do PySpark
O código mostrado abaixo é um exemplo de job do PySpark que converte a temperatura décimos de grau de Celsius para graus Celsius. Esta vaga converte de temperatura do conjunto de dados em um formato diferente.
Fazer upload de arquivos de suporte para o Cloud Storage
Para fazer upload do arquivo PySpark e do conjunto de dados armazenado em holidays.csv
:
Salvar data_analytics_process.py na máquina local.
Salve o arquivo holidays.csv na sua máquina local.
No console do Google Cloud, acesse a página Navegador do Cloud Storage:
Clique no nome do bucket que você criou anteriormente.
Na guia Objetos do bucket, clique no botão Fazer upload de arquivos. selecione
data_analytics_process.py
eholidays.csv
na caixa de diálogo aparece e clique em Abrir.
DAG de análise de dados
conheça o DAG de exemplo
O DAG usa vários operadores para transformar e unificar os dados:
A
GCSToBigQueryOperator
ingere o arquivo holidays.csv do Cloud Storage para uma nova tabela no BigQuery conjunto de dadosholidays_weather
criado anteriormente.A
DataprocCreateBatchOperator
cria e executa um job em lote do PySpark usando Dataproc sem servidorA
BigQueryInsertJobOperator
agrega os dados de holidays.csv no "Data" coluna com dados meteorológicos do conjunto de dados público do BigQuery ghcn_d. As tarefasBigQueryInsertJobOperator
estão geradas dinamicamente usando uma repetição "for", e essas tarefas estão em umTaskGroup
para uma melhor legibilidade na visualização de gráfico da interface do Airflow.
Usar a interface do Airflow para adicionar variáveis
No Airflow, variáveis são uma forma universal de armazenar e recuperar configurações arbitrárias ou personalizadas como um repositório simples de chave-valor. Esse DAG usa variáveis do Airflow para e armazenar valores comuns. Para adicioná-los ao seu ambiente:
Acesse Administrador > Variáveis.
Adicione as seguintes variáveis:
gcp_project
: o ID do projeto.gcs_bucket
: o nome do bucket criado anteriormente. (sem o prefixogs://
).gce_region
: a região em que você quer que as job do Dataproc que atenda Requisitos de rede sem servidor do Dataproc. Esta é a região em que você ativou o Acesso privado do Google anteriormente.dataproc_service_account
: a conta de serviço do ambiente do Cloud Composer. Você pode encontrar esse serviço do contêiner na guia de configuração do ambiente da sua ambiente do Cloud Composer.
faça upload do DAG para o bucket do ambiente
O Cloud Composer programa DAGs localizados no
/dags
no bucket do ambiente. Para fazer o upload do DAG usando o
Console do Google Cloud:
Na sua máquina local, salve data_analytics_dag.py.
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, na coluna pasta do DAG, clique em link DAGs. A pasta de DAGs do ambiente é aberta.
Clique em Fazer o upload dos arquivos.
Selecione
data_analytics_dag.py
na sua máquina local e clique em Abrir.
Como acionar o DAG
No ambiente do Cloud Composer, clique na guia DAGs.
Clique no ID do DAG
data_analytics_dag
.Clique em Acionar DAG.
Aguarde cerca de cinco a dez minutos até ver uma marca de seleção verde indicando as tarefas foram concluídas com sucesso.
Valide o sucesso do DAG
No console do Google Cloud, acesse a página do BigQuery.
No painel Explorer, clique no nome do projeto.
Clique em
holidays_weather_joined
.Clique em "Visualizar" para conferir a tabela resultante. Observe que os números na de valor da coluna estão em décimos de grau Celsius.
Clique em
holidays_weather_normalized
.Clique em "Visualizar" para conferir a tabela resultante. Observe que os números na estão em graus Celsius.
Análise detalhada com o Dataproc sem servidor (opcional)
É possível testar uma versão avançada desse DAG com um PySpark mais complexo no fluxo de processamento de dados. Consulte Extensão do Dataproc para o exemplo de análise de dados no GitHub.
Limpeza
Exclua os recursos individuais que você criou para este tutorial:
Exclua o bucket do Cloud Storage que você criados para este tutorial.
Exclua o ambiente do Cloud Composer, incluindo: excluir manualmente o bucket do ambiente.
A seguir
- Execute um DAG de análise de dados no Google Cloud usando dados da AWS.
- Execute um DAG de análise de dados no Azure.