KubernetesPodOperator を使用する

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

このページでは、KubernetesPodOperator を使用して、Cloud Composer から Cloud Composer 環境の一部である Google Kubernetes Engine クラスタに Kubernetes Pod をデプロイする方法について説明します。

KubernetesPodOperator は、使用中の環境のクラスタで Kubernetes Pod を起動します。これに対し、Google Kubernetes Engine 演算子は、指定されたクラスタ内で Kubernetes Pod を実行します。このクラスタは、使用中の環境とは関係がない別のクラスタであっても構いません。Google Kubernetes Engine 演算子を使用してクラスタを作成、削除することもできます。

KubernetesPodOperator は、以下を必要とする場合に適しています。

  • 公開 PyPI リポジトリでは使用できないカスタム Python 依存関係。
  • Cloud Composer ワーカーのストック画像では使用できないバイナリ依存関係。

始める前に

Cloud Composer 3 の KubernetesPodOperator と Cloud Composer 2 の KubernetesPodOperator の違いの一覧を確認し、DAG が互換性があることを確認します。

  • Cloud Composer 3 でカスタム名前空間を作成することはできません。Pod は、別の名前空間が指定されている場合でも、常に composer-user-workloads 名前空間で実行されます。この名前空間内の Pod は、追加の構成を行うことなくプロジェクトのリソースと VPC ネットワーク(有効な場合)にアクセスできます。

  • Kubernetes Secret と ConfigMap は Kubernetes API を使用して作成することができません。代わりに、Cloud Composer には、Kubernetes Secret と ConfigMap を管理するための Google Cloud CLI コマンド、Terraform リソース、Cloud Composer API が用意されています。詳細については、Kubernetes シークレットと ConfigMap を使用するをご覧ください。

  • Cloud Composer 2 での場合と同様に、Pod アフィニティの構成は使用できません。Pod アフィニティを使用する場合は、GKE オペレーターを使用して、別のクラスタで Pod を起動します。

Cloud Composer 3 の KubernetesPodOperator について

このセクションでは、Cloud Composer 3 で KubernetesPodOperator がどのように機能するかについて説明します。

リソースの使用量

Cloud Composer 3 では、お使いの環境のクラスタは自動的にスケーリングされます。KubernetesPodOperator を使用して実行する追加のワークロードは、環境とは独立してスケーリングされます。環境は、リソース需要の増加による影響を受けませんが、環境のクラスタは、リソース需要に応じてスケールアップおよびスケールダウンされます。

環境のクラスタで実行する追加のワークロードの料金は Cloud Composer 3 料金モデルに従い、Cloud Composer 3 SKU を使用します。

Cloud Composer 3 では、コンピューティング クラスの概念が導入されている Autopilot クラスタを使用します。

  • Cloud Composer は、general-purpose コンピューティング クラスのみをサポートしています。

  • デフォルトでは、クラスが選択されていない場合、KubernetesPodOperator を使用して Pod を作成する際には general-purpose クラスが想定されます。

  • 各クラスは特定のプロパティとリソース制限に関連付けられています。それらの詳細については、Autopilot のドキュメントをご覧ください。たとえば、general-purpose クラス内で実行される Pod は、最大 110 GiB のメモリを使用できます。

プロジェクトのリソースへのアクセス

Cloud Composer 3 では、環境のクラスタはテナント プロジェクトに配置され、Pod は環境のクラスタ内の分離された名前空間で実行されます。

Cloud Composer 3 では、別の名前空間が指定されている場合でも、Pod は常に composer-user-workloads 名前空間で実行されます。この名前空間内の Pod は、追加の構成を行うことなく、プロジェクト内の Google Cloud リソースと VPC ネットワーク(有効になっている場合)にアクセスできます。これらのリソースへのアクセスには、環境のサービス アカウントが使用されます。別のサービス アカウントを指定することはできません。

構成を最小限にしたい

