Stream Pub/Sub Lite to Cloud Storage using Dataflow
Stay organized with collections
Save and categorize content based on your preferences.
This sample shows how to create an Apache Beam streaming pipeline that reads messages from Pub/Sub Lite, group the messages using a fixed-sized windowing function, and writes them to Cloud Storage.
Explore further
For detailed documentation that includes this code sample, see the following:
Code sample
Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
[[["Easy to understand","easyToUnderstand","thumb-up"],["Solved my problem","solvedMyProblem","thumb-up"],["Other","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["Other","otherDown","thumb-down"]],[],[],[],null,["# Stream Pub/Sub Lite to Cloud Storage using Dataflow\n\nThis sample shows how to create an Apache Beam streaming pipeline that reads messages from Pub/Sub Lite, group the messages using a fixed-sized windowing function, and writes them to Cloud Storage.\n\nExplore further\n---------------\n\n\nFor detailed documentation that includes this code sample, see the following:\n\n- [Stream Pub/Sub Lite messages by using Dataflow](/pubsub/lite/docs/stream-messages-dataflow)\n\nCode sample\n-----------\n\n### Java\n\n\nTo authenticate to Pub/Sub Lite, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.SubscriptionPath.html;\n import com.google.cloud.pubsublite.proto.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.SequencedMessage.html;\n import org.apache.beam.examples.common.WriteOneFilePerWindow;\n import org.apache.beam.sdk.Pipeline;\n import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO;\n import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;\n import org.apache.beam.sdk.options.Default;\n import org.apache.beam.sdk.options.Description;\n import org.apache.beam.sdk.options.PipelineOptionsFactory;\n import org.apache.beam.sdk.options.StreamingOptions;\n import org.apache.beam.sdk.options.Validation.Required;\n import org.apache.beam.sdk.transforms.MapElements;\n import org.apache.beam.sdk.transforms.windowing.FixedWindows;\n import org.apache.beam.sdk.transforms.windowing.Window;\n import org.apache.beam.sdk.values.TypeDescriptors;\n import org.joda.time.Duration;\n import org.slf4j.Logger;\n import org.slf4j.LoggerFactory;\n\n public class PubsubliteToGcs {\n /*\n * Define your own configuration options. Add your arguments to be processed\n * by the command-line parser.\n */\n public interface PubsubliteToGcsOptions extends StreamingOptions {\n @Description(\"Your Pub/Sub Lite subscription.\")\n @Required\n String getSubscription();\n\n void setSubscription(String value);\n\n @Description(\"Window size of output files in minutes.\")\n @Default.Integer(1)\n Integer getWindowSize();\n\n void setWindowSize(Integer value);\n\n @Description(\"Filename prefix of output files.\")\n @Required\n String getOutput();\n\n void setOutput(String value);\n }\n\n private static final Logger LOG = LoggerFactory.getLogger(PubsubliteToGcs.class);\n\n public static void main(String[] args) throws InterruptedException {\n // The maximum number of shards when writing output files.\n int numShards = 1;\n\n PubsubliteToGcsOptions options =\n PipelineOptionsFactory.fromArgs(args).withValidation().as(PubsubliteToGcsOptions.class);\n\n options.setStreaming(true);\n\n SubscriberOptions subscriberOptions =\n SubscriberOptions.newBuilder()\n .setSubscriptionPath(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.SubscriptionPath.html.parse(options.getSubscription()))\n .build();\n\n Pipeline pipeline = Pipeline.create(options);\n pipeline\n .apply(\"Read From Pub/Sub Lite\", PubsubLiteIO.read(subscriberOptions))\n .apply(\n \"Convert messages\",\n MapElements.into(TypeDescriptors.strings())\n .via(\n (SequencedMessage sequencedMessage) -\u003e {\n String data = sequencedMessage.getMessage().getData().toStringUtf8();\n LOG.info(\"Received: \" + data);\n long publishTime = sequencedMessage.getPublishTime().getSeconds();\n return data + \"\\t\" + publishTime;\n }))\n .apply(\n \"Apply windowing function\",\n Window\n // Group the elements using fixed-sized time intervals based on the element\n // timestamp (using the default event time trigger). The element timestamp\n // is the publish timestamp associated with a message.\n //\n // NOTE: If data is not being continuously ingested, such as with a batch or\n // intermittent publisher, the final window will never close as the watermark\n // will not advance. If this is a possibility with your pipeline, you should\n // add an additional processing time trigger to force window closure after\n // enough time has passed. See\n // https://beam.apache.org/documentation/programming-guide/#triggers\n // for more information.\n .\u003cString\u003einto(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))\n .apply(\"Write elements to GCS\", new WriteOneFilePerWindow(options.getOutput(), numShards));\n\n // Execute the pipeline. You may add `.waitUntilFinish()` to observe logs in your console, but\n // `waitUntilFinish()` will not work in Dataflow Flex Templates.\n pipeline.run();\n }\n }\n\nWhat's next\n-----------\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=pubsublite)."]]