Stream Pub/Sub Lite messages by using Dataflow
As an alternative to writing and running your own data processing programs, you can use Dataflow with the Pub/Sub Lite I/O connector for Apache Beam. Dataflow is a fully-managed service for transforming and enriching data in streaming (real-time) and batch modes with equal reliability and expressiveness. It reliably executes programs developed using the Apache Beam SDK, which has an extensible set of powerful stateful processing abstractions, and I/O connectors to other streaming and batch systems.
This quickstart shows you how to write an Apache Beam pipeline that will:
- Read messages from Pub/Sub Lite
- Window (or group) the messages by publish timestamp
- Write the messages to Cloud Storage
It also shows you how to:
- Submit your pipeline to run on Dataflow
- Create a Dataflow Flex Template from your pipeline
Before you begin
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.
-
Enable the Pub/Sub Lite, Dataflow, Cloud Storage, Logging APIs.
-
Create a service account:
-
In the console, go to the Create service account page.
Go to Create service account - Select your project.
-
In the Service account name field, enter a name. The console fills in the Service account ID field based on this name.
In the Service account description field, enter a description. For example,
Service account for quickstart
. - Click Create and continue.
-
To provide access to your project, grant the following role(s) to your service account: Dataflow Admin, Pub/Sub Lite Admin, Cloud Storage Object Admin .
In the Select a role list, select a role.
For additional roles, click
Add another role and add each additional role. - Click Continue.
-
Click Done to finish creating the service account.
Do not close your browser window. You will use it in the next step.
-
-
Create a service account key:
- In the console, click the email address for the service account that you created.
- Click Keys.
- Click Add key, and then click Create new key.
- Click Create. A JSON key file is downloaded to your computer.
- Click Close.
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your service account key. This variable only applies to your current shell session, so if you open a new session, set the variable again. - Install and initialize the Google Cloud CLI.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.
-
Enable the Pub/Sub Lite, Dataflow, Cloud Storage, Logging APIs.
-
Create a service account:
-
In the console, go to the Create service account page.
Go to Create service account - Select your project.
-
In the Service account name field, enter a name. The console fills in the Service account ID field based on this name.
In the Service account description field, enter a description. For example,
Service account for quickstart
. - Click Create and continue.
-
To provide access to your project, grant the following role(s) to your service account: Dataflow Admin, Pub/Sub Lite Admin, Cloud Storage Object Admin .
In the Select a role list, select a role.
For additional roles, click
Add another role and add each additional role. - Click Continue.
-
Click Done to finish creating the service account.
Do not close your browser window. You will use it in the next step.
-
-
Create a service account key:
- In the console, click the email address for the service account that you created.
- Click Keys.
- Click Add key, and then click Create new key.
- Click Create. A JSON key file is downloaded to your computer.
- Click Close.
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your service account key. This variable only applies to your current shell session, so if you open a new session, set the variable again. - Install and initialize the Google Cloud CLI.
Set up
Create variables for your project and bucket. Cloud Storage bucket names must be globally unique. Create a Cloud Storage bucket.
export PROJECT_ID=$(gcloud config get-value project)
export BUCKET=your-bucket-name
gsutil mb gs://$BUCKET
Select a Pub/Sub Lite location where you plan to run your Dataflow job. Create a Pub/Sub Lite topic and subscription.
TOPIC=your-lite-topic-id
export SUBSCRIPTION=your-lite-subscription-id
export LITE_LOCATION=your-lite-location
gcloud pubsub lite-topics create $TOPIC \ --location=$LITE_LOCATION \ --partitions=1 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$LITE_LOCATION \ --topic=$TOPIC \ --starting-offset=beginning
Select a Dataflow region where you plan to run your Dataflow job.
export DATAFLOW_REGION=your-dataflow-region
Start streaming
Clone the quickstart repository and navigate to the sample code directory:
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics
This sample code uses Dataflow to:
- Read messages from a Pub/Sub Lite subscription as an unbounded source.
- Group messages based on their publish timestamps, using fixed time windows and the default trigger.
Write the grouped messages to files on Cloud Storage.
Java
Before running this sample, follow the Java setup instructions in Pub/Sub Lite Client Libraries.
To start the pipeline in Dataflow, run the following command:
mvn compile exec:java \
-Dexec.mainClass=examples.PubsubliteToGcs \
-Dexec.args=" \
--subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
--output=gs://$BUCKET/samples/output \
--windowSize=1 \
--project=$PROJECT_ID \
--region=$DATAFLOW_REGION \
--tempLocation=gs://$BUCKET/temp \
--runner=DataflowRunner"
The preceding command launches a Dataflow job. Follow the link in the console output to access the job in the Dataflow monitoring console.
Observe job progress
Observe the job's progress in the Dataflow console.
Open the job details view to see:
- Job graph
- Execution details
- Job metrics
Publish some messages to your Lite topic.
gcloud pubsub lite-topics publish $TOPIC \
--location=$LITE_LOCATION \
--message="Hello World!"
You may have to wait a few minutes to see the messages in your Worker Logs.
Use the command below to check which files have been written out to Cloud Storage.
gsutil ls "gs://$BUCKET/samples/"
The output should look like the following:
gs://$BUCKET/samples/output-19:41-19:42-0-of-1
gs://$BUCKET/samples/output-19:47-19:48-0-of-1
gs://$BUCKET/samples/output-19:48-19:49-0-of-1
Use the command below to look at the content in a file:
gsutil cat "gs://$BUCKET/samples/your-filename"
Create a template
You can optionally create a custom Dataflow Flex Template based on your pipeline. Dataflow templates let you run jobs with different input parameters from console or the command line without the need to set up a full Java development environment.
Create a fat JAR that includes all the dependencies of your pipeline. You should see
target/pubsublite-streaming-bundled-1.0.jar
after the command has run.mvn clean package -DskipTests=true
Provide names and locations for your template file and template container image.
export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
Build a custom flex template. A required
metadata.json
file, which contains the necessary spec to run the job, has been provided with the example.gcloud dataflow flex-template build $TEMPLATE_PATH \ --image-gcr-path $TEMPLATE_IMAGE \ --sdk-language "JAVA" \ --flex-template-base-image "JAVA11" \ --metadata-file "metadata.json" \ --jar "target/pubsublite-streaming-bundled-1.0.jar" \ --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
Run a job using the custom flex template.
Console
Enter a Job name.
Enter your Dataflow region.
Choose your Custom Template.
Enter your template path.
Enter the required parameters.
Click Run job.
gcloud
gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
--template-file-gcs-location $TEMPLATE_PATH \
--parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
--parameters output="gs://$BUCKET/samples/template-output" \
--parameters windowSize=1 \
--region $DATAFLOW_REGION
Clean up
To avoid incurring charges to your Google Cloud account for the resources used on this page, follow these steps.
In the Dataflow console, stop the job. Cancel the pipeline instead of draining it.
Delete the topic and subscription.
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
Delete the files created by the pipeline.
gsutil -m rm -rf "gs://$BUCKET/samples/*"
gsutil -m rm -rf "gs://$BUCKET/temp/*"
Delete the template image and the template file if they exist.
gcloud container images delete $TEMPLATE_IMAGE
gsutil rm $TEMPLATE_PATH
Remove the Cloud Storage bucket.
gsutil rb gs://$BUCKET
What's next
Read more about Configuring Dataflow Flex Templates.
Understand Dataflow streaming pipelines.