Cloud Composer 1 | Cloud Composer 2
Nesta página, descrevemos como usar o Cloud Composer 2 para executar cargas de trabalho do Dataproc sem servidor no Google Cloud.
Os exemplos nas seções a seguir mostram como usar os operadores para gerenciar cargas de trabalho em lote sem servidor do Dataproc. Esses operadores são usados nos DAGs que criam, excluem, listam e recebem uma carga de trabalho em lote do Spark sem servidor do Dataproc:
Crie DAGs para operadores que funcionam com cargas de trabalho em lote sem servidor do Dataproc:
Crie DAGs que usem contêineres personalizados e o metastore do Dataproc.
Configure o servidor de histórico permanente para esses DAGs.
Antes de começar
Ative a API Dataproc:
Console
Ative a API Dataproc.
gcloud
Ative a API Dataproc:
gcloud services enable dataproc.googleapis.com
Selecione o local do arquivo de carga de trabalho do Batch. É possível usar qualquer uma das seguintes opções:
- Crie um bucket do Cloud Storage que armazene esse arquivo.
- Use o bucket do ambiente. Como não é necessário sincronizar esse arquivo com o Airflow, é possível criar uma subpasta separada fora das pastas
/dags
ou/data
. Por exemplo,/batches
. - Usar um bucket atual.
configurar arquivos e variáveis do Airflow
Esta seção demonstra como configurar arquivos e variáveis do Airflow para este tutorial.
Fazer upload de um arquivo de carga de trabalho do Spark ML sem servidor do Dataproc para um bucket
A carga de trabalho neste tutorial executa um script pyspark:
Salve qualquer script pyspark em um arquivo local chamado
spark-job.py
. Por exemplo, use o script de amostra pyspark.Faça o upload do arquivo para o local selecionado em Antes de começar.
definir variáveis do Airflow
Os exemplos nas seções a seguir usam variáveis do Airflow. Defina valores para essas variáveis no Airflow e, em seguida, seu código DAG poderá acessar esses valores.
Os exemplos neste tutorial usam as seguintes variáveis do Airflow. É possível defini-las conforme necessário, dependendo do exemplo usado.
Defina as variáveis do Airflow para uso no código DAG:
project_id
: ID do projeto.bucket_name
: URI de um bucket em que o arquivo Python principal da carga de trabalho (spark-job.py
) está localizado. Você selecionou esse local em Antes de começar.phs_cluster
: nome do cluster do servidor de histórico permanente. Você define essa variável ao criar um servidor de histórico permanente.image_name
: nome e tag da imagem do contêiner personalizada (image:tag
). Você define essa variável ao usar a imagem de contêiner personalizada com o DataprocCreateBatchOperator.metastore_cluster
: nome do serviço do metastore do Dataproc. Defina essa variável ao usar o serviço Metastore do Dataproc com o DataprocCreateBatchOperator.region_name
: região em que o serviço Metastore do Dataproc está localizado. Defina essa variável ao usar o serviço Metastore do Dataproc com o DataprocCreateBatchOperator.
Use o console do Google Cloud e a interface do Airflow para definir cada variável do Airflow
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, clique no link do Airflow. A interface do Airflow é aberta.
Na interface do Airflow, selecione Admin > Variables.
Clique em Add a new record.
Especifique o nome da variável no campo Key e defina o valor para ela no campo Val.
Clique em Salvar.
Crie um servidor de histórico permanente
Use um servidor de histórico permanente (PHS, na sigla em inglês) para visualizar os arquivos de histórico do Spark das cargas de trabalho em lote:
- Criar um servidor de histórico permanente.
- Verifique se você especificou o nome do cluster PHS na variável
phs_cluster
do Airflow.
DataprocCreateBatchOperator
O DAG a seguir inicia uma carga de trabalho em lote sem servidor do Dataproc.
Para mais informações sobre argumentos DataprocCreateBatchOperator
, consulte o
código-fonte do operador.
Para mais informações sobre atributos que você pode transmitir no parâmetro batch
de DataprocCreateBatchOperator
, consulte a
descrição da classe Batch.
Usar uma imagem de contêiner personalizada com o DataprocCreateBatchOperator
O exemplo a seguir mostra como usar uma imagem de contêiner personalizada para executar suas cargas de trabalho. É possível usar um contêiner personalizado, por exemplo, para adicionar dependências do Python não fornecidas pela imagem de contêiner padrão.
Para usar uma imagem de contêiner personalizada:
Crie uma imagem de contêiner personalizada e faça upload dela para o Container Registry.
Especifique a imagem na variável do Airflow
image_name
.Use DataprocCreateBatchOperator com a imagem personalizada:
Usar o serviço Metastore do Dataproc com DataprocCreateBatchOperator
Para usar um serviço Metastore do Dataproc de um DAG:
Verifique se o serviço metastore já foi iniciado.
Para saber mais sobre como iniciar um serviço metastore, consulte Ativar e desativar o metastore do Dataproc.
Para informações detalhadas sobre o operador de lote para criar a configuração, consulte PeripheralsConfig.
Quando o serviço metastore estiver funcionando, especifique o nome dele na variável
metastore_cluster
e a região na variável do Airflowregion_name
.Use o serviço metastore no DataprocCreateBatchOperator:
DataprocDeleteBatchOperator
Use o DataprocDeleteBatchOperator para excluir um lote com base no ID da carga de trabalho.
DataprocListBatchesOperator
DataprocDeleteBatchOperator lista os lotes que existem em um determinado project_id e região.
DataprocGetBatchOperator
DataprocGetBatchOperator busca uma carga de trabalho em lote específica.