Dataflow から Pub/Sub に書き込む

このドキュメントでは、Apache Beam PubSubIO I/O コネクタを使用して、Dataflow から Pub/Sub にテキストデータを書き込む方法について説明します。

概要

Pub/Sub にデータを書き込むには、PubSubIO コネクタを使用します。入力要素は、Pub/Sub メッセージまたは単なるメッセージ データのいずれかです。入力要素が Pub/Sub メッセージの場合、必要に応じて各メッセージに属性または順序指定キーを設定できます。

次のように、Java、Python、Go バージョンの PubSubIO コネクタを使用できます。

Java

単一のトピックに書き込むには、PubsubIO.writeMessages メソッドを呼び出します。このメソッドは、PubsubMessage オブジェクトの入力コレクションを受け取ります。コネクタは、文字列、バイナリエンコードされた Avro メッセージ、またはバイナリエンコードされた protobuf メッセージを書き込むための便利なメソッドも定義します。これらのメソッドは、入力コレクションを Pub/Sub メッセージに変換します。

入力データに基づいて動的なトピックのセットに書き込むには、writeMessagesDynamic を呼び出します。メッセージで PubsubMessage.withTopic を呼び出し、各メッセージの宛先トピックを指定します。たとえば、入力データの特定フィールドの値に基づいて、別のトピックにメッセージをルーティングできます。

詳細については、PubsubIO リファレンス ドキュメントをご覧ください。

Python

pubsub.WriteToPubSub メソッドを呼び出します。 デフォルトでは、このメソッドはメッセージ ペイロードを表す bytes 型の入力コレクションを受け取ります。with_attributes パラメータが True の場合、メソッドは PubsubMessage オブジェクトのコレクションを受け取ります。

詳細については、pubsub モジュールのリファレンス ドキュメントをご覧ください。

Go

Pub/Sub にデータを書き込むには、pubsubio.Write メソッドを呼び出します。このメソッドは、PubSubMessage オブジェクトまたはメッセージ ペイロードを含むバイトスライスの入力コレクションを受け取ります。

詳細については、pubsubio パッケージのリファレンス ドキュメントをご覧ください。

Pub/Sub メッセージの詳細については、Pub/Sub ドキュメントのメッセージの形式をご覧ください。

タイムスタンプ

Pub/Sub は各メッセージにタイムスタンプを設定します。このタイムスタンプは、Pub/Sub にメッセージが公開された時刻を表します。ストリーミング シナリオでは、イベント タイムスタンプ(メッセージ データが生成された時刻)も考慮する場合があります。Apache Beam 要素のタイムスタンプを使用して、イベント時刻を表すことができます。制限なし PCollection を作成するソースは、イベント時刻に対応するタイムスタンプを新しい要素に割り当てます。

Java と Python の場合、Pub/Sub I/O コネクタは、各要素のタイムスタンプを Pub/Sub メッセージ属性として書き込むことができます。メッセージ コンシューマは、この属性を使用してイベントのタイムスタンプを取得できます。

Java

PubsubIO.Write<T>.withTimestampAttribute を呼び出し、属性の名前を指定します。

Python

WriteToPubSub を呼び出すときに timestamp_attribute パラメータを指定します。

メッセージ配信

Dataflow は、パイプライン内のメッセージの 1 回限りの処理をサポートしています。ただし、Pub/Sub I/O コネクタでは、Pub/Sub を介した 1 回限りのメッセージ配信は保証できません。

Java と Python の場合、Pub/Sub I/O コネクタを構成して、各要素の一意の ID をメッセージ属性として書き込むようにできます。メッセージ コンシューマは、この属性を使用してメッセージの重複を除去できます。

Java

PubsubIO.Write<T>.withIdAttribute を呼び出し、属性の名前を指定します。

Python

WriteToPubSub を呼び出すときに id_label パラメータを指定します。

次の例では、Pub/Sub メッセージの PCollection を作成し、Pub/Sub トピックに書き込みます。トピックはパイプライン オプションとして指定されます。各メッセージには、ペイロード データと一連の属性が含まれています。

Java

Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;



public class PubSubWriteWithAttributes {
  public interface Options extends PipelineOptions {
    @Description("The Pub/Sub topic to write to. Format: projects/<PROJECT>/topics/<TOPIC>")
    String getTopic();

    void setTopic(String value);
  }

  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  static class ExampleData {
    public String name;
    public String product;
    public Long timestamp; // Epoch time in milliseconds

    public ExampleData() {}

    public ExampleData(String name, String product, Long timestamp) {
      this.name = name;
      this.product = product;
      this.timestamp = timestamp;
    }
  }

  // Write messages to a Pub/Sub topic.
  public static void main(String[] args) {
    // Example source data.
    final List<ExampleData> messages = Arrays.asList(
        new ExampleData("Robert", "TV", 1613141590000L),
        new ExampleData("Maria", "Phone", 1612718280000L),
        new ExampleData("Juan", "Laptop", 1611618000000L),
        new ExampleData("Rebeca", "Videogame", 1610000000000L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   ----runner=DirectRunner --topic=projects/MY_PROJECT/topics/MY_TOPIC"
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        // Create some data to write to Pub/Sub.
        .apply(Create.of(messages))
        // Convert the data to Pub/Sub messages.
        .apply(MapElements
            .into(TypeDescriptor.of(PubsubMessage.class))
            .via((message -> {
              byte[] payload = message.product.getBytes(StandardCharsets.UTF_8);
              // Create attributes for each message.
              HashMap<String, String> attributes = new HashMap<String, String>();
              attributes.put("buyer", message.name);
              attributes.put("timestamp", Long.toString(message.timestamp));
              return new PubsubMessage(payload, attributes);
            })))
        // Write the messages to Pub/Sub.
        .apply(PubsubIO.writeMessages().to(options.getTopic()));
    pipeline.run().waitUntilFinish();
  }
}

Python

Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import argparse
from typing import Any, Dict, List

import apache_beam as beam
from apache_beam.io import PubsubMessage
from apache_beam.io import WriteToPubSub
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def item_to_message(item: Dict[str, Any]) -> PubsubMessage:
    # Re-import needed types. When using the Dataflow runner, this
    # function executes on a worker, where the global namespace is not
    # available. For more information, see:
    # https://cloud.google.com/dataflow/docs/guides/common-errors#name-error
    from apache_beam.io import PubsubMessage

    attributes = {
        'buyer': item['name'],
        'timestamp': str(item['ts'])
    }
    data = bytes(item['product'], 'utf-8')

    return PubsubMessage(data=data, attributes=attributes)


def write_to_pubsub(argv: List[str] = None) -> None:

    # Parse the pipeline options passed into the application. Example:
    #     --topic=$TOPIC_PATH --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option to specify the Pub/Sub topic.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic", required=True)

    example_data = [
        {'name': 'Robert', 'product': 'TV', 'ts': 1613141590000},
        {'name': 'Maria', 'product': 'Phone', 'ts': 1612718280000},
        {'name': 'Juan', 'product': 'Laptop', 'ts': 1611618000000},
        {'name': 'Rebeca', 'product': 'Video game', 'ts': 1610000000000}
    ]
    options = MyOptions()

    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(example_data)
            | "Convert to Pub/Sub messages" >> beam.Map(item_to_message)
            | WriteToPubSub(
                  topic=options.topic,
                  with_attributes=True)
        )

    print('Pipeline ran successfully.')