このドキュメントでは、Apache Beam PubSubIO
I/O コネクタを使用して、Dataflow から Pub/Sub にテキストデータを書き込む方法について説明します。
概要
Pub/Sub にデータを書き込むには、PubSubIO
コネクタを使用します。入力要素は、Pub/Sub メッセージまたは単なるメッセージ データのいずれかです。入力要素が Pub/Sub メッセージの場合、必要に応じて各メッセージに属性または順序指定キーを設定できます。
次のように、Java、Python、Go バージョンの PubSubIO
コネクタを使用できます。
Java
単一のトピックに書き込むには、PubsubIO.writeMessages
メソッドを呼び出します。このメソッドは、PubsubMessage
オブジェクトの入力コレクションを受け取ります。コネクタは、文字列、バイナリエンコードされた Avro メッセージ、またはバイナリエンコードされた protobuf メッセージを書き込むための便利なメソッドも定義します。これらのメソッドは、入力コレクションを Pub/Sub メッセージに変換します。
入力データに基づいて動的なトピックのセットに書き込むには、writeMessagesDynamic
を呼び出します。メッセージで PubsubMessage.withTopic
を呼び出し、各メッセージの宛先トピックを指定します。たとえば、入力データの特定フィールドの値に基づいて、別のトピックにメッセージをルーティングできます。
詳細については、PubsubIO
リファレンス ドキュメントをご覧ください。
Python
pubsub.WriteToPubSub
メソッドを呼び出します。
デフォルトでは、このメソッドはメッセージ ペイロードを表す bytes
型の入力コレクションを受け取ります。with_attributes
パラメータが True
の場合、メソッドは PubsubMessage
オブジェクトのコレクションを受け取ります。
詳細については、pubsub
モジュールのリファレンス ドキュメントをご覧ください。
Go
Pub/Sub にデータを書き込むには、pubsubio.Write
メソッドを呼び出します。このメソッドは、PubSubMessage
オブジェクトまたはメッセージ ペイロードを含むバイトスライスの入力コレクションを受け取ります。
詳細については、pubsubio
パッケージのリファレンス ドキュメントをご覧ください。
Pub/Sub メッセージの詳細については、Pub/Sub ドキュメントのメッセージの形式をご覧ください。
タイムスタンプ
Pub/Sub は各メッセージにタイムスタンプを設定します。このタイムスタンプは、Pub/Sub にメッセージが公開された時刻を表します。ストリーミング シナリオでは、イベント タイムスタンプ(メッセージ データが生成された時刻)も考慮する場合があります。Apache Beam 要素のタイムスタンプを使用して、イベント時刻を表すことができます。制限なし PCollection
を作成するソースは、イベント時刻に対応するタイムスタンプを新しい要素に割り当てます。
Java と Python の場合、Pub/Sub I/O コネクタは、各要素のタイムスタンプを Pub/Sub メッセージ属性として書き込むことができます。メッセージ コンシューマは、この属性を使用してイベントのタイムスタンプを取得できます。
Java
PubsubIO.Write<T>.withTimestampAttribute
を呼び出し、属性の名前を指定します。
Python
WriteToPubSub
を呼び出すときに timestamp_attribute
パラメータを指定します。
メッセージ配信
Dataflow は、パイプライン内のメッセージの 1 回限りの処理をサポートしています。ただし、Pub/Sub I/O コネクタでは、Pub/Sub を介した 1 回限りのメッセージ配信は保証できません。
Java と Python の場合、Pub/Sub I/O コネクタを構成して、各要素の一意の ID をメッセージ属性として書き込むようにできます。メッセージ コンシューマは、この属性を使用してメッセージの重複を除去できます。
Java
PubsubIO.Write<T>.withIdAttribute
を呼び出し、属性の名前を指定します。
Python
WriteToPubSub
を呼び出すときに id_label
パラメータを指定します。
例
次の例では、Pub/Sub メッセージの PCollection
を作成し、Pub/Sub トピックに書き込みます。トピックはパイプライン オプションとして指定されます。各メッセージには、ペイロード データと一連の属性が含まれています。
Java
Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
Python
Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。