Pub/Sub I/O

Pub/Sub 用の埋め込み ReadWrite 変換は、PubsubIO に含まれています。PubsubIO を使用して、Pub/Sub のトピックやサブスクリプションからデータを読み込む(またはデータを書き込む)ことができます。また、PubsubIOカスタム メッセージ ID または Pub/Sub によって割り当てられた ID に基づいてメッセージの重複を排除するので、Pub/Sub メッセージ ストリームを正確に 1 回だけ処理できます。

注: デフォルトでは、PubsubIO 変換により制限なし PCollection が作成されます。PubsubIO.Read を使用して PCollection を作成したら、その PCollectionウィンドウ処理方式を適用したうえで、要素をグループ化する変換、たとえば GroupByKeyCombine などを適用する必要があります。

Pub/Sub トピックまたはサブスクリプションを指定する

PubsubIO を使用するには、Pub/Sub トピックの名前か、特定のトピックに対して以前に作成したサブスクリプションを指定できます。トピック名付きの PubsubIO 変換を使用する場合、Dataflow は背後で自動的にサブスクリプションを作成し、管理します。

Pub/Sub トピックを指定する

PubsubIO を使用すると、Pub/Sub トピックを指定して、読み書きを行うことができます。トピック名を指定すると、そのトピックに対するサブスクリプションが Dataflow によって自動的に作成されます。Dataflow は、パイプラインが開始してからトピックの読み取りを開始します。実際のパイプラインの開始前にトピックに公開されたデータは、パイプラインで使用できません。

注: Dataflow は、パイプライン セットアップの中で、必要な Pub/Sub サブスクリプションを作成します。パイプライン セットアップ時間は、Compute Engine インスタンスの数や Cloud Platform に割り当てられたその他のリソースによって変化する可能性があるため、パイプラインが Pub/Sub トピックの読み取りをいつ開始したかを正確に判別することは困難です。データの読み取りと書き込みのタイミングを細かく制御する必要がある場合は、Pub/Sub トピックに独自のサブスクリプションを作成して管理し、そのサブスクリプションを PubsubIO に渡すことができます。詳しくは、Pub/Sub サブスクリプションを指定するをご覧ください。

指定するトピック名は projects/<Cloud Platform Project Name>/topics/<topic name> 形式に従う必要があります。その場合、プロジェクト名はトピックを所有するプロジェクトの名前です。また、トピック名は次の要件を満たす必要があります。

  • トピック名は 3~255 文字にする必要があります。
  • トピック名に使用できる文字は、文字、数字、ダッシュ(-)、下線(_)、ピリオド(.)です。
  • トピック名の先頭は文字にする必要があります。
  • トピック名の末尾は文字か数字にする必要があります。
  • トピック名に接頭辞「goog」を付けることはできません。

Pub/Sub サブスクリプションを指定する

PubsubIO 変換を使用すると、Pub/Sub サブスクリプションを指定して、特定の Pub/Sub トピックの読み書きを行うことができます。サブスクリプションを使用するには、サブスクリプションをすでに作成し、管理している必要があります。Pub/Sub サブスクリプションの作成について詳しくは、Pub/Sub サブスクライバ ドキュメントをご覧ください。

指定するサブスクリプション名は /projects/<Cloud Platform Project Name>/subscriptions/<subscription name> 形式に従う必要があります。その場合、プロジェクト名はサブスクリプションを所有するプロジェクトの名前です。また、指定するサブスクリプション名は次の要件を満たす必要があります。

  • サブスクリプション名は 3~255 文字にする必要があります。
  • サブスクリプション名に使用できる文字は、文字、数字、ダッシュ(-)、下線(_)、ピリオド(.)です。
  • サブスクリプション名の先頭は文字にする必要があります。
  • サブスクリプション名の末尾は文字か数字にする必要があります。
  • サブスクリプション名に接頭辞「goog」を付けることはできません。

パイプラインが Pub/Sub トピックのデータを確実に取得するためには、サブスクリプションを使用してください。Pub/Sub サブスクリプションは、パイプラインが読み取りを行っていなくても、データの収集を継続します。パイプラインが開始すると、パイプラインの開始前に収集されたデータも含め、サブスクリプションによって収集されたすべてのデータにアクセスできます。また、パイプラインを停止して、同じサブスクリプションを読み取る新しいパイプラインを作成しても、失われるデータはありません。サブスクリプションはパイプラインがなくてもデータの収集を継続するためです。

