使用 Dataflow 將 Pub/Sub 內容傳送至 Cloud Storage
透過集合功能整理內容
你可以依據偏好儲存及分類內容。
使用 Dataflow 將 Pub/Sub 訊息串流至 Cloud Storage。
深入探索
如需包含這個程式碼範例的詳細說明文件,請參閱下列內容:
程式碼範例
除非另有註明,否則本頁面中的內容是採用創用 CC 姓名標示 4.0 授權,程式碼範例則為阿帕契 2.0 授權。詳情請參閱《Google Developers 網站政策》。Java 是 Oracle 和/或其關聯企業的註冊商標。
[[["容易理解","easyToUnderstand","thumb-up"],["確實解決了我的問題","solvedMyProblem","thumb-up"],["其他","otherUp","thumb-up"]],[["難以理解","hardToUnderstand","thumb-down"],["資訊或程式碼範例有誤","incorrectInformationOrSampleCode","thumb-down"],["缺少我需要的資訊/範例","missingTheInformationSamplesINeed","thumb-down"],["翻譯問題","translationIssue","thumb-down"],["其他","otherDown","thumb-down"]],[],[],[],null,["# Pub/Sub to Cloud Storage using Dataflow\n\nStream Pub/Sub messages to Cloud Storage using Dataflow.\n\nExplore further\n---------------\n\n\nFor detailed documentation that includes this code sample, see the following:\n\n- [Stream messages from Pub/Sub by using Dataflow and Cloud Storage](/pubsub/docs/stream-messages-dataflow)\n\nCode sample\n-----------\n\n### Java\n\n\nBefore trying this sample, follow the Java setup instructions in the\n[Pub/Sub quickstart using\nclient libraries](/pubsub/docs/quickstart-client-libraries).\n\n\nFor more information, see the\n[Pub/Sub Java API\nreference documentation](/java/docs/reference/google-cloud-pubsub/latest/overview).\n\n\nTo authenticate to Pub/Sub, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n\n import java.io.IOException;\n import org.apache.beam.examples.common.WriteOneFilePerWindow;\n import org.apache.beam.sdk.Pipeline;\n import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;\n import org.apache.beam.sdk.options.Default;\n import org.apache.beam.sdk.options.Description;\n import org.apache.beam.sdk.options.PipelineOptionsFactory;\n import org.apache.beam.sdk.options.StreamingOptions;\n import org.apache.beam.sdk.options.Validation.Required;\n import org.apache.beam.sdk.transforms.windowing.FixedWindows;\n import org.apache.beam.sdk.transforms.windowing.Window;\n import org.joda.time.Duration;\n\n public class PubSubToGcs {\n /*\n * Define your own configuration options. Add your own arguments to be processed\n * by the command-line parser, and specify default values for them.\n */\n public interface PubSubToGcsOptions extends StreamingOptions {\n @Description(\"The Cloud Pub/Sub topic to read from.\")\n @Required\n String getInputTopic();\n\n void setInputTopic(String value);\n\n @Description(\"Output file's window size in number of minutes.\")\n @Default.Integer(1)\n Integer getWindowSize();\n\n void setWindowSize(Integer value);\n\n @Description(\"Path of the output file including its filename prefix.\")\n @Required\n String getOutput();\n\n void setOutput(String value);\n }\n\n public static void main(String[] args) throws IOException {\n // The maximum number of shards when writing output.\n int numShards = 1;\n\n PubSubToGcsOptions options =\n PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);\n\n options.setStreaming(true);\n\n Pipeline pipeline = Pipeline.create(options);\n\n pipeline\n // 1) Read string messages from a Pub/Sub topic.\n .apply(\"Read PubSub Messages\", PubsubIO.readStrings().fromTopic(options.getInputTopic()))\n // 2) Group the messages into fixed-sized minute intervals.\n .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))\n // 3) Write one file to GCS for every window of messages.\n .apply(\"Write Files to GCS\", new WriteOneFilePerWindow(options.getOutput(), numShards));\n\n // Execute the pipeline and wait until it finishes running.\n pipeline.run().waitUntilFinish();\n }\n }\n\n### Python\n\n\nBefore trying this sample, follow the Python setup instructions in the\n[Pub/Sub quickstart using\nclient libraries](/pubsub/docs/quickstart-client-libraries).\n\n\nFor more information, see the\n[Pub/Sub Python API\nreference documentation](/python/docs/reference/pubsub/latest).\n\n\nTo authenticate to Pub/Sub, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import argparse\n from datetime import datetime\n import logging\n import random\n\n from apache_beam import (\n DoFn,\n GroupByKey,\n io,\n ParDo,\n Pipeline,\n PTransform,\n WindowInto,\n WithKeys,\n )\n from apache_beam.options.pipeline_options import PipelineOptions\n from apache_beam.transforms.window import FixedWindows\n\n\n class GroupMessagesByFixedWindows(PTransform):\n \"\"\"A composite transform that groups Pub/Sub messages based on publish time\n and outputs a list of tuples, each containing a message and its publish time.\n \"\"\"\n\n def __init__(self, window_size, num_shards=5):\n # Set window size to 60 seconds.\n self.window_size = int(window_size * 60)\n self.num_shards = num_shards\n\n def expand(self, pcoll):\n return (\n pcoll\n # Bind window info to each element using element timestamp (or publish time).\n | \"Window into fixed intervals\"\n \u003e\u003e WindowInto(FixedWindows(self.window_size))\n | \"Add timestamp to windowed elements\" \u003e\u003e ParDo(AddTimestamp())\n # Assign a random key to each windowed element based on the number of shards.\n | \"Add key\" \u003e\u003e WithKeys(lambda _: random.randint(0, self.num_shards - 1))\n # Group windowed elements by key. All the elements in the same window must fit\n # memory for this. If not, you need to use `beam.util.BatchElements`.\n | \"Group by key\" \u003e\u003e GroupByKey()\n )\n\n\n class AddTimestamp(DoFn):\n def process(self, element, publish_time=DoFn.TimestampParam):\n \"\"\"Processes each windowed element by extracting the message body and its\n publish time into a tuple.\n \"\"\"\n yield (\n element.decode(\"utf-8\"),\n datetime.utcfromtimestamp(float(publish_time)).strftime(\n \"%Y-%m-%d %H:%M:%S.%f\"\n ),\n )\n\n\n class WriteToGCS(DoFn):\n def __init__(self, output_path):\n self.output_path = output_path\n\n def process(self, key_value, window=DoFn.WindowParam):\n \"\"\"Write messages in a batch to Google Cloud Storage.\"\"\"\n\n ts_format = \"%H:%M\"\n window_start = window.start.to_utc_datetime().strftime(ts_format)\n window_end = window.end.to_utc_datetime().strftime(ts_format)\n shard_id, batch = key_value\n filename = \"-\".join([self.output_path, window_start, window_end, str(shard_id)])\n\n with io.gcsio.GcsIO().open(filename=filename, mode=\"w\") as f:\n for message_body, publish_time in batch:\n f.write(f\"{message_body},{publish_time}\\n\".encode())\n\n\n def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):\n # Set `save_main_session` to True so DoFns can access globally imported modules.\n pipeline_options = PipelineOptions(\n pipeline_args, streaming=True, save_main_session=True\n )\n\n with Pipeline(options=pipeline_options) as pipeline:\n (\n pipeline\n # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam\n # binds the publish time returned by the Pub/Sub server for each message\n # to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.\n # https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub\n | \"Read from Pub/Sub\" \u003e\u003e io.ReadFromPubSub(topic=input_topic)\n | \"Window into\" \u003e\u003e GroupMessagesByFixedWindows(window_size, num_shards)\n | \"Write to GCS\" \u003e\u003e ParDo(WriteToGCS(output_path))\n )\n\n\n if __name__ == \"__main__\":\n logging.getLogger().setLevel(logging.INFO)\n\n parser = argparse.ArgumentParser()\n parser.add_argument(\n \"--input_topic\",\n help=\"The Cloud Pub/Sub topic to read from.\"\n '\"projects/\u003cPROJECT_ID\u003e/topics/\u003cTOPIC_ID\u003e\".',\n )\n parser.add_argument(\n \"--window_size\",\n type=float,\n default=1.0,\n help=\"Output file's window size in minutes.\",\n )\n parser.add_argument(\n \"--output_path\",\n help=\"Path of the output GCS file including the prefix.\",\n )\n parser.add_argument(\n \"--num_shards\",\n type=int,\n default=5,\n help=\"Number of shards to use when writing windowed elements to GCS.\",\n )\n known_args, pipeline_args = parser.parse_known_args()\n\n run(\n known_args.input_topic,\n known_args.output_path,\n known_args.window_size,\n known_args.num_shards,\n pipeline_args,\n )\n\nWhat's next\n-----------\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=pubsub)."]]