Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Esta página descreve como usar o Cloud Composer 2 para executar cargas de trabalho Dataproc Serverless no Google Cloud.
Os exemplos nas seções a seguir mostram como usar operadores para gerenciar cargas de trabalho em lote sem servidor do Dataproc. Use estes operadores em DAGs que criam, excluem, listam e extraem uma carga de trabalho em lote do Dataproc Serverless Spark:
Crie DAGs para operadores que funcionam com cargas de trabalho em lote sem servidor do Dataproc:
Criar DAGs que usam contêineres personalizados Metastore do Dataproc.
Configure o Persistent History Server para esses DAGs.
Antes de começar
Ative a API Dataproc:
Console
Enable the Dataproc API.
gcloud
Enable the Dataproc API:
gcloud services enable dataproc.googleapis.com
Selecione o local do seu arquivo de carga de trabalho em lote. É possível usar qualquer um seguintes opções:
- Crie um bucket do Cloud Storage que armazene esse arquivo.
- Use o bucket do ambiente. Porque você não precisa sincronizar este arquivo
com o Airflow, é possível criar uma subpasta separada fora da
/dags
ou/data
pastas. Por exemplo,/batches
. - Usar um bucket atual.
configurar arquivos e variáveis do Airflow
Esta seção demonstra como configurar arquivos e variáveis do Airflow para este tutorial.
Fazer upload de um arquivo de carga de trabalho de ML do Spark sem servidor para um bucket
A carga de trabalho neste tutorial executa um script pyspark:
Salve qualquer script pyspark em um arquivo local chamado
spark-job.py
. Por exemplo, use o script de exemplo do pyspark.Faça upload do arquivo para o local selecionado em Antes de começar.
definir variáveis do Airflow
Os exemplos nas seções a seguir usam variáveis do Airflow. Você define valores para essas variáveis no Airflow, e o código do DAG pode acessar esses valores.
Os exemplos deste tutorial usam as seguintes variáveis do Airflow. Você pode definir conforme necessário, dependendo do exemplo usado.
Defina as seguintes variáveis do Airflow para usar no seu código DAG:
project_id
: ID do projeto.bucket_name
: URI de um bucket em que o arquivo principal do Python da carga de trabalho (spark-job.py
) está localizado. Você selecionou este local em Antes de começar.phs_cluster
: nome do cluster do servidor de histórico permanente. Você define essa variável ao Criar um Servidor de Histórico Persistente.image_name
: nome e tag da imagem do contêiner personalizado (image:tag
). Você defina essa variável quando usar imagem de contêiner personalizada com DataprocCreateBatchOperator.metastore_cluster
: nome do serviço do metastore do Dataproc. Você define essa variável quando usar o serviço Dataproc Metastore com DataprocCreateBatchOperator.region_name
: região em que o serviço Metastore do Dataproc está localizado. Defina essa variável ao usar o serviço do Dataproc Metastore com DataprocCreateBatchOperator.
Usar o console do Google Cloud e a interface do Airflow para definir cada variável do Airflow
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, clique no link Airflow para sua de nuvem. A interface do Airflow é aberta.
Na interface do Airflow, selecione Admin > Variables.
Clique em Add a new record.
Especifique o nome da variável no campo Chave e defina o valor no campo Val.
Clique em Salvar.
Crie um servidor de histórico permanente
Use um servidor de histórico persistente (PHS) para conferir os arquivos de histórico do Spark das suas cargas de trabalho em lote:
- Crie um servidor de histórico permanente.
- Verifique se você especificou o nome do cluster de PHS na
variável Airflow
phs_cluster
.
DataprocCreateBatchOperator
O DAG a seguir inicia uma carga de trabalho do Dataproc Serverless Batch.
Para mais informações sobre os argumentos DataprocCreateBatchOperator
, consulte o
código-fonte do operador.
Para mais informações sobre os atributos que podem ser transmitidos no método batch
parâmetro de DataprocCreateBatchOperator
, consulte a
descrição da classe Batch.
Usar a imagem de contêiner personalizada com o DataprocCreateBatchOperator
O exemplo a seguir mostra como usar uma imagem de contêiner personalizada para executar as cargas de trabalho. É possível usar um contêiner personalizado, por exemplo, para adicionar dependências do Python que não são fornecidas pela imagem de contêiner padrão.
Para usar uma imagem de contêiner personalizada:
Crie uma imagem de contêiner personalizada e faça upload dela para o Container Registry.
Especifique a imagem na variável do Airflow
image_name
.Use DataprocCreateBatchOperator com sua imagem personalizada:
Usar o serviço Metastore do Dataproc com o DataprocCreateBatchOperator
Para usar um serviço do metastore do Dataproc em um DAG:
Verifique se o serviço de metastore já foi iniciado.
Para saber mais sobre como iniciar um serviço de metastore, consulte Ative e desative o metastore do Dataproc.
Para informações detalhadas sobre o operador de lote para criar o do Terraform, consulte PeripheralsConfig.
Quando o serviço metastore estiver em execução, especifique seu nome em a variável
metastore_cluster
e a região dela na variávelregion_name
do Airflow.Use o serviço de metastore no DataprocCreateBatchOperator:
DataprocDeleteBatchOperator
É possível usar o DataprocDeleteBatchOperator para excluir um lote com base no ID da carga de trabalho.
DataprocListBatchesOperator
O DataprocDeleteBatchOperator lista os lotes que existem em um determinado project_id e região.
DataprocGetBatchOperator
O DataprocGetBatchOperator busca uma carga de trabalho em lote específica.