KubernetesPodOperator の使用

このページでは、KubernetesPodOperator を使用して Cloud Composer から Kubernetes Pod を Cloud Composer 環境の一部である Google Kubernetes Engine クラスタに起動し、環境に適切なリソースを確保する方法について説明します。

矢印が付いているテナント プロジェクトと顧客プロジェクト内の Cloud Composer 環境リソースは、起動した Pod が Airflow Workers、Redis、Airflow Scheduler、および Cloud SQL Proxy と同じ Kubernetes Engine に存在していることを示します。
Cloud Composer Kubernetes Pod の起動場所(クリックして拡大)

KubernetesPodOperator は、以下を必要とする場合に適しています。

  • 公開 PyPI リポジトリでは使用できないカスタム Python 依存関係。
  • Cloud Composer ワーカー イメージでは使用できないバイナリ依存関係。

このページでは、次の KubernetesPodOperator 構成を含む DAG の例について説明します。

始める前に

  • Cloud Composer の最新バージョンを使用することをおすすめします。少なくとも、使用するバージョンは、Cloud Composer の非推奨のポリシーとサポート ポリシーの一部としてサポートされている必要があります。イメージのバージョンを確認するには、環境の詳細をご覧ください。
  • 環境に十分なリソースが存在することを確認してください。 リソースが不足している Pod を環境で起動すると、Airflow ワーカーと Airflow スケジューラでエラーが発生する可能性があります。

環境に適したリソースの確保

Cloud Composer 環境を作成するときは、環境のコンピューティング能力を指定し、GKE クラスタに一定量を割り当てます。Kubernetes Pod を環境に起動すると、CPU やメモリなどのリソースに対して、プログラム同士が競合する可能性があります。Airflow スケジューラと Airflow ワーカーは同じ GKE クラスタにあるため、競合によってリソース不足になった場合は、スケジューラとワーカーは正常に動作しません。

リソースの不足を防ぐには、次の操作を行います。

ノードプールを作成する

Cloud Composer 環境でリソースの不足を回避するための推奨方法は、新しいノードプールを作成し、そのプールのリソースのみを使用して実行するように Kubernetes Pod を構成することです。

既存のクラスタにノードプールを作成するには、次の手順を行います。

コンソール

  1. Cloud Console で、GKE メニューに移動します。

    GKE メニューに移動

  2. 目的のクラスタを選択します。

  3. [編集] をクリックします。

  4. [ノードプール] で、[ノードプールを追加] をクリックします。

  5. ノードプールを構成します。

  6. (省略可)詳細オプション(自動アップグレードや自動スケーリングなど)を有効にします。

  7. [保存] をクリックします。

gcloud

次のコマンドを入力します。

gcloud container node-pools create POOL_NAME \
    --cluster CLUSTER_NAME \
    --project PROJECT_ID \
    --zone ZONE \
    ADDITIONAL_FLAGS 

ここで

  • POOL_NAME は、ノードプールの名前です。
  • CLUSTER は、ノードプールを作成するクラスタの名前です。
  • PROJECT_ID は、Cloud Composer のプロジェクト名です。
  • ZONE は、GKE クラスタが配置されるゾーンです。

    オプションの一覧については、gcloud container node-pools create ドキュメントをご覧ください。

    node-pools create リクエストが正しく処理されると、ノードプール情報が返されます。

    Creating node pool example-pool...done.
    Created [https://container.googleapis.com/v1/projects/kubernetes-engine-docs/zones/us-central1-f/clusters/example-cluster/nodePools/example-pool].
    NAME          MACHINE_TYPE   DISK_SIZE_GB  NODE_VERSION
    example-pool  n1-standard-1  100           1.2.4

環境内のノード数を増やす

Cloud Composer 環境でノード数を増やすと、ワーカーが使用できるコンピューティング能力が増加します。指定したマシンタイプに用意されているよりも多くの CPU または RAM を必要とするタスクには、この増加によって追加のリソースは割り当てられません。

ノード数を増やすには、環境を更新してください

適切なマシンタイプを指定する

Cloud Composer 環境の作成の際に、マシンタイプを指定できます。使用可能なリソースを確保するには、Cloud Composer 環境で行うコンピューティングの種類に適したマシンタイプを指定します。

KubernetesPodOperator の構成

この例に沿った操作を行うために、kubernetes_pod_operator.py ファイル全体をご使用の環境の DAG フォルダに配置するか、関連する KubernetesPodOperator コードを DAG に追加します。

次のセクションでは、この例のそれぞれの KubernetesPodOperator 構成について説明します。各構成変数について詳しくは、Airflow リファレンスをご覧ください。

import datetime

from airflow import models
from airflow.contrib.kubernetes import secret
from airflow.contrib.operators import kubernetes_pod_operator

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type='env',
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target='SQL_CONN',
    # Name of the Kubernetes Secret
    secret='airflow-secrets',
    # Key of a secret stored in this Secret object
    key='sql_alchemy_conn')
