Menggunakan KubernetesPodOperator

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Halaman ini menjelaskan cara menggunakan KubernetesPodOperator untuk men-deploy Pod Kubernetes dari Cloud Composer ke cluster Google Kubernetes Engine yang merupakan bagian dari lingkungan Cloud Composer Anda.

KubernetesPodOperator meluncurkan Kubernetes Pod di cluster lingkungan Anda. Sebagai perbandingan, operator Google Kubernetes Engine menjalankan Pod Kubernetes di cluster tertentu, yang dapat berupa cluster terpisah yang tidak terkait dengan lingkungan Anda. Anda juga dapat membuat dan menghapus cluster menggunakan operator Google Kubernetes Engine.

KubernetesPodOperator adalah opsi yang baik jika Anda memerlukan:

  • Dependensi Python kustom yang tidak tersedia melalui repositori PyPI publik.
  • Dependensi biner yang tidak tersedia di image pekerja Cloud Composer stock.

Sebelum memulai

Menyiapkan resource lingkungan Cloud Composer

Saat membuat lingkungan Cloud Composer, Anda menentukan parameter performanya, termasuk parameter performa untuk cluster lingkungan. Meluncurkan Pod Kubernetes ke cluster lingkungan dapat menyebabkan persaingan untuk resource cluster, seperti CPU atau memori. Karena penjadwal dan pekerja Airflow berada di cluster GKE yang sama, penjadwal dan pekerja tidak akan berfungsi dengan benar jika persaingan menyebabkan kehabisan resource.

Untuk mencegah kehabisan resource, lakukan satu atau beberapa tindakan berikut:

Membuat node pool

Cara yang lebih disukai untuk mencegah kehabisan resource di lingkungan Cloud Composer adalah dengan membuat node pool baru dan mengonfigurasi Pod Kubernetes untuk dijalankan hanya menggunakan resource dari kumpulan tersebut.

Konsol

  1. Di konsol Google Cloud , buka halaman Environments.

    Buka Lingkungan

  2. Klik nama lingkungan Anda.

  3. Di halaman Detail lingkungan, buka tab Konfigurasi lingkungan.

  4. Di bagian Resources > GKE cluster, ikuti link lihat detail cluster.

  5. Buat node pool seperti yang dijelaskan dalam Menambahkan node pool.

gcloud

  1. Tentukan nama cluster lingkungan Anda:

    gcloud composer environments describe ENVIRONMENT_NAME \
      --location LOCATION \
      --format="value(config.gkeCluster)"
    

    Ganti:

    • ENVIRONMENT_NAME dengan nama lingkungan.
    • LOCATION dengan region tempat lingkungan berada.
  2. Output-nya berisi nama cluster lingkungan Anda. Misalnya, ini dapat berupa europe-west3-example-enviro-af810e25-gke.

  3. Buat node pool seperti yang dijelaskan dalam Menambahkan node pool.

Meningkatkan jumlah node di lingkungan Anda

Meningkatkan jumlah node di lingkungan Cloud Composer akan meningkatkan daya komputasi yang tersedia untuk beban kerja Anda. Peningkatan ini tidak memberikan resource tambahan untuk tugas yang memerlukan lebih banyak CPU atau RAM daripada yang disediakan jenis mesin yang ditentukan.

Untuk meningkatkan jumlah node, update lingkungan Anda.

Menentukan jenis mesin yang sesuai

Selama pembuatan lingkungan Cloud Composer, Anda dapat menentukan jenis mesin. Untuk memastikan resource yang tersedia, tentukan jenis mesin untuk jenis komputasi yang terjadi di lingkungan Cloud Composer Anda.

Konfigurasi minimal

Untuk membuat KubernetesPodOperator, hanya parameter name, image Pod yang akan digunakan, dan task_id yang diperlukan. /home/airflow/composer_kube_config berisi kredensial untuk mengautentikasi ke GKE.

Airflow 2

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. In Composer 1 there is the potential for
    # the resource starvation of Airflow workers and scheduler
    # within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources,
    # and using Composer 2 will mean the environment will autoscale.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

Aliran udara 1

kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. There is the potential for the resource starvation of
    # Airflow workers and scheduler within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

Konfigurasi afinitas pod

Saat mengonfigurasi parameter affinity di KubernetesPodOperator, Anda mengontrol node tempat Pod akan dijadwalkan, seperti node hanya di node pool tertentu. Dalam contoh ini, operator hanya berjalan di node pool bernama pool-0 dan pool-1. Node lingkungan Cloud Composer 1 Anda berada di default-pool, sehingga Pod tidak berjalan di node di lingkungan Anda.

