Ejecuta un DAG de Apache Airflow en Cloud Composer 2 (Google Cloud CLI)
Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
En esta guía de inicio rápido, se muestra cómo crear un entorno de Cloud Composer y cómo ejecutar un DAG de Apache Airflow en Cloud Composer 2.
Si eres nuevo en Airflow, consulta el instructivo de conceptos de Airflow en la documentación de Apache Airflow para obtener más información sobre los conceptos, los objetos y su uso.
Si quieres usar la consola de Google Cloud, consulta Cómo ejecutar un DAG de Apache Airflow en Cloud Composer.
Si deseas crear un entorno con Terraform, consulta Crea entornos (Terraform).
Antes de comenzar
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Cloud Composer API:
gcloud services enable composer.googleapis.com
-
Si quieres obtener los permisos que necesitas para completar esta guía de inicio rápido, pídele a tu administrador que te otorgue los siguientes roles de IAM en tu proyecto:
-
Para ver, crear y administrar el entorno de Cloud Composer, haz lo siguiente:
-
Administrador de objetos de almacenamiento y entorno (
roles/composer.environmentAndStorageObjectAdmin
) -
Usuario de cuenta de servicio (
roles/iam.serviceAccountUser
)
-
Administrador de objetos de almacenamiento y entorno (
-
Para ver los registros, usa el Visualizador de registros (
roles/logging.viewer
).
Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.
También puedes obtener los permisos necesarios mediante roles personalizados o cualquier otro rol predefinido.
-
Para ver, crear y administrar el entorno de Cloud Composer, haz lo siguiente:
Crea la cuenta de servicio de un entorno
Cuando creas un entorno, especificas una cuenta de servicio. Esta cuenta de servicio se denomina cuenta de servicio del entorno. Tu entorno usa esta cuenta de servicio para realizar la mayoría de las operaciones.
La cuenta de servicio de tu entorno no es una cuenta de usuario. Una cuenta de servicio es un tipo especial de cuenta que usa una aplicación o una instancia de máquina virtual (VM), no una persona.
Para crear una cuenta de servicio para tu entorno, sigue estos pasos:
Crea una cuenta de servicio nueva, como se describe en la documentación de Identity and Access Management.
Otórgale un rol, como se describe en la documentación de Identity and Access Management. El rol requerido es Trabajador de Composer (
composer.worker
).
Crear un entorno
Si este es el primer entorno de tu proyecto, agrega la cuenta de agente de servicio de Cloud Composer como un principal nuevo en la cuenta de servicio de tu entorno y otórgale el rol roles/composer.ServiceAgentV2Ext
.
De forma predeterminada, tu entorno usa la cuenta de servicio predeterminada de Compute Engine y en el siguiente ejemplo, se muestra cómo agregarle este permiso.
# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
--filter="$(gcloud config get-value project)" \
--format="value(PROJECT_NUMBER)" \
--limit=1)
# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
$PROJECT_NUMBER-compute@developer.gserviceaccount.com \
--member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
--role roles/composer.ServiceAgentV2Ext
Crea un entorno nuevo llamado example-environment
en la región us-central1
con la versión más reciente de Cloud Composer 2.
gcloud composer environments create example-environment \
--location us-central1 \
--image-version composer-2.11.1-airflow-2.10.2
Crea un archivo DAG
Un DAG de Airflow es una colección de tareas organizadas que deseas programar y ejecutar. Los DAG se definen en archivos estándares de Python.
En esta guía, se usa un ejemplo de DAG de Airflow definido en el archivo quickstart.py
.
El código de Python de este archivo hace lo siguiente:
- Crea un DAG
composer_sample_dag
. Este DAG se ejecuta todos los días. - Ejecuta una tarea
print_dag_run_conf
. La tarea imprime la configuración de ejecución del DAG con el operador bash.
Guarda una copia del archivo quickstart.py
en tu máquina local:
Sube el archivo de DAG al bucket de tu entorno
Cada entorno de Cloud Composer tiene un bucket de Cloud Storage asociado. Airflow en Cloud Composer programa solo los DAG que se encuentran en la carpeta /dags
de este bucket.
Para programar tu DAG, sube quickstart.py
desde tu máquina local a la carpeta /dags
de tu entorno:
Para subir quickstart.py
con Google Cloud CLI, ejecuta el siguiente comando en
la carpeta en la que se encuentra el archivo quickstart.py
:
gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py
Cómo ver el DAG
Después de subir el archivo DAG, Airflow hace lo siguiente:
- Analiza el archivo DAG que subiste. Es posible que el DAG tarde unos minutos en estar disponible para Airflow.
- Agrega el DAG a la lista de DAGs disponibles.
- Ejecuta el DAG según el programa que proporcionaste en el archivo DAG.
Para verificar que tu DAG se procese sin errores y esté disponible en Airflow, obsérvalo en la IU de DAG. La IU de DAG es la interfaz de Cloud Composer para ver información de DAG en la consola de Google Cloud. Cloud Composer también proporciona acceso a la IU de Airflow, que es una interfaz web nativa de Airflow.
Espera unos cinco minutos para darle tiempo a Airflow de procesar el archivo DAG que subiste anteriormente y completar la primera ejecución del DAG (se explica más adelante).
Ejecuta el siguiente comando en Google Cloud CLI. Este comando ejecuta el comando de la CLI de Airflow
dags list
que muestra una lista de los DAG en tu entorno.gcloud composer environments run example-environment \ --location us-central1 \ dags list
Verifica que el DAG de
composer_quickstart
aparezca en el resultado del comando.Resultado de ejemplo:
Executing the command: [ airflow dags list ]... Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb Use ctrl-c to interrupt the command dag_id | filepath | owner | paused ====================+=======================+==================+======= airflow_monitoring | airflow_monitoring.py | airflow | False composer_quickstart | dag-quickstart-af2.py | Composer Example | False
Cómo ver los detalles de la ejecución del DAG
Una sola ejecución de un DAG se denomina ejecución de DAG. Airflow ejecuta de inmediato una ejecución de DAG para el DAG de ejemplo porque la fecha de inicio en el archivo DAG está establecida en ayer. De esta manera, Airflow se pone al día con la programación del DAG especificado.
El DAG de ejemplo contiene una tarea, print_dag_run_conf
, que ejecuta el comando echo
en la consola. Este comando genera metainformación sobre el DAG (el identificador numérico de la ejecución del DAG).
Ejecuta el siguiente comando en Google Cloud CLI. Este comando muestra una lista de las ejecuciones de DAG para el DAG composer_quickstart
:
gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart
Resultado de ejemplo:
dag_id | run_id | state | execution_date | start_date | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
La CLI de Airflow no proporciona un comando para ver los registros de tareas. Puedes usar otros métodos para ver los registros de tareas de Airflow: la IU de DAG de Cloud Composer, la IU de Airflow o Cloud Logging. En esta guía, se muestra una forma de consultar Cloud Logging para obtener registros de una ejecución de DAG específica.
Ejecuta el siguiente comando en Google Cloud CLI. Este comando lee los registros de
Cloud Logging para una ejecución de DAG específica del DAG composer_quickstart
. El argumento --format
da formato al resultado para que solo se muestre el texto del mensaje de registro.
gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"
Reemplaza lo siguiente:
RUN_ID
con el valorrun_id
del resultado del comandotasks states-for-dag-run
que ejecutaste antes. Por ejemplo,2024-02-17T15:38:38.969307+00:00
.
Resultado de ejemplo:
...
Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task
...
Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746
...
Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check
Limpia
Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos que se usaron en esta página, borra el proyecto de Google Cloud que tiene los recursos.
Borra los recursos que se usaron en este instructivo:
Borra el entorno de Cloud Composer:
En la consola de Google Cloud, ve a la página Entornos.
Selecciona
example-environment
y haz clic en Borrar.Espera hasta que se borre el entorno.
Borra el bucket de tu entorno. Aunque borres el entorno de Cloud Composer, no se borra el bucket.
En la consola de Google Cloud, ve a la página Almacenamiento > Navegador.
Selecciona el bucket del entorno y haz clic en Borrar. Por ejemplo, este bucket puede llamarse
us-central1-example-environ-c1616fe8-bucket
.
¿Qué sigue?