Menggunakan KubernetesPodOperator

Cloud Composer 1 | Cloud Composer 2

Halaman ini menjelaskan cara menggunakan KubernetesPodOperator untuk men-deploy pod Kubernetes dari Cloud Composer ke cluster Google Kubernetes Engine yang merupakan bagian dari lingkungan Cloud Composer Anda dan untuk memastikan lingkungan Anda memiliki resource yang sesuai.

KubernetesPodOperator meluncurkan pod Kubernetes di cluster lingkungan Anda. Sebagai perbandingan, operator Google Kubernetes Engine menjalankan pod Kubernetes di cluster tertentu, yang dapat berupa cluster terpisah yang tidak terkait dengan lingkungan Anda. Anda juga dapat membuat dan menghapus cluster menggunakan operator Google Kubernetes Engine.

KubernetesPodOperator adalah opsi yang baik jika Anda memerlukan:

  • Dependensi Python kustom yang tidak tersedia melalui repositori PyPI publik.
  • Dependensi biner yang tidak tersedia di image pekerja Cloud Composer stok.

Halaman ini akan memandu Anda melihat contoh DAG Airflow yang mencakup konfigurasi KubernetesPodOperator berikut:

Sebelum memulai

  • Di Cloud Composer 2, cluster lingkungan Anda akan diskalakan secara otomatis. Workload tambahan yang Anda jalankan menggunakan KubernetesPodOperator akan diskalakan secara terpisah dari lingkungan Anda. Lingkungan Anda tidak terpengaruh oleh peningkatan permintaan resource, tetapi skala cluster lingkungan Anda naik dan turun bergantung pada permintaan resource. Harga untuk beban kerja tambahan yang Anda jalankan di cluster lingkungan mengikuti model harga Cloud Composer 2 dan menggunakan SKU Compute Cloud Composer.

  • Cluster Cloud Composer 2 menggunakan Workload Identity. Secara default, Pod yang berjalan di namespace yang baru dibuat atau namespace composer-user-workloads tidak dapat mengakses resource Google Cloud. Saat menggunakan Workload Identity, akun layanan Kubernetes yang terkait dengan namespace harus dipetakan ke akun layanan Google Cloud agar dapat mengaktifkan otorisasi identitas layanan untuk permintaan ke Google API dan layanan lainnya.

    Oleh karena itu, jika Anda menjalankan Pod di namespace composer-user-workloads atau namespace yang baru dibuat di cluster lingkungan Anda, binding IAM yang tepat antara akun layanan Kubernetes dan Google Cloud tidak akan dibuat, dan Pod ini tidak dapat mengakses resource project Google Cloud Anda.

    Jika ingin Pod memiliki akses ke resource Google Cloud, gunakan namespace composer-user-workloads atau buat namespace Anda sendiri seperti yang dijelaskan lebih lanjut.

    Untuk memberikan akses ke resource project Anda, ikuti panduan di Workload Identity dan siapkan binding:

    1. Buat namespace terpisah di cluster lingkungan Anda.
    2. Buat binding antara Akun Layanan Kubernetes composer-user-workloads/<namespace_name> dan akun layanan lingkungan Anda.
    3. Tambahkan anotasi akun layanan lingkungan Anda ke akun layanan Kubernetes.
    4. Saat Anda menggunakan KubernetesPodOperator, tentukan namespace dan akun layanan Kubernetes di parameter namespace dan service_account_name.
  • Jika versi 5.0.0 Penyedia Kubernetes CNCF digunakan, ikuti petunjuk yang didokumentasikan bagian CNCF Kubernetes Provider.

  • Cloud Composer 2 menggunakan cluster GKE dengan Workload Identity. Server metadata GKE memerlukan waktu beberapa detik untuk mulai menerima permintaan pada Pod yang baru dibuat. Oleh karena itu, upaya autentikasi menggunakan Workload Identity dalam beberapa detik pertama masa aktif Pod mungkin akan gagal. Anda dapat membaca selengkapnya tentang batasan ini di sini.

  • Cloud Composer 2 menggunakan cluster Autopilot yang memperkenalkan konsep class komputasi. Secara default, jika tidak ada class yang dipilih, class general-purpose diasumsikan saat Anda membuat Pod menggunakan KubernetesPodOperator.

    • Setiap class dikaitkan dengan properti dan batas resource tertentu. Anda dapat membacanya di dokumentasi Autopilot. Misalnya, Pod yang berjalan dalam class general-purpose dapat menggunakan memori hingga 110 GiB.

Konfigurasi KubernetesPodOperator

