Dataflow에서 Pub/Sub로 쓰기

이 문서에서는 Apache Beam PubSubIO I/O 커넥터를 사용하여 Dataflow의 텍스트 데이터를 Pub/Sub로 쓰는 방법을 설명합니다.

개요

Pub/Sub에 데이터를 쓰려면 PubSubIO 커넥터를 사용합니다. 입력 요소는 Pub/Sub 메시지나 단지 메시지 데이터일 수 있습니다. 입력 요소가 Pub/Sub 메시지이면 선택적으로 메시지마다 속성이나 순서 키를 설정할 수 있습니다.

다음과 같이 PubSubIO 커넥터의 Java, Python 또는 Go 버전을 사용할 수 있습니다.

Java

단일 주제에 쓰려면 PubsubIO.writeMessages 메서드를 호출합니다. 이 메서드는 PubsubMessage 객체의 입력 컬렉션을 사용합니다. 커넥터는 문자열, 바이너리로 인코딩된 Avro 메시지 또는 바이너리로 인코딩된 protobuf 메시지를 작성하기 위한 편의 메서드도 정의합니다. 이러한 메서드는 입력 컬렉션을 Pub/Sub 메시지로 변환합니다.

입력 데이터를 기반으로 동적 주제 세트에 쓰려면 writeMessagesDynamic을 호출합니다. 메시지에서 PubsubMessage.withTopic을 호출하여 각 메시지의 대상 주제를 지정합니다. 예를 들어 입력 데이터의 특정 필드 값에 따라 메시지를 여러 주제로 라우팅할 수 있습니다.

자세한 내용은 PubsubIO 참조 문서를 확인하세요.

Python

pubsub.WriteToPubSub 메서드를 호출합니다. 기본적으로 이 메서드는 메시지 페이로드를 나타내는 bytes 유형의 입력 컬렉션을 사용합니다. with_attributes 매개변수가 True이면 메서드는 PubsubMessage 객체의 컬렉션을 사용합니다.

자세한 내용은 pubsub 모듈 참고 문서를 참조하세요.

Go

Pub/Sub에 데이터를 쓰려면 pubsubio.Write 메서드를 호출합니다. 이 메서드는 PubSubMessage 객체 또는 메시지 페이로드가 포함된 바이트 슬라이스의 입력 컬렉션을 사용합니다.

자세한 내용은 pubsubio 패키지 참고 문서를 참조하세요.

Pub/Sub 메시지에 대한 자세한 내용은 Pub/Sub 문서의 메시지 형식을 참조하세요.

타임스탬프

Pub/Sub는 메시지마다 타임스탬프를 설정합니다. 이 타임스탬프는 메시지가 Pub/Sub에 게시된 시간을 나타냅니다. 스트리밍 시나리오에서는 메시지 데이터가 생성된 시간인 이벤트 타임스탬프도 고려할 수 있습니다. Apache Beam 요소 타임스탬프를 사용하여 이벤트 시간을 나타낼 수 있습니다. 바인딩되지 않은 PCollection을 만드는 소스에서 새 요소 각각에 이벤트 시간에 해당하는 타임스탬프를 할당하는 경우가 많습니다.

Java 및 Python의 경우 Pub/Sub I/O 커넥터는 각 요소의 타임스탬프를 Pub/Sub 메시지 속성으로 쓸 수 있습니다. 메시지 소비자는 이 속성을 사용하여 이벤트 타임스탬프를 가져올 수 있습니다.

Java

PubsubIO.Write<T>.withTimestampAttribute를 호출하고 속성 이름을 지정합니다.

Python

WriteToPubSub을 호출할 때 timestamp_attribute 매개변수를 지정합니다.

메일 전송

Dataflow는 한 파이프라인 내에서 메시지를 단 한 번 처리할 수 있습니다. 그러나 Pub/Sub I/O 커넥터는 Pub/Sub을 통해 단 한 번 메시지 전송을 보장할 수 없습니다.

