Nachrichten in Apache Kafka empfangen
Mit Sammlungen den Überblick behalten
Sie können Inhalte basierend auf Ihren Einstellungen speichern und kategorisieren.
Nachrichten von einem Pub/Sub Lite-Abo mit einem Shim für die Kafka Consumer API empfangen
Codebeispiel
Nächste Schritte
Informationen zum Suchen und Filtern von Codebeispielen für andere Google Cloud -Produkte finden Sie im Google Cloud Beispielbrowser.
Sofern nicht anders angegeben, sind die Inhalte dieser Seite unter der Creative Commons Attribution 4.0 License und Codebeispiele unter der Apache 2.0 License lizenziert. Weitere Informationen finden Sie in den Websiterichtlinien von Google Developers. Java ist eine eingetragene Marke von Oracle und/oder seinen Partnern.
[[["Leicht verständlich","easyToUnderstand","thumb-up"],["Mein Problem wurde gelöst","solvedMyProblem","thumb-up"],["Sonstiges","otherUp","thumb-up"]],[["Schwer verständlich","hardToUnderstand","thumb-down"],["Informationen oder Beispielcode falsch","incorrectInformationOrSampleCode","thumb-down"],["Benötigte Informationen/Beispiele nicht gefunden","missingTheInformationSamplesINeed","thumb-down"],["Problem mit der Übersetzung","translationIssue","thumb-down"],["Sonstiges","otherDown","thumb-down"]],[],[],[],null,["# Receive messages in Apache Kafka\n\nReceive messages from a Pub/Sub Lite subscription using a shim for the Kafka Consumer API\n\nCode sample\n-----------\n\n### Java\n\n\nTo authenticate to Pub/Sub Lite, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudRegion.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudZone.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.ProjectNumber.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.SubscriptionName.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.SubscriptionPath.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.TopicName.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html;\n import com.google.cloud.pubsublite.cloudpubsub.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings.html;\n import com.google.cloud.pubsublite.kafka.https://cloud.google.com/java/docs/reference/pubsublite-kafka/latest/com.google.cloud.pubsublite.kafka.ConsumerSettings.html;\n import java.time.Duration;\n import java.util.Arrays;\n import java.util.Base64;\n import java.util.HashSet;\n import java.util.Set;\n import org.apache.kafka.clients.consumer.Consumer;\n import org.apache.kafka.clients.consumer.ConsumerRecord;\n import org.apache.kafka.clients.consumer.ConsumerRecords;\n\n public class ConsumerExample {\n\n public static void main(String... args) throws Exception {\n // TODO(developer): Replace these variables before running the sample.\n String cloudRegion = \"your-cloud-region\";\n char zoneId = 'b';\n // Use an existing Pub/Sub Lite topic and subscription.\n String topicId = \"your-topic-id\";\n String subscriptionId = \"your-subscription-id\";\n // Using the project number here is required for constructing a Pub/Sub Lite\n // topic path that the Kafka consumer can use.\n long projectNumber = Long.parseLong(\"123456789\");\n\n consumerExample(cloudRegion, zoneId, projectNumber, topicId, subscriptionId);\n }\n\n public static void consumerExample(\n String cloudRegion, char zoneId, long projectNumber, String topicId, String subscriptionId) {\n\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudZone.html location = https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudZone.html.of(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudRegion.html.of(cloudRegion), zoneId);\n\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html topicPath =\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html.newBuilder()\n .setLocation(location)\n .setProject(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.ProjectNumber.html.of(projectNumber))\n .setName(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.TopicName.html.of(topicId))\n .build();\n\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.SubscriptionPath.html subscription =\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.SubscriptionPath.html.newBuilder()\n .setLocation(location)\n .setProject(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.ProjectNumber.html.of(projectNumber))\n .setName(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.SubscriptionName.html.of(subscriptionId))\n .build();\n\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings.html flowControlSettings =\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings.html.builder()\n // 50 MiB. Must be greater than the allowed size of the largest message (1 MiB).\n .setBytesOutstanding(50 * 1024 * 1024L)\n // 10,000 outstanding messages. Must be \u003e0.\n .setMessagesOutstanding(10000L)\n .build();\n\n https://cloud.google.com/java/docs/reference/pubsublite-kafka/latest/com.google.cloud.pubsublite.kafka.ConsumerSettings.html settings =\n https://cloud.google.com/java/docs/reference/pubsublite-kafka/latest/com.google.cloud.pubsublite.kafka.ConsumerSettings.html.newBuilder()\n .setSubscriptionPath(subscription)\n .setPerPartitionFlowControlSettings(flowControlSettings)\n .setAutocommit(true)\n .build();\n\n Set\u003cConsumerRecord\u003cbyte[], byte[]\u003e\u003e hashSet = new HashSet\u003c\u003e();\n try (Consumer\u003cbyte[], byte[]\u003e consumer = settings.https://cloud.google.com/java/docs/reference/pubsublite-kafka/latest/com.google.cloud.pubsublite.kafka.ConsumerSettings.html#com_google_cloud_pubsublite_kafka_ConsumerSettings_instantiate__()) {\n // The consumer can only subscribe to the topic that it is associated to.\n // If this is the only subscriber for this subscription, it will take up\n // to 90s for the subscriber to warm up.\n consumer.subscribe(Arrays.asList(https://cloud.google.com/java/docs/reference/pubsublite-spark-sql-streaming/latest/com.google.cloud.pubsublite.spark.PslWriteDataSourceOptions.html#com_google_cloud_pubsublite_spark_PslWriteDataSourceOptions_topicPath__.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html#com_google_cloud_pubsublite_TopicPath_toString__()));\n while (true) {\n ConsumerRecords\u003cbyte[], byte[]\u003e records = consumer.poll(Duration.ofMillis(100));\n for (ConsumerRecord\u003cbyte[], byte[]\u003e record : records) {\n long offset = record.offset();\n String value = Base64.getEncoder().encodeToString(record.value());\n hashSet.add(record);\n System.out.printf(\"Received %s: %s%n\", offset, value);\n }\n // Early exit. Remove entirely to keep the consumer alive indefinitely.\n if (hashSet.size() \u003e= 10) {\n System.out.println(\"Received 10 messages.\");\n break;\n }\n }\n }\n }\n }\n\nWhat's next\n-----------\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=pubsublite)."]]