Using Pub/Sub Lite with Dataflow

As an alternative to writing and running your own data processing programs, you can use Dataflow with the Pub/Sub Lite I/O connector for Apache Beam. Dataflow is a fully-managed service for transforming and enriching data in streaming (real-time) and batch modes with equal reliability and expressiveness. It reliably executes programs developed using the Apache Beam SDK, which has an extensible set of powerful stateful processing abstractions, and I/O connectors to other streaming and batch systems.

This quickstart shows you how to write an Apache Beam pipeline that will:

  • Read messages from Pub/Sub Lite
  • Window (or group) the messages by publish timestamp
  • Write the messages to Cloud Storage

It also shows you how to:

  • Submit your pipeline to run on Dataflow
  • Create a Dataflow Flex Template from your pipeline

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Cloud project. Learn how to confirm that billing is enabled for your project.

  4. Enable the Pub/Sub Lite, Dataflow, Cloud Storage, Logging APIs.

    Enable the APIs

  5. Create a service account:

    1. In the Cloud Console, go to the Create service account page.

      Go to Create service account
    2. Select a project.
    3. In the Service account name field, enter a name. The Cloud Console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Click the Select a role field.

      Under Quick access, click Basic, then click Owner.

    6. Click Continue.
    7. Click Done to finish creating the service account.

      Do not close your browser window. You will use it in the next step.

  6. Create a service account key:

    1. In the Cloud Console, click the email address for the service account that you created.
    2. Click Keys.
    3. Click Add key, then click Create new key.
    4. Click Create. A JSON key file is downloaded to your computer.
    5. Click Close.
  7. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your service account key. This variable only applies to your current shell session, so if you open a new session, set the variable again.

  8. Install and initialize the Cloud SDK.
  9. In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  10. Make sure that billing is enabled for your Cloud project. Learn how to confirm that billing is enabled for your project.

  11. Enable the Pub/Sub Lite, Dataflow, Cloud Storage, Logging APIs.

    Enable the APIs

  12. Create a service account:

    1. In the Cloud Console, go to the Create service account page.

      Go to Create service account
    2. Select a project.
    3. In the Service account name field, enter a name. The Cloud Console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Click the Select a role field.

      Under Quick access, click Basic, then click Owner.

    6. Click Continue.
    7. Click Done to finish creating the service account.

      Do not close your browser window. You will use it in the next step.

  13. Create a service account key:

    1. In the Cloud Console, click the email address for the service account that you created.
    2. Click Keys.
    3. Click Add key, then click Create new key.
    4. Click Create. A JSON key file is downloaded to your computer.
    5. Click Close.
  14. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your service account key. This variable only applies to your current shell session, so if you open a new session, set the variable again.

  15. Install and initialize the Cloud SDK.

Set up

  1. Create variables for your project and bucket. Cloud Storage bucket names must be globally unique. Create a Cloud Storage bucket.

    export PROJECT_ID=$(gcloud config get-value project)
    export BUCKET=your-bucket-name
    gsutil mb gs://$BUCKET
    
  2. Select a Pub/Sub Lite location where you plan to run your Dataflow job. Create a Pub/Sub Lite topic and subscription.

    export TOPIC=your-lite-topic-id
    export SUBSCRIPTION=your-lite-subscription-id
    export LITE_LOCATION=your-lite-location
    gcloud pubsub lite-topics create $TOPIC \
        --location=$LITE_LOCATION \
        --partitions=1 \
        --per-partition-bytes=30GiB
    gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
        --location=$LITE_LOCATION \
        --topic=$TOPIC \
        --starting-offset=beginning
    
  3. Select a Dataflow region where you plan to run your Dataflow job.

    export DATAFLOW_REGION=your-dataflow-region
    

Start streaming

Clone the quickstart repository and navigate to the sample code directory:

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics

This sample code uses Dataflow to:

  • Read messages from a Pub/Sub Lite subscription as an unbounded source.
  • Group messages into fixed-size intervals by their publish timestamps.
  • Write the grouped messages to files on Cloud Storage.

Java

Before running this sample, follow the Java setup instructions in Pub/Sub Lite Client Libraries.


