Receive messages in Apache Kafka
Stay organized with collections
Save and categorize content based on your preferences.
Receive messages from a Pub/Sub Lite subscription using a shim for the Kafka Consumer API
Code sample
Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
[[["Easy to understand","easyToUnderstand","thumb-up"],["Solved my problem","solvedMyProblem","thumb-up"],["Other","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["Other","otherDown","thumb-down"]],[],[],[],null,["# Receive messages in Apache Kafka\n\nReceive messages from a Pub/Sub Lite subscription using a shim for the Kafka Consumer API\n\nCode sample\n-----------\n\n### Java\n\n\nTo authenticate to Pub/Sub Lite, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudRegion.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudZone.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.ProjectNumber.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.SubscriptionName.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.SubscriptionPath.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.TopicName.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html;\n import com.google.cloud.pubsublite.cloudpubsub.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings.html;\n import com.google.cloud.pubsublite.kafka.https://cloud.google.com/java/docs/reference/pubsublite-kafka/latest/com.google.cloud.pubsublite.kafka.ConsumerSettings.html;\n import java.time.Duration;\n import java.util.Arrays;\n import java.util.Base64;\n import java.util.HashSet;\n import java.util.Set;\n import org.apache.kafka.clients.consumer.Consumer;\n import org.apache.kafka.clients.consumer.ConsumerRecord;\n import org.apache.kafka.clients.consumer.ConsumerRecords;\n\n public class ConsumerExample {\n\n public static void main(String... args) throws Exception {\n // TODO(developer): Replace these variables before running the sample.\n String cloudRegion = \"your-cloud-region\";\n char zoneId = 'b';\n // Use an existing Pub/Sub Lite topic and subscription.\n String topicId = \"your-topic-id\";\n String subscriptionId = \"your-subscription-id\";\n // Using the project number here is required for constructing a Pub/Sub Lite\n // topic path that the Kafka consumer can use.\n long projectNumber = Long.parseLong(\"123456789\");\n\n consumerExample(cloudRegion, zoneId, projectNumber, topicId, subscriptionId);\n }\n\n public static void consumerExample(\n String cloudRegion, char zoneId, long projectNumber, String topicId, String subscriptionId) {\n\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudZone.html location = https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudZone.html.of(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudRegion.html.of(cloudRegion), zoneId);\n\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html topicPath =\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html.newBuilder()\n .setLocation(location)\n .setProject(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.ProjectNumber.html.of(projectNumber))\n .setName(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.TopicName.html.of(topicId))\n .build();\n\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.SubscriptionPath.html subscription =\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.SubscriptionPath.html.newBuilder()\n .setLocation(location)\n .setProject(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.ProjectNumber.html.of(projectNumber))\n .setName(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.SubscriptionName.html.of(subscriptionId))\n .build();\n\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings.html flowControlSettings =\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings.html.builder()\n // 50 MiB. Must be greater than the allowed size of the largest message (1 MiB).\n .setBytesOutstanding(50 * 1024 * 1024L)\n // 10,000 outstanding messages. Must be \u003e0.\n .setMessagesOutstanding(10000L)\n .build();\n\n https://cloud.google.com/java/docs/reference/pubsublite-kafka/latest/com.google.cloud.pubsublite.kafka.ConsumerSettings.html settings =\n https://cloud.google.com/java/docs/reference/pubsublite-kafka/latest/com.google.cloud.pubsublite.kafka.ConsumerSettings.html.newBuilder()\n .setSubscriptionPath(subscription)\n .setPerPartitionFlowControlSettings(flowControlSettings)\n .setAutocommit(true)\n .build();\n\n Set\u003cConsumerRecord\u003cbyte[], byte[]\u003e\u003e hashSet = new HashSet\u003c\u003e();\n try (Consumer\u003cbyte[], byte[]\u003e consumer = settings.https://cloud.google.com/java/docs/reference/pubsublite-kafka/latest/com.google.cloud.pubsublite.kafka.ConsumerSettings.html#com_google_cloud_pubsublite_kafka_ConsumerSettings_instantiate__()) {\n // The consumer can only subscribe to the topic that it is associated to.\n // If this is the only subscriber for this subscription, it will take up\n // to 90s for the subscriber to warm up.\n consumer.subscribe(Arrays.asList(https://cloud.google.com/java/docs/reference/pubsublite-spark-sql-streaming/latest/com.google.cloud.pubsublite.spark.PslWriteDataSourceOptions.html#com_google_cloud_pubsublite_spark_PslWriteDataSourceOptions_topicPath__.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html#com_google_cloud_pubsublite_TopicPath_toString__()));\n while (true) {\n ConsumerRecords\u003cbyte[], byte[]\u003e records = consumer.poll(Duration.ofMillis(100));\n for (ConsumerRecord\u003cbyte[], byte[]\u003e record : records) {\n long offset = record.offset();\n String value = Base64.getEncoder().encodeToString(record.value());\n hashSet.add(record);\n System.out.printf(\"Received %s: %s%n\", offset, value);\n }\n // Early exit. Remove entirely to keep the consumer alive indefinitely.\n if (hashSet.size() \u003e= 10) {\n System.out.println(\"Received 10 messages.\");\n break;\n }\n }\n }\n }\n }\n\nWhat's next\n-----------\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=pubsublite)."]]