Using Pub/Sub Lite with the Apache Kafka API

This page shows how to use the Pub/Sub Lite Kafka Shim to publish messages to and receive messages from Pub/Sub Lite.

The Pub/Sub Lite Kafka Shim is a Java library that makes it easy for users of the Apache Kafka Java client library to work with Pub/Sub Lite. It achieves this by implementing the Producer API and Consumer API.

This is possible because Pub/Sub Lite topics, like Apache Kafka topics, are partitioned logs that track consumer progress with numeric offsets.

While the two systems are similar, there are some notable practical differences:

  • A Pub/Sub Lite topic is equivalent to a Kafka topic. However, a Lite topic has configurable throughput and storage capacity on each topic partition, while Kafka topic capacity is determined by Kafka cluster configurations.
  • A Pub/Sub Lite subscription is equivalent to a Kafka consumer group. A Lite subscription is a first-order Google Cloud Platform named resource representing messages from Lite topic partitions that subscribers can read from. Similarly, a Kafka consumer group comprises of consumers that can read data from the partitions of a Kafka topic.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Cloud project. Learn how to confirm that billing is enabled for your project.

  4. Enable the Pub/Sub Lite API.

    Enable the API

  5. Create a service account:

    1. In the Cloud Console, go to the Create service account page.

      Go to Create service account
    2. Select a project.
    3. In the Service account name field, enter a name. The Cloud Console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create.
    5. Click the Select a role field.

      Under Quick access, click Basic, then click Owner.

    6. Click Continue.
    7. Click Done to finish creating the service account.

      Do not close your browser window. You will use it in the next step.

  6. Create a service account key:

    1. In the Cloud Console, click the email address for the service account that you created.
    2. Click Keys.
    3. Click Add key, then click Create new key.
    4. Click Create. A JSON key file is downloaded to your computer.
    5. Click Close.
  7. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your service account key. This variable only applies to your current shell session, so if you open a new session, set the variable again.

Installing the module

Java

If you are using Maven without a BOM, add this to your dependencies:

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>pubsublite-kafka</artifactId>
  <version>0.4.0</version>
</dependency>

If you are using Gradle, add the following to your dependencies:

compile 'com.google.cloud:pubsublite-kafka:0.4.0'

If you are using sbt, add the following to your dependencies:

libraryDependencies += "com.google.cloud" % "pubsublite-kafka" % "0.4.0"

If you're using IntelliJ or Eclipse, you can add client libraries to your project using the following IDE plugins:

The plugins provide additional functionality, such as key management for service accounts. Refer to each plugin's documentation for details.

Creating a Pub/Sub Lite topic

Create a Lite topic using the following steps:

  1. In the Cloud Console, go to the Lite Topics page.

    Go to the Lite Topics page

  2. Click Create Lite topic.

  3. Select a region and a zone.

  4. In the Name section, enter your-lite-topic as the Lite topic ID. The Lite topic name includes the Lite topic ID, the zone, and the project number.

  5. Click Create.

Creating a Pub/Sub Lite subscription

Create a Lite subscription using the following steps:

  1. In the Cloud Console, go to the Lite Subscriptions page.

    Go to the Lite Subscriptions page

  2. Click Create Lite subscription.

  3. In the Lite subscription ID field, enter your-lite-subscription.

  4. Select a Lite topic to receive messages from.

  5. In the Delivery requirement section, select Deliver messages after stored.

  6. Click Create.

The Lite subscription is in the same zone as the Lite topic.

Publishing messages to Pub/Sub Lite

Construct a Pub/Sub Lite producer to send data to your Lite topic. This class implements org.apache.kafka.clients.Producer , the same interface as KafkaProducer.