secret_volume = secret.Secret(
    'volume',
    # Path where we mount the secret as volume
    '/var/secrets/google',
    # Name of Kubernetes Secret
    'service-account',
    # Key in the form of service account file name
    'service-account.json')

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
        dag_id='composer_sample_kubernetes_pod',
        schedule_interval=datetime.timedelta(days=1),
        start_date=YESTERDAY) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, currently the operator defaults
    # to using the config file found at `/home/airflow/composer_kube_config if
    # no `config_file` parameter is specified. By default it will contain the
    # credentials for Cloud Composer's Google Kubernetes Engine cluster that is
    # created upon environment creation.

    kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
        # The ID specified for the task.
        task_id='pod-ex-minimum',
        # Name of task you want to run, used to generate Pod ID.
        name='pod-ex-minimum',
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['echo'],
        # The namespace to run within Kubernetes, default namespace is
        # `default`. There is the potential for the resource starvation of
        # Airflow workers and scheduler within the Cloud Composer environment,
        # the recommended solution is to increase the amount of nodes in order
        # to satisfy the computing requirements. Alternatively, launching pods
        # into a custom namespace will stop fighting over resources.
        namespace='default',
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image='gcr.io/gcp-runtimes/ubuntu_18_0_4')
    kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-kube-templates',
        name='ex-kube-templates',
        namespace='default',
        image='bash',
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/code.html#default-variables

        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['echo'],
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=['{{ ds }}'],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={'MY_VALUE': '{{ var.value.my_value }}'},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}")
    kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-kube-secrets',
        name='ex-kube-secrets',
        namespace='default',
        image='ubuntu',
        startup_timeout_seconds=300,
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
        env_vars={
            'EXAMPLE_VAR': '/example/value',
            'GOOGLE_APPLICATION_CREDENTIALS': '/var/secrets/google/service-account.json'})
    kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-pod-affinity',
        name='ex-pod-affinity',
        namespace='default',
        image='perl',
        cmds=['perl'],
        arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
        # affinity allows you to constrain which nodes your pod is eligible to
        # be scheduled on, based on labels on the node. In this case, if the
        # label 'cloud.google.com/gke-nodepool' with value
        # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
        # nodes, it will fail to schedule.
        affinity={
            'nodeAffinity': {
                # requiredDuringSchedulingIgnoredDuringExecution means in order
                # for a pod to be scheduled on a node, the node must have the
                # specified labels. However, if labels on a node change at
                # runtime such that the affinity rules on a pod are no longer
                # met, the pod will still continue to run on the node.
                'requiredDuringSchedulingIgnoredDuringExecution': {
                    'nodeSelectorTerms': [{
                        'matchExpressions': [{
                            # When nodepools are created in Google Kubernetes
                            # Engine, the nodes inside of that nodepool are
                            # automatically assigned the label
                            # 'cloud.google.com/gke-nodepool' with the value of
                            # the nodepool's name.
                            'key': 'cloud.google.com/gke-nodepool',
                            'operator': 'In',
                            # The label key's value that pods can be scheduled
                            # on.
                            'values': [
                                'pool-0',
                                'pool-1',
                            ]
                        }]
                    }]
                }
            }
        })
    kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-all-configs',
        name='pi',
        namespace='default',
        image='perl',
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['perl'],
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={'pod-label': 'label-name'},
        # Timeout to start up the Pod, default is 120.
        startup_timeout_seconds=120,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={'EXAMPLE_VAR': '/example/value'},
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        image_pull_policy='Always',
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={'key1': 'value1'},
        # Resource specifications for Pod, this will allow you to set both cpu
        # and memory limits and requirements.
        # Prior to Airflow 1.10.4, resource specifications were
        # passed as a Pod Resources Class object,
        # If using this example on a version of Airflow prior to 1.10.4,
        # import the "pod" package from airflow.contrib.kubernetes and use
        # resources = pod.Resources() instead passing a dict
        # For more info see:
        # https://github.com/apache/airflow/pull/4551
        resources={'limit_memory': 1, 'limit_cpu': 1},
        # Specifies path to kubernetes config. If no config is specified will
        # default to '~/.kube/config'. The config_file is templated.
        config_file='/home/airflow/composer_kube_config',
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        do_xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        affinity={})

