将具有自定义属性的消息写入 Pub/Sub
使用集合让一切井井有条
根据您的偏好保存内容并对其进行分类。
使用 Dataflow 将具有自定义属性的消息写入 Pub/Sub
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
如未另行说明,那么本页面中的内容已根据知识共享署名 4.0 许可获得了许可,并且代码示例已根据 Apache 2.0 许可获得了许可。有关详情,请参阅 Google 开发者网站政策。Java 是 Oracle 和/或其关联公司的注册商标。
[[["易于理解","easyToUnderstand","thumb-up"],["解决了我的问题","solvedMyProblem","thumb-up"],["其他","otherUp","thumb-up"]],[["很难理解","hardToUnderstand","thumb-down"],["信息或示例代码不正确","incorrectInformationOrSampleCode","thumb-down"],["没有我需要的信息/示例","missingTheInformationSamplesINeed","thumb-down"],["翻译问题","translationIssue","thumb-down"],["其他","otherDown","thumb-down"]],[],[],[],null,["Use Dataflow to write messages with custom attributes to Pub/Sub\n\nExplore further\n\n\nFor detailed documentation that includes this code sample, see the following:\n\n- [Write from Dataflow to Pub/Sub](/dataflow/docs/guides/write-to-pubsub)\n\nCode sample \n\nJava\n\n\nTo authenticate to Dataflow, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import java.nio.charset.StandardCharsets;\n import java.util.Arrays;\n import java.util.HashMap;\n import java.util.List;\n import org.apache.beam.sdk.Pipeline;\n import org.apache.beam.sdk.coders.DefaultCoder;\n import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;\n import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;\n import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;\n import org.apache.beam.sdk.options.Description;\n import org.apache.beam.sdk.options.PipelineOptions;\n import org.apache.beam.sdk.options.PipelineOptionsFactory;\n import org.apache.beam.sdk.transforms.Create;\n import org.apache.beam.sdk.transforms.MapElements;\n import org.apache.beam.sdk.values.TypeDescriptor;\n\n\n\n public class PubSubWriteWithAttributes {\n public interface Options extends PipelineOptions {\n @Description(\"The Pub/Sub topic to write to. Format: projects/\u003cPROJECT\u003e/topics/\u003cTOPIC\u003e\")\n String getTopic();\n\n void setTopic(String value);\n }\n\n // A custom datatype for the source data.\n @DefaultCoder(AvroCoder.class)\n static class ExampleData {\n public String name;\n public String product;\n public Long timestamp; // Epoch time in milliseconds\n\n public ExampleData() {}\n\n public ExampleData(String name, String product, Long timestamp) {\n this.name = name;\n this.product = product;\n this.timestamp = timestamp;\n }\n }\n\n // Write messages to a Pub/Sub topic.\n public static void main(String[] args) {\n // Example source data.\n final List\u003cExampleData\u003e messages = Arrays.asList(\n new ExampleData(\"Robert\", \"TV\", 1613141590000L),\n new ExampleData(\"Maria\", \"Phone\", 1612718280000L),\n new ExampleData(\"Juan\", \"Laptop\", 1611618000000L),\n new ExampleData(\"Rebeca\", \"Videogame\", 1610000000000L)\n );\n\n // Parse the pipeline options passed into the application. Example:\n // --runner=DirectRunner --topic=projects/MY_PROJECT/topics/MY_TOPIC\"\n // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options\n var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);\n var pipeline = Pipeline.create(options);\n pipeline\n // Create some data to write to Pub/Sub.\n .apply(Create.of(messages))\n // Convert the data to Pub/Sub messages.\n .apply(MapElements\n .into(TypeDescriptor.of(PubsubMessage.class))\n .via((message -\u003e {\n byte[] payload = message.product.getBytes(StandardCharsets.UTF_8);\n // Create attributes for each message.\n HashMap\u003cString, String\u003e attributes = new HashMap\u003cString, String\u003e();\n attributes.put(\"buyer\", message.name);\n attributes.put(\"timestamp\", Long.toString(message.timestamp));\n return new PubsubMessage(payload, attributes);\n })))\n // Write the messages to Pub/Sub.\n .apply(PubsubIO.writeMessages().to(options.getTopic()));\n pipeline.run().waitUntilFinish();\n }\n }\n\nPython\n\n\nTo authenticate to Dataflow, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import argparse\n from typing import Any, Dict, List\n\n import apache_beam as beam\n from apache_beam.io import PubsubMessage\n from apache_beam.io import WriteToPubSub\n from apache_beam.options.pipeline_options import PipelineOptions\n\n from typing_extensions import Self\n\n\n def item_to_message(item: Dict[str, Any]) -\u003e PubsubMessage:\n # Re-import needed types. When using the Dataflow runner, this\n # function executes on a worker, where the global namespace is not\n # available. For more information, see:\n # https://cloud.google.com/dataflow/docs/guides/common-errors#name-error\n from apache_beam.io import PubsubMessage\n\n attributes = {\"buyer\": item[\"name\"], \"timestamp\": str(item[\"ts\"])}\n data = bytes(item[\"product\"], \"utf-8\")\n\n return PubsubMessage(data=data, attributes=attributes)\n\n\n def write_to_pubsub(argv: List[str] = None) -\u003e None:\n # Parse the pipeline options passed into the application. Example:\n # --topic=$TOPIC_PATH --streaming\n # For more information, see\n # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options\n class MyOptions(PipelineOptions):\n @classmethod\n # Define a custom pipeline option to specify the Pub/Sub topic.\n def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -\u003e None:\n parser.add_argument(\"--topic\", required=True)\n\n example_data = [\n {\"name\": \"Robert\", \"product\": \"TV\", \"ts\": 1613141590000},\n {\"name\": \"Maria\", \"product\": \"Phone\", \"ts\": 1612718280000},\n {\"name\": \"Juan\", \"product\": \"Laptop\", \"ts\": 1611618000000},\n {\"name\": \"Rebeca\", \"product\": \"Video game\", \"ts\": 1610000000000},\n ]\n options = MyOptions()\n\n with beam.Pipeline(options=options) as pipeline:\n (\n pipeline\n | \"Create elements\" \u003e\u003e beam.Create(example_data)\n | \"Convert to Pub/Sub messages\" \u003e\u003e beam.Map(item_to_message)\n | WriteToPubSub(topic=options.topic, with_attributes=True)\n )\n\n print(\"Pipeline ran successfully.\")\n\nWhat's next\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=dataflow)."]]