Dataflow マネージド I/O

マネージド I/O を使用すると、Dataflow は Apache Beam パイプラインで使用される特定の I/O コネクタを管理できます。マネージド I/O は、サポートされているソースとシンクを統合するパイプラインの管理を簡素化します。

マネージド I/O は、連携して動作する次の 2 つのコンポーネントで構成されています。

  • I/O コネクタ(ソースとシンク)を作成するための共通の API を提供する Apache Beam 変換。

  • これらの I/O コネクタをユーザーに代わって管理する Dataflow サービス。Apache Beam のバージョンとは別にアップグレードする機能も含まれます。

マネージド I/O の利点は次のとおりです。

  • 自動アップグレード。Dataflow は、パイプライン内のマネージド I/O コネクタを自動的にアップグレードします。つまり、コードを変更することなく、これらのコネクタのセキュリティ修正、パフォーマンスの改善、バグの修正をパイプラインで受け取ることができます。詳細については、自動アップグレードをご覧ください。

  • 一貫性のある API。これまで、Apache Beam の I/O コネクタには個別の API があり、各コネクタは異なる方法で構成されていました。マネージド I/O は、Key-Value プロパティを使用する単一構成の API を提供するため、パイプライン コードがよりシンプルで一貫性のあるものになります。詳細については、構成 API をご覧ください。

要件

  • 次の SDK は、マネージド I/O をサポートしています。

    • Apache Beam SDK for Java バージョン 2.58.0 以降
    • Apache Beam SDK for Python バージョン 2.61.0 以降
  • バックエンド サービスには Dataflow Runner v2 が必要です。Runner v2 が有効になっていない場合でも、パイプラインは実行されますが、マネージド I/O サービスのメリットは得られません。

自動アップグレード

マネージド I/O コネクタを使用する Dataflow パイプラインでは、信頼できるコネクタの最新バージョンが自動的に使用されます。

  • ジョブを送信すると、Dataflow はテスト済みで正常に動作する最新バージョンのコネクタを使用します。

  • ストリーミング ジョブの場合、Dataflow は置換ジョブを起動するたびに更新を確認し、正常であることが確認されている最新バージョンを自動的に使用します。Dataflow は、置換ジョブのコードを変更しない場合でも、このチェックを実行します。

コネクタやパイプラインの Apache Beam バージョンを手動で更新する必要はありません。

次の図は、アップグレード プロセスを示しています。ユーザーが SDK バージョン X を使用して Apache Beam パイプラインを作成します。ユーザーがジョブを送信すると、Dataflow はマネージド I/O のバージョンを確認し、バージョン Y にアップグレードします。

マネージド I/O のアップグレード プロセスを示す図。

アップグレード プロセスにより、ジョブの起動時間が 2 分ほど長くなります。マネージド I/O オペレーションのステータスを確認するには、文字列「Managed Transform(s)」を含むログエントリを探します。

Configuration API

マネージド I/O は、ソースとシンクを構成するための一貫した API を提供する、すぐに使用可能な Apache Beam 変換です。

Java

マネージド I/O でサポートされているソースまたはシンクを作成するには、Managed クラスを使用します。インスタンス化するソースまたはシンクを指定し、次のように一連の構成パラメータを渡します。

Map config = ImmutableMap.<String, Object>builder()
    .put("config1", "abc")
    .put("config2", 1);

pipeline.apply(Managed.read(/*Which source to read*/).withConfig(config))
    .getSinglePCollection();

構成パラメータを YAML ファイルとして渡すこともできます。完全なコード例については、Apache Iceberg から読み取るをご覧ください。

Python

apache_beam.transforms.managed モジュールをインポートし、managed.Read または managed.Write メソッドを呼び出します。インスタンス化するソースまたはシンクを指定し、次のように一連の構成パラメータを渡します。

pipeline
| beam.managed.Read(
    beam.managed.SOURCE, # Example: beam.managed.KAFKA
    config={
      "config1": "abc",
      "config2": 1
    }
)

構成パラメータを YAML ファイルとして渡すこともできます。完全なコード例については、Apache Kafka から読み取るをご覧ください。

ダイナミック デスティネーション

一部のシンクでは、マネージド I/O コネクタは、受信レコードのフィールド値に基づいて宛先を動的に選択できます。

ダイナミック デスティネーションを使用するには、宛先のテンプレート文字列を指定します。テンプレート文字列には、"tables.{field1}" などのフィールド名を中かっこで囲んで含めることができます。実行時に、コネクタは受信レコードごとにフィールドの値を置き換えて、そのレコードの宛先を特定します。

たとえば、データに airport という名前のフィールドがあるとします。宛先を "flights.{airport}" に設定できます。airport=SFO の場合、レコードは flights.SFO に書き込まれます。ネストされたフィールドにはドット表記を使用します。例: {top.middle.nested}

ダイナミック デスティネーションの使用方法を示すコード例については、ダイナミック デスティネーションで書き込むをご覧ください。

フィルタリング

特定のフィールドを宛先テーブルに書き込む前にフィルタリングが必要になる場合があります。ダイナミック デスティネーションをサポートするシンクでは、この目的で dropkeeponly パラメータを使用できます。これらのパラメータを使用すると、メタデータを宛先に書き込むことなく、入力レコードに宛先メタデータを含めることができます。

シンクごとに設定できるパラメータは 1 つだけです。

構成パラメータ データ型 説明
drop 文字列のリスト 宛先に書き込む前に削除するフィールド名のリスト。
keep 文字列のリスト 宛先に書き込むときに保持するフィールド名のリスト。他のフィールドは削除されます。
only 文字列 宛先に書き込むときに最上位レコードとして使用するフィールドの名前(1 つのみ)。他のフィールドはすべて削除されます。このフィールドは行型にする必要があります。

サポートされているソースとシンク

マネージド I/O は、次のソースとシンクをサポートしています。