最小の構成

KubernetesPodOperator の作成に必要になるのは、namenamespaceimage、および task_id のみです。

次のコード スニペットを DAG に配置すると、/home/airflow/composer_kube_config のデフォルトが構成に使用されます。pod-ex-minimum タスクを成功させるのに、コードを変更する必要はありません。

kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
    # The ID specified for the task.
    task_id='pod-ex-minimum',
    # Name of task you want to run, used to generate Pod ID.
    name='pod-ex-minimum',
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['echo'],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. There is the potential for the resource starvation of
    # Airflow workers and scheduler within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources.
    namespace='default',
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image='gcr.io/gcp-runtimes/ubuntu_18_0_4')

テンプレートの構成

Airflow は Jinja テンプレートの使用をサポートしています。 必須変数(task_idnamenamespaceimage)を演算子を使用して宣言する必要があります。次の例で示すように、Jinja を使用してその他のすべてのパラメータ(cmdsargumentsenv_varsconfig_file など)をテンプレート化できます。

kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-kube-templates',
    name='ex-kube-templates',
    namespace='default',
    image='bash',
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/code.html#default-variables

    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['echo'],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=['{{ ds }}'],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={'MY_VALUE': '{{ var.value.my_value }}'},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}")

DAG または環境を変更しない場合は、2 つのエラーが発生して ex-kube-templates タスクが失敗します。タスクをデバッグしてエラーを解決する方法は、次のとおりです。

  1. ex-kube-templates タスクの失敗を確認します。
  2. ex-kube-templates タスクのログを確認します。

    ログによると、このタスクは適切な変数が存在しないため失敗しています(my_value)。

  3. my_valuegcloud または Airflow UI を設定するには、次の手順を行います。

    gcloud

    次のコマンドを入力します。

    gcloud composer environments run ENVIRONMENT \
        --location LOCATION \
        variables -- \
        --set my_value example_value 

    ここで

    • ENVIRONMENT は、Cloud Composer 環境の名前です。
    • LOCATION は、Cloud Composer 環境が配置されているリージョンです。

    Airflow UI

    1. ツールバーで、[Admin] > [Variables] をクリックします。
    2. [作成] をクリックします。
    3. 次の情報を入力します。
      • Key:my_value
      • Val: example_value
    4. [保存] をクリックします。
  4. もう一度 ex-kube-templates タスクを実行します

  5. ex-kube-templates タスクのステータスを確認します。

    ex-kube-templates タスクはまだ失敗します。ログを確認すると、core/kubeconfigconfig で見つからないためタスクが失敗しています。カスタムの config_file(Kubernetes 構成ファイル)を参照するには、airflow.cfg ファイルの kube_config 変数を有効な Kubernetes 構成に設定する必要があります。

  6. kube_config 変数を設定するには、次のコマンドを入力します。

    gcloud composer environments update ENVIRONMENT \
        --location LOCATION \
        --update-airflow-configs=core-kube_config=/home/airflow/composer_kube_config

    ここで

    • ENVIRONMENT は、Cloud Composer 環境の名前です。
    • LOCATION は、Cloud Composer 環境が配置されているリージョンです。
  7. 環境が更新されるまで数分待ちます。

  8. もう一度 ex-kube-templates タスクを実行します

  9. ex-kube-templates タスクの成功を確認します。

