Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Este tutorial é uma modificação de Executar um DAG de análise de dados em Google Cloud que mostra como conectar seu ambiente do Cloud Composer ao Amazon Web Services para usar os dados armazenados lá. Ele mostra como usar o Cloud Composer para criar um DAG do Apache Airflow. O DAG une dados de um conjunto de dados público do BigQuery e um arquivo CSV armazenado em um bucket do Amazon Web Services (AWS) S3 e executa um job em lote sem servidor do Dataproc para processar os dados unidos.
O conjunto de dados público do BigQuery neste tutorial é ghcn_d, um banco de dados integrado de resumos climáticos em todo o mundo. O arquivo CSV contém informações sobre as datas e os nomes dos feriados dos EUA de 1997 a 2021.
A pergunta que queremos responder usando o DAG é: "Qual foi a temperatura em Chicago no Dia de Ação de Graças nos últimos 25 anos?"
Objetivos
- Criar um ambiente do Cloud Composer na configuração padrão
- Criar um bucket no AWS S3
- Criar um conjunto de dados vazio do BigQuery
- Crie um novo bucket do Cloud Storage
- Crie e execute um DAG que inclua as seguintes tarefas:
- Carregar um conjunto de dados externo do S3 para o Cloud Storage
- Carregar um conjunto de dados externo do Cloud Storage para o BigQuery
- Combinar dois conjuntos de dados no BigQuery
- Executar um job do PySpark de análise de dados
Antes de começar
Gerenciar permissões na AWS
Siga a seção "Criar políticas com o editor visual" do tutorial da AWS sobre criação de políticas do IAM para criar uma política personalizada do IAM para o AWS S3 com a seguinte configuração:
- Serviço:S3
- ListAllMyBuckets (
s3:ListAllMyBuckets
), para ver seu bucket do S3 - CreateBucket (
s3:CreateBucket
), para criar um bucket - PutBucketOwnershipControls (
s3:PutBucketOwnershipControls
), para criar um bucket - ListBucket (
s3:ListBucket
), para conceder permissão para listar objetos em um bucket do S3 - PutObject (
s3:PutObject
), para fazer upload de arquivos em um bucket - GetBucketVersioning (
s3:GetBucketVersioning
), para excluir um objeto em um bucket - DeleteObject (
s3:DeleteObject
), para excluir um objeto em um bucket - ListBucketVersions (
s3:ListBucketVersions
), para excluir um bucket - DeleteBucket (
s3:DeleteBucket
), para excluir um bucket - Recursos:escolha "Qualquer" ao lado de "bucket" e "objeto" para conceder permissões a qualquer recurso desse tipo.
- Tag:nenhuma
- Nome:TutorialPolicy
Consulte a lista de ações compatíveis com o Amazon S3 para mais informações sobre cada configuração acima.
Ativar APIs
Ative as APIs a seguir:
Console
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Conceder permissões
Conceda os seguintes papéis e permissões à sua conta de usuário:
Conceda papéis para gerenciar ambientes e buckets de ambiente do Cloud Composer.
Conceda o papel de Proprietário de dados do BigQuery (
roles/bigquery.dataOwner
) para criar um conjunto de dados do BigQuery.Conceda o papel Administrador do Storage (
roles/storage.admin
) para criar um bucket do Cloud Storage.
Criar e preparar o ambiente do Cloud Composer
Crie um ambiente do Cloud Composer com parâmetros padrão:
- Escolha uma região nos EUA.
- Escolha a versão mais recente do Cloud Composer.
Conceda os seguintes papéis à conta de serviço usada no seu ambiente do Cloud Composer para que os workers do Airflow executem as tarefas do DAG com êxito:
- Usuário do BigQuery (
roles/bigquery.user
) - Proprietário de dados do BigQuery (
roles/bigquery.dataOwner
) - Usuário da conta de serviço (
roles/iam.serviceAccountUser
) - Editor do Dataproc (
roles/dataproc.editor
) - Worker do Dataproc (
roles/dataproc.worker
)
- Usuário do BigQuery (
Criar e modificar recursos relacionados em Google Cloud
Instale o
apache-airflow-providers-amazon
pacote PyPI no ambiente do Cloud Composer.Crie um conjunto de dados vazio do BigQuery com os seguintes parâmetros:
- Nome:
holiday_weather
- Região:
US
- Nome:
Crie um bucket do Cloud Storage na multirregião
US
.Execute o comando a seguir para ativar o Acesso privado do Google na sub-rede padrão da região em que você quer executar o Dataproc sem servidor para atender aos requisitos de rede. Recomendamos usar a mesma região do ambiente do Cloud Composer.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Criar recursos relacionados na AWS
Crie um bucket do S3 com as configurações padrão na região de sua preferência.
Conectar-se à AWS pelo Cloud Composer
- Receber seu ID da chave de acesso e a chave de acesso secreta da AWS
Adicione sua conexão do AWS S3 usando a interface do Airflow:
- Acesse Administrador > Conexões.
Crie uma conexão com a seguinte configuração:
- ID da conexão:
aws_s3_connection
- Tipo de conexão:
Amazon S3
- Extras:
{"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}
- ID da conexão:
Processamento de dados usando o Dataproc sem servidor
Conheça o exemplo de job do PySpark
O código abaixo é um exemplo de job do PySpark que converte a temperatura de décimos de grau Celsius para graus Celsius. Esse job converte dados de temperatura do conjunto de dados em um formato diferente.
Fazer upload do arquivo PySpark para o Cloud Storage
Para fazer upload do arquivo PySpark no Cloud Storage:
Salve data_analytics_process.py na sua máquina local.
No console Google Cloud , acesse a página Navegador do Cloud Storage:
Clique no nome do bucket que você criou.
Na guia Objetos do bucket, clique no botão Fazer upload de arquivos, selecione
data_analytics_process.py
na caixa de diálogo exibida e clique em Abrir.
Fazer upload do arquivo CSV para o AWS S3
Para fazer upload do arquivo holidays.csv
:
- Salve
holidays.csv
na sua máquina local. - Siga o guia da AWS para fazer upload do arquivo no bucket.
DAG de análise de dados
Conheça o exemplo de DAG
O DAG usa vários operadores para transformar e unificar os dados:
O
S3ToGCSOperator
transfere o arquivo holidays.csv do bucket do AWS S3 para o bucket do Cloud Storage.O
GCSToBigQueryOperator
ingere o arquivo holidays.csv do Cloud Storage para uma nova tabela no conjunto de dadosholidays_weather
do BigQuery que você criou antes.O
DataprocCreateBatchOperator
cria e executa um job em lote do PySpark usando o Dataproc sem servidor.O
BigQueryInsertJobOperator
mescla os dados de holidays.csv na coluna "Date" com dados meteorológicos do conjunto de dados público do BigQuery ghcn_d. As tarefasBigQueryInsertJobOperator
são geradas dinamicamente usando um loop "for", e essas tarefas estão em umTaskGroup
para melhorar a legibilidade na visualização de gráfico da interface do Airflow.
Usar a interface do Airflow para adicionar variáveis
No Airflow, as variáveis são uma maneira universal de armazenar e recuperar configurações ou configurações arbitrárias como um simples armazenamento de chave-valor. Esse DAG usa variáveis do Airflow para armazenar valores comuns. Para adicioná-los ao seu ambiente:
Acesse Administrador > Variáveis.
Adicione as seguintes variáveis:
s3_bucket
: o nome do bucket do S3 criado anteriormente.gcp_project
: o ID do projeto.gcs_bucket
: o nome do bucket criado anteriormente (sem o prefixogs://
).gce_region
: a região em que você quer o job do Dataproc que atende aos requisitos de rede do Dataproc sem servidor. Essa é a região em que você ativou o Acesso privado do Google anteriormente.dataproc_service_account
: a conta de serviço do seu ambiente do Cloud Composer. Essa conta de serviço pode ser encontrada na guia de configuração do ambiente do Cloud Composer.
Faça upload do DAG para o bucket do ambiente
O Cloud Composer agenda os DAGs que estão na
pasta /dags
no bucket do ambiente. Para fazer o upload do DAG usando o console do
Google Cloud :
Na sua máquina local, salve s3togcsoperator_tutorial.py.
No console Google Cloud , acesse a página Ambientes.
Na lista de ambientes, na coluna Pasta de DAGs, clique no link DAGs. A pasta DAGs do seu ambiente é aberta.
Clique em Fazer o upload dos arquivos.
Selecione
s3togcsoperator_tutorial.py
na sua máquina local e clique em Abrir.
Acionar o DAG
No ambiente do Cloud Composer, clique na guia DAGs.
Clique no ID do DAG
s3_to_gcs_dag
.Clique em Acionar DAG.
Aguarde de cinco a dez minutos até ver uma marca de seleção verde indicando que as tarefas foram concluídas com sucesso.
Validar o sucesso do DAG
No console Google Cloud , acesse a página BigQuery.
No painel Explorer, clique no nome do projeto.
Clique em
holidays_weather_joined
.Clique em "Visualização" para conferir a tabela resultante. Os números na coluna "Valor" estão em décimos de grau Celsius.
Clique em
holidays_weather_normalized
.Clique em "Visualização" para conferir a tabela resultante. Os números na coluna "Valor" estão em graus Celsius.
Limpeza
Exclua os recursos individuais criados para este tutorial:
Exclua o arquivo
holidays.csv
no bucket do AWS S3.Exclua o bucket do AWS S3 que você criou.
Exclua o bucket do Cloud Storage que você criou para este tutorial.
Exclua o ambiente do Cloud Composer, incluindo a exclusão manual do bucket do ambiente.