このドキュメントでは、Apache Beam PubSubIO
I/O コネクタを使用して、Dataflow から Pub/Sub にテキストデータを書き込む方法について説明します。
概要
Pub/Sub にデータを書き込むには、PubSubIO
コネクタを使用します。入力要素は、Pub/Sub メッセージまたは単なるメッセージ データのいずれかです。入力要素が Pub/Sub メッセージの場合、必要に応じて各メッセージに属性または順序指定キーを設定できます。
次のように、Java、Python、Go バージョンの PubSubIO
コネクタを使用できます。
単一のトピックに書き込むには、PubsubIO.writeMessages
メソッドを呼び出します。このメソッドは、PubsubMessage
オブジェクトの入力コレクションを受け取ります。コネクタは、文字列、バイナリエンコードされた Avro メッセージ、またはバイナリエンコードされた protobuf メッセージを書き込むための便利なメソッドも定義します。これらのメソッドは、入力コレクションを Pub/Sub メッセージに変換します。
入力データに基づいて動的なトピックのセットに書き込むには、writeMessagesDynamic
を呼び出します。メッセージで PubsubMessage.withTopic
を呼び出し、各メッセージの宛先トピックを指定します。たとえば、入力データの特定フィールドの値に基づいて、別のトピックにメッセージをルーティングできます。
詳細については、PubsubIO
リファレンス ドキュメントをご覧ください。
pubsub.WriteToPubSub
メソッドを呼び出します。
デフォルトでは、このメソッドはメッセージ ペイロードを表す bytes
型の入力コレクションを受け取ります。with_attributes
パラメータが True
の場合、メソッドは PubsubMessage
オブジェクトのコレクションを受け取ります。
詳細については、pubsub
モジュールのリファレンス ドキュメントをご覧ください。
Pub/Sub にデータを書き込むには、pubsubio.Write
メソッドを呼び出します。このメソッドは、PubSubMessage
オブジェクトまたはメッセージ ペイロードを含むバイトスライスの入力コレクションを受け取ります。
詳細については、pubsubio
パッケージのリファレンス ドキュメントをご覧ください。
Pub/Sub メッセージの詳細については、Pub/Sub ドキュメントのメッセージの形式をご覧ください。
タイムスタンプ
Pub/Sub は各メッセージにタイムスタンプを設定します。このタイムスタンプは、Pub/Sub にメッセージが公開された時刻を表します。ストリーミング シナリオでは、イベント タイムスタンプ(メッセージ データが生成された時刻)も考慮する場合があります。Apache Beam 要素のタイムスタンプを使用して、イベント時刻を表すことができます。制限なし PCollection
を作成するソースは、イベント時刻に対応するタイムスタンプを新しい要素に割り当てます。
Java と Python の場合、Pub/Sub I/O コネクタは、各要素のタイムスタンプを Pub/Sub メッセージ属性として書き込むことができます。メッセージ コンシューマは、この属性を使用してイベントのタイムスタンプを取得できます。
PubsubIO.Write<T>.withTimestampAttribute
を呼び出し、属性の名前を指定します。
WriteToPubSub
を呼び出すときに timestamp_attribute
パラメータを指定します。
メッセージ配信
Dataflow は、パイプライン内のメッセージの 1 回限りの処理をサポートしています。ただし、Pub/Sub I/O コネクタでは、Pub/Sub を介した 1 回限りのメッセージ配信は保証できません。
Java と Python の場合、Pub/Sub I/O コネクタを構成して、各要素の一意の ID をメッセージ属性として書き込むようにできます。メッセージ コンシューマは、この属性を使用してメッセージの重複を除去できます。
PubsubIO.Write<T>.withIdAttribute
を呼び出し、属性の名前を指定します。
WriteToPubSub
を呼び出すときに id_label
パラメータを指定します。
直接出力
パイプラインで1 回以上のストリーミング モードを有効にすると、I/O コネクタは直接出力を使用します。このモードでは、コネクタはメッセージをチェックポイントに保存しないため、書き込みが高速化されます。ただし、このモードで再試行すると、メッセージ ID が異なる重複メッセージが発生し、メッセージ消費者がメッセージを重複除去しにくくなる可能性があります。
exactly-once モードを使用するパイプラインの場合、streaming_enable_pubsub_direct_output
サービス オプションを設定することで、直接出力を有効にできます。直接出力により、書き込みレイテンシが短縮され、処理が効率化されます。メッセージ消費者が一意ではないメッセージ ID を持つ重複メッセージを処理できる場合は、このオプションを検討してください。
例
次の例では、Pub/Sub メッセージの PCollection
を作成し、Pub/Sub トピックに書き込みます。トピックはパイプライン オプションとして指定されます。各メッセージには、ペイロード データと一連の属性が含まれています。
Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
Dataflow で認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。