Secret 変数を構成する

Kubernetes Secret は、少量の機密データを含むオブジェクトです。Secret を Kubernetes Pod に渡すには、KubernetesPodOperator を使用します。 Kubernetes で Secret を定義してください。Secret を定義しない場合、Pod の起動に失敗します。

この例では、Kubernetes Secret airflow-secretsSQL_CONN という名前の Kubernetes の環境変数にデプロイしています(Airflow の環境変数や Cloud Composer の環境変数とは異なります)。

Secret は次のようになります。

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type='env',
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target='SQL_CONN',
    # Name of the Kubernetes Secret
    secret='airflow-secrets',
    # Key of a secret stored in this Secret object
    key='sql_alchemy_conn')
secret_volume = secret.Secret(
    'volume',
    # Path where we mount the secret as volume
    '/var/secrets/google',
    # Name of Kubernetes Secret
    'service-account',
    # Key in the form of service account file name
    'service-account.json')

Kubernetes Secret の名前は、secret 変数で定義されます。 この特定の Secret の名前は、airflow-secrets です。この Secret は、環境変数として公開されるように deploy_type で指定されます。この Secret で deploy_target に設定される環境変数は、SQL_CONN です。最後に、deploy_target に保管される Secret の keysql_alchemy_conn です。

演算子の構成は、次のようになります。

kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-kube-secrets',
    name='ex-kube-secrets',
    namespace='default',
    image='ubuntu',
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        'EXAMPLE_VAR': '/example/value',
        'GOOGLE_APPLICATION_CREDENTIALS': '/var/secrets/google/service-account.json'})

DAG または環境を変更しない場合、ex-kube-secrets タスクは失敗します。タスクをデバッグしてエラーを解決する方法は、次のとおりです。

  1. ex-kube-secrets タスクの失敗を確認します。
  2. ex-kube-secrets タスクのログを確認します。

    ログを確認すると、Pod took too long to start エラーのためタスクが失敗していることがわかります。このエラーは、構成(secret_env)で指定された Secret が見つからない場合に発生します。

  3. gcloud を使用して Secret を設定するには、次の手順を行います。

    1. 次のコマンドを実行して、Cloud Composer 環境の詳細を表示します。

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"

      ここで

      • ENVIRONMENT は、Cloud Composer 環境の名前です。
      • LOCATION は、Cloud Composer 環境が配置されているリージョンです。

        次のような出力が返されます。 projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>

    2. GKE クラスタ ID を取得するには、後で取得できる場所に、/clusters/ の後の出力(文末は -gke)をコピーして貼り付けます。この出力がクラスタ ID になります。

    3. ゾーンを取得するには、後で取得できる場所に、/zones/ の後の出力をコピーして貼り付けます。

    4. 次のコマンドを実行して GKE クラスタに接続します。

      gcloud container clusters get-credentials CLUSTER_ID \
      --zone ZONE \
      --project PROJECT
      

      ここで

      • CLUSTER_ID は、GKE クラスタ ID です。
      • ZONE は、GKE が配置されるゾーンです。
      • PROJECT は、Google Cloud プロジェクトの ID です。
    5. 次のコマンドを実行して、sql_alchemy_conntest_value の値を設定する Kubernetes Secret を作成します。

      kubectl create secret generic airflow-secrets \
      --from-literal sql_alchemy_conn=test_value
      
  4. Secret を設定した後、もう一度 ex-kube-secrets タスクを実行します

  5. ex-kube-secrets タスクの成功を確認します。

Pod アフィニティの構成

KubernetesPodOperatoraffinity パラメータを構成する際に、Pod のスケジュールを設定する対象ノードを制御します(特定のノードプール内のノードなど)。この例では、演算子は pool-0pool-1 という名前のノードプールでのみ実行されます。

