Il 15 settembre 2026, tutti gli ambienti Cloud Composer 1 e Cloud Composer 2 versione 2.0.x raggiungeranno la fine del ciclo di vita pianificata e non potrai più utilizzarli. Ti consigliamo di pianificare la migrazione a Cloud Composer 3.
Questa pagina descrive come utilizzare gli operatori Google Kubernetes Engine per creare
cluster in Google Kubernetes Engine e per avviare
pod Kubernetes
in questi cluster.
Gli operatori Google Kubernetes Engine eseguono i pod Kubernetes in un cluster specificato,
che può essere un cluster separato non correlato al tuo ambiente.
Al contrario, KubernetesPodOperatoresegue i pod Kubernetes
nel cluster del tuo ambiente.
Questa pagina descrive un DAG di esempio che crea un cluster Google Kubernetes Engine
con GKECreateClusterOperator, utilizza GKEStartPodOperator
con le seguenti configurazioni e poi lo elimina con
GKEDeleteClusterOperator:
Per seguire questo esempio, inserisci l'intero file gke_operator.py nella cartella dags/ del tuo ambiente o aggiungi il codice pertinente a un DAG.
Crea un cluster
Il codice mostrato qui crea un cluster Google Kubernetes Engine con due node pool,
pool-0 e pool-1, ognuno con un nodo. Se necessario, puoi impostare
altri parametri dall'API Google Kubernetes Engine nell'ambito di body.
Ti consigliamo di utilizzare i cluster regionali in Airflow 2. I cluster a livello di zona sono
più esposti a errori a livello di zona. Ad esempio, potresti voler utilizzare la regione
us-central1 per il cluster anziché la zona us-central1-a.
Per saperne di più sulle considerazioni specifiche per le regioni, consulta
Area geografica e regioni.
Prima del rilascio della versione 5.1.0 di apache-airflow-providers-google,
non era possibile passare l'oggetto node_pools in
GKECreateClusterOperator. Se utilizzi Airflow 2, assicurati che il tuo
ambiente utilizzi apache-airflow-providers-google versione 5.1.0 o successive. Puoi installare una versione più recente di questo pacchetto PyPI specificando apache-airflow-providers-google e >=5.1.0 come versione richiesta.
# TODO(developer): update with your valuesPROJECT_ID="my-project-id"# It is recommended to use regional clusters for increased reliability# though passing a zone in the location parameter is also validCLUSTER_REGION="us-west1"CLUSTER_NAME="example-cluster"CLUSTER={"name":CLUSTER_NAME,"node_pools":[{"name":"pool-0","initial_node_count":1},{"name":"pool-1","initial_node_count":1},],}create_cluster=GKECreateClusterOperator(task_id="create_cluster",project_id=PROJECT_ID,location=CLUSTER_REGION,body=CLUSTER,)
Avviare i carichi di lavoro nel cluster
Le sezioni seguenti spiegano ogni GKEStartPodOperatorconfigurazione
nell'esempio. Per informazioni su ogni variabile di configurazione, consulta
il riferimento di Airflow per gli operatori GKE.
fromairflowimportmodelsfromairflow.providers.google.cloud.operators.kubernetes_engineimport(GKECreateClusterOperator,GKEDeleteClusterOperator,GKEStartPodOperator,)fromairflow.utils.datesimportdays_agofromkubernetes.clientimportmodelsask8s_modelswithmodels.DAG("example_gcp_gke",schedule_interval=None,# Override to match your needsstart_date=days_ago(1),tags=["example"],)asdag:# TODO(developer): update with your valuesPROJECT_ID="my-project-id"# It is recommended to use regional clusters for increased reliability# though passing a zone in the location parameter is also validCLUSTER_REGION="us-west1"CLUSTER_NAME="example-cluster"CLUSTER={"name":CLUSTER_NAME,"node_pools":[{"name":"pool-0","initial_node_count":1},{"name":"pool-1","initial_node_count":1},],}create_cluster=GKECreateClusterOperator(task_id="create_cluster",project_id=PROJECT_ID,location=CLUSTER_REGION,body=CLUSTER,)kubernetes_min_pod=GKEStartPodOperator(# The ID specified for the task.task_id="pod-ex-minimum",# Name of task you want to run, used to generate Pod ID.name="pod-ex-minimum",project_id=PROJECT_ID,location=CLUSTER_REGION,cluster_name=CLUSTER_NAME,# Entrypoint of the container, if not specified the Docker container's# entrypoint is used. The cmds parameter is templated.cmds=["echo"],# The namespace to run within Kubernetes, default namespace is# `default`.namespace="default",# Docker image specified. Defaults to hub.docker.com, but any fully# qualified URLs will point to a custom repository. Supports private# gcr.io images if the Composer Environment is under the same# project-id as the gcr.io images and the service account that Composer# uses has permission to access the Google Container Registry# (the default service account has permission)image="gcr.io/gcp-runtimes/ubuntu_18_0_4",)kubenetes_template_ex=GKEStartPodOperator(task_id="ex-kube-templates",name="ex-kube-templates",project_id=PROJECT_ID,location=CLUSTER_REGION,cluster_name=CLUSTER_NAME,namespace="default",image="bash",# All parameters below are able to be templated with jinja -- cmds,# arguments, env_vars, and config_file. For more information visit:# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html# Entrypoint of the container, if not specified the Docker container's# entrypoint is used. The cmds parameter is templated.cmds=["echo"],# DS in jinja is the execution date as YYYY-MM-DD, this docker image# will echo the execution date. Arguments to the entrypoint. The docker# image's CMD is used if this is not provided. The arguments parameter# is templated.arguments=["{{ ds }}"],# The var template variable allows you to access variables defined in# Airflow UI. In this case we are getting the value of my_value and# setting the environment variable `MY_VALUE`. The pod will fail if# `my_value` is not set in the Airflow UI.env_vars={"MY_VALUE":"{{ var.value.my_value }}"},)kubernetes_affinity_ex=GKEStartPodOperator(task_id="ex-pod-affinity",project_id=PROJECT_ID,location=CLUSTER_REGION,cluster_name=CLUSTER_NAME,name="ex-pod-affinity",namespace="default",image="perl",cmds=["perl"],arguments=["-Mbignum=bpi","-wle","print bpi(2000)"],# affinity allows you to constrain which nodes your pod is eligible to# be scheduled on, based on labels on the node. In this case, if the# label 'cloud.google.com/gke-nodepool' with value# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any# nodes, it will fail to schedule.affinity={"nodeAffinity":{# requiredDuringSchedulingIgnoredDuringExecution means in order# for a pod to be scheduled on a node, the node must have the# specified labels. However, if labels on a node change at# runtime such that the affinity rules on a pod are no longer# met, the pod will still continue to run on the node."requiredDuringSchedulingIgnoredDuringExecution":{"nodeSelectorTerms":[{"matchExpressions":[{# When nodepools are created in Google Kubernetes# Engine, the nodes inside of that nodepool are# automatically assigned the label# 'cloud.google.com/gke-nodepool' with the value of# the nodepool's name."key":"cloud.google.com/gke-nodepool","operator":"In",# The label key's value that pods can be scheduled# on."values":["pool-1",],}]}]}}},)kubernetes_full_pod=GKEStartPodOperator(task_id="ex-all-configs",name="full",project_id=PROJECT_ID,location=CLUSTER_REGION,cluster_name=CLUSTER_NAME,namespace="default",image="perl:5.34.0",# Entrypoint of the container, if not specified the Docker container's# entrypoint is used. The cmds parameter is templated.cmds=["perl"],# Arguments to the entrypoint. The docker image's CMD is used if this# is not provided. The arguments parameter is templated.arguments=["-Mbignum=bpi","-wle","print bpi(2000)"],# The secrets to pass to Pod, the Pod will fail to create if the# secrets you specify in a Secret object do not exist in Kubernetes.secrets=[],# Labels to apply to the Pod.labels={"pod-label":"label-name"},# Timeout to start up the Pod, default is 120.startup_timeout_seconds=120,# The environment variables to be initialized in the container# env_vars are templated.env_vars={"EXAMPLE_VAR":"/example/value"},# If true, logs stdout output of container. Defaults to True.get_logs=True,# Determines when to pull a fresh image, if 'IfNotPresent' will cause# the Kubelet to skip pulling an image if it already exists. If you# want to always pull a new image, set it to 'Always'.image_pull_policy="Always",# Annotations are non-identifying metadata you can attach to the Pod.# Can be a large range of data, and can include characters that are not# permitted by labels.annotations={"key1":"value1"},# Optional resource specifications for Pod, this will allow you to# set both cpu and memory limits and requirements.# Prior to Airflow 2.3 and the cncf providers package 5.0.0# resources were passed as a dictionary. This change was made in# https://github.com/apache/airflow/pull/27197# Additionally, "memory" and "cpu" were previously named# "limit_memory" and "limit_cpu"# resources={'limit_memory': "250M", 'limit_cpu': "100m"},container_resources=k8s_models.V1ResourceRequirements(limits={"memory":"250M","cpu":"100m"},),# If true, the content of /airflow/xcom/return.json from container will# also be pushed to an XCom when the container ends.do_xcom_push=False,# List of Volume objects to pass to the Pod.volumes=[],# List of VolumeMount objects to pass to the Pod.volume_mounts=[],# Affinity determines which nodes the Pod can run on based on the# config. For more information see:# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/affinity={},)delete_cluster=GKEDeleteClusterOperator(task_id="delete_cluster",name=CLUSTER_NAME,project_id=PROJECT_ID,location=CLUSTER_REGION,)create_cluster >> kubernetes_min_pod >> delete_clustercreate_cluster >> kubernetes_full_pod >> delete_clustercreate_cluster >> kubernetes_affinity_ex >> delete_clustercreate_cluster >> kubenetes_template_ex >> delete_cluster
Configurazione minima
Per avviare un pod nel cluster GKE con
GKEStartPodOperator, sono necessarie solo le opzioni project_id, location, cluster_name,
name, namespace, image e task_id.
Quando inserisci il seguente snippet di codice in un DAG, l'attività pod-ex-minimum
ha esito positivo a condizione che i parametri elencati in precedenza siano definiti e validi.
# TODO(developer): update with your valuesPROJECT_ID="my-project-id"# It is recommended to use regional clusters for increased reliability# though passing a zone in the location parameter is also validCLUSTER_REGION="us-west1"CLUSTER_NAME="example-cluster"kubernetes_min_pod=GKEStartPodOperator(# The ID specified for the task.task_id="pod-ex-minimum",# Name of task you want to run, used to generate Pod ID.name="pod-ex-minimum",project_id=PROJECT_ID,location=CLUSTER_REGION,cluster_name=CLUSTER_NAME,# Entrypoint of the container, if not specified the Docker container's# entrypoint is used. The cmds parameter is templated.cmds=["echo"],# The namespace to run within Kubernetes, default namespace is# `default`.namespace="default",# Docker image specified. Defaults to hub.docker.com, but any fully# qualified URLs will point to a custom repository. Supports private# gcr.io images if the Composer Environment is under the same# project-id as the gcr.io images and the service account that Composer# uses has permission to access the Google Container Registry# (the default service account has permission)image="gcr.io/gcp-runtimes/ubuntu_18_0_4",)
Configurazione del modello
Airflow supporta l'utilizzo di
modelli Jinja.
Devi dichiarare le variabili richieste (task_id, name, namespace
e image) con l'operatore. Come mostrato nell'esempio seguente, puoi
creare un modello di tutti gli altri parametri con Jinja, inclusi cmds, arguments
e env_vars.
Senza modificare il DAG o l'ambiente, l'attività ex-kube-templates
non va a buon fine. Imposta una variabile Airflow denominata my_value per fare in modo che questo DAG venga eseguito correttamente.
Per impostare my_value con gcloud o l'interfaccia utente di Airflow:
LOCATION con la regione in cui si trova l'ambiente.
UI di Airflow
Nell'UI di Airflow 2:
Nella barra degli strumenti, seleziona Admin > Variables.
Nella pagina List Variable (Variabile elenco), fai clic su Add a new record (Aggiungi un nuovo record).
Nella pagina Aggiungi variabile, inserisci le seguenti informazioni:
Chiave:my_value
Valore: example_value
Fai clic su Salva.
Configurazione del modello:
# TODO(developer): update with your valuesPROJECT_ID="my-project-id"# It is recommended to use regional clusters for increased reliability# though passing a zone in the location parameter is also validCLUSTER_REGION="us-west1"CLUSTER_NAME="example-cluster"kubenetes_template_ex=GKEStartPodOperator(task_id="ex-kube-templates",name="ex-kube-templates",project_id=PROJECT_ID,location=CLUSTER_REGION,cluster_name=CLUSTER_NAME,namespace="default",image="bash",# All parameters below are able to be templated with jinja -- cmds,# arguments, env_vars, and config_file. For more information visit:# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html# Entrypoint of the container, if not specified the Docker container's# entrypoint is used. The cmds parameter is templated.cmds=["echo"],# DS in jinja is the execution date as YYYY-MM-DD, this docker image# will echo the execution date. Arguments to the entrypoint. The docker# image's CMD is used if this is not provided. The arguments parameter# is templated.arguments=["{{ ds }}"],# The var template variable allows you to access variables defined in# Airflow UI. In this case we are getting the value of my_value and# setting the environment variable `MY_VALUE`. The pod will fail if# `my_value` is not set in the Airflow UI.env_vars={"MY_VALUE":"{{ var.value.my_value }}"},)
Configurazione dell'affinità dei pod
Quando configuri il parametro affinity in GKEStartPodOperator, controlli su quali nodi programmare i pod, ad esempio i nodi solo in un determinato pool di nodi. Quando hai creato il cluster, hai creato due pool di nodi denominati
pool-0 e pool-1. Questo operatore stabilisce che i pod devono essere eseguiti solo in
pool-1.
Posizione di avvio del pod Kubernetes di Cloud Composer con affinità pod (fai clic per ingrandire)
# TODO(developer): update with your valuesPROJECT_ID="my-project-id"# It is recommended to use regional clusters for increased reliability# though passing a zone in the location parameter is also validCLUSTER_REGION="us-west1"CLUSTER_NAME="example-cluster"kubernetes_affinity_ex=GKEStartPodOperator(task_id="ex-pod-affinity",project_id=PROJECT_ID,location=CLUSTER_REGION,cluster_name=CLUSTER_NAME,name="ex-pod-affinity",namespace="default",image="perl",cmds=["perl"],arguments=["-Mbignum=bpi","-wle","print bpi(2000)"],# affinity allows you to constrain which nodes your pod is eligible to# be scheduled on, based on labels on the node. In this case, if the# label 'cloud.google.com/gke-nodepool' with value# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any# nodes, it will fail to schedule.affinity={"nodeAffinity":{# requiredDuringSchedulingIgnoredDuringExecution means in order# for a pod to be scheduled on a node, the node must have the# specified labels. However, if labels on a node change at# runtime such that the affinity rules on a pod are no longer# met, the pod will still continue to run on the node."requiredDuringSchedulingIgnoredDuringExecution":{"nodeSelectorTerms":[{"matchExpressions":[{# When nodepools are created in Google Kubernetes# Engine, the nodes inside of that nodepool are# automatically assigned the label# 'cloud.google.com/gke-nodepool' with the value of# the nodepool's name."key":"cloud.google.com/gke-nodepool","operator":"In",# The label key's value that pods can be scheduled# on."values":["pool-1",],}]}]}}},)
Configurazione completa
Questo esempio mostra tutte le variabili che puoi configurare in
GKEStartPodOperator. Non è necessario modificare il codice per
la riuscita dell'attività ex-all-configs.
# TODO(developer): update with your valuesPROJECT_ID="my-project-id"# It is recommended to use regional clusters for increased reliability# though passing a zone in the location parameter is also validCLUSTER_REGION="us-west1"CLUSTER_NAME="example-cluster"kubernetes_full_pod=GKEStartPodOperator(task_id="ex-all-configs",name="full",project_id=PROJECT_ID,location=CLUSTER_REGION,cluster_name=CLUSTER_NAME,namespace="default",image="perl:5.34.0",# Entrypoint of the container, if not specified the Docker container's# entrypoint is used. The cmds parameter is templated.cmds=["perl"],# Arguments to the entrypoint. The docker image's CMD is used if this# is not provided. The arguments parameter is templated.arguments=["-Mbignum=bpi","-wle","print bpi(2000)"],# The secrets to pass to Pod, the Pod will fail to create if the# secrets you specify in a Secret object do not exist in Kubernetes.secrets=[],# Labels to apply to the Pod.labels={"pod-label":"label-name"},# Timeout to start up the Pod, default is 120.startup_timeout_seconds=120,# The environment variables to be initialized in the container# env_vars are templated.env_vars={"EXAMPLE_VAR":"/example/value"},# If true, logs stdout output of container. Defaults to True.get_logs=True,# Determines when to pull a fresh image, if 'IfNotPresent' will cause# the Kubelet to skip pulling an image if it already exists. If you# want to always pull a new image, set it to 'Always'.image_pull_policy="Always",# Annotations are non-identifying metadata you can attach to the Pod.# Can be a large range of data, and can include characters that are not# permitted by labels.annotations={"key1":"value1"},# Optional resource specifications for Pod, this will allow you to# set both cpu and memory limits and requirements.# Prior to Airflow 2.3 and the cncf providers package 5.0.0# resources were passed as a dictionary. This change was made in# https://github.com/apache/airflow/pull/27197# Additionally, "memory" and "cpu" were previously named# "limit_memory" and "limit_cpu"# resources={'limit_memory': "250M", 'limit_cpu': "100m"},container_resources=k8s_models.V1ResourceRequirements(limits={"memory":"250M","cpu":"100m"},),# If true, the content of /airflow/xcom/return.json from container will# also be pushed to an XCom when the container ends.do_xcom_push=False,# List of Volume objects to pass to the Pod.volumes=[],# List of VolumeMount objects to pass to the Pod.volume_mounts=[],# Affinity determines which nodes the Pod can run on based on the# config. For more information see:# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/affinity={},)
Elimina il cluster
Il codice mostrato qui elimina il cluster creato all'inizio della guida.
[[["Facile da capire","easyToUnderstand","thumb-up"],["Il problema è stato risolto","solvedMyProblem","thumb-up"],["Altra","otherUp","thumb-up"]],[["Difficile da capire","hardToUnderstand","thumb-down"],["Informazioni o codice di esempio errati","incorrectInformationOrSampleCode","thumb-down"],["Mancano le informazioni o gli esempi di cui ho bisogno","missingTheInformationSamplesINeed","thumb-down"],["Problema di traduzione","translationIssue","thumb-down"],["Altra","otherDown","thumb-down"]],["Ultimo aggiornamento 2025-09-03 UTC."],[[["\u003cp\u003eThis page details the use of Google Kubernetes Engine (GKE) operators within Cloud Composer, specifically showing how to create, manage, and delete GKE clusters, as well as launch Kubernetes pods.\u003c/p\u003e\n"],["\u003cp\u003eThe document walks through an example DAG that uses \u003ccode\u003eGKECreateClusterOperator\u003c/code\u003e to establish a GKE cluster, \u003ccode\u003eGKEStartPodOperator\u003c/code\u003e to deploy pods with different configurations (minimal, templated, affinity-based, and full), and \u003ccode\u003eGKEDeleteClusterOperator\u003c/code\u003e to remove the cluster.\u003c/p\u003e\n"],["\u003cp\u003e\u003ccode\u003eGKEStartPodOperator\u003c/code\u003e requires at a minimum the \u003ccode\u003eproject_id\u003c/code\u003e, \u003ccode\u003elocation\u003c/code\u003e, \u003ccode\u003ecluster_name\u003c/code\u003e, \u003ccode\u003ename\u003c/code\u003e, \u003ccode\u003enamespace\u003c/code\u003e, \u003ccode\u003eimage\u003c/code\u003e, and \u003ccode\u003etask_id\u003c/code\u003e, and the operator allows for Jinja templating in parameters like \u003ccode\u003ecmds\u003c/code\u003e, \u003ccode\u003earguments\u003c/code\u003e, and \u003ccode\u003eenv_vars\u003c/code\u003e.\u003c/p\u003e\n"],["\u003cp\u003eThe affinity configuration in \u003ccode\u003eGKEStartPodOperator\u003c/code\u003e allows for control over pod scheduling, enabling the selection of specific nodes or node pools, using labels like 'cloud.google.com/gke-nodepool' to ensure pods run on desired nodes, such as those in \u003ccode\u003epool-1\u003c/code\u003e.\u003c/p\u003e\n"],["\u003cp\u003eThe example code presented covers the creation of GKE clusters with node pools, launching pods, and deleting the clusters, alongside configuration options, including templating, affinity rules, and full parameter settings, with the provided examples detailing each of those configurations.\u003c/p\u003e\n"]]],[],null,[]]