PubsubIO を使用して読み取る

PubsubIO.Read 変換は Pub/Sub ストリームから連続的に読み込みを行い、ストリームのデータを表す、String制限なし PCollection を返します。デフォルトでは、結果の PCollection の各要素は UTF-8 文字列としてエンコーディングされます。デフォルトのエンコーディングは、PubsubIO.Read を呼び出したとき、.withCoder を使用してオーバーライドできます。

Java

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  // streamData is Unbounded; apply windowing afterward.
  PCollection<String> streamData =
    p.apply(PubsubIO.Read.named("ReadFromPubsub")
                       .topic("/topics/my-topic"));

Pub/Sub から制限付きレコードセットを読み取る

テストを目的として、Pub/Sub から読み取ることができます。そのために使用するツールは、InProcessPipelineRunner(Dataflow SDK from Java 1.X)または DirectRunner(Dataflow SDK for Java 2.X)です。

テストを目的として制限付きコレクションを操作する必要がある場合は、読み取る入力量の制限を指定できます。.maxNumRecords オプションを使用して固定最大値レコードを読み取るか、.maxReadTime を使用して固定期間レコードを読み取るかいずれかの方法があります。

このモードでは、重複除去、再試行、および障害時のリカバリは保証されないことに注意が必要です。本番環境では、Pub/Sub を制限なしソースとして扱う必要があります。

PubsubIO を使用して書き込む

PubsubIO.Write 変換は、Pub/Sub ストリームに String オブジェクトの制限なし PCollection の書き込みを続けます。デフォルトでは、PubsubIO.Write に対する入力 PCollection は UTF-8 でエンコーディングされた文字列を含む必要があります。withCoder を使用して、予期する入力の型とエンコーディングを変更できます。

Java

  // streamData is Unbounded.
  PCollection<String> streamData = ...;
  streamData.apply(PubsubIO.Write.named("WriteToPubsub")
                       .topic("/topics/my-topic"));

タイムスタンプとレコード ID

PubsubIO タイムスタンプレコード ID を使用して、読み取りまたは書き込みするレコードに 2 種類のメタデータを追加できます。

ユーザー指定のタイムスタンプを使用する

ユーザー指定のタイムスタンプを使用すると、Pub/Sub から読み取った要素を Dataflow パイプラインのウィンドウに割り当てる方法を細かく制御できます。ユーザー指定のタイムスタンプを設定するには、PubsubIO.Read または PubsubIO.Write 変換を作成するときに timestampLabel を呼び出して、選択した文字列値を渡します。

PubsubIO.Read を使用するときに、ユーザー指定のタイムスタンプ ラベルを設定した場合、この変換では Pub/Sub の要素を読み取るときに、着信メッセージごとのタイムスタンプとしてユーザーから timestampLabel に渡された文字列の名前と一緒に属性の値を使用します。タイムスタンプは、Unix エポックからのミリ秒数の形式で指定するか、RFC 3339 に準じた形式で指定する必要があります。

PubsubIO.Write を使用するときに、ユーザー指定のタイムスタンプ ラベルを設定した場合、この変換では、各要素を Pub/Sub メッセージとして、要素のタイムスタンプ値(Unix エポックからのミリ秒数)を含む所定の名前の属性と一緒に書き込みます。

レコード ID を使用する

レコード ID を使用すると、Dataflow と他のシステムの間で正確に 1 回の処理を行うことができます。レコード ID を使用するには、PubsubIO.Read または PubsubIO.Write 変換を作成するときに、idLabel を呼び出して選択した文字列値を渡します。

PubsubIO.Read 使用時にレコード ID ラベルを設定している場合、Dataflow が同じ ID(idLabel に渡された文字列が名前と一緒に属性から読み取られる)が付いた複数のメッセージを受信すると、Dataflow は 1 つを除いてすべてのメッセージを破棄します。ただし、同じレコード ID 値のメッセージが Pub/Sub に公開された間隔が 10 分を超える場合、Dataflow はこのような重複解除を実行しません。

PubsubIO.Write を使用するときにレコード ID ラベルを設定した場合、この変換では、指定の名前と一意の値を含む属性をすべての送信メッセージに書き込みます。下流のシステムはこの一意の値を使用してメッセージを重複排除できます。