Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Esta página descreve como usar o Cloud Composer 2 para executar cargas de trabalho do Dataproc sem servidor no Google Cloud.
Os exemplos nas secções seguintes mostram como usar operadores para gerir cargas de trabalho em lote sem servidor do Dataproc. Use estes operadores em DAGs que criam, eliminam, listam e obtêm uma carga de trabalho em lote do Dataproc Serverless Spark:
Crie DAGs para operadores que funcionam com cargas de trabalho em lote sem servidor do Dataproc:
Crie DAGs que usem contentores personalizados e Dataproc Metastore.
Configure o Persistent History Server para estes DAGs.
Antes de começar
Ative a API Dataproc:
Consola
Enable the Dataproc API.
gcloud
Enable the Dataproc API:
gcloud services enable dataproc.googleapis.com
Selecione a localização do ficheiro de carga de trabalho em lote. Pode usar qualquer uma das seguintes opções:
- Crie um contentor do Cloud Storage que armazene este ficheiro.
- Use o contentor do seu ambiente. Uma vez que não precisa de sincronizar este ficheiro com o Airflow, pode criar uma subpasta separada fora das pastas
/dags
ou/data
. Por exemplo,/batches
. - Use um contentor existente.
Configure ficheiros e variáveis do Airflow
Esta secção demonstra como configurar ficheiros e variáveis do Airflow para este tutorial.
Carregue um ficheiro de carga de trabalho de ML do Dataproc Serverless Spark para um contentor
A carga de trabalho neste tutorial executa um script pyspark:
Guarde qualquer script pyspark num ficheiro local com o nome
spark-job.py
. Por exemplo, pode usar o script pyspark de exemplo.Carregue o ficheiro para a localização que selecionou em Antes de começar.
Defina variáveis do Airflow
Os exemplos nas secções seguintes usam variáveis do Airflow. Define valores para estas variáveis no Airflow e, em seguida, o código DAG pode aceder a estes valores.
Os exemplos neste tutorial usam as seguintes variáveis do Airflow. Pode defini-los conforme necessário, consoante o exemplo que usar.
Defina as seguintes variáveis do Airflow para utilização no código do DAG:
project_id
: ID do projeto.bucket_name
: URI de um contentor onde se encontra o ficheiro Python principal da carga de trabalho (spark-job.py
). Selecionou esta localização em Antes de começar.phs_cluster
: Nome do cluster do servidor de histórico persistente. Define esta variável quando cria um servidor de histórico persistente.image_name
: nome e etiqueta da imagem do contentor personalizado (image:tag
). Define esta variável quando usa a imagem do contentor personalizado com DataprocCreateBatchOperator.metastore_cluster
: nome do serviço Dataproc Metastore. Define esta variável quando usa o serviço Dataproc Metastore com o DataprocCreateBatchOperator.region_name
: região onde o serviço Dataproc Metastore está localizado. Define esta variável quando usa o serviço Dataproc Metastore com o DataprocCreateBatchOperator.
Use a Google Cloud consola e a IU do Airflow para definir cada variável do Airflow
Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, clique no link Airflow para o seu ambiente. A IU do Airflow é aberta.
Na IU do Airflow, selecione Admin > Variáveis.
Clique em Adicionar um novo registo.
Especifique o nome da variável no campo Chave e defina o valor para esta no campo Valor.
Clique em Guardar.
Crie um servidor de histórico persistente
Use um servidor de histórico persistente (PHS) para ver os ficheiros de histórico do Spark das suas cargas de trabalho em lote:
- Crie um servidor de histórico persistente.
- Certifique-se de que especificou o nome do cluster do PHS na
phs_cluster
variável do Airflow.
DataprocCreateBatchOperator
O seguinte DAG inicia uma carga de trabalho em lote do Dataproc Serverless.
Para mais informações sobre os argumentos DataprocCreateBatchOperator
, consulte o código fonte do operador.
Para mais informações sobre os atributos que pode transmitir no parâmetro batch
de DataprocCreateBatchOperator
, consulte a
descrição da classe Batch.
Use uma imagem de contentor personalizada com o DataprocCreateBatchOperator
O exemplo seguinte mostra como usar uma imagem de contentor personalizada para executar as suas cargas de trabalho. Pode usar um contentor personalizado, por exemplo, para adicionar dependências do Python que não são fornecidas pela imagem do contentor predefinida.
Para usar uma imagem de contentor personalizada:
Crie uma imagem de contentor personalizada e carregue-a para o Container Registry.
Especifique a imagem na
image_name
variável do Airflow.Use o DataprocCreateBatchOperator com a sua imagem personalizada:
Use o serviço Dataproc Metastore com o DataprocCreateBatchOperator
Para usar um serviço Dataproc Metastore a partir de um DAG:
Verifique se o serviço de metastore já foi iniciado.
Para saber como iniciar um serviço de metastore, consulte o artigo Ative e desative o Dataproc Metastore.
Para obter informações detalhadas sobre o operador de lote para criar a configuração, consulte PeripheralsConfig.
Assim que o serviço de metastore estiver em funcionamento, especifique o respetivo nome na variável
metastore_cluster
e a respetiva região naregion_name
variável do Airflow.Use o serviço de metastore em DataprocCreateBatchOperator:
DataprocDeleteBatchOperator
Pode usar o DataprocDeleteBatchOperator para eliminar um lote com base no ID do lote da carga de trabalho.
DataprocListBatchesOperator
DataprocDeleteBatchOperator apresenta as tarefas em lote existentes num determinado project_id e região.
DataprocGetBatchOperator
O DataprocGetBatchOperator obtém uma carga de trabalho em lote específica.