KubernetesPodOperator の作成に必要になるのは、Pod の name、使用する imagetask_id パラメータのみです。/home/airflow/composer_kube_config には、GKE の認証に使用する認証情報が含まれています。

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`. Always use the
    # `composer-user-workloads` namespace with Composer 3.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

追加構成

この例では、KubernetesPodOperator で構成できる追加のパラメータを示します。

パラメータの詳細については、KubernetesPodOperator の Airflow リファレンスをご覧ください。Kubernetes Secret と ConfigMap の使用方法については、Kubernetes Secret と ConfigMap を使用するをご覧ください。KubernetesPodOperator で Jinja テンプレートを使用する方法については、Jinja テンプレートを使用するをご覧ください。

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The Docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container.
    # The env_vars parameter is templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Jinja テンプレートを使用する

Airflow は、DAG で Jinja テンプレートをサポートしています。

必要な Airflow パラメータ(task_idnameimage)を演算子で宣言する必要があります。次の例で示すように、Jinja を使用してその他のすべてのパラメータ(cmdsargumentsenv_varsconfig_file など)をテンプレート化できます。

この例の env_vars パラメータは、my_value という名前の Airflow 変数から設定されます。例の DAG は、Airflow の vars テンプレート変数から値を取得します。Airflow には、さまざまな種類の情報にアクセスできる変数が他にもあります。たとえば、conf テンプレート変数を使用して、Airflow 構成オプションの値にアクセスできます。詳細と Airflow で使用可能な変数のリストについては、Airflow ドキュメントのテンプレート リファレンスをご覧ください。

DAG を変更するか env_vars 変数を作成しないと、変数が存在しないため、例の ex-kube-templates タスクは失敗します。この変数は、Airflow UI または Google Cloud CLI で作成します。

Airflow UI

  1. Airflow UI に移動します。

  2. ツールバーで、[管理者] > [変数] を選択します。

  3. [List Variable] ページで、[新しいレコードを追加する] をクリックします。

  4. [Add Variable] ページで、次の情報を入力します。

    • Key:my_value
    • Val: example_value
  5. [保存] をクリックします。

gcloud

次のコマンドを入力します。

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

以下のように置き換えます。

  • ENVIRONMENT を環境の名前にする。
  • LOCATION は、環境が配置されているリージョン。

次の例では、KubernetesPodOperator で Jinja テンプレートを使用する方法を示します。

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below can be templated with Jinja. For more information
    # and the list of variables available in Airflow, see
    # the Airflow templates reference:
    # https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in Jinja is the execution date as YYYY-MM-DD, this Docker image
    # will echo the execution date. Arguments to the entrypoint. The Docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI. The env_vars parameter
    # is templated.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Specifies path to Kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Kubernetes Secret と ConfigMap を使用する

Kubernetes Secret は、センシティブ データを含むオブジェクトです。Kubernetes ConfigMap は、Key-Value ペアに非機密データを含むオブジェクトです。

Cloud Composer 3 では、Google Cloud CLI、API、Terraform を使用して Secret と ConfigMap を作成し、KubernetesPodOperator からそれらにアクセスできます。

  • Google Cloud CLI と API では、YAML 構成ファイルを指定します。
  • Terraform では、Terraform 構成ファイルで Secret と ConfigMap を別個のリソースとして定義します。

YAML 構成ファイルについて

Google Cloud CLI と API を使用して Kubernetes Secret または ConfigMap を作成する場合は、YAML 形式のファイルを指定します。このファイルは、Kubernetes Secret と ConfigMap で使用される形式に従っている必要があります。Kubernetes ドキュメントには、ConfigMap と Secret の多くのコードサンプルが用意されています。使用を開始するには、Secret を使用して安全に認証情報を配布するページと ConfigMaps をご覧ください。

Kubernetes Secret の場合と同じように、Secret で値を定義する場合は base64 表現を使用します。

値をエンコードするには、次のコマンドを使用します(これは、Base64 でエンコードされた値を取得する多くの方法の一つです)。

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

出力:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

次の 2 つの YAML ファイルの例は、このガイドの後半のサンプルで使用します。Kubernetes Secret の YAML 構成ファイルの例:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

ファイルを含める方法を示す別の例。前の例と同様に、まずファイルの内容(cat ./key.json | base64)をエンコードしてから、この値を YAML ファイルに指定します。

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

ConfigMap の YAML 構成ファイルの例。ConfigMap で base64 表現を使用する必要はありません。

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

Kubernetes Secret を管理する

gcloud

シークレットを作成

Kubernetes Secret を作成するには、次のコマンドを実行します。

gcloud beta composer environments user-workloads-secrets create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

以下を置き換えます。

  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。
  • SECRET_FILE: Secret の構成を含むローカル YAML ファイルへのパス。

例:

gcloud beta composer environments user-workloads-secrets create \
  --environment example-environment \
  --location us-central1 \
  --secret-file-path ./secrets/example-secret.yaml

シークレットを更新する

Kubernetes Secret を更新するには、次のコマンドを実行します。シークレットの名前は指定された YAML ファイルから取得され、シークレットのコンテンツが置き換えられます。

gcloud beta composer environments user-workloads-secrets update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

以下を置き換えます。

  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。
  • SECRET_FILE: Secret の構成を含むローカル YAML ファイルへのパス。このファイルの metadata > name フィールドにシークレットの名前を指定します。

シークレットの一覧表示

環境の Secret とそのフィールドのリストを取得するには、次のコマンドを実行します。出力におけるキー値はアスタリスクに置き換えられます。

gcloud beta composer environments user-workloads-secrets list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

以下を置き換えます。

  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。

シークレットの詳細を取得する

Secret の詳細情報を取得するには、次のコマンドを実行します。出力におけるキー値はアスタリスクに置き換えられます。

gcloud beta composer environments user-workloads-secrets describe \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

以下を置き換えます。

  • SECRET_NAME: Secret の名前。Secret の構成を含む YAML ファイルの metadata > name フィールドで定義されています。
  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。

シークレットを削除する

Secret を削除するには、次のコマンドを実行します。

gcloud beta composer environments user-workloads-secrets delete \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • SECRET_NAME: Secret の名前。Secret の構成を含む YAML ファイルの metadata > name フィールドで定義されています。
  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。

API

シークレットを作成

  1. environments.userWorkloadsSecrets.create API リクエストを作成します。

  2. このリクエストで次のように操作します。

    1. リクエスト本文の name フィールドに、新しい Secret の URI を指定します。
    2. リクエスト本文の data フィールドに、シークレットのキーと Base64 でエンコードされた値を指定します。

例:

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo="
  }
}

シークレットを更新する

  1. environments.userWorkloadsSecrets.update API リクエストを作成します。

  2. このリクエストで次のように操作します。

    1. リクエスト本文の name フィールドに、Secret の URI を指定します。
    2. リクエスト本文の data フィールドに、シークレットのキーと Base64 でエンコードされた値を指定します。値が置き換えられます。

例:

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo=",
    "another-example": "YW5vdGhlcl9leGFtcGxlX3ZhbHVlIC1uCg=="
  }
}

シークレットの一覧表示

environments.userWorkloadsSecrets.list API リクエストを作成します。出力に含まれるキー値はアスタリスクに置き換えられます。このリクエストでページネーションを使用できます。詳細については、リクエストのリファレンスをご覧ください。

例:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

シークレットの詳細を取得する

environments.userWorkloadsSecrets.get API リクエストを作成します。出力に含まれるキー値はアスタリスクに置き換えられます。

例:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

シークレットを削除する

environments.userWorkloadsSecrets.delete API リクエストを作成します。

例:

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

Terraform

google_composer_user_workloads_secret リソースは Kubernetes Secret を定義し、そのキーと値が data ブロックで定義されます。

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "SECRET_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME: 環境のリソースの名前。Terraform の環境の定義が含まれています。実際の環境の名前もこのリソースで指定します。
  • LOCATION: 環境が配置されているリージョン。
  • SECRET_NAME: Secret の名前。
  • KEY_NAME: この Secret の 1 つ以上のキー。
  • KEY_VALUE: キーの Base64 エンコード値。base64encode 関数を使用して値をエンコードできます(例を参照)。

Kubernetes Secret の次の 2 つの例は、このガイドの後半のサンプルで使用します。

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta

  name = "airflow-secrets"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    sql_alchemy_conn: base64encode("postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db")
  }
}

ファイルを含める方法を示す別の例。file 関数を使用してファイルの内容を文字列として読み取り、それを Base64 でエンコードできます。

resource "google_composer_user_workloads_secret" "service_account_secret" {
  provider = google-beta

  name = "service-account"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "service-account.json": base64encode(file("./key.json"))
  }
}

DAG で Kubernetes Secret を使用する

この例では、Kubernetes Secret を使用する 2 つの方法を紹介します。環境変数としての方法と、Pod によってマウントされたボリュームによる方法です。

最初の Secret である airflow-secrets は、SQL_CONN という名前の Kubernetes の環境変数に設定されます(Airflow の環境変数や Cloud Composer の環境変数とは異なります)。

2 番目の Secret である service-account は、サービス アカウント トークンを含むファイルである service-account.json/var/secrets/google にマウントします。

Secret オブジェクトは次のようになります。

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

最初の Kubernetes Secret の名前は、secret_env 変数で定義されます。この Secret の名前は airflow-secrets です。deploy_type パラメータでは、環境変数として公開する必要があることを指定します。環境変数の名前は SQL_CONN で、deploy_target パラメータで指定されています。最後に、SQL_CONN 環境変数の値が sql_alchemy_conn キーの値に設定されます。

2 番目の Kubernetes Secret の名前は、secret_volume 変数で定義されています。この Secret の名前は service-account です。deploy_type パラメータで指定されているように、ボリュームとして公開されます。マウントするファイルのパス deploy_target/var/secrets/google です。最後に、deploy_target に保管される Secret の keyservice-account.json です。

演算子の構成は、次のようになります。

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="composer-user-workloads",
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # env_vars allows you to specify environment variables for your
    # container to use. The env_vars parameter is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
    },
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Kubernetes ConfigMap を管理する

gcloud

ConfigMap の作成

ConfigMap を作成するには、次のコマンドを実行します。

gcloud beta composer environments user-workloads-config-maps create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

以下を置き換えます。

  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。
  • CONFIG_MAP_FILE: ConfigMap の構成を含むローカル YAML ファイルへのパス。

例:

gcloud beta composer environments user-workloads-config-maps create \
  --environment example-environment \
  --location us-central1 \
  --config-map-file-path ./configs/example-configmap.yaml

ConfigMap を更新する

ConfigMap を更新するには、次のコマンドを実行します。ConfigMap の名前は指定された YAML ファイルから取得され、ConfigMap のコンテンツが置き換えられます。

gcloud beta composer environments user-workloads-config-maps update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

以下を置き換えます。

  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。
  • CONFIG_MAP_FILE: ConfigMap の構成を含むローカル YAML ファイルへのパス。このファイルの metadata > name フィールドに ConfigMap の名前を指定します。

ConfigMap の一覧表示

環境の ConfigMap とそのフィールドの一覧を取得するには、次のコマンドを実行します。出力に含まれるキー値はそのまま表示されます。

gcloud beta composer environments user-workloads-config-maps list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

以下を置き換えます。

  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。

ConfigMap の詳細を取得する

ConfigMap の詳細情報を取得するには、次のコマンドを実行します。出力に含まれるキー値はそのまま表示されます。

gcloud beta composer environments user-workloads-config-maps describe \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

以下を置き換えます。

  • CONFIG_MAP_NAME: ConfigMap の名前。ConfigMap の構成を含む YAML ファイルの metadata > name フィールドで定義されています。
  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。

ConfigMap を削除する

ConfigMap を削除するには、次のコマンドを実行します。

gcloud beta composer environments user-workloads-config-maps delete \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • CONFIG_MAP_NAME: ConfigMap の名前。ConfigMap の構成を含む YAML ファイルの metadata > name フィールドで定義されています。
  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。

API

ConfigMap の作成

  1. environments.userWorkloadsConfigMaps.create API リクエストを作成します。

  2. このリクエストで次のように操作します。

    1. リクエスト本文の name フィールドに、新しい ConfigMap の URI を指定します。
    2. リクエスト本文の data フィールドに、ConfigMap のキーと値を指定します。

例:

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value"
  }
}

ConfigMap を更新する

  1. environments.userWorkloadsConfigMaps.update API リクエストを作成します。

  2. このリクエストで次のように操作します。

    1. リクエスト本文の name フィールドに、ConfigMap の URI を指定します。
    2. リクエスト本文の data フィールドに、ConfigMap のキーと値を指定します。値が置き換えられます。

例:

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value",
    "another_key": "another_value"
  }
}

ConfigMap の一覧表示

environments.userWorkloadsConfigMaps.list API リクエストを作成します。出力に含まれるキー値はそのまま表示されます。このリクエストでページネーションを使用できます。詳細については、リクエストのリファレンスをご覧ください。

例:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

ConfigMap の詳細を取得する

environments.userWorkloadsConfigMaps.get API リクエストを作成します。出力に含まれるキー値はそのまま表示されます。

例:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

ConfigMap を削除する

environments.userWorkloadsConfigMaps.delete API リクエストを作成します。

例:

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

Terraform

google_composer_user_workloads_config_map リソースでは ConfigMap を定義します。キーと値は data ブロックで定義します。

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "CONFIG_MAP_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME: 環境のリソースの名前。Terraform の環境の定義が含まれています。実際の環境の名前もこのリソースで指定します。
  • LOCATION: 環境が配置されているリージョン。
  • CONFIG_MAP_NAME: ConfigMap の名前。
  • KEY_NAME: この ConfigMap の 1 つ以上のキー。
  • KEY_VALUE: キーの値。

例:

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta

  name = "example-config-map"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "example_key": "example_value"
  }
}

DAG で ConfigMap を使用する

この例では、DAG で ConfigMap を使用する方法を示します。

次の例では、ConfigMap が configmaps パラメータで渡されます。この ConfigMap のすべてのキーは、環境変数として使用できます。

import datetime

from airflow import models
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_env_vars',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'echo "Value: $example_key"',
    ],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

次の例では、ConfigMap をボリュームとしてマウントする方法を示します。

import datetime

from airflow import models
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

volume_mount = k8s.V1VolumeMount(name='confmap-example',
  mount_path='/config',
  sub_path=None,
  read_only=False)

volume = k8s.V1Volume(name='confmap-example',
  config_map=k8s.V1ConfigMapVolumeSource(name='example-configmap'))

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_volume_mount',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'ls /config'
    ],
    volumes=[volume],
    volume_mounts=[volume_mount],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

CNCF Kubernetes プロバイダに関する情報

KubernetesPodOperator は apache-airflow-providers-cncf-kubernetes プロバイダで実装されています。

CNCF Kubernetes プロバイダ向けの詳細なリリースノートについては、CNCF Kubernetes プロバイダのウェブサイトをご覧ください。

トラブルシューティング

このセクションでは、KubernetesPodOperator の一般的な問題のトラブルシューティングに関するアドバイスを提供します。

ログを表示

問題のトラブルシューティング時には、次の順序でログを確認できます。

  1. Airflow タスクログ:

    1. Google Cloud コンソールで [環境] ページに移動します。

      [環境] に移動

    2. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

    3. [DAG] タブに移動します。

    4. DAG の名前をクリックし、DAG 実行をクリックして詳細とログを表示します。

  2. Airflow スケジューラ ログ:

    1. [環境の詳細] ページに移動します。

    2. [ログ] タブに移動します。

    3. Airflow スケジューラ ログを調べます。

  3. ユーザー ワークロード ログ:

    1. [環境の詳細] ページに移動します。

    2. [Monitoring] タブに移動します。

    3. [ユーザー ワークロード] を選択します。

    4. 実行されたワークロードのリストを調べます。各ワークロードのログとリソース使用状況情報を表示できます。

ゼロ以外の戻りコード

KubernetesPodOperator(および GKEStartPodOperator)を使用する場合、コンテナのエントリ ポイントの戻りコードによって、タスクが成功したかどうかを判断できます。ゼロ以外の戻りコードは失敗を示します。

一般的なパターンは、コンテナのエントリ ポイントとしてシェル スクリプトを実行して、コンテナ内の複数のオペレーションをグループ化することです。

このようなスクリプトを記述する場合は、スクリプト内のコマンドが失敗した場合にスクリプトを終了し Airflow タスク インスタンスに失敗を伝播させるために、スクリプトの先頭に set -e コマンドを含めることをおすすめします。

Pod のタイムアウト

KubernetesPodOperator のデフォルトのタイムアウトは 120 秒であるため、サイズの大きなイメージをダウンロードする前にタイムアウトが発生する場合があります。タイムアウトの時間を長くするには、KubernetesPodOperator を作成する際に startup_timeout_seconds パラメータを変更します。

Pod がタイムアウトすると、Airflow UI にタスク固有のログが表示されます。次に例を示します。

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

手元のタスクを実行するために Cloud Composer サービス アカウントに必要な IAM 権限がない場合でも、Pod のタイムアウトは発生する場合があります。この状態を確認するには、GKE ダッシュボードで特定のワークロードのログを調べて Pod レベルのエラーを確認するか、Cloud Logging を使用します。

次のステップ