Stream Pub/Sub Lite messages by using Dataflow
As an alternative to writing and running your own data processing programs, you can use Dataflow with the Pub/Sub Lite I/O connector for Apache Beam. Dataflow is a fully-managed service for transforming and enriching data in streaming (real-time) and batch modes with equal reliability and expressiveness. It reliably executes programs developed using the Apache Beam SDK, which has an extensible set of powerful stateful processing abstractions, and I/O connectors to other streaming and batch systems.
This quickstart shows you how to write an Apache Beam pipeline that will:
- Read messages from Pub/Sub Lite
- Window (or group) the messages by publish timestamp
- Write the messages to Cloud Storage
It also shows you how to:
- Submit your pipeline to run on Dataflow
- Create a Dataflow Flex Template from your pipeline
This tutorial requires Maven, but it's also possible to convert the example project from Maven to Gradle. To learn more, see Optional: Convert from Maven to Gradle.
Before you begin
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:
gcloud services enable pubsublite.googleapis.com
dataflow.googleapis.com storage-api.googleapis.com logging.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:
gcloud services enable pubsublite.googleapis.com
dataflow.googleapis.com storage-api.googleapis.com logging.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
Set up your Pub/Sub Lite project
Create variables for your Cloud Storage bucket, project, and Dataflow region. Cloud Storage bucket names must be globally unique. The Dataflow region must be a valid region where you can run your job. For more information about regions and locations, see Dataflow locations.
export PROJECT_ID=$(gcloud config get-value project)
export SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
export BUCKET=BUCKET_NAME
export DATAFLOW_REGION=DATAFLOW_REGION
Create a Cloud Storage bucket owned by this project:
gcloud storage buckets create gs://$BUCKET
Create a Pub/Sub Lite zonal Lite topic and subscription
Create a zonal Lite Pub/Sub Lite topic and Lite subscription.
For the Lite location, choose a
supported Pub/Sub Lite location. You must also
specify a zone for the region. For example, us-central1-a
.
export TOPIC=LITE_TOPIC_ID
export SUBSCRIPTION=LITE_SUBSCRIPTION_ID
export LITE_LOCATION=LITE_LOCATION
gcloud pubsub lite-topics create $TOPIC \ --location=$LITE_LOCATION \ --partitions=1 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$LITE_LOCATION \ --topic=$TOPIC \ --starting-offset=beginning
Stream messages to Dataflow
Download the quickstart sample code
Clone the quickstart repository and navigate to the sample code directory.
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics
Sample code
This sample code uses Dataflow to:
- Read messages from a Pub/Sub Lite subscription as an unbounded source.
- Group messages based on their publish timestamps, using fixed time windows and the default trigger.
Write the grouped messages to files on Cloud Storage.
Java
Before running this sample, follow the Java setup instructions in Pub/Sub Lite Client Libraries.
Start the Dataflow pipeline
To start the pipeline in Dataflow, run the following command:
mvn compile exec:java \
-Dexec.mainClass=examples.PubsubliteToGcs \
-Dexec.args=" \
--subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
--output=gs://$BUCKET/samples/output \
--windowSize=1 \
--project=$PROJECT_ID \
--region=$DATAFLOW_REGION \
--tempLocation=gs://$BUCKET/temp \
--runner=DataflowRunner \
--serviceAccount=$SERVICE_ACCOUNT"
The preceding command launches a Dataflow job. Follow the link in the console output to access the job in the Dataflow monitoring console.
Observe job progress
Observe the job's progress in the Dataflow console.
Open the job details view to see:
- Job graph
- Execution details
- Job metrics
Publish some messages to your Lite topic.
gcloud pubsub lite-topics publish $TOPIC \
--location=$LITE_LOCATION \
--message="Hello World!"
You may have to wait a few minutes to see the messages in your Worker Logs.
Use the command below to check which files have been written out to Cloud Storage.
gcloud storage ls "gs://$BUCKET/samples/"
The output should look like the following:
gs://$BUCKET/samples/output-19:41-19:42-0-of-1
gs://$BUCKET/samples/output-19:47-19:48-0-of-1
gs://$BUCKET/samples/output-19:48-19:49-0-of-1
Use the command below to look at the content in a file:
gcloud storage cat "gs://$BUCKET/samples/your-filename"
Optional: Create a Dataflow template
You can optionally create a custom Dataflow Flex Template based on your pipeline. Dataflow templates let you run jobs with different input parameters from Google Cloud console or the command line without the need to set up a full Java development environment.
Create a fat JAR that includes all the dependencies of your pipeline. You should see
target/pubsublite-streaming-bundled-1.0.jar
after the command has run.mvn clean package -DskipTests=true
Provide names and locations for your template file and template container image.
export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
Build a custom flex template. A required
metadata.json
file, which contains the necessary spec to run the job, has been provided with the example.gcloud dataflow flex-template build $TEMPLATE_PATH \ --image-gcr-path $TEMPLATE_IMAGE \ --sdk-language "JAVA" \ --flex-template-base-image "JAVA11" \ --metadata-file "metadata.json" \ --jar "target/pubsublite-streaming-bundled-1.0.jar" \ --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
Run a job using the custom flex template.
Console
Enter a Job name.
Enter your Dataflow region.
Choose your Custom Template.
Enter your template path.
Enter the required parameters.
Click Run job.
gcloud
gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
--template-file-gcs-location $TEMPLATE_PATH \
--parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
--parameters output="gs://$BUCKET/samples/template-output" \
--parameters windowSize=1 \
--region $DATAFLOW_REGION \
--serviceAccount=$SERVICE_ACCOUNT
Clean up
To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Google Cloud project with the resources.
In the Dataflow console, stop the job. Cancel the pipeline instead of draining it.
Delete the topic and subscription.
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
Delete the files created by the pipeline.
gcloud storage rm "gs://$BUCKET/samples/*" --recursive --continue-on-error
gcloud storage rm "gs://$BUCKET/temp/*" --recursive --continue-on-error
Delete the template image and the template file if they exist.
gcloud container images delete $TEMPLATE_IMAGE
gcloud storage rm $TEMPLATE_PATH
Remove the Cloud Storage bucket.
gcloud storage rm gs://$BUCKET --recursive
-
Delete the service account:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
What's next
Read more about Configuring Dataflow Flex Templates.
Understand Dataflow streaming pipelines.