Pub/Sub에 커스텀 속성이 있는 메시지 쓰기
컬렉션을 사용해 정리하기
내 환경설정을 기준으로 콘텐츠를 저장하고 분류하세요.
Dataflow를 사용하여 Pub/Sub에 커스텀 속성이 있는 메시지 쓰기
더 살펴보기
이 코드 샘플이 포함된 자세한 문서는 다음을 참조하세요.
코드 샘플
Java
Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다.
자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.
Python
Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다.
자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.
달리 명시되지 않는 한 이 페이지의 콘텐츠에는 Creative Commons Attribution 4.0 라이선스에 따라 라이선스가 부여되며, 코드 샘플에는 Apache 2.0 라이선스에 따라 라이선스가 부여됩니다. 자세한 내용은 Google Developers 사이트 정책을 참조하세요. 자바는 Oracle 및/또는 Oracle 계열사의 등록 상표입니다.
[[["이해하기 쉬움","easyToUnderstand","thumb-up"],["문제가 해결됨","solvedMyProblem","thumb-up"],["기타","otherUp","thumb-up"]],[["이해하기 어려움","hardToUnderstand","thumb-down"],["잘못된 정보 또는 샘플 코드","incorrectInformationOrSampleCode","thumb-down"],["필요한 정보/샘플이 없음","missingTheInformationSamplesINeed","thumb-down"],["번역 문제","translationIssue","thumb-down"],["기타","otherDown","thumb-down"]],[],[],[],null,["# Write messages with custom attributes to Pub/Sub\n\nUse Dataflow to write messages with custom attributes to Pub/Sub\n\nExplore further\n---------------\n\n\nFor detailed documentation that includes this code sample, see the following:\n\n- [Write from Dataflow to Pub/Sub](/dataflow/docs/guides/write-to-pubsub)\n\nCode sample\n-----------\n\n### Java\n\n\nTo authenticate to Dataflow, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import java.nio.charset.StandardCharsets;\n import java.util.Arrays;\n import java.util.HashMap;\n import java.util.List;\n import org.apache.beam.sdk.Pipeline;\n import org.apache.beam.sdk.coders.DefaultCoder;\n import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;\n import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;\n import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;\n import org.apache.beam.sdk.options.Description;\n import org.apache.beam.sdk.options.PipelineOptions;\n import org.apache.beam.sdk.options.PipelineOptionsFactory;\n import org.apache.beam.sdk.transforms.Create;\n import org.apache.beam.sdk.transforms.MapElements;\n import org.apache.beam.sdk.values.TypeDescriptor;\n\n\n\n public class PubSubWriteWithAttributes {\n public interface Options extends PipelineOptions {\n @Description(\"The Pub/Sub topic to write to. Format: projects/\u003cPROJECT\u003e/topics/\u003cTOPIC\u003e\")\n String getTopic();\n\n void setTopic(String value);\n }\n\n // A custom datatype for the source data.\n @DefaultCoder(AvroCoder.class)\n static class ExampleData {\n public String name;\n public String product;\n public Long timestamp; // Epoch time in milliseconds\n\n public ExampleData() {}\n\n public ExampleData(String name, String product, Long timestamp) {\n this.name = name;\n this.product = product;\n this.timestamp = timestamp;\n }\n }\n\n // Write messages to a Pub/Sub topic.\n public static void main(String[] args) {\n // Example source data.\n final List\u003cExampleData\u003e messages = Arrays.asList(\n new ExampleData(\"Robert\", \"TV\", 1613141590000L),\n new ExampleData(\"Maria\", \"Phone\", 1612718280000L),\n new ExampleData(\"Juan\", \"Laptop\", 1611618000000L),\n new ExampleData(\"Rebeca\", \"Videogame\", 1610000000000L)\n );\n\n // Parse the pipeline options passed into the application. Example:\n // --runner=DirectRunner --topic=projects/MY_PROJECT/topics/MY_TOPIC\"\n // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options\n var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);\n var pipeline = Pipeline.create(options);\n pipeline\n // Create some data to write to Pub/Sub.\n .apply(Create.of(messages))\n // Convert the data to Pub/Sub messages.\n .apply(MapElements\n .into(TypeDescriptor.of(PubsubMessage.class))\n .via((message -\u003e {\n byte[] payload = message.product.getBytes(StandardCharsets.UTF_8);\n // Create attributes for each message.\n HashMap\u003cString, String\u003e attributes = new HashMap\u003cString, String\u003e();\n attributes.put(\"buyer\", message.name);\n attributes.put(\"timestamp\", Long.toString(message.timestamp));\n return new PubsubMessage(payload, attributes);\n })))\n // Write the messages to Pub/Sub.\n .apply(PubsubIO.writeMessages().to(options.getTopic()));\n pipeline.run().waitUntilFinish();\n }\n }\n\n### Python\n\n\nTo authenticate to Dataflow, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import argparse\n from typing import Any, Dict, List\n\n import apache_beam as beam\n from apache_beam.io import PubsubMessage\n from apache_beam.io import WriteToPubSub\n from apache_beam.options.pipeline_options import PipelineOptions\n\n from typing_extensions import Self\n\n\n def item_to_message(item: Dict[str, Any]) -\u003e PubsubMessage:\n # Re-import needed types. When using the Dataflow runner, this\n # function executes on a worker, where the global namespace is not\n # available. For more information, see:\n # https://cloud.google.com/dataflow/docs/guides/common-errors#name-error\n from apache_beam.io import PubsubMessage\n\n attributes = {\"buyer\": item[\"name\"], \"timestamp\": str(item[\"ts\"])}\n data = bytes(item[\"product\"], \"utf-8\")\n\n return PubsubMessage(data=data, attributes=attributes)\n\n\n def write_to_pubsub(argv: List[str] = None) -\u003e None:\n # Parse the pipeline options passed into the application. Example:\n # --topic=$TOPIC_PATH --streaming\n # For more information, see\n # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options\n class MyOptions(PipelineOptions):\n @classmethod\n # Define a custom pipeline option to specify the Pub/Sub topic.\n def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -\u003e None:\n parser.add_argument(\"--topic\", required=True)\n\n example_data = [\n {\"name\": \"Robert\", \"product\": \"TV\", \"ts\": 1613141590000},\n {\"name\": \"Maria\", \"product\": \"Phone\", \"ts\": 1612718280000},\n {\"name\": \"Juan\", \"product\": \"Laptop\", \"ts\": 1611618000000},\n {\"name\": \"Rebeca\", \"product\": \"Video game\", \"ts\": 1610000000000},\n ]\n options = MyOptions()\n\n with beam.Pipeline(options=options) as pipeline:\n (\n pipeline\n | \"Create elements\" \u003e\u003e beam.Create(example_data)\n | \"Convert to Pub/Sub messages\" \u003e\u003e beam.Map(item_to_message)\n | WriteToPubSub(topic=options.topic, with_attributes=True)\n )\n\n print(\"Pipeline ran successfully.\")\n\nWhat's next\n-----------\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=dataflow)."]]