矢印が付いているテナント プロジェクトと顧客プロジェクト内の Cloud Composer 環境リソースは、起動した Pod が Airflow Workers、Redis、Airflow Scheduler、および Cloud SQL Proxy と同じ Kubernetes Engine であるが、Kubernetes Engine 内に別個のボックスとして表示される特定のプール(pool-0 と pool-1)に存在していることを示します。
Cloud Composer Kubernetes の Pod アフィニティがある Pod の起動場所(クリックして拡大)


kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-pod-affinity',
    name='ex-pod-affinity',
    namespace='default',
    image='perl',
    cmds=['perl'],
    arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        'nodeAffinity': {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            'requiredDuringSchedulingIgnoredDuringExecution': {
                'nodeSelectorTerms': [{
                    'matchExpressions': [{
                        # When nodepools are created in Google Kubernetes
                        # Engine, the nodes inside of that nodepool are
                        # automatically assigned the label
                        # 'cloud.google.com/gke-nodepool' with the value of
                        # the nodepool's name.
                        'key': 'cloud.google.com/gke-nodepool',
                        'operator': 'In',
                        # The label key's value that pods can be scheduled
                        # on.
                        'values': [
                            'pool-0',
                            'pool-1',
                        ]
                    }]
                }]
            }
        }
    })

例のように構成されている場合、タスクは失敗します。タスクをデバッグしてエラーを解決する方法は、次のとおりです。

  1. ex-pod-affinity タスクの失敗を確認します。
  2. ex-pod-affinity タスクのログを確認します。

    ログを確認すると、pool-0pool-1 のノードプールが存在しないためタスクが失敗していることがわかります。

  3. values のノードプールが存在するようにするため、次のいずれかの構成変更を行います。

    • ノードプールを以前に作成した場合は、pool-0pool-1 をノードプールの名前に置き換えてからもう一度 DAG をアップロードします。
    • pool-0 または pool-1 という名前のノードプールを作成します。両方のノードプールを作成できますが、タスクを成功させるために必要なのは 1 つのみです。
    • pool-0pool-1default-pool に置き換えます。これは、Airflow が使用するデフォルトのプールです。もう一度 DAG をアップロードします。 注: デフォルトでは、Kubernetes Pod の予定は default-pool に設定されます。 プールを後で追加すると、プールは default-pool に制限されます。
  4. 環境が更新されるまで数分待ちます。

  5. もう一度 ex-pod-affinity タスクを実行します。

  6. ex-pod-affinity タスクの成功を確認します。

すべての構成

この例では、KubernetesPodOperator で構成できるすべての変数が示されます。ex-all-configs タスクを成功させるのに、コードを変更する必要はありません。

各変数について詳しくは、Airflow の KubernetesPodOperator リファレンスをご覧ください。

kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-all-configs',
    name='pi',
    namespace='default',
    image='perl',
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['perl'],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={'pod-label': 'label-name'},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={'EXAMPLE_VAR': '/example/value'},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy='Always',
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={'key1': 'value1'},
    # Resource specifications for Pod, this will allow you to set both cpu
    # and memory limits and requirements.
    # Prior to Airflow 1.10.4, resource specifications were
    # passed as a Pod Resources Class object,
    # If using this example on a version of Airflow prior to 1.10.4,
    # import the "pod" package from airflow.contrib.kubernetes and use
    # resources = pod.Resources() instead passing a dict
    # For more info see:
    # https://github.com/apache/airflow/pull/4551
    resources={'limit_memory': 1, 'limit_cpu': 1},
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file='/home/airflow/composer_kube_config',
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    affinity={})

DAG の管理

タスクのステータスの表示

  1. Airflow ウェブ インターフェースに移動します。
  2. DAG ページで、DAG 名(composer_sample_kubernetes_pod など)をクリックします。
  3. DAG の詳細ページで、[Graph View] をクリックします。
  4. ステータスを確認します。

    • Failed: タスクの周囲に赤色のボックスが表示されます(例の ex-kube-templates)。ポインタをタスクの上に置いても、State: Failed と表示されます。

    • Succeed: タスクの周囲に緑色のボックスが表示されます(例の pod-ex-minimum)。ポインタをタスクの上に置いても、State: Success と表示されます。

