マネージド I/O コネクタは、ソースとシンクを作成するための共通の API を提供する Apache Beam 変換です。
概要
他の I/O コネクタと同様に、Apache Beam コードを使用してマネージド I/O コネクタを作成します。インスタンス化するソースまたはシンクを指定し、一連の構成パラメータを渡します。構成パラメータを YAML ファイルに配置し、ファイルの URL を指定することもできます。
バックエンドでは、Dataflow はマネージド I/O コネクタをサービスとして扱います。これにより、Dataflow はコネクタのランタイム オペレーションを管理できます。これにより、これらの詳細を管理することではなく、パイプラインのビジネス ロジックに注力できます。
マネージド I/O API の詳細については、Apache Beam Java SDK ドキュメントの Managed
をご覧ください。
Apache Iceberg
Apache Iceberg の場合、マネージド I/O は次の構成パラメータを使用します。名前 | データ型 | 説明 |
---|---|---|
table |
文字列 | Apache Iceberg テーブルの識別子。例:
"db.table1" 。 |
catalog_name |
文字列 | カタログ名です。例: "local" 。 |
catalog_properties |
マップ | Apache Iceberg カタログの構成プロパティのマップ。必要なプロパティはカタログによって異なります。詳細については、Apache Iceberg ドキュメントの CatalogUtil をご覧ください。 |
config_properties |
マップ | オプションの Hadoop の構成プロパティのセット。詳細については、Apache Iceberg ドキュメントの CatalogUtil をご覧ください。 |
triggering_frequency_seconds |
整数 | ストリーミング書き込みパイプラインの場合、シンクがスナップショットの生成を試みる頻度(秒単位)。 |
コードの例などの詳細については、次のトピックをご覧ください。
Apache Kafka
Apache Kafka の場合、マネージド I/O は次の構成パラメータを使用します。
構成の読み取りと書き込み | データ型 | 説明 |
---|---|---|
bootstrap_servers |
文字列 | 必須。Kafka ブートストラップ サーバーのカンマ区切りのリスト。例: localhost:9092 |
topic |
文字列 | 必須。読み取る Kafka トピック。 |
file_descriptor_path |
文字列 | プロトコル バッファ ファイル記述子セットのパス。data_format が "PROTO" の場合にのみ適用されます。 |
data_format |
文字列 | メッセージの形式。サポートされる値: "AVRO" 、"JSON" 、"PROTO" 、"RAW" 。デフォルト値は "RAW" で、メッセージ ペイロードの未加工バイトの読み取りまたは書き込みを行います。 |
message_name |
文字列 | プロトコル バッファ メッセージの名前。data_format が "PROTO" の場合は必須です。 |
schema |
文字列 | Kafka メッセージ スキーマ。想定されるスキーマタイプはデータ形式によって異なります。
読み取りパイプラインの場合、 |
構成を読み取る | ||
auto_offset_reset_config |
文字列 | 初期オフセットがない場合や、Kafka サーバーに現在のオフセットが存在しなくなった場合の動作を指定します。次の値を使用できます。
デフォルト値は |
confluent_schema_registry_subject |
文字列 | Confluent スキーマ レジストリのサブジェクト。confluent_schema_registry_url が指定されている場合は必須。 |
confluent_schema_registry_url |
文字列 | Confluent スキーマ レジストリの URL。指定すると、schema パラメータは無視されます。 |
consumer_config_updates |
マップ | Kafka コンシューマーの構成パラメータを設定します。詳細については、Kafka ドキュメントのコンシューマー構成をご覧ください。このパラメータを使用して、Kafka コンシューマーをカスタマイズできます。 |
max_read_time_seconds |
整数 | 最大読み取り時間(秒)。このオプションは、境界付きの PCollection を生成します。主にテストやその他の非本番環境のシナリオを対象としています。 |
構成を書き込む | ||
producer_config_updates |
マップ | Kafka プロデューサーの構成パラメータを設定します。詳細については、Kafka ドキュメントのプロデューサー構成をご覧ください。このパラメータを使用して、Kafka プロデューサーをカスタマイズできます。 |
Avro または JSON メッセージを読み取るには、メッセージ スキーマを指定する必要があります。スキーマを直接設定するには、schema
パラメータを使用します。Confluent スキーマ レジストリを介してスキーマを指定するには、confluent_schema_registry_url
パラメータと confluent_schema_registry_subject
パラメータを設定します。
プロトコル バッファ メッセージを読み書きするには、メッセージ スキーマを指定するか、file_descriptor_path
パラメータを設定します。