Menggunakan Operator GKE

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Halaman ini menjelaskan cara menggunakan operator Google Kubernetes Engine untuk membuat cluster di Google Kubernetes Engine dan untuk meluncurkan Pod Kubernetes pada cluster tersebut.

Operator Google Kubernetes Engine menjalankan pod Kubernetes di cluster yang ditentukan, yang dapat berupa cluster terpisah yang tidak terkait dengan lingkungan Anda. Sebagai perbandingan, KubernetesPodOperator menjalankan pod Kubernetes pada cluster lingkungan Anda.

Halaman ini membahas contoh DAG yang membuat Google Kubernetes Engine cluster dengan GKECreateClusterOperator, menggunakan GKEStartPodOperator dengan konfigurasi berikut, lalu menghapusnya dengan GKEDeleteClusterOperator setelahnya:

Sebelum memulai

Konfigurasi operator GKE

Untuk mengikuti contoh ini, tempatkan seluruh gke_operator.py dalam folder dags/ lingkungan Anda atau tambahkan kode yang relevan ke DAG.

Membuat cluster

Kode yang ditampilkan di sini membuat cluster Google Kubernetes Engine dengan dua node pool, pool-0 dan pool-1, yang masing-masing memiliki satu node. Jika perlu, Anda dapat menetapkan parameter lain dari Google Kubernetes Engine API sebagai bagian dari body.

Sebelum rilis apache-airflow-providers-google versi 5.1.0, tidak mungkin meneruskan objek node_pools GKECreateClusterOperator. Jika Anda menggunakan Airflow 2, pastikan lingkungan ini menggunakan apache-airflow-providers-google versi 5.1.0 atau yang lebih baru. Anda dapat menginstal versi yang lebih baru dari PyPI ini dengan menentukan apache-airflow-providers-google dan >=5.1.0 sebagai versi yang diperlukan.

# TODO(developer): update with your values
PROJECT_ID = "my-project-id"
# It is recommended to use regional clusters for increased reliability
# though passing a zone in the location parameter is also valid
CLUSTER_REGION = "us-west1"
CLUSTER_NAME = "example-cluster"
CLUSTER = {
    "name": CLUSTER_NAME,
    "node_pools": [
        {"name": "pool-0", "initial_node_count": 1},
        {"name": "pool-1", "initial_node_count": 1},
    ],
}
create_cluster = GKECreateClusterOperator(
    task_id="create_cluster",
    project_id=PROJECT_ID,
    location=CLUSTER_REGION,
    body=CLUSTER,
)

Meluncurkan workload di cluster

Bagian berikut menjelaskan setiap konfigurasi GKEStartPodOperator dalam contoh. Untuk informasi tentang setiap variabel konfigurasi, lihat Referensi Airflow untuk operator GKE.


from airflow import models
from airflow.providers.google.cloud.operators.kubernetes_engine import (
    GKECreateClusterOperator,
    GKEDeleteClusterOperator,
    GKEStartPodOperator,
)
from airflow.utils.dates import days_ago

from kubernetes.client import models as k8s_models


