Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Neste tutorial, mostramos como usar o Cloud Composer para criar um DAG (gráfico acíclico direcionado) do Apache Airflow que executa um job de contagem de palavras do Apache Hadoop em um cluster do Dataproc.
Objetivos
- Acesse seu ambiente do Cloud Composer e use a interface do Airflow.
- Crie e visualize variáveis de ambiente do Airflow.
- Criar e executar um DAG que inclua as seguintes tarefas:
- Cria um Dataproc aglomerado.
- Executa um Apache Hadoop (em inglês) de contagem de palavras no cluster.
- Gera os resultados da contagem de palavras para um Cloud Storage do Google Cloud.
- Exclui o cluster.
Custos
Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:
- Cloud Composer
- Dataproc
- Cloud Storage
Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços.
Antes de começar
Verifique se as seguintes APIs estão ativadas no projeto:
Console
Enable the Dataproc, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.com
storage-component.googleapis.com No projeto, crie um bucket do Cloud Storage de qualquer classe e região de armazenamento para armazenar os resultados do job de contagem de palavras do Hadoop.
Anote o caminho do bucket que você criou, por exemplo
gs://example-bucket
: Você vai definir uma variável do Airflow para esse caminho e use a variável no DAG de exemplo mais adiante neste tutorial.Crie um ambiente do Cloud Composer com parâmetros padrão. Aguarde até que a criação do ambiente seja concluída. Quando terminar, a marca de seleção verde vai aparecer à esquerda do nome do ambiente.
Observe a região em que você criou o ambiente, por exemplo,
us-central
. Você definirá uma variável do Airflow para essa região e usá-lo no DAG de exemplo para executar um cluster do Dataproc na mesma região.
definir variáveis do Airflow
Defina as variáveis do Airflow para usar mais tarde no DAG de exemplo. Por exemplo, é possível definir variáveis do Airflow na interface do Airflow.
Variável do Airflow | Valor |
---|---|
gcp_project
|
O ID do projeto que você está usando neste tutorial, como example-project . |
gcs_bucket
|
O bucket do URI do Cloud Storage que você criou para este tutorial,
como gs://example-bucket . |
gce_region
|
A região em que você criou o ambiente, como us-central1 .
Essa é a região em que o cluster do Dataproc será criado. |
Conferir o fluxo de trabalho de exemplo
Um DAG do Airflow é uma coleção de tarefas organizadas que você quer programar
e executar. Os DAGs são definidos em arquivos Python padrão. O código mostrado em
hadoop_tutorial.py
é o código do fluxo de trabalho.
Operadores
Para orquestrar as três tarefas no fluxo de trabalho de exemplo, o DAG importa os seguintes três operadores do Airflow:
DataprocClusterCreateOperator
: cria um cluster do Dataproc.DataProcHadoopOperator
: envia um job de contagem de palavras do Hadoop e grava os resultados em um bucket do Cloud Storage.DataprocClusterDeleteOperator
: exclui o cluster para evitar cobranças recorrentes do Compute Engine.
Dependências
Você organiza as tarefas que quer executar de uma forma que reflita os relacionamentos e as dependências delas. As tarefas neste DAG são executadas sequencialmente.
Programação
O nome do DAG é composer_hadoop_tutorial
, e ele é executado uma vez a cada
dia. Como o start_date
transmitido para default_dag_args
está definido como yesterday
, o Cloud Composer programa o fluxo de trabalho para iniciar imediatamente após o upload do DAG para o bucket do ambiente.
Faça upload do DAG para o bucket do ambiente
O Cloud Composer armazena DAGs na pasta /dags
da sua
bucket do seu ambiente de execução.
Siga estas etapas para fazer o upload do DAG:
Na máquina local, salve
hadoop_tutorial.py
.No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, na coluna pasta de DAGs de cada ambiente, clique no link DAGs.
Clique em Enviar arquivos.
Selecione
hadoop_tutorial.py
na sua máquina local e clique em Abrir.
O Cloud Composer adiciona o DAG ao Airflow e agenda o DAG automaticamente. As alterações no DAG levam de 3 a 5 minutos.
Analisar execuções de DAG
Conferir o status da tarefa
Quando você faz o upload do arquivo DAG para a pasta dags/
no Cloud Storage,
o Cloud Composer analisa o arquivo. Quando concluído, o nome
do fluxo de trabalho aparece na listagem de DAGs e é colocado em fila para ser executado
imediatamente.
Para ver o status da tarefa, acesse a interface da Web do Airflow e clique em DAGs na barra de ferramentas.
Para abrir a página de detalhes do DAG, clique em
composer_hadoop_tutorial
. Isso inclui uma representação gráfica das tarefas do fluxo de trabalho e dependências.Para ver o status de cada tarefa, clique em Visualização de gráfico e, em seguida, passe o mouse sobre a imagem de cada tarefa.
Enfileirar o fluxo de trabalho novamente
Para executar o fluxo de trabalho novamente a partir da Visualização de gráfico:
- Na visualização de gráfico da IU do Airflow, clique no gráfico
create_dataproc_cluster
. - Para redefinir as três tarefas, clique em Limpar e em OK para confirmar.
- Clique em
create_dataproc_cluster
novamente na visualização de gráfico. - Para enfileirar o fluxo de trabalho novamente, clique em Executar.
Ver resultados de tarefas
Também é possível verificar o status e os resultados do composer_hadoop_tutorial
.
do BigQuery acessando as seguintes páginas do console do Google Cloud:
Clusters do Dataproc: para monitorar a criação de clusters e exclusão. O cluster criado pelo fluxo de trabalho é temporário: ele existe apenas durante o fluxo de trabalho e é excluído como parte do a última tarefa do fluxo de trabalho.
Jobs do Dataproc: para visualizar ou monitorar o job de contagem de palavras do Apache Hadoop. Clique no ID do job para ver a saída do registro dele.
Navegador do Cloud Storage: para ver os resultados da contagem de palavras no a pasta
wordcount
no bucket do Cloud Storage que você criou para este tutorial.
Limpeza
Exclua os recursos usados neste tutorial:
Exclua o ambiente do Cloud Composer, incluindo: excluir manualmente o bucket do ambiente.
Exclua o bucket do Cloud Storage que armazena os resultados do job de contagem de palavras do Hadoop.