Cloud Composer 1 | Cloud Composer 2
Este tutorial é uma modificação do tópico Executar um DAG de análise de dados no Google Cloud que mostra como conectar seu ambiente do Cloud Composer ao Microsoft Azure para utilizar os dados armazenados nele. Ele mostra como usar o Cloud Composer para criar um DAG do Apache Airflow. O DAG une dados de um conjunto de dados público do BigQuery e um arquivo CSV armazenado em um Armazenamento de Blobs do Azure (em inglês) e, em seguida, executa um job em lote do Dataproc sem servidor para processar os dados mesclados.
O conjunto de dados público do BigQuery neste tutorial é ghcn_d, um banco de dados integrado de resumos climáticos de todo o mundo. O arquivo CSV contém informações sobre as datas e os nomes dos feriados nos EUA de 1997 a 2021.
A pergunta que queremos responder usando o DAG é: "Qual foi o nível de calor em Chicago no Dia de Ação de Graças nos últimos 25 anos?"
Objetivos
- Criar um ambiente do Cloud Composer na configuração padrão
- Criar um blob no Azure
- Criar um conjunto de dados vazio do BigQuery
- Crie um novo bucket do Cloud Storage
- Crie e execute um DAG que inclua as seguintes tarefas:
- Carregar um conjunto de dados externo do Armazenamento de Blobs do Azure para o Cloud Storage
- Carregar um conjunto de dados externo do Cloud Storage para o BigQuery
- Mesclar dois conjuntos de dados no BigQuery
- Executar um job de análise de dados do PySpark
Antes de começar
Ativar APIs
Ative as APIs a seguir:
Console
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Conceder permissões
Conceda os seguintes papéis e permissões à sua conta de usuário:
Conceda papéis para gerenciar ambientes e buckets de ambiente do Cloud Composer.
Conceda o papel Proprietário de dados do BigQuery (
roles/bigquery.dataOwner
) para criar um conjunto de dados do BigQuery.Conceda o papel Administrador do Storage (
roles/storage.admin
) para criar um bucket do Cloud Storage.
Criar e preparar o ambiente do Cloud Composer
Crie um ambiente do Cloud Composer com parâmetros padrão:
- Escolha uma região dos EUA.
- Escolha a versão mais recente do Cloud Composer.
Conceda os papéis a seguir à conta de serviço usada no ambiente do Cloud Composer para que os workers do Airflow possam executar as tarefas do DAG:
- Usuário do BigQuery (
roles/bigquery.user
) - Proprietário de dados do BigQuery (
roles/bigquery.dataOwner
) - Usuário da conta de serviço (
roles/iam.serviceAccountUser
) - Editor do Dataproc (
roles/dataproc.editor
) - Worker do Dataproc (
roles/dataproc.worker
)
- Usuário do BigQuery (
Criar e modificar recursos relacionados no Google Cloud
Instale o
apache-airflow-providers-microsoft-azure
pacote PyPI no ambiente do Cloud Composer.Crie um conjunto de dados vazio do BigQuery com os seguintes parâmetros:
- Name:
holiday_weather
- Região:
US
- Name:
Crie um novo bucket do Cloud Storage na multirregião
US
.Execute o comando a seguir para ativar o acesso privado do Google na sub-rede padrão na região onde você gostaria de executar o Dataproc sem servidor para atender aos requisitos de rede. Recomendamos usar a mesma região do seu ambiente do Cloud Composer.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Criar recursos relacionados no Azure
Crie uma conta de armazenamento com as configurações padrão.
Conseguir a chave de acesso e a string de conexão da sua conta de armazenamento.
Crie um contêiner com opções padrão na sua conta de armazenamento recém-criada.
Conceda o papel Delegador de blob de armazenamento para o contêiner criado na etapa anterior.
Faça upload de holidays.csv para criar um blob de bloco com opções padrão no portal do Azure.
Crie um token SAS para o blob de bloco criado na etapa anterior no portal do Azure.
- Método de assinatura: chave de delegação do usuário
- Permissões: leitura
- Endereço IP permitido: nenhum
- Protocolos permitidos: somente HTTPS
Conectar-se ao Azure pelo Cloud Composer
Adicione a conexão com o Microsoft Azure usando a IU do Airflow:
Acesse Administrador > Conexões.
Crie uma nova conexão com a seguinte configuração:
- ID da conexão:
azure_blob_connection
- Tipo de conexão:
Azure Blob Storage
- Login do Blob Storage:o nome da sua conta de armazenamento
- Chave de armazenamento de blobs:a chave de acesso da sua conta de armazenamento
- String de conexão da conta de armazenamento de blobs:a string de conexão da sua conta de armazenamento.
- Token SAS:o token SAS gerado pelo blob
- ID da conexão:
Processamento de dados usando Dataproc sem servidor
Analise o job de exemplo do PySpark
O código mostrado abaixo é um exemplo de job do PySpark que converte a temperatura de décimos de grau em Celsius para graus Celsius. Esse job converte dados de temperatura do conjunto de dados em um formato diferente.
faça upload do arquivo do PySpark para o Cloud Storage
Para fazer o upload do arquivo PySpark para o Cloud Storage:
Salve o data_analytics_process.py na sua máquina local.
No console do Google Cloud, acesse a página Navegador do Cloud Storage:
Clique no nome do bucket criado anteriormente.
Na guia Objetos do bucket, clique no botão Fazer upload de arquivos, selecione
data_analytics_process.py
na caixa de diálogo exibida e clique em Abrir.
DAG de análise de dados
Analise o DAG de exemplo
O DAG usa vários operadores para transformar e unificar os dados:
O
AzureBlobStorageToGCSOperator
transfere o arquivo holidays.csv do blob de bloco do Azure para o bucket do Cloud Storage.O
GCSToBigQueryOperator
ingere o arquivo holidays.csv do Cloud Storage para uma nova tabela no conjunto de dadosholidays_weather
do BigQuery criado anteriormente.O
DataprocCreateBatchOperator
cria e executa um job em lote do PySpark usando o Dataproc sem servidor.O
BigQueryInsertJobOperator
mescla os dados de holidays.csv na coluna "Date" com os dados meteorológicos do conjunto de dados público do BigQuery ghcn_d. As tarefasBigQueryInsertJobOperator
são geradas dinamicamente usando um loop "for" e estão em umTaskGroup
para facilitar a leitura na visualização de gráfico da interface do Airflow.
Usar a interface do Airflow para adicionar variáveis
No Airflow, as variáveis são uma maneira universal de armazenar e recuperar configurações ou configurações arbitrárias como um armazenamento simples de chave-valor. Esse DAG usa variáveis do Airflow para armazenar valores comuns. Para adicioná-los ao seu ambiente:
Acesse a interface do Airflow pelo console do Cloud Composer.
Acesse Administrador > Variáveis.
Adicione as seguintes variáveis:
gcp_project
: o ID do projeto.gcs_bucket
: o nome do bucket que você criou anteriormente (sem o prefixogs://
).gce_region
: a região em que você quer que o job do Dataproc atenda aos requisitos de rede sem servidor do Dataproc. Esta é a região onde você ativou o Acesso privado do Google anteriormente.dataproc_service_account
: a conta de serviço do ambiente do Cloud Composer. Essa conta de serviço está na guia de configuração do ambiente do Cloud Composer.azure_blob_name
: o nome do blob criado anteriormente.azure_container_name
: o nome do contêiner que você criou anteriormente.
faça upload do DAG para o bucket do ambiente
O Cloud Composer programa os DAGs localizados na pasta /dags
no bucket do ambiente. Para fazer o upload do DAG usando o console do Google Cloud:
Na máquina local, salve azureblobstoretogcsoperator_tutorial.py.
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, na coluna pasta do DAG, clique no link DAGs. A pasta de DAGs do seu ambiente será aberta.
Clique em Fazer o upload dos arquivos.
Selecione
azureblobstoretogcsoperator_tutorial.py
na máquina local e clique em Abrir.
Acione o DAG
No seu ambiente do Cloud Composer, clique na guia DAGs.
Clique no ID do DAG
azure_blob_to_gcs_dag
.Clique em Acionar DAG.
Aguarde cerca de cinco a dez minutos até ver uma marca de seleção verde indicando que as tarefas foram concluídas com êxito.
Validar o sucesso do DAG
No console do Google Cloud, acesse a página do BigQuery.
No painel Explorer, clique no nome do projeto.
Clique em
holidays_weather_joined
.Clique em "Visualizar" para conferir a tabela resultante. Observe que os números na coluna de valor estão em décimos de grau Celsius.
Clique em
holidays_weather_normalized
.Clique em "Visualizar" para conferir a tabela resultante. Observe que os números na coluna de valor estão em graus Celsius.
limpeza
Exclua os recursos individuais criados para este tutorial:
Exclua o bucket do Cloud Storage que você criou neste tutorial.
Exclua o ambiente do Cloud Composer, incluindo a exclusão manual do bucket do ambiente.