Pub/Sub to Cloud Storage using Dataflow
Stay organized with collections
Save and categorize content based on your preferences.
Stream Pub/Sub messages to Cloud Storage using Dataflow.
Explore further
For detailed documentation that includes this code sample, see the following:
Code sample
Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
[[["Easy to understand","easyToUnderstand","thumb-up"],["Solved my problem","solvedMyProblem","thumb-up"],["Other","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["Other","otherDown","thumb-down"]],[],[],[],null,["# Pub/Sub to Cloud Storage using Dataflow\n\nStream Pub/Sub messages to Cloud Storage using Dataflow.\n\nExplore further\n---------------\n\n\nFor detailed documentation that includes this code sample, see the following:\n\n- [Stream messages from Pub/Sub by using Dataflow and Cloud Storage](/pubsub/docs/stream-messages-dataflow)\n\nCode sample\n-----------\n\n### Java\n\n\nBefore trying this sample, follow the Java setup instructions in the\n[Pub/Sub quickstart using\nclient libraries](/pubsub/docs/quickstart-client-libraries).\n\n\nFor more information, see the\n[Pub/Sub Java API\nreference documentation](/java/docs/reference/google-cloud-pubsub/latest/overview).\n\n\nTo authenticate to Pub/Sub, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n\n import java.io.IOException;\n import org.apache.beam.examples.common.WriteOneFilePerWindow;\n import org.apache.beam.sdk.Pipeline;\n import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;\n import org.apache.beam.sdk.options.Default;\n import org.apache.beam.sdk.options.Description;\n import org.apache.beam.sdk.options.PipelineOptionsFactory;\n import org.apache.beam.sdk.options.StreamingOptions;\n import org.apache.beam.sdk.options.Validation.Required;\n import org.apache.beam.sdk.transforms.windowing.FixedWindows;\n import org.apache.beam.sdk.transforms.windowing.Window;\n import org.joda.time.Duration;\n\n public class PubSubToGcs {\n /*\n * Define your own configuration options. Add your own arguments to be processed\n * by the command-line parser, and specify default values for them.\n */\n public interface PubSubToGcsOptions extends StreamingOptions {\n @Description(\"The Cloud Pub/Sub topic to read from.\")\n @Required\n String getInputTopic();\n\n void setInputTopic(String value);\n\n @Description(\"Output file's window size in number of minutes.\")\n @Default.Integer(1)\n Integer getWindowSize();\n\n void setWindowSize(Integer value);\n\n @Description(\"Path of the output file including its filename prefix.\")\n @Required\n String getOutput();\n\n void setOutput(String value);\n }\n\n public static void main(String[] args) throws IOException {\n // The maximum number of shards when writing output.\n int numShards = 1;\n\n PubSubToGcsOptions options =\n PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);\n\n options.setStreaming(true);\n\n Pipeline pipeline = Pipeline.create(options);\n\n pipeline\n // 1) Read string messages from a Pub/Sub topic.\n .apply(\"Read PubSub Messages\", PubsubIO.readStrings().fromTopic(options.getInputTopic()))\n // 2) Group the messages into fixed-sized minute intervals.\n .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))\n // 3) Write one file to GCS for every window of messages.\n .apply(\"Write Files to GCS\", new WriteOneFilePerWindow(options.getOutput(), numShards));\n\n // Execute the pipeline and wait until it finishes running.\n pipeline.run().waitUntilFinish();\n }\n }\n\n### Python\n\n\nBefore trying this sample, follow the Python setup instructions in the\n[Pub/Sub quickstart using\nclient libraries](/pubsub/docs/quickstart-client-libraries).\n\n\nFor more information, see the\n[Pub/Sub Python API\nreference documentation](/python/docs/reference/pubsub/latest).\n\n\nTo authenticate to Pub/Sub, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import argparse\n from datetime import datetime\n import logging\n import random\n\n from apache_beam import (\n DoFn,\n GroupByKey,\n io,\n ParDo,\n Pipeline,\n PTransform,\n WindowInto,\n WithKeys,\n )\n from apache_beam.options.pipeline_options import PipelineOptions\n from apache_beam.transforms.window import FixedWindows\n\n\n class GroupMessagesByFixedWindows(PTransform):\n \"\"\"A composite transform that groups Pub/Sub messages based on publish time\n and outputs a list of tuples, each containing a message and its publish time.\n \"\"\"\n\n def __init__(self, window_size, num_shards=5):\n # Set window size to 60 seconds.\n self.window_size = int(window_size * 60)\n self.num_shards = num_shards\n\n def expand(self, pcoll):\n return (\n pcoll\n # Bind window info to each element using element timestamp (or publish time).\n | \"Window into fixed intervals\"\n \u003e\u003e WindowInto(FixedWindows(self.window_size))\n | \"Add timestamp to windowed elements\" \u003e\u003e ParDo(AddTimestamp())\n # Assign a random key to each windowed element based on the number of shards.\n | \"Add key\" \u003e\u003e WithKeys(lambda _: random.randint(0, self.num_shards - 1))\n # Group windowed elements by key. All the elements in the same window must fit\n # memory for this. If not, you need to use `beam.util.BatchElements`.\n | \"Group by key\" \u003e\u003e GroupByKey()\n )\n\n\n class AddTimestamp(DoFn):\n def process(self, element, publish_time=DoFn.TimestampParam):\n \"\"\"Processes each windowed element by extracting the message body and its\n publish time into a tuple.\n \"\"\"\n yield (\n element.decode(\"utf-8\"),\n datetime.utcfromtimestamp(float(publish_time)).strftime(\n \"%Y-%m-%d %H:%M:%S.%f\"\n ),\n )\n\n\n class WriteToGCS(DoFn):\n def __init__(self, output_path):\n self.output_path = output_path\n\n def process(self, key_value, window=DoFn.WindowParam):\n \"\"\"Write messages in a batch to Google Cloud Storage.\"\"\"\n\n ts_format = \"%H:%M\"\n window_start = window.start.to_utc_datetime().strftime(ts_format)\n window_end = window.end.to_utc_datetime().strftime(ts_format)\n shard_id, batch = key_value\n filename = \"-\".join([self.output_path, window_start, window_end, str(shard_id)])\n\n with io.gcsio.GcsIO().open(filename=filename, mode=\"w\") as f:\n for message_body, publish_time in batch:\n f.write(f\"{message_body},{publish_time}\\n\".encode())\n\n\n def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):\n # Set `save_main_session` to True so DoFns can access globally imported modules.\n pipeline_options = PipelineOptions(\n pipeline_args, streaming=True, save_main_session=True\n )\n\n with Pipeline(options=pipeline_options) as pipeline:\n (\n pipeline\n # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam\n # binds the publish time returned by the Pub/Sub server for each message\n # to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.\n # https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub\n | \"Read from Pub/Sub\" \u003e\u003e io.ReadFromPubSub(topic=input_topic)\n | \"Window into\" \u003e\u003e GroupMessagesByFixedWindows(window_size, num_shards)\n | \"Write to GCS\" \u003e\u003e ParDo(WriteToGCS(output_path))\n )\n\n\n if __name__ == \"__main__\":\n logging.getLogger().setLevel(logging.INFO)\n\n parser = argparse.ArgumentParser()\n parser.add_argument(\n \"--input_topic\",\n help=\"The Cloud Pub/Sub topic to read from.\"\n '\"projects/\u003cPROJECT_ID\u003e/topics/\u003cTOPIC_ID\u003e\".',\n )\n parser.add_argument(\n \"--window_size\",\n type=float,\n default=1.0,\n help=\"Output file's window size in minutes.\",\n )\n parser.add_argument(\n \"--output_path\",\n help=\"Path of the output GCS file including the prefix.\",\n )\n parser.add_argument(\n \"--num_shards\",\n type=int,\n default=5,\n help=\"Number of shards to use when writing windowed elements to GCS.\",\n )\n known_args, pipeline_args = parser.parse_known_args()\n\n run(\n known_args.input_topic,\n known_args.output_path,\n known_args.window_size,\n known_args.num_shards,\n pipeline_args,\n )\n\nWhat's next\n-----------\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=pubsub)."]]