Pub/Sub I/O

Cloud Pub/Sub 用の組み込み Read 変換と Write 変換は PubsubIO に含まれています。PubsubIO を使用すると、Cloud Pub/Sub トピックまたはサブスクリプションに対するデータの読み取りまたは書き込みを行うことができます。また、PubsubIOカスタム メッセージ ID または Cloud Pub/Sub によって割り当てられた ID に基づいてメッセージの重複を排除するので、Cloud Pub/Sub メッセージ ストリームを正確に 1 回だけ処理できます。

注: デフォルトでは PubsubIO 変換により制限なし PCollection が作成されます。PubsubIO.Read を使用して PCollection を作成したら、その PCollectionウィンドウ処理方式を適用したうえで、要素をグループ化する変換(GroupByKeyCombine など)を適用する必要があります。

Cloud Pub/Sub トピックまたはサブスクリプションの指定

PubsubIO を使用するには、Cloud Pub/Sub トピックの名前か、特定のトピックに対して作成されているサブスクリプションの名前を指定します。トピック名を指定して PubsubIO 変換を使用すると、サブスクリプションが Dataflow によって自動的に作成されて管理されます。

Cloud Pub/Sub トピックの指定

PubsubIO を使用する場合は、読み取り元または書き込み先の Cloud Pub/Sub トピックを指定できます。トピック名を指定すると、そのトピックに対するサブスクリプションが Dataflow によって自動的に作成されます。Dataflow は、パイプラインが開始してからトピックの読み取りを開始します。実際のパイプラインの開始前にトピックに公開されたデータは、パイプラインで使用できません。

注: Dataflow では、必要な Cloud Pub/Sub サブスクリプションがパイプラインの設定中に作成されます。パイプラインの設定時間は Cloud Platform で割り当てられている Compute Engine インスタンスなどのリソースの数によって変化するため、パイプラインで Cloud Pub/Sub トピックからの読み取りが開始された時点を正確に把握できません。データの読み取りと書き込みのタイミングを細かく制御する必要がある場合は、Cloud Pub/Sub トピックの独自のサブスクリプションを作成して管理し、そのサブスクリプションを PubsubIO に渡してください。詳細については、Cloud Pub/Sub サブスクリプションの指定をご覧ください。

トピック名は projects/<Cloud Platform Project Name>/topics/<topic name> の形式で指定します。このプロジェクト名はトピックを所有するプロジェクトの名前です。また、トピック名は次の要件を満たす必要があります。

  • トピック名は 3~255 文字にする必要があります。
  • トピック名に使用できる文字は、文字、数字、ダッシュ(-)、下線(_)、ピリオド(.)です。
  • トピック名の先頭は文字にする必要があります。
  • トピック名の末尾は文字か数字にする必要があります。
  • トピック名に接頭辞「goog」を付けることはできません。

Cloud Pub/Sub サブスクリプションの指定

PubsubIO 変換を使用する場合は、特定の Cloud Pub/Sub トピックに対して読み取りまたは書き込みを行うときに使用する Cloud Pub/Sub サブスクリプションを指定できます。サブスクリプションを使用するには、まず自分でサブスクリプションを作成し、管理している必要があります。Cloud Pub/Sub サブスクリプションの作成について詳しくは、Cloud Pub/Sub サブスクライバー ドキュメントをご覧ください。

サブスクリプション名は /projects/<Cloud Platform Project Name>/subscriptions/<subscription name> の形式で指定します。このプロジェクト名はサブスクリプションを所有するプロジェクトの名前です。また、指定するサブスクリプション名は次の要件を満たす必要があります。

  • サブスクリプション名は 3~255 文字にする必要があります。
  • サブスクリプション名に使用できる文字は、文字、数字、ダッシュ(-)、下線(_)、ピリオド(.)です。
  • サブスクリプション名の先頭は文字にする必要があります。
  • サブスクリプション名の末尾は文字か数字にする必要があります。
  • サブスクリプション名に接頭辞「goog」を付けることはできません。

パイプラインで Pub/Sub トピックのデータを確実に取得するには、サブスクリプションを使用してください。Cloud Pub/Sub サブスクリプションでは、パイプラインによってサブスクリプションからデータが読み取られていない場合でもデータの収集は続けられます。パイプラインを起動すると、起動前に到着しているデータを含め、サブスクリプションによって収集されたすべてのデータにパイプラインからアクセスできます。また、パイプラインを停止して、同じサブスクリプションを読み取る新しいパイプラインを作成しても、失われるデータはありません。サブスクリプションはパイプラインがなくてもデータの収集を継続するためです。

