KubernetesPodOperator を使用する

Cloud Composer 1 | Cloud Composer 2

このページでは、KubernetesPodOperator を使用して Kubernetes Pod を Cloud Composer から Cloud Composer 環境の一部である Google Kubernetes Engine クラスタにデプロイし、環境に適切なリソースを確保する方法について説明します。

KubernetesPodOperator は、使用中の環境のクラスタKubernetes Pod を起動します。これに対し、Google Kubernetes Engine 演算子は、指定されたクラスタ内で Kubernetes Pod を実行します。このクラスタは、使用中の環境とは関係がない別のクラスタであっても構いません。Google Kubernetes Engine 演算子を使用してクラスタを作成、削除することもできます。

KubernetesPodOperator は、以下を必要とする場合に適しています。

  • 公開 PyPI リポジトリでは使用できないカスタム Python 依存関係。
  • Cloud Composer ワーカー イメージでは使用できないバイナリ依存関係。

このページでは、次の KubernetesPodOperator 構成を含む Airflow DAG の例について説明します。

始める前に

  • Cloud Composer 2 では、環境のクラスタは自動的にスケーリングされます。KubernetesPodOperator を使用して実行する追加のワークロードは、環境とは独立してスケーリングされます。 環境は、リソース需要の増加による影響を受けませんが、環境のクラスタは、リソース需要に応じてスケールアップおよびスケールダウンされます。環境のクラスタで実行する追加のワークロードの料金は Cloud Composer 2 料金モデルに従い、Cloud Composer Compute SKU を使用します。

  • Cloud Composer 2 は Workload Identity で GKE クラスタを使用します。デフォルトでは、新しく作成された名前空間または composer-user-workloads 名前空間で実行される Pod は Google Cloud リソースにアクセスできません。Workload Identity を使用する場合は、名前空間に関連付けられた Kubernetes サービス アカウントを Google Cloud サービス アカウントにマッピングして、Google API やその他のサービスへのリクエストに対してサービス ID の承認を有効にする必要があります。

    このため、環境のクラスタ内で composer-user-workloads 名前空間または新しく作成された名前空間で Pod を実行すると、Kubernetes と Google Cloud サービス アカウント間の適切な IAM バインディングが作成されず、これらの Pod は、Google Cloud プロジェクトのリソースにアクセスできません。

    Pod が Google Cloud リソースにアクセスできるようにするには、後で説明するように、composer-user-workloads 名前空間を使用するか、独自の名前空間を作成します。

    プロジェクトのリソースへのアクセス権を付与するには、Workload Identity のガイダンスに沿って、次のようにバインディングを設定します。

    1. 環境のクラスタに別の名前空間を作成します。
    2. composer-user-workloads/<namespace_name> Kubernetes サービス アカウントと環境のサービス アカウントの間にバインディングを作成します。
    3. 環境のサービス アカウント アノテーションを Kubernetes サービス アカウントに追加します。
    4. KubernetesPodOperator を使用する場合は、namespace パラメータと service_account_name パラメータで名前空間と Kubernetes サービス アカウントを指定します。
  • Cloud Composer 2 は Workload Identity で GKE クラスタを使用します。GKE メタデータ サーバーが新しく作成された Pod でのリクエストの承認を開始するまでに数秒かかります。したがって、Pod の有効期間の最初の数秒以内に Workload Identity を使用して認証を試みると、失敗する可能性があります。この制限について詳しくは、Workload Identity の制限事項をご覧ください。

  • Cloud Composer 2 では、コンピューティング クラスの概念が導入されている Autopilot クラスタを使用します。

    • デフォルトでは、クラスが選択されていない場合、KubernetesPodOperator を使用して Pod を作成する際には general-purpose クラスが想定されます。

    • 各クラスは特定のプロパティとリソース制限に関連付けられています。それらの詳細については、Autopilot のドキュメントをご覧ください。たとえば、general-purpose クラス内で実行される Pod では、最大 110 GiB のメモリを使用できます。

  • CNCF Kubernetes プロバイダのバージョン 5.0.0 を使用している場合は、CNCF Kubernetes プロバイダ セクションに記載されている手順に沿って操作します。

KubernetesPodOperator の構成