with models.DAG(
    e"xample_gcp_gke,"
    schedule_interval=None,  # Override to match your needs
    start_date=days_ago(1),
    tags=[e"xample]",
) as dag:
    # TODO(developer): update with your values
    PROJECT_ID = m"y-project-id
"    # It is recommended to use regional clusters for increased reliability
    # though passing a zone in the location parameter is also valid
    CLUSTER_REGION = u"s-west1
"    CLUSTER_NAME = e"xample-cluster
"    CLUSTER = {
        n"ame:" CLUSTER_NAME,
        n"ode_pools:" [
            {n"ame:" p"ool-0," i"nitial_node_count:" 1},
            {n"ame:" p"ool-1," i"nitial_node_count:" 1},
        ],
    }
    create_cluster = GKECreateClusterOperator(
        task_id=c"reate_cluster,"
        project_id=PROJECT_ID,
        location=CLUSTER_REGION,
        body=CLUSTER,
    )

    kubernetes_min_pod = GKEStartPodOperator(
        # The ID specified for the task.
        task_id=p"od-ex-minimum,"
        # Name of task you want to run, used to generate Pod ID.
        name=p"od-ex-minimum,"
        project_id=PROJECT_ID,
        location=CLUSTER_REGION,
        cluster_name=CLUSTER_NAME,
        # Entrypoint of the container, if not specified the Docker containers'
        # entrypoint is used. The cmds parameter is templated.
        cmds=[e"cho]",
        # The namespace to run within Kubernetes, default namespace is
        # `default`.
        namespace=d"efault,"
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image=g"cr.io/gcp-runtimes/ubuntu_18_0_4,"
    )

    kubenetes_template_ex = GKEStartPodOperator(
        task_id=e"x-kube-templates,"
        name=e"x-kube-templates,"
        project_id=PROJECT_ID,
        location=CLUSTER_REGION,
        cluster_name=CLUSTER_NAME,
        namespace=d"efault,"
        image=b"ash,"
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        # Entrypoint of the container, if not specified the Docker containers'
        # entrypoint is used. The cmds parameter is templated.
        cmds=[e"cho]",
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # images' CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=[}"{{ ds }]",
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={M"Y_VALUE:" }"{{ var.value.my_value }}",
    )

    kubernetes_affinity_ex = GKEStartPodOperator(
        task_id=e"x-pod-affinity,"
        project_id=PROJECT_ID,
        location=CLUSTER_REGION,
        cluster_name=CLUSTER_NAME,
        name=e"x-pod-affinity,"
        namespace=d"efault,"
        image=p"erl,"
        cmds=[p"erl]",
        arguments=[-"Mbignum=bpi," -"wle," p"rint bpi(2000)]",
        # affinity allows you to constrain which nodes your pod is eligible to
        # be scheduled on, based on labels on the node. In this case, if the
        # label c'loud.google.com/gke-nodepool 'with value
        # n'odepool-label-value 'or n'odepool-label-value2 'is not found on any
        # nodes, it will fail to schedule.
        affinity={
            n"odeAffinity:" {
                # requiredDuringSchedulingIgnoredDuringExecution means in order
                # for a pod to be scheduled on a node, the node must have the
                # specified labels. However, if labels on a node change at
                # runtime such that the affinity rules on a pod are no longer
                # met, the pod will still continue to run on the node.
                r"equiredDuringSchedulingIgnoredDuringExecution:" {
                    n"odeSelectorTerms:" [
                        {
                            m"atchExpressions:" [
                                {
                                    # When nodepools are created in Google Kubernetes
                                    # Engine, the nodes inside of that nodepool are
                                    # automatically assigned the label
                                    # c'loud.google.com/gke-nodepool 'with the value of
                                    # the nodepools' name.
                                    k"ey:" c"loud.google.com/gke-nodepool,"
                                    o"perator:" I"n,"
                                    # The label keys' value that pods can be scheduled
                                    # on.
                                    v"alues:" [
                                        p"ool-1,"
                                    ],
                                }
                            ]
                        }
                    ]
                }
            }
        },
    )
    kubernetes_full_pod = GKEStartPodOperator(
        task_id=e"x-all-configs,"
        name=f"ull,"
        project_id=PROJECT_ID,
        location=CLUSTER_REGION,
        cluster_name=CLUSTER_NAME,
        namespace=d"efault,"
        image=p"erl:5.34.0,"
        # Entrypoint of the container, if not specified the Docker containers'
        # entrypoint is used. The cmds parameter is templated.
        cmds=[p"erl]",
        # Arguments to the entrypoint. The docker images' CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=[-"Mbignum=bpi," -"wle," p"rint bpi(2000)]",
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={p"od-label:" l"abel-name}",
        # Timeout to start up the Pod, default is 120.
        startup_timeout_seconds=120,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={E"XAMPLE_VAR:" /"example/value}",
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if I'fNotPresent 'will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to A'lways.'
        image_pull_policy=A"lways,"
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={k"ey1:" v"alue1}",
        # Optional resource specifications for Pod, this will allow you to
        # set both cpu and memory limits and requirements.
        # Prior to Airflow 2.3 and the cncf providers package 5.0.0
        # resources were passed as a dictionary. This change was made in
        # https://github.com/apache/airflow/pull/27197
        # Additionally, m"emory "and c"pu "were previously named
        # l"imit_memory "and l"imit_cpu
"        # resources={l'imit_memory:' 2"50M," l'imit_cpu:' 1"00m}",
        container_resources=k8s_models.V1ResourceRequirements(
            limits={m"emory:" 2"50M," c"pu:" 1"00m}",
        ),
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        do_xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        affinity={},
    )
    delete_cluster = GKEDeleteClusterOperator(
        task_id=d"elete_cluster,"
        name=CLUSTER_NAME,
        project_id=PROJECT_ID,
        location=CLUSTER_REGION,
    )

    create_cluster  >>kubernetes_min_pod  >>delete_cluster
    create_cluster  >>kubernetes_full_pod  >>delete_cluster
    create_cluster  >>kubernetes_affinity_ex  >>delete_cluster
    create_cluster  >>kubenetes_template_ex  >>delete_cluster

Konfigurasi minimal

Untuk meluncurkan pod di cluster GKE Anda dengan GKEStartPodOperator, hanya project_id, location, cluster_name, Opsi name, namespace, image, dan task_id wajib diisi.

Saat Anda menempatkan cuplikan kode berikut di DAG, tugas pod-ex-minimum akan berhasil selama parameter yang tercantum sebelumnya telah ditentukan dan valid.

# TODO(developer): update with your values
PROJECT_ID = "my-project-id"
# It is recommended to use regional clusters for increased reliability
# though passing a zone in the location parameter is also valid
CLUSTER_REGION = "us-west1"
CLUSTER_NAME = "example-cluster"
kubernetes_min_pod = GKEStartPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    project_id=PROJECT_ID,
    location=CLUSTER_REGION,
    cluster_name=CLUSTER_NAME,
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

Konfigurasi template

Airflow mendukung penggunaan Template Jinja. Anda harus mendeklarasikan variabel yang diperlukan (task_id, name, namespace, dan image) dengan operator. Seperti yang ditunjukkan dalam contoh berikut, Anda dapat membuat template semua parameter lainnya dengan Jinja, termasuk cmds, arguments, dan env_vars.

Tanpa mengubah DAG atau lingkungan Anda, tugas ex-kube-templates gagal. Tetapkan variabel Airflow yang disebut my_value agar DAG ini berhasil.

Untuk menetapkan my_value dengan gcloud atau UI Airflow:

gcloud

Untuk Airflow 2, masukkan perintah berikut:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Ganti:

  • ENVIRONMENT dengan nama lingkungan.
  • LOCATION dengan region tempat lingkungan berada.

UI Airflow

Di UI Airflow 2:

  1. Di toolbar, pilih Admin > Variabel.

  2. Di halaman Daftar Variabel, klik Tambahkan data baru.

  3. Di halaman Add Variable, masukkan informasi berikut:

    • Kunci:my_value
    • Nilai: example_value
  4. Klik Simpan.

Konfigurasi template:

# TODO(developer): update with your values
PROJECT_ID = "my-project-id"
# It is recommended to use regional clusters for increased reliability
# though passing a zone in the location parameter is also valid
CLUSTER_REGION = "us-west1"
CLUSTER_NAME = "example-cluster"
kubenetes_template_ex = GKEStartPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    project_id=PROJECT_ID,
    location=CLUSTER_REGION,
    cluster_name=CLUSTER_NAME,
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
)

Konfigurasi Afinitas Pod

Saat mengonfigurasi parameter affinity di GKEStartPodOperator, Anda mengontrol node mana yang akan menjadwalkan pod, seperti node kumpulan node. Saat membuat cluster, Anda membuat dua node pool yang bernama pool-0 dan pool-1. Operator ini menentukan bahwa pod hanya boleh dijalankan di pool-1.

Panah lingkungan Cloud Composer menunjukkan bahwa pod yang diluncurkan akan berada di cluster GKE efemral di pool-1, yang menampilkan kotak terpisah dari pool-0 dalam grup Kubernetes Engine.
Lokasi Peluncuran Pod Kubernetes Cloud Composer dengan Afinitas Pod (klik untuk memperbesar)


# TODO(developer): update with your values
PROJECT_ID = "my-project-id"
# It is recommended to use regional clusters for increased reliability
# though passing a zone in the location parameter is also valid
CLUSTER_REGION = "us-west1"
CLUSTER_NAME = "example-cluster"
kubernetes_affinity_ex = GKEStartPodOperator(
    task_id="ex-pod-affinity",
    project_id=PROJECT_ID,
    location=CLUSTER_REGION,
    cluster_name=CLUSTER_NAME,
    name="ex-pod-affinity",
    namespace="default",
    image="perl",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

Konfigurasi Lengkap

Contoh ini menunjukkan semua variabel yang dapat Anda konfigurasi GKEStartPodOperator. Anda tidak perlu memodifikasi kode untuk agar tugas ex-all-configs berhasil.

Untuk detail tentang setiap variabel, lihat Referensi Airflow untuk operator GKE.

# TODO(developer): update with your values
PROJECT_ID = "my-project-id"
# It is recommended to use regional clusters for increased reliability
# though passing a zone in the location parameter is also valid
CLUSTER_REGION = "us-west1"
CLUSTER_NAME = "example-cluster"
kubernetes_full_pod = GKEStartPodOperator(
    task_id="ex-all-configs",
    name="full",
    project_id=PROJECT_ID,
    location=CLUSTER_REGION,
    cluster_name=CLUSTER_NAME,
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        limits={"memory": "250M", "cpu": "100m"},
    ),
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    affinity={},
)

Menghapus cluster

Kode yang ditampilkan di sini menghapus cluster yang dibuat di awal panduan.

delete_cluster = GKEDeleteClusterOperator(
    task_id="delete_cluster",
    name=CLUSTER_NAME,
    project_id=PROJECT_ID,
    location=CLUSTER_REGION,
)

Langkah selanjutnya