PubsubIO を使用して読み取る

PubsubIO.Read 変換では Cloud Pub/Sub ストリームから連続的に読み取りが行われ、ストリームのデータを表す String制限なし PCollection が返されます。デフォルトでは、結果の PCollection の各要素は UTF-8 文字列としてエンコーディングされます。デフォルト エンコーディングをオーバーライドするには、PubsubIO.Read を呼び出すときに .withCoder を使用します。

Java

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  // streamData is Unbounded; apply windowing afterward.
  PCollection<String> streamData =
    p.apply(PubsubIO.Read.named("ReadFromPubsub")
                       .topic("/topics/my-topic"));

Cloud Pub/Sub から制限付きレコードセットを読み取る

テストを目的として Cloud Pub/Sub から読み取ることができます。そのために使用するツールは、InProcessPipelineRunner(Dataflow SDK for Java 1.X)または DirectRunner(Dataflow SDK for Java 2.X)です。

テストを目的として制限付きコレクションを操作する必要がある場合は、読み取る入力量の制限を指定することができます。.maxNumRecords オプションを使用し最大数を固定してレコードを読み取るか、.maxReadTime を使用して時間を固定してレコードを読み取ることができます。

このモードでは、重複除去、再試行、および障害時のリカバリは保証されません。本番環境では Cloud Pub/Sub を制限なしのソースとして扱う必要があります。

PubsubIO を使用して書き込む

PubsubIO.Write 変換では、String オブジェクトの制限なし PCollection が連続的に Cloud Pub/Sub ストリームに書き込まれます。デフォルトでは、PubsubIO.Write に対する入力 PCollection は UTF-8 でエンコーディングされた文字列を含む必要があります。withCoder を使用して、予期する入力の型とエンコーディングを変更できます。

Java

  // streamData is Unbounded.
  PCollection<String> streamData = ...;
  streamData.apply(PubsubIO.Write.named("WriteToPubsub")
                       .topic("/topics/my-topic"));

タイムスタンプとレコード ID

PubsubIO を使用して読み取りまたは書き込みができる 2 種類のメタデータとして、タイムスタンプレコード ID をレコードに追加できます。

ユーザー指定のタイムスタンプを使用する

ユーザー指定のタイムスタンプを使用することで、Cloud Pub/Sub から読み取った要素を Dataflow パイプライン内のウィンドウに割り当てる方法を細かく制御できます。ユーザー指定のタイムスタンプを設定するには、PubsubIO.Read または PubsubIO.Write 変換を作成するときに timestampLabel を呼び出して、選択した文字列値を渡します。

PubsubIO.Read を使用するときに、ユーザー指定のタイムスタンプ ラベルを設定した場合、この変換では Cloud Pub/Sub の要素を読み取るときに、着信メッセージごとのタイムスタンプとしてユーザーから timestampLabel に渡された文字列が名前として付けられた属性の値を使用します。タイムスタンプは、Unix エポックからのミリ秒数で指定するか、RFC 3339 形式で指定する必要があります。

PubsubIO.Write を使用するときに、ユーザー指定のタイムスタンプ ラベルを設定した場合、この変換では、各要素を Cloud Pub/Sub メッセージとして、要素のタイムスタンプ値(Unix エポックからのミリ秒数)を含む所定の名前の属性と一緒に書き込みます。

レコード ID を使用する

レコード ID を使用すると、Dataflow と他のシステムの間で正確に 1 回の処理を行うことができます。レコード ID を使用するには、PubsubIO.Read または PubsubIO.Write 変換を作成するときに、選択した文字列値を渡して idLabel を呼び出します。

PubsubIO.Read を使用するときにレコード ID ラベルを設定した場合、Dataflow が受信したメッセージの中に(ユーザーから idLabel に渡された文字列が名前として付けられた属性から読み取られる)ID が重複しているものがあると、Dataflow は 1 つのメッセージを除き、そのすべてを破棄します。ただし、Cloud Pub/Sub に公開されたメッセージのレコード ID 値が重複する場合でも、メッセージ公開の間隔が 10 分を超えていれば重複解除は行われません。

PubsubIO.Write を使用するときにレコード ID ラベルを設定した場合、この変換では、指定の名前と一意の値を含む属性をすべての送信メッセージに書き込みます。下流のシステムはこの一意の値を使用してメッセージを重複排除できます。

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。