Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Este tutorial é uma modificação de Executar um DAG de análise de dados no Google Cloud, que mostra como conectar o ambiente do Cloud Composer ao Amazon Web Services para utilizar os dados armazenados nele. Ele mostra como usar o Cloud Composer para criar um DAG do Apache Airflow (em inglês). O DAG mescla dados de um conjunto de dados público do BigQuery e um arquivo CSV armazenado em um bucket S3 da Amazon Web Services (AWS) e, em seguida, executa um job em lote do Dataproc sem servidor para processar os dados mesclados.
O conjunto de dados público do BigQuery neste tutorial é o ghcn_d, um banco de dados integrado de resumos climáticos de todo o mundo. O arquivo CSV contém informações sobre as datas e os nomes dos feriados dos EUA de 1997 a 2021.
A pergunta que queremos responder usando o DAG é: "Qual foi o nível de calor em Chicago no Dia de Ação de Graças nos últimos 25 anos?".
Objetivos
- Criar um ambiente do Cloud Composer na configuração padrão
- Crie um bucket no AWS S3
- Criar um conjunto de dados vazio do BigQuery
- Crie um novo bucket do Cloud Storage
- Crie e execute um DAG que inclua as seguintes tarefas:
- Carregar um conjunto de dados externo do S3 para o Cloud Storage
- Carregar um conjunto de dados externo do Cloud Storage para o BigQuery
- mesclar dois conjuntos de dados no BigQuery;
- Executar um job de análise de dados do PySpark
Antes de começar
Gerenciar permissões na AWS
Siga a seção "Como criar políticas com o editor visual" do tutorial da AWS sobre criação de políticas do IAM para criar uma política do IAM personalizada para o AWS S3 com a seguinte configuração:
- Serviço:S3
- ListAllMyBuckets (
s3:ListAllMyBuckets
), para visualizar o bucket do S3 - CreateBucket (
s3:CreateBucket
), para criar um bucket - PutBucketOwnershipControls (
s3:PutBucketOwnershipControls
), para criar um bucket - ListBucket (
s3:ListBucket
), para conceder permissão para listar objetos em um bucket do S3 - PutObject (
s3:PutObject
), para fazer upload de arquivos para um bucket - GetBucketVersioning (
s3:GetBucketVersioning
), para excluir um objeto em um bucket - DeleteObject (
s3:DeleteObject
) para excluir um objeto em um bucket - ListBucketVersions (
s3:ListBucketVersions
), para excluir um bucket - DeleteBucket (
s3:DeleteBucket
) para excluir um bucket - Recursos: escolha "Qualquer" ao lado de "bucket" e "objeto" para conceder permissões a qualquer recurso desse tipo.
- Tag:nenhuma
- Nome: TutorialPolicy
Consulte a lista de ações compatíveis com o Amazon S3 para mais informações sobre cada configuração encontrada acima.
Ativar APIs
Ative as APIs a seguir:
Console
Ative as APIs Dataproc, Cloud Composer, BigQuery, Cloud Storage.
gcloud
Ative as APIs Dataproc, Cloud Composer, BigQuery, Cloud Storage:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Conceder permissões
Conceda os seguintes papéis e permissões à conta de usuário:
Conceder papéis para gerenciar ambientes e buckets de ambiente do Cloud Composer.
Conceda o papel Proprietário de dados do BigQuery (
roles/bigquery.dataOwner
) para criar um conjunto de dados do BigQuery.Conceda o papel Administrador do Storage (
roles/storage.admin
) para criar um bucket do Cloud Storage.
Criar e preparar o ambiente do Cloud Composer
Crie um ambiente do Cloud Composer com parâmetros padrão:
- Escolha uma região dos EUA.
- Escolha a versão mais recente do Cloud Composer.
Conceda os papéis a seguir à conta de serviço usada no ambiente do Cloud Composer para que os workers do Airflow executem as tarefas do DAG:
- Usuário do BigQuery (
roles/bigquery.user
) - Proprietário de dados do BigQuery (
roles/bigquery.dataOwner
) - Usuário da conta de serviço (
roles/iam.serviceAccountUser
) - Editor do Dataproc (
roles/dataproc.editor
) - Worker do Dataproc (
roles/dataproc.worker
)
- Usuário do BigQuery (
Criar e modificar recursos relacionados no Google Cloud
Instale o pacote PyPI
apache-airflow-providers-amazon
no ambiente do Cloud Composer.Crie um conjunto de dados vazio do BigQuery com os seguintes parâmetros:
- Nome:
holiday_weather
- Região:
US
- Nome:
Crie um novo bucket do Cloud Storage na multirregião
US
.Execute o comando a seguir para ativar o acesso privado do Google na sub-rede padrão da região em que você quer executar o Dataproc sem servidor para atender aos requisitos de rede. Recomendamos usar a mesma região do ambiente do Cloud Composer.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Criar recursos relacionados na AWS
Crie um bucket do S3 com as configurações padrão na região de sua preferência.
Conectar-se à AWS pelo Cloud Composer
- Gerar o ID da chave de acesso e a chave de acesso secreta da AWS
Adicione a conexão da AWS S3 usando a IU do Airflow:
- Acesse Administrador > Conexões.
Crie uma nova conexão com a seguinte configuração:
- ID da conexão:
aws_s3_connection
- Tipo de conexão:
Amazon S3
- Extras:
{"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}
- ID da conexão:
Processamento de dados com o Dataproc sem servidor
conheça o exemplo de job do PySpark
O código mostrado abaixo é um exemplo de job do PySpark que converte a temperatura de dezenas de graus em Celsius para graus Celsius. Esse job converte os dados de temperatura do conjunto em um formato diferente.
Faça upload do arquivo PySpark para o Cloud Storage
Para fazer upload do arquivo PySpark para o Cloud Storage:
Salve o data_analytics_process.py na sua máquina local.
No console do Google Cloud, acesse a página Navegador do Cloud Storage:
Clique no nome do bucket que você criou anteriormente.
Na guia Objetos do bucket, clique no botão Fazer upload de arquivos, selecione
data_analytics_process.py
na caixa de diálogo exibida e clique em Abrir.
Faça upload do arquivo CSV para o AWS S3
Para fazer upload do arquivo holidays.csv
:
- Salve
holidays.csv
na máquina local. - Siga o guia da AWS para fazer upload do arquivo no bucket.
DAG de análise de dados
conheça o DAG de exemplo
O DAG usa vários operadores para transformar e unificar os dados:
O
S3ToGCSOperator
transfere o arquivo holidays.csv do bucket do AWS S3 para o bucket do Cloud Storage.O
GCSToBigQueryOperator
ingere o arquivo holidays.csv do Cloud Storage em uma nova tabela no conjunto de dadosholidays_weather
do BigQuery criado anteriormente.O
DataprocCreateBatchOperator
cria e executa um job em lote do PySpark usando o Dataproc sem servidor.O
BigQueryInsertJobOperator
mescla os dados de holidays.csv na coluna "Data" com os dados meteorológicos do conjunto de dados público ghcn_d do BigQuery. As tarefasBigQueryInsertJobOperator
são geradas dinamicamente usando uma repetição "for", e essas tarefas estão em umTaskGroup
para facilitar a leitura na visualização de gráfico da interface do Airflow.
Usar a interface do Airflow para adicionar variáveis
No Airflow, as variáveis são uma maneira universal de armazenar e recuperar configurações arbitrárias como um repositório simples de chave-valor. Ele usa variáveis do Airflow para armazenar valores comuns. Para adicioná-los ao seu ambiente:
Acesse Administrador > Variáveis.
Adicione as seguintes variáveis:
s3_bucket
: o nome do bucket do S3 que você criou anteriormente.gcp_project
: o ID do projeto.gcs_bucket
: o nome do bucket criado anteriormente (sem o prefixogs://
).gce_region
: a região em que você quer que o job do Dataproc que atenda aos requisitos de rede sem servidor do Dataproc. Esta é a região em que você ativou o Acesso privado do Google anteriormente.dataproc_service_account
: a conta de serviço do ambiente do Cloud Composer. Essa conta de serviço está disponível na guia de configuração do ambiente do Cloud Composer.
faça upload do DAG para o bucket do ambiente
O Cloud Composer programa DAGs que estão localizados na pasta /dags
no bucket do ambiente. Para fazer upload do DAG usando o console do Google Cloud:
Na máquina local, salve s3togcsoperator_tutorial.py.
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, na coluna pasta DAG, clique no link DAGs. A pasta de DAGs do ambiente é aberta.
Clique em Fazer o upload dos arquivos.
Selecione
s3togcsoperator_tutorial.py
na máquina local e clique em Abrir.
Como acionar o DAG
No ambiente do Cloud Composer, clique na guia DAGs.
Clique no ID do DAG
s3_to_gcs_dag
.Clique em Acionar DAG.
Aguarde cerca de cinco a dez minutos até ver uma marca de seleção verde indicando que as tarefas foram concluídas com êxito.
Valide o sucesso do DAG
No console do Google Cloud, acesse a página do BigQuery.
No painel Explorer, clique no nome do projeto.
Clique em
holidays_weather_joined
.Clique em "Visualizar" para conferir a tabela resultante. Observe que os números na coluna de valor estão em décimos de grau Celsius.
Clique em
holidays_weather_normalized
.Clique em "Visualizar" para conferir a tabela resultante. Observe que os números na coluna de valor estão em graus Celsius.
limpeza
Exclua os recursos individuais que você criou para este tutorial:
Exclua o arquivo
holidays.csv
no bucket da AWS S3.Exclua o bucket do AWS S3 que você criou.
Exclua o bucket do Cloud Storage que você criou para este tutorial.
Exclua o ambiente do Cloud Composer, incluindo a exclusão manual do bucket do ambiente.