Airflow 2

# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
kubernetes_affinity_ex = KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

Aliran udara 1

kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

Saat contoh dikonfigurasi, tugas akan gagal. Jika Anda melihat log, tugas gagal karena node pool pool-0 dan pool-1 tidak ada.

Untuk memastikan node pool di values ada, buat salah satu perubahan konfigurasi berikut:

  • Jika Anda membuat node pool sebelumnya, ganti pool-0 dan pool-1 dengan nama node pool Anda, lalu upload DAG lagi.

  • Buat node pool bernama pool-0 atau pool-1. Anda dapat membuat kedua file tersebut, tetapi tugas hanya memerlukan satu file untuk berhasil.

  • Ganti pool-0 dan pool-1 dengan default-pool, yang merupakan kumpulan default yang digunakan Airflow. Kemudian, upload DAG Anda lagi.

Setelah Anda membuat perubahan, tunggu beberapa menit hingga lingkungan Anda diperbarui. Kemudian, jalankan kembali tugas ex-pod-affinity dan pastikan tugas ex-pod-affinity berhasil.

Konfigurasi tambahan

Contoh ini menunjukkan parameter tambahan yang dapat Anda konfigurasikan di KubernetesPodOperator.

Lihat referensi berikut untuk informasi selengkapnya:

Airflow 2

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        limits={"memory": "250M", "cpu": "100m"},
    ),
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Aliran udara 1

kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 1.10.4, resource specifications were
    # passed as a Pod Resources Class object,
    # If using this example on a version of Airflow prior to 1.10.4,
    # import the "pod" package from airflow.contrib.kubernetes and use
    # resources = pod.Resources() instead passing a dict
    # For more info see:
    # https://github.com/apache/airflow/pull/4551
    resources={"limit_memory": "250M", "limit_cpu": "100m"},
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    affinity={},
)

Menggunakan template Jinja

Airflow mendukung template Jinja di DAG.

Anda harus mendeklarasikan parameter Airflow yang diperlukan (task_id, name, dan image) dengan operator. Seperti yang ditunjukkan dalam contoh berikut, Anda dapat membuat template semua parameter lainnya dengan Jinja, termasuk cmds, arguments, env_vars, dan config_file.

Parameter env_vars dalam contoh ditetapkan dari Variabel Airflow bernama my_value. Contoh DAG mendapatkan nilainya dari variabel template vars di Airflow. Airflow memiliki lebih banyak variabel yang memberikan akses ke berbagai jenis informasi. Misalnya, Anda dapat menggunakan variabel template conf untuk mengakses nilai opsi konfigurasi Airflow. Untuk informasi selengkapnya dan daftar variabel yang tersedia di Airflow, lihat Referensi template dalam dokumentasi Airflow.

Tanpa mengubah DAG atau membuat variabel env_vars, tugas ex-kube-templates dalam contoh akan gagal karena variabel tidak ada. Buat variabel ini di UI Airflow atau dengan Google Cloud CLI:

UI Airflow

  1. Buka UI Airflow.

  2. Di toolbar, pilih Admin > Variables.

  3. Di halaman List Variable, klik Add a new record.

  4. Di halaman Tambahkan Variabel, masukkan informasi berikut:

    • Kunci:my_value
    • Val: example_value
  5. Klik Simpan.

Jika lingkungan Anda menggunakan Airflow 1, jalankan perintah berikut:

  1. Buka UI Airflow.

  2. Di toolbar, pilih Admin > Variables.

  3. Di halaman Variabel, klik tab Buat.

  4. Di halaman Variabel, masukkan informasi berikut:

    • Kunci:my_value
    • Val: example_value
  5. Klik Simpan.

gcloud

Masukkan perintah berikut:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Jika lingkungan Anda menggunakan Airflow 1, jalankan perintah berikut:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set my_value example_value

Ganti:

  • ENVIRONMENT dengan nama lingkungan.
  • LOCATION dengan region tempat lingkungan tersebut berada.

Contoh berikut menunjukkan cara menggunakan template Jinja dengan KubernetesPodOperator:

Airflow 2

kubenetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Aliran udara 1

kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Menggunakan Secret dan ConfigMap Kubernetes

Secret Kubernetes adalah objek yang berisi data sensitif. ConfigMap Kubernetes adalah objek yang berisi data non-rahasia dalam key-value pair.

Di Cloud Composer 2, Anda dapat membuat Secret dan ConfigMap menggunakan Google Cloud CLI, API, atau Terraform, lalu mengaksesnya dari KubernetesPodOperator.