この例の操作を行うためには、kubernetes_pod_operator.py ファイル全体をご使用の環境の dags/ フォルダに配置するか、関連する KubernetesPodOperator コードを DAG に追加します。

次のセクションでは、この例のそれぞれの KubernetesPodOperator 構成について説明します。各構成変数について詳しくは、Airflow リファレンスをご覧ください。

"""Example DAG using KubernetesPodOperator."""
import datetime

from airflow import models
from airflow.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from kubernetes.client import models as k8s_models

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.
secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
    dag_id="composer_sample_kubernetes_pod",
    schedule_interval=datetime.timedelta(days=1),
    start_date=YESTERDAY,
) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, the config file found at
    # `/home/airflow/composer_kube_config` contains credentials for
    # Cloud Composer's Google Kubernetes Engine cluster that is created
    # upon environment creation.
    kubernetes_min_pod = KubernetesPodOperator(
        # The ID specified for the task.
        task_id="pod-ex-minimum",
        # Name of task you want to run, used to generate Pod ID.
        name="pod-ex-minimum",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # The namespace to run within Kubernetes. In Composer 2 environments
        # after December 2022, the default namespace is
        # `composer-user-workloads`.
        namespace="composer-user-workloads",
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_template_ex = KubernetesPodOperator(
        task_id="ex-kube-templates",
        name="ex-kube-templates",
        namespace="composer-user-workloads",
        image="bash",
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=["{{ ds }}"],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_secret_vars_ex = KubernetesPodOperator(
        task_id="ex-kube-secrets",
        name="ex-kube-secrets",
        namespace="composer-user-workloads",
        image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
        startup_timeout_seconds=300,
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        cmds=["echo"],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
        env_vars={
            "EXAMPLE_VAR": "/example/value",
            "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
        },
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_full_pod = KubernetesPodOperator(
        task_id="ex-all-configs",
        name="pi",
        namespace="composer-user-workloads",
        image="perl:5.34.0",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["perl"],
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={"pod-label": "label-name"},
        # Timeout to start up the Pod, default is 600.
        startup_timeout_seconds=600,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={"EXAMPLE_VAR": "/example/value"},
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        image_pull_policy="Always",
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={"key1": "value1"},
        # Optional resource specifications for Pod, this will allow you to
        # set both cpu and memory limits and requirements.
        # Prior to Airflow 2.3 and the cncf providers package 5.0.0
        # resources were passed as a dictionary. This change was made in
        # https://github.com/apache/airflow/pull/27197
        # Additionally, "memory" and "cpu" were previously named
        # "limit_memory" and "limit_cpu"
        # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
        container_resources=k8s_models.V1ResourceRequirements(
            requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
            limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        ),
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        do_xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        # Pod affinity with the KubernetesPodOperator
        # is not supported with Composer 2
        # instead, create a cluster and use the GKEStartPodOperator
        # https://cloud.google.com/composer/docs/using-gke-operator
        affinity={},
    )

構成を最小限にしたい

KubernetesPodOperator の作成に必要なものは、Pod の name、Pod を実行する namespace、使用する imagetask_id のみです。

次のコード スニペットを DAG に配置すると、/home/airflow/composer_kube_config のデフォルトが構成に使用されます。pod-ex-minimum タスクを成功させるのに、コードを変更する必要はありません。

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

テンプレートの構成

Airflow は Jinja テンプレートの使用をサポートしています。必須変数(task_idnamenamespaceimage など)を演算子を使用して宣言する必要があります。次の例で示すように、Jinja を使用してその他のすべてのパラメータ(cmdsargumentsenv_varsconfig_file など)をテンプレート化できます。

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

DAG または環境を変更しない場合は、2 つのエラーが発生して ex-kube-templates タスクが失敗します。ログによると、このタスクは適切な変数が存在しないため失敗しています(my_value)。2 番目のエラーは最初のエラーの修正後に取得できますが、configcore/kube_config が見つからないためタスクが失敗していることを示しています。

両方のエラーを修正するには、以下で説明されている手順を行います。

my_valuegcloud または Airflow UI を設定するには、次の手順を行います。

Airflow UI

Airflow 2 UI で:

  1. Airflow UI に移動します。

  2. ツールバーで、[管理者] > [変数] を選択します。

  3. [List Variable] ページで、[新しいレコードを追加する] をクリックします。

  4. [Add Variable] ページで、次の情報を入力します。

    • Key:my_value
    • Val: example_value
  5. [保存] をクリックします。

gcloud

Airflow 2 の場合は、次のコマンドを入力します。

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

次のように置き換えます。

  • ENVIRONMENT を環境の名前にする。
  • LOCATION は、環境が配置されているリージョン。

カスタム config_file(Kubernetes 構成ファイル)を参照するには、kube_config Airflow 構成オプションを有効な Kubernetes 構成にオーバーライドします。

セクション キー
core kube_config /home/airflow/composer_kube_config

環境が更新されるまで数分待ちます。次に、ex-kube-templates タスクを再度実行し、ex-kube-templates タスクが成功したことを確認します。

Secret 変数の構成

Kubernetes Secret は、センシティブ データを含むオブジェクトです。Secret を Kubernetes Pod に渡すには、KubernetesPodOperator を使用します。 Kubernetes で Secret を定義してください。Secret を定義しない場合、Pod の起動に失敗します。

この例では、Kubernetes Secret を使用する 2 つの方法を紹介します。環境変数としての方法と、Pod によってマウントされたボリュームによる方法です。

最初の Secret である airflow-secrets は、SQL_CONN という名前の Kubernetes の環境変数に設定されます(Airflow の環境変数や Cloud Composer の環境変数とは異なります)。

2 番目の Secret である service-account は、サービス アカウント トークンを含むファイルである service-account.json/var/secrets/google にマウントします。

Secret は次のようになります。

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

最初の Kubernetes Secret の名前は、secret 変数で定義されます。この特定の Secret の名前は、airflow-secrets です。この Secret は、環境変数として公開されるように deploy_type で指定されます。この Secret で deploy_target に設定される環境変数は、SQL_CONN です。最後に、deploy_target に保管される Secret の keysql_alchemy_conn です。

2 番目の Kubernetes Secret の名前は、secret 変数で定義されています。この特定の Secret の名前は、service-account です。deploy_type で指定されるように、ボリュームとして公開されます。マウントするファイルのパス deploy_target/var/secrets/google です。最後に、deploy_target に保管される Secret の keyservice-account.json です。

演算子の構成は、次のようになります。

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="composer-user-workloads",
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    cmds=["echo"],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
    },
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

DAG または環境を変更しない場合、ex-kube-secrets タスクは失敗します。ログを確認すると、Pod took too long to start エラーのためタスクが失敗しています。このエラーは、構成(secret_env)で指定された Secret が見つからない場合に発生します。

gcloud

gcloud を使用して Secret を設定するには、次の手順を行います。

  1. Cloud Composer 環境クラスタに関する情報を取得します。

    1. 次のコマンドを実行します。

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"
      

      次のように置き換えます。

      • ENVIRONMENT は、環境の名前です。
      • LOCATION は、Cloud Composer 環境が配置されているリージョンです。

      このコマンドの出力では、projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id> の形式が使用されます。

    2. GKE クラスタ ID を取得するには、/clusters/ の後に出力(-gke で終わるもの)をコピーします。

  2. 次のコマンドを実行して GKE クラスタに接続します。

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --region LOCATION
    

    次のように置き換えます。

    • CLUSTER_ID は GKE クラスタ ID です。
    • PROJECT は、Google Cloud プロジェクトの ID です。
    • LOCATION は、Cloud Composer 環境が配置されているリージョンです。

  3. Kubernetes Secret を作成します。

    1. 次のコマンドを実行して、sql_alchemy_conntest_value の値を設定する Kubernetes Secret を作成します。

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
      

    2. 次のコマンドを実行して、service-account.json の値を key.json というサービス アカウント キーファイルのローカルパスに設定する Kubernetes Secret を作成します。

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json -n composer-user-workloads
      

  4. Secret を設定した後、Airflow UI でもう一度 ex-kube-secrets タスクを実行します。

  5. ex-kube-secrets タスクの成功を確認します。

すべての構成

この例では、KubernetesPodOperator で構成できるすべての変数が示されます。ex-all-configs タスクを成功させるのに、コードを変更する必要はありません。

各変数について詳しくは、Airflow の KubernetesPodOperator リファレンスをご覧ください。

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

CNCF Kubernetes プロバイダに関する情報

GKEStartPodOperator と KubernetesPodOperator は apache-airflow-providers-cncf-kubernetes プロバイダ内に実装されます。

CNCF Kubernetes プロバイダ向けの詳細なリリースノートについては、CNCF Kubernetes プロバイダのウェブサイトをご覧ください。

バージョン 6.0.0

CNCF Kubernetes プロバイダ パッケージ バージョン 6.0.0 では、KubernetesPodOperatorkubernetes_default 接続がデフォルトで使用されます。

バージョン 5.0.0 でカスタム接続を指定した場合も、このカスタム接続はオペレータによって引き続き使用されます。kubernetes_default 接続を使用するように戻すには、DAG を適宜調整しなければならない場合があります。

バージョン 5.0.0

このバージョンでは、バージョン 4.4.0 と比較して、下位互換性のないいくつかの変更が導入されています。最も重要な点は、バージョン 5.0.0 で使用されない kubernetes_default 接続に関連したものです。

  • kubernetes_default 接続を変更する必要があります。Kube 構成パスを /home/airflow/composer_kube_config に設定(図 1 を参照)するか、config_fileKubernetesPodOperator 構成に追加(次のコード例を参照)する必要があります。
Airflow UI の Kube 構成パスフィールド
図 1Airflow UI、kubernetes_default 接続の変更(クリックして拡大)
  • 次の方法で KubernetesPodOperator を使用してタスクのコードを変更します
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

バージョン 5.0.0 の詳細については、CNCF Kubernetes プロバイダ リリースノートをご覧ください。

トラブルシューティング

Pod の失敗をトラブルシューティングするヒント

Airflow UI でタスクログを確認するだけでなく、次のログも確認してください。

  • Airflow スケジューラと Airflow ワーカーの出力:

    1. Google Cloud Console で [環境] ページに移動します。

      [環境] に移動

    2. ご使用の環境の [DAG] リンクをクリックします。

    3. 環境のバケットで、1 つ上のレベルに移動します。

    4. logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE> フォルダのログを確認します。

  • GKE ワークロードに表示される Google Cloud コンソールの詳細な Pod ログ。これらのログには、Pod 定義 YAML ファイル、Pod イベント、および Pod の詳細が含まれます。

GKEStartPodOperator も使用する場合は、ゼロ以外のコードも返されます。

KubernetesPodOperatorGKEStartPodOperator を使用する場合、コンテナのエントリ ポイントの戻りコードによって、タスクが成功したかどうかを判断できます。ゼロ以外の戻りコードは失敗を示します。

KubernetesPodOperatorGKEStartPodOperator を使用する際には、一般的にコンテナのエントリ ポイントとしてシェル スクリプトを実行して、コンテナ内の複数のオペレーションをグループ化します。

このようなスクリプトを記述する場合は、スクリプト内のコマンドが失敗した場合にスクリプトを終了し Airflow タスク インスタンスに失敗を伝播させるために、スクリプトの先頭に set -e コマンドを含めることをおすすめします。

Pod のタイムアウト

KubernetesPodOperator のデフォルトのタイムアウトは 120 秒であるため、サイズの大きなイメージをダウンロードする前にタイムアウトが発生する場合があります。タイムアウトの時間を長くするには、KubernetesPodOperator を作成する際に startup_timeout_seconds パラメータを変更します。

Pod がタイムアウトすると、Airflow UI にタスク固有のログが表示されます。次に例を示します。

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

手元のタスクを実行するために Cloud Composer サービス アカウントに必要な IAM 権限がない場合でも、Pod のタイムアウトは発生する場合があります。この状態を確認するには、GKE ダッシュボードで特定のワークロードのログを調べて Pod レベルのエラーを確認するか、Cloud Logging を使用します。

新しい接続の確立に失敗した

GKE クラスタでは、自動アップグレードがデフォルトで有効になっています。 アップグレード中のクラスタにノードプールが存在する場合、次のエラーが表示される場合があります。

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

クラスタがアップグレードされているかどうかを確認するには、Google Cloud コンソールで [Kubernetes クラスタ] ページに移動し、環境のクラスタ名の横にある読み込みアイコンを確認します。

次のステップ