import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.kafka.ProducerSettings;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class ProducerExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Use an existing Pub/Sub Lite topic.
    String topicId = "your-topic-id";
    // Using the project number is required for constructing a Pub/Sub Lite
    // topic path that the Kafka producer can use.
    long projectNumber = Long.parseLong("123456789");

    producerExample(cloudRegion, zoneId, projectNumber, topicId);
  }

  public static void producerExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId)
      throws InterruptedException, ExecutionException {
    TopicPath topicPath =
        TopicPath.newBuilder()
            .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setProject(ProjectNumber.of(projectNumber))
            .setName(TopicName.of(topicId))
            .build();

    ProducerSettings producerSettings =
        ProducerSettings.newBuilder().setTopicPath(topicPath).build();

    List<Future<RecordMetadata>> futures = new ArrayList<>();
    try (Producer<byte[], byte[]> producer = producerSettings.instantiate()) {
      for (long i = 0L; i < 10L; i++) {
        String key = "demo";
        Future<RecordMetadata> future =
            producer.send(
                new ProducerRecord(
                    topicPath.toString(), key.getBytes(), ("message-" + i).getBytes()));
        futures.add(future);
      }
      for (Future<RecordMetadata> future : futures) {
        RecordMetadata meta = future.get();
        System.out.println(meta.offset());
      }
    }
    System.out.printf("Published 10 messages to %s%n", topicPath.toString());
  }
}

Receiving messages from Pub/Sub Lite

Construct a Pub/Sub Lite consumer to receive data from your Lite subscription. This class implements org.apache.kafka.clients.consumer.Consumer , the same interface as KafkaConsumer.

import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.kafka.ConsumerSettings;
import java.time.Duration;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

public class ConsumerExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Use an existing Pub/Sub Lite topic and subscription.
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    // Using the project number here is required for constructing a Pub/Sub Lite
    // topic path that the Kafka consumer can use.
    long projectNumber = Long.parseLong("123456789");

    consumerExample(cloudRegion, zoneId, projectNumber, topicId, subscriptionId);
  }

  public static void consumerExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, String subscriptionId) {

    CloudZone location = CloudZone.of(CloudRegion.of(cloudRegion), zoneId);

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(TopicName.of(topicId))
            .build();

    SubscriptionPath subscription =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    FlowControlSettings flowControlSettings =
        FlowControlSettings.builder()
            // 50 MiB. Must be greater than the allowed size of the largest message (1 MiB).
            .setBytesOutstanding(50 * 1024 * 1024L)
            // 10,000 outstanding messages. Must be >0.
            .setMessagesOutstanding(10000L)
            .build();

    ConsumerSettings settings =
        ConsumerSettings.newBuilder()
            .setSubscriptionPath(subscription)
            .setPerPartitionFlowControlSettings(flowControlSettings)
            .setAutocommit(true)
            .build();

    Set<ConsumerRecord<byte[], byte[]>> hashSet = new HashSet<>();
    try (Consumer<byte[], byte[]> consumer = settings.instantiate()) {
      // The consumer can only subscribe to the topic that it is associated to.
      // If this is the only subscriber for this subscription, it will take up
      // to 90s for the subscriber to warm up.
      consumer.subscribe(Arrays.asList(topicPath.toString()));
      while (true) {
        ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<byte[], byte[]> record : records) {
          long offset = record.offset();
          String value = Base64.getEncoder().encodeToString(record.value());
          hashSet.add(record);
          System.out.printf("Received %s: %s%n", offset, value);
        }
        // Early exit. Remove entirely to keep the consumer alive indefinitely.
        if (hashSet.size() >= 10) {
          System.out.println("Received 10 messages.");
          break;
        }
      }
    }
  }
}

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this quickstart, follow these steps.

  1. In the Cloud Console, go to the Lite Topics page.

    Go to the Lite Topics page

  2. Click your-lite-topic.

  3. In the Lite topic details page, click Delete.

  4. In the field that appears, enter delete to confirm that you want to delete the Lite topic.

  5. Click Delete.

  6. Repeat these steps for your Lite subscription.

What's next