Tentang file konfigurasi YAML

Saat membuat Secret atau ConfigMap Kubernetes menggunakan Google Cloud CLI dan API, Anda harus menyediakan file dalam format YAML. File ini harus mengikuti format yang sama seperti yang digunakan oleh Secret dan ConfigMap Kubernetes. Dokumentasi Kubernetes menyediakan banyak contoh kode ConfigMap dan Secret. Untuk memulai, Anda dapat melihat halaman Mendistribusikan Kredensial dengan Aman Menggunakan Secret dan ConfigMaps.

Sama seperti di Kubernetes Secrets, gunakan representasi base64 saat Anda menentukan nilai di Secret.

Untuk mengenkode nilai, Anda dapat menggunakan perintah berikut (ini adalah salah satu dari banyak cara untuk mendapatkan nilai yang dienkode base64):

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

Output:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Dua contoh file YAML berikut akan digunakan dalam contoh di bagian selanjutnya dalam panduan ini. Contoh file konfigurasi YAML untuk Secret Kubernetes:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Contoh lain yang menunjukkan cara menyertakan file. Sama seperti pada contoh sebelumnya, pertama-tama encode konten file (cat ./key.json | base64), lalu berikan nilai ini dalam file YAML:

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

Contoh file konfigurasi YAML untuk ConfigMap. Anda tidak perlu menggunakan representasi base64 di ConfigMap:

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

Mengelola Secret Kubernetes

Di Cloud Composer 2, Anda membuat Secret menggunakan Google Cloud CLI dan kubectl:

  1. Dapatkan informasi tentang cluster lingkungan Anda:

    1. Jalankan perintah berikut:

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"
      

      Ganti:

      • ENVIRONMENT dengan nama lingkungan Anda.
      • LOCATION dengan region tempat lingkungan Cloud Composer berada.

      Output perintah ini menggunakan format berikut: projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>.

    2. Untuk mendapatkan ID cluster GKE, salin output setelah /clusters/ (berakhiran -gke).

    3. Untuk mendapatkan zona, salin output setelah /zones/.

  2. Hubungkan ke cluster GKE Anda dengan perintah berikut:

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --zone ZONE
    

    Ganti:

    • CLUSTER_ID: ID cluster lingkungan.
    • PROJECT_ID: Project ID.
    • ZONE dengan zona tempat cluster lingkungan berada.
  3. Membuat Secret Kubernetes:

    Perintah berikut menunjukkan dua pendekatan berbeda untuk membuat Kubernetes Secrets. Pendekatan --from-literal menggunakan key-value pair. Pendekatan --from-file menggunakan konten file.

    • Untuk membuat Secret Kubernetes dengan memberikan pasangan nilai kunci, jalankan perintah berikut. Contoh ini membuat Secret bernama airflow-secrets yang memiliki kolom sql_alchemy_conn dengan nilai test_value.

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value
      
    • Untuk membuat Kubernetes Secret dengan memberikan konten file, jalankan perintah berikut. Contoh ini membuat Secret bernama service-account yang memiliki kolom service-account.json dengan nilai yang diambil dari konten file ./key.json lokal.

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json
      

Menggunakan Secret Kubernetes di DAG

Contoh ini menunjukkan dua cara menggunakan Secret Kubernetes: sebagai variabel lingkungan, dan sebagai volume yang dipasang oleh Pod.

Secret pertama, airflow-secrets, ditetapkan ke variabel lingkungan Kubernetes bernama SQL_CONN (bukan variabel lingkungan Airflow atau Cloud Composer).

Secret kedua, service-account, memasang service-account.json, file dengan token akun layanan, ke /var/secrets/google.

Berikut adalah tampilan objek Secret:

Airflow 2

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Aliran udara 1

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = secret.Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Nama Secret Kubernetes pertama ditentukan dalam variabel secret_env. Secret ini bernama airflow-secrets. Parameter deploy_type menentukan bahwa parameter tersebut harus diekspos sebagai variabel lingkungan. Nama variabel lingkungannya adalah SQL_CONN, seperti yang ditentukan dalam parameter deploy_target. Terakhir, nilai variabel lingkungan SQL_CONN ditetapkan ke nilai kunci sql_alchemy_conn.

Nama Kubernetes Secret kedua ditentukan dalam variabel secret_volume. Secret ini bernama service-account. Volume ini diekspos sebagai volume, seperti yang ditentukan dalam parameter deploy_type. Jalur file yang akan dipasang, deploy_target, adalah /var/secrets/google. Terakhir, key Secret yang disimpan di deploy_target adalah service-account.json.

