Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Este tutorial é uma modificação do artigo Execute um DAG de análise de dados no Google Cloud, que mostra como associar o seu ambiente do Cloud Composer ao Microsoft Azure para usar os dados aí armazenados. Mostra como usar o Cloud Composer para criar um DAG do Apache Airflow. O DAG junta dados de um conjunto de dados público do BigQuery e um ficheiro CSV armazenado num armazenamento de blobs do Azure e, em seguida, executa uma tarefa em lote sem servidor do Dataproc para processar os dados unidos.
O conjunto de dados públicos do BigQuery neste tutorial é o ghcn_d, uma base de dados integrada de resumos climáticos em todo o mundo. O ficheiro CSV contém informações sobre as datas e os nomes dos feriados dos EUA de 1997 a 2021.
A pergunta à qual queremos responder usando o DAG é: "Qual foi a temperatura em Chicago no Dia de Ação de Graças nos últimos 25 anos?"
Objetivos
- Crie um ambiente do Cloud Composer na configuração predefinida
- Crie um blob no Azure
- Crie um conjunto de dados do BigQuery vazio
- Crie um novo contentor do Cloud Storage
- Crie e execute um DAG que inclua as seguintes tarefas:
- Carregue um conjunto de dados externo do Azure Blob Storage para o Cloud Storage
- Carregue um conjunto de dados externo do Cloud Storage para o BigQuery
- Junte dois conjuntos de dados no BigQuery
- Execute uma tarefa PySpark de análise de dados
Antes de começar
Ativar APIs
Ative as seguintes APIs:
Consola
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Conceder autorizações
Conceda as seguintes funções e autorizações à sua conta de utilizador:
Conceda funções para gerir ambientes do Cloud Composer e contentores de ambientes.
Conceda a função de proprietário de dados do BigQuery (
roles/bigquery.dataOwner
) para criar um conjunto de dados do BigQuery.Conceda a função de administrador de armazenamento (
roles/storage.admin
) para criar um contentor do Cloud Storage.
Crie e prepare o seu ambiente do Cloud Composer
Crie um ambiente do Cloud Composer com os parâmetros predefinidos:
- Escolha uma região sediada nos EUA.
- Escolha a versão do Cloud Composer mais recente.
Conceda as seguintes funções à conta de serviço usada no seu ambiente do Cloud Composer para que os trabalhadores do Airflow executem com êxito as tarefas DAG:
- Utilizador do BigQuery (
roles/bigquery.user
) - Proprietário dos dados do BigQuery (
roles/bigquery.dataOwner
) - Utilizador da conta de serviço (
roles/iam.serviceAccountUser
) - Editor do Dataproc (
roles/dataproc.editor
) - Dataproc Worker (
roles/dataproc.worker
)
- Utilizador do BigQuery (
Crie e modifique recursos relacionados no Google Cloud
Instale o
apache-airflow-providers-microsoft-azure
pacote PyPI no seu ambiente do Cloud Composer.Crie um conjunto de dados do BigQuery vazio com os seguintes parâmetros:
- Nome:
holiday_weather
- Região:
US
- Nome:
Crie um novo contentor do Cloud Storage na multirregião
US
.Execute o seguinte comando para ativar o acesso privado à Google na sub-rede predefinida na região onde quer executar o Dataproc Serverless para cumprir os requisitos de rede. Recomendamos que use a mesma região que o seu ambiente do Cloud Composer.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Crie recursos relacionados no Azure
Crie uma conta de armazenamento com as predefinições.
Obtenha a chave de acesso e a string de ligação para a sua conta de armazenamento.
Crie um contentor com as opções predefinidas na conta de armazenamento recém-criada.
Conceda a função de delegador de blobs de armazenamento para o contentor criado no passo anterior.
Carregue o ficheiro holidays.csv para criar um blob de blocos com as opções predefinidas no portal do Azure.
Crie um token SAS para o blob de blocos que criou no passo anterior no portal do Azure.
- Método de assinatura: chave de delegação do utilizador
- Autorizações: leitura
- Endereço IP permitido: nenhum
- Protocolos permitidos: apenas HTTPS
Estabeleça ligação ao Azure a partir do Cloud Composer
Adicione a sua ligação do Microsoft Azure através da IU do Airflow:
Aceda a Administração > Ligações.
Crie uma nova associação com a seguinte configuração:
- ID da associação:
azure_blob_connection
- Tipo de ligação:
Azure Blob Storage
- Início de sessão no armazenamento de blobs: o nome da sua conta de armazenamento
- Chave de armazenamento de blobs: a chave de acesso para a sua conta de armazenamento
- String de ligação da conta de armazenamento de blobs: a string de ligação da conta de armazenamento
- Token SAS: o token SAS gerado a partir do seu blob
- ID da associação:
Tratamento de dados com o Dataproc Serverless
Explore o exemplo de tarefa do PySpark
O código apresentado abaixo é um exemplo de uma tarefa do PySpark que converte a temperatura de décimas de grau Celsius para graus Celsius. Esta tarefa converte os dados de temperatura do conjunto de dados num formato diferente.
Carregue o ficheiro PySpark para o Cloud Storage
Para carregar o ficheiro PySpark para o Cloud Storage:
Guarde o ficheiro data_analytics_process.py na sua máquina local.
Na Google Cloud consola, aceda à página do navegador do Cloud Storage:
Clique no nome do contentor que criou anteriormente.
No separador Objetos do contentor, clique no botão Carregar ficheiros, selecione
data_analytics_process.py
na caixa de diálogo apresentada e clique em Abrir.
DAG de análise de dados
Explore o DAG de exemplo
O DAG usa vários operadores para transformar e unificar os dados:
O comando
AzureBlobStorageToGCSOperator
transfere o ficheiro holidays.csv do seu blob de blocos do Azure para o seu contentor do Cloud Storage.O comando
GCSToBigQueryOperator
carrega o ficheiro holidays.csv do Cloud Storage para uma nova tabela no conjunto de dadosholidays_weather
do BigQuery que criou anteriormente.O comando
DataprocCreateBatchOperator
cria e executa uma tarefa em lote do PySpark através do Dataproc sem servidor.O comando
BigQueryInsertJobOperator
junta os dados de holidays.csv na coluna "Date" com os dados meteorológicos do conjunto de dados público do BigQuery ghcn_d. As tarefasBigQueryInsertJobOperator
são geradas dinamicamente através de um ciclo for, e estas tarefas estão numTaskGroup
para uma melhor legibilidade na vista de gráfico da IU do Airflow.
Use a IU do Airflow para adicionar variáveis
No Airflow, as variáveis são uma forma universal de armazenar e obter definições ou configurações arbitrárias como um simples armazenamento de valores-chave. Este DAG usa variáveis do Airflow para armazenar valores comuns. Para as adicionar ao seu ambiente:
Aceda à IU do Airflow a partir da consola do Cloud Composer.
Aceda a Administração > Variáveis.
Adicione as seguintes variáveis:
gcp_project
: o ID do seu projeto.gcs_bucket
: o nome do contentor que criou anteriormente (sem o prefixogs://
).gce_region
: a região onde quer que a sua tarefa do Dataproc cumpra os requisitos de rede do Dataproc Serverless. Esta é a região onde ativou o acesso privado à Google anteriormente.dataproc_service_account
: a conta de serviço do seu ambiente do Cloud Composer. Pode encontrar esta conta de serviço no separador de configuração do ambiente do seu ambiente do Cloud Composer.azure_blob_name
: o nome do blob que criou anteriormente.azure_container_name
: o nome do contentor que criou anteriormente.
Carregue o DAG para o contentor do seu ambiente
O Cloud Composer agenda DAGs localizados na pasta /dags
no contentor do seu ambiente. Para carregar o DAG através da
Google Cloud consola:
Na sua máquina local, guarde o ficheiro azureblobstoretogcsoperator_tutorial.py.
Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, na coluna Pasta DAG, clique no link DAGs. A pasta DAGs do seu ambiente é aberta.
Clique em Carregar ficheiros.
Selecione
azureblobstoretogcsoperator_tutorial.py
no seu computador local e clique em Abrir.
Acione o DAG
No seu ambiente do Cloud Composer, clique no separador DAGs.
Clique no ID do DAG
azure_blob_to_gcs_dag
.Clique em Acionar DAG.
Aguarde cerca de cinco a dez minutos até ver uma marca de verificação verde a indicar que as tarefas foram concluídas com êxito.
Valide o êxito do DAG
Na Google Cloud consola, aceda à página BigQuery.
No painel Explorador, clique no nome do projeto.
Clique em
holidays_weather_joined
.Clique em pré-visualizar para ver a tabela resultante. Tenha em atenção que os números na coluna de valor estão em décimos de grau Celsius.
Clique em
holidays_weather_normalized
.Clique em pré-visualizar para ver a tabela resultante. Tenha em atenção que os números na coluna de valor estão em graus Celsius.
Limpeza
Elimine os recursos individuais que criou para este tutorial:
Elimine o contentor do Cloud Storage que criou para este tutorial.
Elimine o ambiente do Cloud Composer, incluindo a eliminação manual do contentor do ambiente.