在 Apache Kafka 中接收消息
使用集合让一切井井有条
根据您的偏好保存内容并对其进行分类。
使用用于 Kafka Consumer API 的 shim,从 Pub/Sub Lite 主题接收消息
代码示例
如未另行说明,那么本页面中的内容已根据知识共享署名 4.0 许可获得了许可,并且代码示例已根据 Apache 2.0 许可获得了许可。有关详情,请参阅 Google 开发者网站政策。Java 是 Oracle 和/或其关联公司的注册商标。
[[["易于理解","easyToUnderstand","thumb-up"],["解决了我的问题","solvedMyProblem","thumb-up"],["其他","otherUp","thumb-up"]],[["很难理解","hardToUnderstand","thumb-down"],["信息或示例代码不正确","incorrectInformationOrSampleCode","thumb-down"],["没有我需要的信息/示例","missingTheInformationSamplesINeed","thumb-down"],["翻译问题","translationIssue","thumb-down"],["其他","otherDown","thumb-down"]],[],[],[],null,["# Receive messages in Apache Kafka\n\nReceive messages from a Pub/Sub Lite subscription using a shim for the Kafka Consumer API\n\nCode sample\n-----------\n\n### Java\n\n\nTo authenticate to Pub/Sub Lite, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudRegion.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudZone.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.ProjectNumber.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.SubscriptionName.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.SubscriptionPath.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.TopicName.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html;\n import com.google.cloud.pubsublite.cloudpubsub.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings.html;\n import com.google.cloud.pubsublite.kafka.https://cloud.google.com/java/docs/reference/pubsublite-kafka/latest/com.google.cloud.pubsublite.kafka.ConsumerSettings.html;\n import java.time.Duration;\n import java.util.Arrays;\n import java.util.Base64;\n import java.util.HashSet;\n import java.util.Set;\n import org.apache.kafka.clients.consumer.Consumer;\n import org.apache.kafka.clients.consumer.ConsumerRecord;\n import org.apache.kafka.clients.consumer.ConsumerRecords;\n\n public class ConsumerExample {\n\n public static void main(String... args) throws Exception {\n // TODO(developer): Replace these variables before running the sample.\n String cloudRegion = \"your-cloud-region\";\n char zoneId = 'b';\n // Use an existing Pub/Sub Lite topic and subscription.\n String topicId = \"your-topic-id\";\n String subscriptionId = \"your-subscription-id\";\n // Using the project number here is required for constructing a Pub/Sub Lite\n // topic path that the Kafka consumer can use.\n long projectNumber = Long.parseLong(\"123456789\");\n\n consumerExample(cloudRegion, zoneId, projectNumber, topicId, subscriptionId);\n }\n\n public static void consumerExample(\n String cloudRegion, char zoneId, long projectNumber, String topicId, String subscriptionId) {\n\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudZone.html location = https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudZone.html.of(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudRegion.html.of(cloudRegion), zoneId);\n\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html topicPath =\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html.newBuilder()\n .setLocation(location)\n .setProject(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.ProjectNumber.html.of(projectNumber))\n .setName(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.TopicName.html.of(topicId))\n .build();\n\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.SubscriptionPath.html subscription =\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.SubscriptionPath.html.newBuilder()\n .setLocation(location)\n .setProject(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.ProjectNumber.html.of(projectNumber))\n .setName(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.SubscriptionName.html.of(subscriptionId))\n .build();\n\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings.html flowControlSettings =\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings.html.builder()\n // 50 MiB. Must be greater than the allowed size of the largest message (1 MiB).\n .setBytesOutstanding(50 * 1024 * 1024L)\n // 10,000 outstanding messages. Must be \u003e0.\n .setMessagesOutstanding(10000L)\n .build();\n\n https://cloud.google.com/java/docs/reference/pubsublite-kafka/latest/com.google.cloud.pubsublite.kafka.ConsumerSettings.html settings =\n https://cloud.google.com/java/docs/reference/pubsublite-kafka/latest/com.google.cloud.pubsublite.kafka.ConsumerSettings.html.newBuilder()\n .setSubscriptionPath(subscription)\n .setPerPartitionFlowControlSettings(flowControlSettings)\n .setAutocommit(true)\n .build();\n\n Set\u003cConsumerRecord\u003cbyte[], byte[]\u003e\u003e hashSet = new HashSet\u003c\u003e();\n try (Consumer\u003cbyte[], byte[]\u003e consumer = settings.https://cloud.google.com/java/docs/reference/pubsublite-kafka/latest/com.google.cloud.pubsublite.kafka.ConsumerSettings.html#com_google_cloud_pubsublite_kafka_ConsumerSettings_instantiate__()) {\n // The consumer can only subscribe to the topic that it is associated to.\n // If this is the only subscriber for this subscription, it will take up\n // to 90s for the subscriber to warm up.\n consumer.subscribe(Arrays.asList(https://cloud.google.com/java/docs/reference/pubsublite-spark-sql-streaming/latest/com.google.cloud.pubsublite.spark.PslWriteDataSourceOptions.html#com_google_cloud_pubsublite_spark_PslWriteDataSourceOptions_topicPath__.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html#com_google_cloud_pubsublite_TopicPath_toString__()));\n while (true) {\n ConsumerRecords\u003cbyte[], byte[]\u003e records = consumer.poll(Duration.ofMillis(100));\n for (ConsumerRecord\u003cbyte[], byte[]\u003e record : records) {\n long offset = record.offset();\n String value = Base64.getEncoder().encodeToString(record.value());\n hashSet.add(record);\n System.out.printf(\"Received %s: %s%n\", offset, value);\n }\n // Early exit. Remove entirely to keep the consumer alive indefinitely.\n if (hashSet.size() \u003e= 10) {\n System.out.println(\"Received 10 messages.\");\n break;\n }\n }\n }\n }\n }\n\nWhat's next\n-----------\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=pubsublite)."]]