Berikut adalah tampilan konfigurasi operator:

Airflow 2

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

Aliran udara 1

kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

Informasi tentang Penyedia Kubernetes CNCF

KubernetesPodOperator diimplementasikan di penyedia apache-airflow-providers-cncf-kubernetes.

Untuk mengetahui catatan rilis mendetail bagi penyedia Kubernetes CNCF, lihat situs Penyedia Kubernetes CNCF.

Versi 6.0.0

Pada paket Penyedia Kubernetes CNCF versi 6.0.0, koneksi kubernetes_default digunakan secara default di KubernetesPodOperator.

Jika Anda menentukan koneksi kustom dalam versi 5.0.0, koneksi kustom ini masih digunakan oleh operator. Untuk beralih kembali menggunakan koneksi kubernetes_default, sebaiknya sesuaikan DAG Anda.

Versi 5.0.0

Versi ini memperkenalkan beberapa perubahan yang tidak kompatibel dengan versi lama dibandingkan dengan versi 4.4.0. Yang paling penting terkait dengan koneksi kubernetes_default yang tidak digunakan di versi 5.0.0.

  • Koneksi kubernetes_default perlu diubah. Jalur konfigurasi Kubernetes harus ditetapkan ke /home/airflow/composer_kube_config (seperti yang ditunjukkan pada gambar berikut). Sebagai alternatif, config_file harus ditambahkan ke konfigurasi KubernetesPodOperator (seperti yang ditunjukkan dalam contoh kode berikut).
Kolom jalur konfigurasi Kube di UI Airflow
Gambar 1. UI Airflow, mengubah koneksi kubernetes_default (klik untuk memperbesar)
  • Ubah kode tugas menggunakan KubernetesPodOperator dengan cara berikut:
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

Untuk informasi selengkapnya tentang Versi 5.0.0, lihat Catatan Rilis Penyedia Kubernetes CNCF.

Pemecahan masalah

Bagian ini memberikan saran untuk memecahkan masalah KubernetesPodOperator umum:

Lihat log

Saat memecahkan masalah, Anda dapat memeriksa log dalam urutan berikut:

  1. Log Tugas Airflow:

    1. Di konsol Google Cloud , buka halaman Environments.

      Buka Lingkungan

    2. Di daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.

    3. Buka tab DAG.

    4. Klik nama DAG, lalu klik operasi DAG untuk melihat detail dan log.

  2. Log penjadwal Airflow:

    1. Buka halaman Detail lingkungan.

    2. Buka tab Logs.

    3. Periksa log penjadwal Airflow.

  3. Log pod di konsol Google Cloud , di bagian workload GKE. Log ini mencakup file YAML definisi Pod, peristiwa Pod, dan detail Pod.

Kode return bukan nol

Saat menggunakan KubernetesPodOperator (dan GKEStartPodOperator), kode yang ditampilkan dari titik entri penampung menentukan apakah tugas dianggap berhasil atau tidak. Kode return selain nol menunjukkan kegagalan.

Pola umum adalah menjalankan skrip shell sebagai titik entri penampung untuk menggabungkan beberapa operasi dalam penampung.

Jika Anda menulis skrip tersebut, sebaiknya sertakan perintah set -e di bagian atas skrip sehingga perintah yang gagal dalam skrip akan menghentikan skrip dan menyebarkan kegagalan ke instance tugas Airflow.

Waktu tunggu pod

Waktu tunggu default untuk KubernetesPodOperator adalah 120 detik, yang dapat menyebabkan waktu tunggu terjadi sebelum download image yang lebih besar. Anda dapat meningkatkan waktu tunggu dengan mengubah parameter startup_timeout_seconds saat membuat KubernetesPodOperator.

Jika Pod habis waktunya, log khusus tugas akan tersedia di UI Airflow. Contoh:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Waktu tunggu Pod juga dapat terjadi saat Akun Layanan Cloud Composer tidak memiliki izin IAM yang diperlukan untuk melakukan tugas yang sedang dilakukan. Untuk memverifikasinya, lihat error tingkat Pod menggunakan Dasbor GKE untuk melihat log Workload tertentu, atau gunakan Cloud Logging.

Gagal membuat koneksi baru

Upgrade otomatis diaktifkan secara default di cluster GKE. Jika node pool berada dalam cluster yang sedang diupgrade, Anda mungkin melihat error berikut:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

Untuk memeriksa apakah cluster Anda sedang diupgrade, di konsol Google Cloud , buka halaman Kubernetes clusters dan cari ikon pemuatan di samping nama cluster lingkungan Anda.

Langkah selanjutnya