タスクログを確認する

  1. Airflow UI にタスクのステータスが表示されます。
  2. DAG のグラフビューで、タスク名をクリックします。
  3. タスク インスタンスのコンテキスト メニューで、[View Log] をクリックします。

タスクを再実行する

  1. DAG に戻るには、次の手順を行います。
    1. Airflow UI のツールバーで、[DAGs] をクリックします。
    2. DAG 名をクリックします(composer_samples_kubernetes_pod など)。
  2. タスクを再実行するには、次の手順を行います。
    1. タスク名をクリックします。
    2. [Clear] をクリックしてから、[OK] をクリックします。タスクが自動的に再実行されます。

トラブルシューティング

Pod の失敗をトラブルシューティングするヒント

タスクログの確認だけでなく、次のログも確認してください。

  • Airflow スケジューラと Airflow ワーカーの出力:

    1. Cloud Composer 環境の Cloud Storage バケットに移動します。 これは、DAG が配置されているバケットです。
    2. logs/DAG_NAME/TASK_ID/EXECUTION_DATE のログを確認します。
  • GKE ワークロードに表示される Cloud Console の詳細な Pod ログ。これらのログには、Pod 定義 YAML ファイル、Pod イベント、および Pod の詳細が含まれます。

GKEPodOperator も使用する場合は、ゼロ以外のコードも返されます。

KubernetesPodOperatorGKEPodOperator を使用する場合、コンテナのエントリ ポイントの戻りコードによって、タスクが成功したかどうかを判断できます。ゼロ以外の戻りコードは失敗を示します。

KubernetesPodOperatorGKEPodOperator を使用する際には、一般的にコンテナのエントリ ポイントとしてシェル スクリプトを実行して、コンテナ内の複数のオペレーションをグループ化します。

このようなスクリプトを記述する場合は、スクリプト内のコマンドが失敗した場合にスクリプトを終了し Airflow タスク インスタンスに失敗を伝播させるために、スクリプトの先頭に set -e コマンドを含めることをおすすめします。

Pod が成功してもタスクが失敗する

composer-1.4.1-airflow-* 以前を実行中の Cloud Composer 環境の場合は、次のとおりです。

Airflow タスクが 1 時間実行され、タスクログが kubernetes.client.rest.ApiException: (401)Reason: Unauthorized で終わる場合、基盤となっている Kubernetes Job はその時点以降も実行を継続し、成功する可能性もあります。ただし、そのタスクは失敗としてレポートされます。

この問題を解決するには、kubernetes>=8.0.1PyPI パッケージの依存関係を明示的に追加します。

Pod のタイムアウト

KubernetesPodOperator のデフォルトのタイムアウトは 120 秒であるため、サイズの大きなイメージをダウンロードする前にタイムアウトが発生する場合があります。タイムアウトの時間を長くするには、KubernetesPodOperator を作成する際に startup_timeout_seconds パラメータを変更します。

Pod がタイムアウトすると、Airflow ウェブ UI にタスク固有のログが表示されます。例:

Executing  on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in 
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

手元のタスクを実行するために Composer サービス アカウントに必要な IAM 権限がない場合も、Pod のタイムアウトが発生する可能性があります。この状態を確認するには、GKE ダッシュボードで特定のワークロードのログを調べて Pod レベルのエラーを確認するか、Stackdriver Logging を使用します。

新しい接続の確立に失敗した

GKE クラスタでは、自動アップグレードがデフォルトで有効になっています。 アップグレード中のクラスタにノードプールが存在する場合、次のエラーが表示される場合があります。

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new connection: [Errno 111] Connection refuse

クラスタがアップグレードされているかどうかを確認するには、Cloud Console の GKE メニューに移動し、環境のクラスタ名の横にある読み込みアイコンを確認します。

関連リソース