Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
En esta página se describe cómo usar Cloud Composer 2 para ejecutar cargas de trabajo de Dataproc Serverless enGoogle Cloud.
En los ejemplos de las siguientes secciones se muestra cómo usar operadores para gestionar cargas de trabajo por lotes de Dataproc Serverless. Estos operadores se usan en DAGs que crean, eliminan, enumeran y obtienen una carga de trabajo por lotes de Dataproc Serverless Spark:
Crea DAGs para operadores que funcionen con cargas de trabajo por lotes de Dataproc Serverless:
Crea DAGs que usen contenedores personalizados y Dataproc Metastore.
Configura Persistent History Server para estos DAGs.
Antes de empezar
Habilita la API de Dataproc:
Consola
Enable the Dataproc API.
gcloud
Enable the Dataproc API:
gcloud services enable dataproc.googleapis.com
Selecciona la ubicación del archivo de carga de trabajo de Batch. Puedes usar cualquiera de las siguientes opciones:
- Crea un segmento de Cloud Storage que almacene este archivo.
- Usa el segmento de tu entorno. Como no necesitas sincronizar este archivo con Airflow, puedes crear una subcarpeta independiente fuera de las carpetas
/dags
o/data
. Por ejemplo,/batches
. - Usa un segmento que ya tengas.
Configurar archivos y variables de Airflow
En esta sección se muestra cómo configurar los archivos y las variables de Airflow para este tutorial.
Subir un archivo de carga de trabajo de aprendizaje automático de Spark de Dataproc Serverless a un segmento
La carga de trabajo de este tutorial ejecuta una secuencia de comandos de pyspark:
Guarda cualquier secuencia de comandos de pyspark en un archivo local llamado
spark-job.py
. Por ejemplo, puedes usar el script de pyspark de ejemplo.Sube el archivo a la ubicación que hayas seleccionado en la sección Antes de empezar.
Definir variables de Airflow
En los ejemplos de las siguientes secciones se usan variables de Airflow. Puedes definir los valores de estas variables en Airflow y, después, el código de tu DAG podrá acceder a ellos.
En los ejemplos de este tutorial se usan las siguientes variables de Airflow. Puedes configurarlos según sea necesario, en función del ejemplo que utilices.
Define las siguientes variables de Airflow para usarlas en el código de tu DAG:
project_id
: ID del proyecto.bucket_name
: URI de un bucket en el que se encuentra el archivo principal de Python de la carga de trabajo (spark-job.py
). Has seleccionado esta ubicación en la sección Antes de empezar.phs_cluster
: nombre del clúster del servidor de historial persistente. Esta variable se define cuando crea un servidor de historial persistente.image_name
: nombre y etiqueta de la imagen de contenedor personalizada (image:tag
). Esta variable se define cuando usas una imagen de contenedor personalizada con DataprocCreateBatchOperator.metastore_cluster
: nombre del servicio Dataproc Metastore. Esta variable se define cuando usas el servicio Dataproc Metastore con DataprocCreateBatchOperator.region_name
: región en la que se encuentra el servicio Dataproc Metastore. Esta variable se define cuando usas el servicio Dataproc Metastore con DataprocCreateBatchOperator.
Usa la consola Google Cloud y la interfaz de usuario de Airflow para definir cada variable de Airflow
En la Google Cloud consola, ve a la página Entornos.
En la lista de entornos, haga clic en el enlace Airflow del entorno. Se abrirá la interfaz de usuario de Airflow.
En la interfaz de Airflow, seleccione Administrar > Variables.
Haz clic en Add a new record (Añadir un registro nuevo).
Especifica el nombre de la variable en el campo Clave y asigna el valor correspondiente en el campo Valor.
Haz clic en Guardar.
Crear un servidor de historial persistente
Usa un servidor de historial persistente (PHS) para ver los archivos de historial de Spark de tus cargas de trabajo por lotes:
- Crea un servidor de historial persistente.
- Asegúrate de haber especificado el nombre del clúster de PHS en la
phs_cluster
variable de Airflow.
DataprocCreateBatchOperator
El siguiente DAG inicia una carga de trabajo de Dataproc Serverless Batch.
Para obtener más información sobre los argumentos de DataprocCreateBatchOperator
, consulta el código fuente del operador.
Para obtener más información sobre los atributos que puede incluir en el parámetro batch
de DataprocCreateBatchOperator
, consulte la descripción de la clase Batch.
Usar una imagen de contenedor personalizada con DataprocCreateBatchOperator
En el siguiente ejemplo se muestra cómo usar una imagen de contenedor personalizada para ejecutar tus cargas de trabajo. Puedes usar un contenedor personalizado, por ejemplo, para añadir dependencias de Python que no proporciona la imagen de contenedor predeterminada.
Para usar una imagen de contenedor personalizada, sigue estos pasos:
Crea una imagen de contenedor personalizada y súbela a Container Registry.
Especifique la imagen en la
image_name
variable de Airflow.Usa DataprocCreateBatchOperator con tu imagen personalizada:
Usar el servicio Dataproc Metastore con DataprocCreateBatchOperator
Para usar un servicio Dataproc Metastore desde un DAG, sigue estos pasos:
Comprueba que el servicio metastore ya se haya iniciado.
Para obtener información sobre cómo iniciar un servicio de metastore, consulta Habilitar e inhabilitar Dataproc Metastore.
Para obtener información detallada sobre el operador de lote para crear la configuración, consulta PeripheralsConfig.
Una vez que el servicio del almacén de metadatos esté en funcionamiento, especifica su nombre en la variable
metastore_cluster
y su región en la variableregion_name
Airflow.Usa el servicio metastore en DataprocCreateBatchOperator:
DataprocDeleteBatchOperator
Puede usar DataprocDeleteBatchOperator para eliminar un lote en función del ID del lote de la carga de trabajo.
DataprocListBatchesOperator
DataprocDeleteBatchOperator muestra las tareas por lotes que existen en un project_id y una región determinados.
DataprocGetBatchOperator
DataprocGetBatchOperator obtiene una carga de trabajo por lotes concreta.