import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubsubliteToGcs {
  /*
   * Define your own configuration options. Add your arguments to be processed
   * by the command-line parser.
   */
  public interface PubsubliteToGcsOptions extends PipelineOptions, StreamingOptions {
    @Description("Your Pub/Sub Lite subscription.")
    @Required
    String getSubscription();

    void setSubscription(String value);

    @Description("Window size of output files in minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Filename prefix of output files.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  private static final Logger LOG = LoggerFactory.getLogger(PubsubliteToGcs.class);

  public static void main(String[] args) throws InterruptedException {
    // The maximum number of shards when writing output files.
    int numShards = 1;

    PubsubliteToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubsubliteToGcsOptions.class);

    options.setStreaming(true);

    SubscriberOptions subscriberOptions =
        SubscriberOptions.newBuilder()
            .setSubscriptionPath(SubscriptionPath.parse(options.getSubscription()))
            .build();

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("Read From Pub/Sub Lite", PubsubLiteIO.read(subscriberOptions))
        .apply(
            "Convert messages",
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (SequencedMessage sequencedMessage) -> {
                      String data = sequencedMessage.getMessage().getData().toStringUtf8();
                      LOG.info("Received: " + data);
                      long publishTime = sequencedMessage.getPublishTime().getSeconds();
                      return data + "\t" + publishTime;
                    }))
        .apply(
            "Apply windowing function",
            Window
                // Group the elements using fixed-sized time intervals based on the element
                // timestamp. The element timestamp is the publish time associated with a message.
                .<String>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))
                // Fire a trigger every 30 seconds after receiving the first element.
                .triggering(
                    Repeatedly.forever(
                        AfterProcessingTime.pastFirstElementInPane()
                            .plusDelayOf(Duration.standardSeconds(30))))
                // Ignore late elements.
                .withAllowedLateness(Duration.ZERO)
                // Accumulate elements in fired panes. This will make sure that elements collected
                // in an earlier pane by an earlier trigger will not be overwritten by those
                // arriving later due to a later trigger fired in the same window.
                .accumulatingFiredPanes())
        .apply("Write elements to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline. You may add `.waitUntilFinish()` to observe logs in your console, but
    // `waitUntilFinish()` will not work in Dataflow Flex Templates.
    pipeline.run();
  }
}

To start the pipeline in Dataflow, run the following command:

mvn compile exec:java \
    -Dexec.mainClass=examples.PubsubliteToGcs \
    -Dexec.args=" \
        --subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
        --output=gs://$BUCKET/samples/output \
        --windowSize=1 \
        --project=$PROJECT_ID \
        --region=$DATAFLOW_REGION \
        --tempLocation=gs://$BUCKET/temp \
        --runner=DataflowRunner"

The preceding command launches a Dataflow job. Follow the link in the console output to access the job in the Dataflow monitoring console.

Observe job progress

Observe the job's progress in the Dataflow console.

Go to the Dataflow console

Open the job details view to see:

  • Job graph
  • Execution details
  • Job metrics

Publish some messages to your Lite topic.

gcloud pubsub lite-topics publish $TOPIC \
    --location=$LITE_LOCATION \
    --message="Hello World!"

You may have to wait a few minutes to see the messages in your Worker Logs.

Use the command below to check which files have been written out to Cloud Storage.

gsutil ls "gs://$BUCKET/samples/"

The output should look like the following:

 gs://$BUCKET/samples/output-19:41-19:42-0-of-1
 gs://$BUCKET/samples/output-19:47-19:48-0-of-1
 gs://$BUCKET/samples/output-19:48-19:49-0-of-1

Use the command below to look at the content in a file:

gsutil cat "gs://$BUCKET/samples/your-filename"

Create a template

You can optionally create a custom Dataflow Flex Template based on your pipeline. Dataflow templates let you run jobs with different input parameters from Cloud Console or the command line without the need to set up a full Java development environment.

  1. Create a fat JAR that includes all the dependencies of your pipeline. You should see target/pubsublite-streaming-bundled-1.0.jar after the command has run.

    mvn clean package -DskipTests=true
    
  2. Provide names and locations for your template file and template container image.

    export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
    export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
    
  3. Build a custom flex template. A required metadata.json file, which contains the necessary spec to run the job, has been provided with the example.

    gcloud dataflow flex-template build $TEMPLATE_PATH \
        --image-gcr-path $TEMPLATE_IMAGE \
        --sdk-language "JAVA" \
        --flex-template-base-image "JAVA11" \
        --metadata-file "metadata.json" \
        --jar "target/pubsublite-streaming-bundled-1.0.jar" \
        --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
    
  4. Run a job using the custom flex template.

Console

  1. Create job from template.

  2. Enter a Job name.

  3. Enter your Dataflow region.

  4. Choose your Custom Template.

  5. Enter your template path.

  6. Enter the required parameters.

  7. Click Run job.

gcloud

gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
     --template-file-gcs-location $TEMPLATE_PATH \
     --parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
     --parameters output="gs://$BUCKET/samples/template-output" \
     --parameters windowSize=1 \
     --region $DATAFLOW_REGION

Clean up

To avoid incurring charges to your Google Cloud account for the resources used on this page, follow these steps.

  1. In the Dataflow console, stop the job. Cancel the pipeline instead of draining it.

  2. Delete the topic and subscription.

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  3. Delete the files created by the pipeline.

    gsutil -m rm -rf "gs://$BUCKET/samples/*"
    gsutil -m rm -rf "gs://$BUCKET/temp/*"
    
  4. Delete the template image and the template file if they exist.

    gcloud container images delete $TEMPLATE_IMAGE
    gsutil rm $TEMPLATE_PATH
    
  5. Remove the Cloud Storage bucket.

    gsutil rb gs://$BUCKET
    

What's next