Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Este tutorial é uma modificação do artigo Executar um DAG de análise de dados no Google Cloud, que mostra como conectar o ambiente do Cloud Composer ao Microsoft Azure para utilizar os dados armazenados nele. Ele mostra como usar o Cloud Composer para criar um DAG do Apache Airflow (em inglês). O DAG une dados de um conjunto de dados público do BigQuery e um arquivo CSV armazenado em um armazenamento de blobs do Azure e, em seguida, executa um job em lote do Dataproc sem servidor para processar os dados mesclados.
O conjunto de dados público do BigQuery neste tutorial é o ghcn_d, um banco de dados integrado de resumos climáticos de todo o mundo. O arquivo CSV contém informações sobre as datas e os nomes dos feriados dos EUA de 1997 a 2021.
A pergunta que queremos responder usando o DAG é: "Qual foi o nível de calor em Chicago no Dia de Ação de Graças nos últimos 25 anos?".
Objetivos
- Criar um ambiente do Cloud Composer na configuração padrão
- Criar um blob no Azure
- Criar um conjunto de dados vazio do BigQuery
- Crie um novo bucket do Cloud Storage
- Crie e execute um DAG que inclua as seguintes tarefas:
- Carregar um conjunto de dados externo do Armazenamento de Blobs do Azure para o Cloud Storage
- Carregar um conjunto de dados externo do Cloud Storage para o BigQuery
- mesclar dois conjuntos de dados no BigQuery;
- Executar um job de análise de dados do PySpark
Antes de começar
Ativar APIs
Ative as APIs a seguir:
Console
Ative as APIs Dataproc, Cloud Composer, BigQuery, Cloud Storage.
gcloud
Ative as APIs Dataproc, Cloud Composer, BigQuery, Cloud Storage:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Conceder permissões
Conceda os seguintes papéis e permissões à conta de usuário:
Conceder papéis para gerenciar ambientes e buckets de ambiente do Cloud Composer.
Conceda o papel Proprietário de dados do BigQuery (
roles/bigquery.dataOwner
) para criar um conjunto de dados do BigQuery.Conceda o papel Administrador do Storage (
roles/storage.admin
) para criar um bucket do Cloud Storage.
Criar e preparar o ambiente do Cloud Composer
Crie um ambiente do Cloud Composer com parâmetros padrão:
- Escolha uma região dos EUA.
- Escolha a versão mais recente do Cloud Composer.
Conceda os papéis a seguir à conta de serviço usada no ambiente do Cloud Composer para que os workers do Airflow executem as tarefas do DAG:
- Usuário do BigQuery (
roles/bigquery.user
) - Proprietário de dados do BigQuery (
roles/bigquery.dataOwner
) - Usuário da conta de serviço (
roles/iam.serviceAccountUser
) - Editor do Dataproc (
roles/dataproc.editor
) - Worker do Dataproc (
roles/dataproc.worker
)
- Usuário do BigQuery (
Criar e modificar recursos relacionados no Google Cloud
Instale o pacote PyPI
apache-airflow-providers-microsoft-azure
no ambiente do Cloud Composer.Crie um conjunto de dados vazio do BigQuery com os seguintes parâmetros:
- Nome:
holiday_weather
- Região:
US
- Nome:
Crie um novo bucket do Cloud Storage na multirregião
US
.Execute o comando a seguir para ativar o acesso privado do Google na sub-rede padrão da região em que você quer executar o Dataproc sem servidor para atender aos requisitos de rede. Recomendamos usar a mesma região do ambiente do Cloud Composer.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Criar recursos relacionados no Azure
Crie uma conta de armazenamento com as configurações padrão.
Consiga a chave de acesso e a string de conexão da sua conta de armazenamento.
Crie um contêiner com opções padrão na sua conta de armazenamento recém-criada.
Conceda o papel Delegador de blobs de armazenamento para o contêiner criado na etapa anterior.
Faça upload do arquivo holidays.csv para criar um blob de bloco com opções padrão no portal do Azure.
Crie um token SAS para o blob de bloco criado na etapa anterior, no portal do Azure.
- Método de assinatura: chave de delegação do usuário
- Permissões: leitura
- Endereço IP permitido: nenhum
- Protocolos permitidos: somente HTTPS
Conectar-se ao Azure pelo Cloud Composer
Adicione a conexão com o Microsoft Azure usando a IU do Airflow:
Acesse Administrador > Conexões.
Crie uma nova conexão com a seguinte configuração:
- ID da conexão:
azure_blob_connection
- Tipo de conexão:
Azure Blob Storage
- Login no armazenamento de blobs:o nome da sua conta de armazenamento
- Chave de armazenamento de blobs: a chave de acesso para sua conta de armazenamento
- String de conexão da conta de armazenamento de blobs:a string de conexão da sua conta de armazenamento
- Token SAS: o token SAS gerado do seu blob
- ID da conexão:
Processamento de dados com o Dataproc sem servidor
conheça o exemplo de job do PySpark
O código mostrado abaixo é um exemplo de job do PySpark que converte a temperatura de dezenas de graus em Celsius para graus Celsius. Esse job converte os dados de temperatura do conjunto em um formato diferente.
Faça upload do arquivo PySpark para o Cloud Storage
Para fazer upload do arquivo PySpark para o Cloud Storage:
Salve o data_analytics_process.py na sua máquina local.
No console do Google Cloud, acesse a página Navegador do Cloud Storage:
Clique no nome do bucket que você criou anteriormente.
Na guia Objetos do bucket, clique no botão Fazer upload de arquivos, selecione
data_analytics_process.py
na caixa de diálogo exibida e clique em Abrir.
DAG de análise de dados
conheça o DAG de exemplo
O DAG usa vários operadores para transformar e unificar os dados:
O
AzureBlobStorageToGCSOperator
transfere o arquivo holidays.csv do seu blob de bloco do Azure para o bucket do Cloud Storage.O
GCSToBigQueryOperator
ingere o arquivo holidays.csv do Cloud Storage em uma nova tabela no conjunto de dadosholidays_weather
do BigQuery criado anteriormente.O
DataprocCreateBatchOperator
cria e executa um job em lote do PySpark usando o Dataproc sem servidor.O
BigQueryInsertJobOperator
mescla os dados de holidays.csv na coluna "Data" com os dados meteorológicos do conjunto de dados público ghcn_d do BigQuery. As tarefasBigQueryInsertJobOperator
são geradas dinamicamente usando uma repetição "for", e essas tarefas estão em umTaskGroup
para facilitar a leitura na visualização de gráfico da interface do Airflow.
Usar a interface do Airflow para adicionar variáveis
No Airflow, as variáveis são uma maneira universal de armazenar e recuperar configurações arbitrárias como um repositório simples de chave-valor. Ele usa variáveis do Airflow para armazenar valores comuns. Para adicioná-los ao seu ambiente:
Acesse Administrador > Variáveis.
Adicione as seguintes variáveis:
gcp_project
: o ID do projeto.gcs_bucket
: o nome do bucket criado anteriormente (sem o prefixogs://
).gce_region
: a região em que você quer que o job do Dataproc que atenda aos requisitos de rede sem servidor do Dataproc. Esta é a região em que você ativou o Acesso privado do Google anteriormente.dataproc_service_account
: a conta de serviço do ambiente do Cloud Composer. Essa conta de serviço está disponível na guia de configuração do ambiente do Cloud Composer.azure_blob_name
: o nome do blob criado anteriormente.azure_container_name
: o nome do contêiner que você criou anteriormente.
faça upload do DAG para o bucket do ambiente
O Cloud Composer programa DAGs que estão localizados na pasta /dags
no bucket do ambiente. Para fazer upload do DAG usando o console do Google Cloud:
Na máquina local, salve azureblobstoretogcsoperator_tutorial.py.
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, na coluna pasta DAG, clique no link DAGs. A pasta de DAGs do ambiente é aberta.
Clique em Fazer o upload dos arquivos.
Selecione
azureblobstoretogcsoperator_tutorial.py
na máquina local e clique em Abrir.
Como acionar o DAG
No ambiente do Cloud Composer, clique na guia DAGs.
Clique no ID do DAG
azure_blob_to_gcs_dag
.Clique em Acionar DAG.
Aguarde cerca de cinco a dez minutos até ver uma marca de seleção verde indicando que as tarefas foram concluídas com êxito.
Valide o sucesso do DAG
No console do Google Cloud, acesse a página do BigQuery.
No painel Explorer, clique no nome do projeto.
Clique em
holidays_weather_joined
.Clique em "Visualizar" para conferir a tabela resultante. Observe que os números na coluna de valor estão em décimos de grau Celsius.
Clique em
holidays_weather_normalized
.Clique em "Visualizar" para conferir a tabela resultante. Observe que os números na coluna de valor estão em graus Celsius.
limpeza
Exclua os recursos individuais que você criou para este tutorial:
Exclua o bucket do Cloud Storage que você criou para este tutorial.
Exclua o ambiente do Cloud Composer, incluindo a exclusão manual do bucket do ambiente.