Java 및 Python의 경우 각 요소의 고유 ID를 메시지 속성으로 쓰도록 Pub/Sub I/O 커넥터를 구성할 수 있습니다. 그런 다음 메시지 소비자가 이 속성을 사용하여 메시지를 중복 삭제할 수 있습니다.

Java

PubsubIO.Write<T>.withIdAttribute를 호출하고 속성 이름을 지정합니다.

Python

WriteToPubSub을 호출할 때 id_label 매개변수를 지정합니다.

예시

다음 예시에서는 Pub/Sub 메시지의 PCollection을 만들어 Pub/Sub 주제에 씁니다. 주제는 파이프라인 옵션으로 지정됩니다. 각 메시지에는 페이로드 데이터와 속성 집합이 포함됩니다.

Java

Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class PubSubWriteWithAttributes {
  public interface Options extends PipelineOptions {
    @Description("The Pub/Sub topic to write to. Format: projects/<PROJECT>/topics/<TOPIC>")
    String getTopic();

    void setTopic(String value);
  }

  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  static class ExampleData {
    public String name;
    public String product;
    public Long timestamp; // Epoch time in milliseconds

    public ExampleData() {}

    public ExampleData(String name, String product, Long timestamp) {
      this.name = name;
      this.product = product;
      this.timestamp = timestamp;
    }
  }

  // Write messages to a Pub/Sub topic.
  public static void main(String[] args) {
    // Example source data.
    final List<ExampleData> messages = Arrays.asList(
        new ExampleData("Robert", "TV", 1613141590000L),
        new ExampleData("Maria", "Phone", 1612718280000L),
        new ExampleData("Juan", "Laptop", 1611618000000L),
        new ExampleData("Rebeca", "Videogame", 1610000000000L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   ----runner=DirectRunner --topic=projects/MY_PROJECT/topics/MY_TOPIC"
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        // Create some data to write to Pub/Sub.
        .apply(Create.of(messages))
        // Convert the data to Pub/Sub messages.
        .apply(MapElements
            .into(TypeDescriptor.of(PubsubMessage.class))
            .via((message -> {
              byte[] payload = message.product.getBytes(StandardCharsets.UTF_8);
              // Create attributes for each message.
              HashMap<String, String> attributes = new HashMap<String, String>();
              attributes.put("buyer", message.name);
              attributes.put("timestamp", Long.toString(message.timestamp));
              return new PubsubMessage(payload, attributes);
            })))
        // Write the messages to Pub/Sub.
        .apply(PubsubIO.writeMessages().to(options.getTopic()));
    pipeline.run().waitUntilFinish();
  }
}

Python

Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import argparse
from typing import Any, Dict, List

import apache_beam as beam
from apache_beam.io import PubsubMessage
from apache_beam.io import WriteToPubSub
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self

def item_to_message(item: Dict[str, Any]) -> PubsubMessage:
    # Re-import needed types. When using the Dataflow runner, this
    # function executes on a worker, where the global namespace is not
    # available. For more information, see:
    # https://cloud.google.com/dataflow/docs/guides/common-errors#name-error
    from apache_beam.io import PubsubMessage

    attributes = {
        'buyer': item['name'],
        'timestamp': str(item['ts'])
    }
    data = bytes(item['product'], 'utf-8')

    return PubsubMessage(data=data, attributes=attributes)

def write_to_pubsub(argv: List[str] = None) -> None:

    # Parse the pipeline options passed into the application. Example:
    #     --topic=$TOPIC_PATH --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option to specify the Pub/Sub topic.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic", required=True)

    example_data = [
        {'name': 'Robert', 'product': 'TV', 'ts': 1613141590000},
        {'name': 'Maria', 'product': 'Phone', 'ts': 1612718280000},
        {'name': 'Juan', 'product': 'Laptop', 'ts': 1611618000000},
        {'name': 'Rebeca', 'product': 'Video game', 'ts': 1610000000000}
    ]
    options = MyOptions()

    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(example_data)
            | "Convert to Pub/Sub messages" >> beam.Map(item_to_message)
            | WriteToPubSub(
                  topic=options.topic,
                  with_attributes=True)
        )

    print('Pipeline ran successfully.')