バックポート プロバイダ パッケージから演算子をインポートする

Cloud Composer 1 | Cloud Composer 2

このページでは、Airflow バックポート プロバイダ パッケージの概要と DAG での使用方法について説明します。

バックポート プロバイダ パッケージは、演算子、転送、センサー、フック、シークレットの Airflow 2 バージョンで、PyPI モジュールとしてパッケージ化されています。

このドキュメントでは、わかりやすくするために、演算子についてのみ説明します。演算子での説明と同じように、バックポート パッケージから他のすべてのコンテンツを使用できます。

バックポート プロバイダ パッケージは、Airflow 環境を新しいバージョンにアップグレードすることなく、オペレータ、転送、センサー、フックの新機能やセキュリティ更新を取得できるという重要な問題を解決します。たとえば、Google のプロバイダ パッケージは Airflow 2 で使用できます。Airflow 1.10.* 環境でこのパッケージのバックポートされたバージョンから演算子をインポートできます。

Apache Airflow でバックポート プロバイダ パッケージの概念が導入される前は、オペレーターは Airflow の重要な要素でした。新しいバージョンのオペレータを取得するには、Airflow の新しいバージョンが必要でした。そのためには、Cloud Composer 環境を新しいバージョンにアップグレードする必要があります。Airflow 2 はこのモデルから離れ、プロバイダ パッケージを導入します。Airflow 2 のプロバイダ パッケージは、特定のプロバイダ(Google など)のオペレータ、転送、センサー、フック、シークレットを含む PyPI モジュールです。Airflow 1.10.* を使用している場合、この変更のメリットを活用し、プロバイダ パッケージのバックポート バージョンを使用できます。

プリインストールされたバックポート パッケージを使用する

一部のバックポート パッケージは Cloud Composer イメージにすでにインストールされています。これらのバックポート パッケージを環境にインストールする必要はありません。後述するように、DAG コードのバックポート パッケージから演算子をインポートしてください。

使用中の環境下で利用可能なバックポート パッケージを確認するには、ご使用の環境の Cloud Composer イメージのパッケージのリストをご覧ください。

環境には、特定のバックポート パッケージに特定のバージョンがあります。別のバージョンを使用する場合は、環境を更新して必要なバージョンを指定します。プリインストールされたバックポート パッケージをダウングレードすることはおすすめしません。新しいバージョンで問題が見つかった場合に限り、以前のバージョンをインストールしてください。プリインストールされているバックポート パッケージはアンインストールできません。インストールされているバージョンのみを変更してください。

バックポート パッケージのインストールとアップグレード

以下の操作でバックポート パッケージをインストールまたはアップグレードします。

  1. 必要なバックポート パッケージを PyPI.org で探します

  2. 他の PyPI パッケージと同様に、パッケージをインストールするかアップグレードします。

  3. 必要に応じて、クロス プロバイダ パッケージの依存関係をインストールします。これらは、追加の依存関係であり、バックポート パッケージのすべての機能を使用しなければならない場合があります。

    たとえば、apache-airflow-backport-providers-google から SalesforceToGcsOperator を使用するには、[salesforce] extra が必要です。apache-airflow-backport-providers-googleインストールし、[その他とバージョン] フィールドに [salesforce] extra を指定します。

DAG のバックポート プロバイダ パッケージから演算子をインポートする

バックポート パッケージのコンテンツのリストを表示するには、PyPI.org のバックポート パッケージ ページに移動します。たとえば、apache-airflow-backport-providers-google のページには、このパッケージの演算子、転送、センサー、フック、シークレットが一覧表示されます。

バックポート プロバイダ パッケージには、新しい演算子や移行された演算子、その他のコンテンツが含まれます。この 2 種類のコンテンツには、後で説明するように違いがあります。

新しい演算子をインポートする

新しい演算子は Airflow 1.10.* には存在しない演算子です。こうした演算子をバックポート パッケージなしでインポートしようとすると、インポート エラーが発生します。

バックポート パッケージから新しい演算子を使用するには、PyPI.org のバックポート パッケージのページの説明に従って、対応する airflow.providers.* パッケージから演算子をインポートしてください。

次の例では、apache-airflow-backport-providers-google パッケージから新しい演算子をインポートします。

from airflow.providers.google.cloud.operators.bigquery_dts import (
    BigQueryCreateDataTransferOperator,
    BigQueryDeleteDataTransferConfigOperator,
    )

移動した演算子をインポート

移動した演算子は、Airflow 1.10.* にすでに存在します。バックポート演算子パッケージをインストールした後、2 つの異なるバージョンの演算子をインポートできます。1 つのバージョンは Airflow にバンドルされており、もう 1 つのバージョンは移動演算子です。移動演算子を使用するには、新しいインポートパスを使用してインポートします。

バックポート パッケージから移動した演算子を使用する場合は、PyPI.org のバックポート パッケージのページの説明に従って、対応する airflow.contrib.* パッケージから演算子をインポートしてください。

次の例では、apache-airflow-backport-providers-google パッケージから新しい演算子をインポートします。

from airflow.contrib.operators.bigquery_operator import (
    BigQueryCreateEmptyDatasetOperator,
    BigQueryOperator,
    )

次のステップ