Untuk mengikuti contoh ini, tempatkan seluruh file kubernetes_pod_operator.py di folder dags/ lingkungan Anda atau tambahkan kode KubernetesPodOperator yang relevan ke DAG.

Bagian berikut menjelaskan setiap konfigurasi KubernetesPodOperator dalam contoh. Untuk mengetahui informasi tentang setiap variabel konfigurasi, lihat Referensi Airflow.

"""Example DAG using KubernetesPodOperator."""
import datetime

from airflow import models
from airflow.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from kubernetes.client import models as k8s_models

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.
secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
    dag_id="composer_sample_kubernetes_pod",
    schedule_interval=datetime.timedelta(days=1),
    start_date=YESTERDAY,
) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, the config file found at
    # `/home/airflow/composer_kube_config` contains credentials for
    # Cloud Composer's Google Kubernetes Engine cluster that is created
    # upon environment creation.
    kubernetes_min_pod = KubernetesPodOperator(
        # The ID specified for the task.
        task_id="pod-ex-minimum",
        # Name of task you want to run, used to generate Pod ID.
        name="pod-ex-minimum",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # The namespace to run within Kubernetes. In Composer 2 environments
        # after December 2022, the default namespace is
        # `composer-user-workloads`.
        namespace="composer-user-workloads",
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_template_ex = KubernetesPodOperator(
        task_id="ex-kube-templates",
        name="ex-kube-templates",
        namespace="composer-user-workloads",
        image="bash",
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=["{{ ds }}"],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_secret_vars_ex = KubernetesPodOperator(
        task_id="ex-kube-secrets",
        name="ex-kube-secrets",
        namespace="composer-user-workloads",
        image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
        startup_timeout_seconds=300,
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        cmds=["echo"],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
        env_vars={
            "EXAMPLE_VAR": "/example/value",
            "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
        },
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_full_pod = KubernetesPodOperator(
        task_id="ex-all-configs",
        name="pi",
        namespace="composer-user-workloads",
        image="perl:5.34.0",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["perl"],
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={"pod-label": "label-name"},
        # Timeout to start up the Pod, default is 600.
        startup_timeout_seconds=600,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={"EXAMPLE_VAR": "/example/value"},
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        image_pull_policy="Always",
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={"key1": "value1"},
        # Optional resource specifications for Pod, this will allow you to
        # set both cpu and memory limits and requirements.
        # Prior to Airflow 2.3 and the cncf providers package 5.0.0
        # resources were passed as a dictionary. This change was made in
        # https://github.com/apache/airflow/pull/27197
        # Additionally, "memory" and "cpu" were previously named
        # "limit_memory" and "limit_cpu"
        # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
        container_resources=k8s_models.V1ResourceRequirements(
            requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
            limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        ),
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        do_xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        # Pod affinity with the KubernetesPodOperator
        # is not supported with Composer 2
        # instead, create a cluster and use the GKEStartPodOperator
        # https://cloud.google.com/composer/docs/using-gke-operator
        affinity={},
    )

Konfigurasi minimal

Untuk membuat KubernetesPodOperator, hanya name Pod, namespace tempat menjalankan pod, image untuk digunakan, dan task_id yang diperlukan.

Jika Anda menempatkan cuplikan kode berikut di DAG, konfigurasi akan menggunakan default di /home/airflow/composer_kube_config. Anda tidak perlu mengubah kode agar tugas pod-ex-minimum berhasil.

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Konfigurasi template

Airflow mendukung penggunaan Jinja Templating. Anda harus mendeklarasikan variabel yang diperlukan (task_id, name, namespace, dan image) dengan operator. Seperti yang ditunjukkan dalam contoh berikut, Anda dapat membuat template semua parameter lainnya dengan Jinja, termasuk cmds, arguments, env_vars, dan config_file.

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Tanpa mengubah DAG atau lingkungan Anda, tugas ex-kube-templates akan gagal karena dua error. Log menunjukkan bahwa tugas ini gagal karena variabel yang sesuai tidak ada (my_value). Error kedua, yang bisa Anda dapatkan setelah memperbaiki error pertama, menunjukkan bahwa tugas gagal karena core/kube_config tidak ditemukan di config.

Untuk memperbaiki kedua error tersebut, ikuti langkah-langkah yang dijelaskan lebih lanjut.

Untuk menetapkan my_value dengan gcloud atau UI Airflow:

UI Airflow

Di UI Airflow 2:

  1. Buka Airflow UI.

  2. Di toolbar, pilih Admin > Variabel.

  3. Di halaman List Variable, klik Add a new record.

  4. Di halaman Tambahkan Variabel, masukkan informasi berikut:

    • Tombol:my_value
    • Nilai: example_value
  5. Klik Save.

gcloud

Untuk Airflow 2, masukkan perintah berikut:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Ganti:

  • ENVIRONMENT dengan nama lingkungan.
  • LOCATION dengan region tempat lingkungan berada.

Untuk merujuk ke config_file kustom (file konfigurasi Kubernetes), ganti opsi konfigurasi Airflow kube_config ke konfigurasi Kubernetes yang valid:

Bagian Kunci Nilai
core kube_config /home/airflow/composer_kube_config

Tunggu beberapa menit sampai lingkungan Anda selesai diperbarui. Kemudian, jalankan lagi tugas ex-kube-templates dan verifikasi bahwa tugas ex-kube-templates berhasil.

Konfigurasi variabel secret

Rahasia Kubernetes adalah objek yang berisi data sensitif. Anda dapat meneruskan secret ke pod Kubernetes menggunakan KubernetesPodOperator. Secret harus ditentukan di Kubernetes. Jika tidak, pod akan gagal diluncurkan.

Contoh ini menunjukkan dua cara penggunaan Secret Kubernetes: sebagai variabel lingkungan, dan sebagai volume yang dipasang oleh pod.

Rahasia pertama, airflow-secrets, ditetapkan ke variabel lingkungan Kubernetes bernama SQL_CONN (bukan variabel lingkungan Airflow atau Cloud Composer).

Rahasia kedua, service-account, memasang service-account.json, file dengan token akun layanan, ke /var/secrets/google.

Rahasianya akan terlihat seperti berikut:

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Nama rahasia Kubernetes pertama ditentukan dalam variabel secret. Rahasia khusus ini diberi nama airflow-secrets. ID ini diekspos sebagai variabel lingkungan, sebagaimana ditentukan oleh deploy_type. Variabel lingkungan yang ditetapkannya, deploy_target, adalah SQL_CONN. Terakhir, key rahasia yang disimpan di deploy_target adalah sql_alchemy_conn.

Nama rahasia Kubernetes kedua ditentukan dalam variabel secret. Rahasia khusus ini diberi nama service-account. Aktivitas ditampilkan sebagai volume, sebagaimana ditentukan oleh deploy_type. Jalur file yang akan dipasang, deploy_target, adalah /var/secrets/google. Terakhir, key rahasia yang disimpan di deploy_target adalah service-account.json.

Berikut tampilan konfigurasi operator:

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="composer-user-workloads",
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    cmds=["echo"],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
    },
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Tanpa membuat perubahan pada DAG atau lingkungan Anda, tugas ex-kube-secrets akan gagal. Jika Anda melihat log, tugas akan gagal karena error Pod took too long to start. Error ini terjadi karena Airflow tidak dapat menemukan rahasia yang ditentukan dalam konfigurasi, secret_env.

gcloud

Untuk menetapkan rahasia menggunakan gcloud:

  1. Dapatkan informasi tentang cluster lingkungan Cloud Composer Anda.

    1. Jalankan perintah berikut:

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"
      

      Ganti:

      • ENVIRONMENT dengan nama lingkungan Anda.
      • LOCATION dengan region tempat lingkungan Cloud Composer berada.

      Output perintah ini menggunakan format berikut: projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id>.

    2. Untuk mendapatkan ID cluster GKE, salin output setelah /clusters/ (diakhiri dengan -gke).

  2. Hubungkan ke cluster GKE Anda dengan menjalankan perintah berikut:

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --region LOCATION
    

    Ganti:

    • CLUSTER_ID dengan ID cluster GKE Anda.
    • PROJECT dengan ID project Google Cloud Anda.
    • LOCATION dengan region tempat lingkungan Cloud Composer berada.

  3. Membuat secret Kubernetes.

    1. Buat secret Kubernetes yang menetapkan nilai sql_alchemy_conn ke test_value dengan menjalankan perintah berikut:

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
      

    2. Buat secret Kubernetes yang menetapkan nilai service-account.json ke jalur lokal file kunci akun layanan yang disebut key.json dengan menjalankan perintah berikut:

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json -n composer-user-workloads
      

  4. Setelah menetapkan secret, jalankan kembali tugas ex-kube-secrets di UI Airflow.

  5. Pastikan tugas ex-kube-secrets berhasil.

Konfigurasi penuh

Contoh ini menunjukkan semua variabel yang dapat Anda konfigurasi di KubernetesPodOperator. Anda tidak perlu mengubah kode agar tugas ex-all-configs berhasil.

Untuk mengetahui detail tentang setiap variabel, lihat referensi KubernetesPodOperator Airflow.

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Informasi tentang Penyedia Kubernetes CNCF

GKEStartPodOperator dan KubernetesPodOperator diimplementasikan dalam penyedia apache-airflow-providers-cncf-kubernetes.

Untuk mengetahui catatan rilis yang gagal bagi penyedia Kubernetes CNCF, lihat situs Penyedia Kubernetes CNCF.

Versi 6.0.0

Pada paket Penyedia Kubernetes CNCF versi 6.0.0, koneksi kubernetes_default digunakan secara default di KubernetesPodOperator.

Jika Anda menentukan koneksi kustom dalam versi 5.0.0, koneksi kustom ini masih digunakan oleh operator. Untuk beralih kembali menggunakan koneksi kubernetes_default, Anda mungkin perlu menyesuaikan DAG.

Versi 5.0.0

Versi ini memperkenalkan beberapa perubahan yang tidak kompatibel dengan versi sebelumnya dibandingkan dengan versi 4.4.0. Hal terpenting yang harus Anda ketahui berkaitan dengan koneksi kubernetes_default yang tidak digunakan di versi 5.0.0.

  • Koneksi kubernetes_default perlu diubah - Jalur konfigurasi Kube harus disetel ke /home/airflow/composer_kube_config (lihat Gambar 1) atau config_file perlu ditambahkan ke konfigurasi KubernetesPodOperator (seperti yang disajikan di bawah).
Kolom jalur konfigurasi Kube di UI Airflow
Gambar 1. UI Airflow, mengubah koneksi kubernetes_default (klik untuk memperbesar)
  • Ubah kode tugas menggunakan KubernetesPodOperator dengan cara berikut
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

Untuk mengetahui informasi selengkapnya tentang Versi 5.0.0, lihat Catatan Rilis Penyedia Kubernetes CNCF

Pemecahan masalah

Tips untuk memecahkan masalah kegagalan Pod

Selain memeriksa log tugas di UI Airflow, periksa juga log berikut:

  • Output penjadwal dan pekerja Airflow:

    1. Di konsol Google Cloud, buka halaman Environments.

      Buka Lingkungan

    2. Ikuti link DAG untuk lingkungan Anda.

    3. Di bucket lingkungan Anda, naik satu tingkat.

    4. Tinjau log di folder logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>.

  • Log pod mendetail di Konsol Google Cloud di bagian workload GKE. Log ini mencakup file YAML definisi pod, peristiwa pod, dan detail pod.

Kode pengembalian bukan nol jika juga menggunakan GKEStartPodOperator

Saat menggunakan KubernetesPodOperator dan GKEStartPodOperator, kode yang ditampilkan dari titik entri container menentukan apakah tugas dianggap berhasil atau tidak. Kode pengembalian bukan nol menunjukkan kegagalan.

Pola yang umum saat menggunakan KubernetesPodOperator dan GKEStartPodOperator adalah menjalankan skrip shell sebagai titik entri container untuk mengelompokkan beberapa operasi dalam container.

Jika Anda menulis skrip semacam itu, sebaiknya sertakan perintah set -e di bagian atas skrip, sehingga perintah yang gagal dalam skrip akan menghentikan skrip dan menerapkan kegagalan tersebut ke instance tugas Airflow.

Waktu tunggu pod

Waktu tunggu default untuk KubernetesPodOperator adalah 120 detik, yang dapat mengakibatkan waktu tunggu yang terjadi sebelum download gambar yang lebih besar. Anda dapat meningkatkan waktu tunggu dengan mengubah parameter startup_timeout_seconds saat membuat KubernetesPodOperator.

Saat waktu pod habis, log khusus tugas akan tersedia di UI Airflow. Contoh:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Waktu Tunggu Pod juga dapat terjadi jika Akun Layanan Cloud Composer tidak memiliki izin IAM yang diperlukan untuk menjalankan tugas yang sedang dikerjakan. Untuk memverifikasinya, pelajari error level pod menggunakan Dasbor GKE untuk melihat log untuk Beban Kerja tertentu Anda, atau gunakan Cloud Logging.

Gagal membuat koneksi baru

Upgrade otomatis diaktifkan secara default di cluster GKE. Jika kumpulan node berada di cluster yang sedang diupgrade, Anda mungkin melihat error berikut:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

Untuk memeriksa apakah cluster Anda sedang diupgrade, di Konsol Google Cloud, buka halaman Cluster Kubernetes dan cari ikon pemuatan di samping nama cluster lingkungan Anda.

Langkah selanjutnya