Apache Kafka でメッセージを受信する
コレクションでコンテンツを整理
必要に応じて、コンテンツの保存と分類を行います。
Kafka Consumer API 用の shim を使用して Pub/Sub Lite サブスクリプションからメッセージを受信する
コードサンプル
Java
Pub/Sub Lite に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
特に記載のない限り、このページのコンテンツはクリエイティブ・コモンズの表示 4.0 ライセンスにより使用許諾されます。コードサンプルは Apache 2.0 ライセンスにより使用許諾されます。詳しくは、Google Developers サイトのポリシーをご覧ください。Java は Oracle および関連会社の登録商標です。
[[["わかりやすい","easyToUnderstand","thumb-up"],["問題の解決に役立った","solvedMyProblem","thumb-up"],["その他","otherUp","thumb-up"]],[["わかりにくい","hardToUnderstand","thumb-down"],["情報またはサンプルコードが不正確","incorrectInformationOrSampleCode","thumb-down"],["必要な情報 / サンプルがない","missingTheInformationSamplesINeed","thumb-down"],["翻訳に関する問題","translationIssue","thumb-down"],["その他","otherDown","thumb-down"]],[],[],[],null,["# Receive messages in Apache Kafka\n\nReceive messages from a Pub/Sub Lite subscription using a shim for the Kafka Consumer API\n\nCode sample\n-----------\n\n### Java\n\n\nTo authenticate to Pub/Sub Lite, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudRegion.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudZone.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.ProjectNumber.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.SubscriptionName.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.SubscriptionPath.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.TopicName.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html;\n import com.google.cloud.pubsublite.cloudpubsub.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings.html;\n import com.google.cloud.pubsublite.kafka.https://cloud.google.com/java/docs/reference/pubsublite-kafka/latest/com.google.cloud.pubsublite.kafka.ConsumerSettings.html;\n import java.time.Duration;\n import java.util.Arrays;\n import java.util.Base64;\n import java.util.HashSet;\n import java.util.Set;\n import org.apache.kafka.clients.consumer.Consumer;\n import org.apache.kafka.clients.consumer.ConsumerRecord;\n import org.apache.kafka.clients.consumer.ConsumerRecords;\n\n public class ConsumerExample {\n\n public static void main(String... args) throws Exception {\n // TODO(developer): Replace these variables before running the sample.\n String cloudRegion = \"your-cloud-region\";\n char zoneId = 'b';\n // Use an existing Pub/Sub Lite topic and subscription.\n String topicId = \"your-topic-id\";\n String subscriptionId = \"your-subscription-id\";\n // Using the project number here is required for constructing a Pub/Sub Lite\n // topic path that the Kafka consumer can use.\n long projectNumber = Long.parseLong(\"123456789\");\n\n consumerExample(cloudRegion, zoneId, projectNumber, topicId, subscriptionId);\n }\n\n public static void consumerExample(\n String cloudRegion, char zoneId, long projectNumber, String topicId, String subscriptionId) {\n\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudZone.html location = https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudZone.html.of(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudRegion.html.of(cloudRegion), zoneId);\n\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html topicPath =\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html.newBuilder()\n .setLocation(location)\n .setProject(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.ProjectNumber.html.of(projectNumber))\n .setName(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.TopicName.html.of(topicId))\n .build();\n\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.SubscriptionPath.html subscription =\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.SubscriptionPath.html.newBuilder()\n .setLocation(location)\n .setProject(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.ProjectNumber.html.of(projectNumber))\n .setName(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.SubscriptionName.html.of(subscriptionId))\n .build();\n\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings.html flowControlSettings =\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings.html.builder()\n // 50 MiB. Must be greater than the allowed size of the largest message (1 MiB).\n .setBytesOutstanding(50 * 1024 * 1024L)\n // 10,000 outstanding messages. Must be \u003e0.\n .setMessagesOutstanding(10000L)\n .build();\n\n https://cloud.google.com/java/docs/reference/pubsublite-kafka/latest/com.google.cloud.pubsublite.kafka.ConsumerSettings.html settings =\n https://cloud.google.com/java/docs/reference/pubsublite-kafka/latest/com.google.cloud.pubsublite.kafka.ConsumerSettings.html.newBuilder()\n .setSubscriptionPath(subscription)\n .setPerPartitionFlowControlSettings(flowControlSettings)\n .setAutocommit(true)\n .build();\n\n Set\u003cConsumerRecord\u003cbyte[], byte[]\u003e\u003e hashSet = new HashSet\u003c\u003e();\n try (Consumer\u003cbyte[], byte[]\u003e consumer = settings.https://cloud.google.com/java/docs/reference/pubsublite-kafka/latest/com.google.cloud.pubsublite.kafka.ConsumerSettings.html#com_google_cloud_pubsublite_kafka_ConsumerSettings_instantiate__()) {\n // The consumer can only subscribe to the topic that it is associated to.\n // If this is the only subscriber for this subscription, it will take up\n // to 90s for the subscriber to warm up.\n consumer.subscribe(Arrays.asList(https://cloud.google.com/java/docs/reference/pubsublite-spark-sql-streaming/latest/com.google.cloud.pubsublite.spark.PslWriteDataSourceOptions.html#com_google_cloud_pubsublite_spark_PslWriteDataSourceOptions_topicPath__.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html#com_google_cloud_pubsublite_TopicPath_toString__()));\n while (true) {\n ConsumerRecords\u003cbyte[], byte[]\u003e records = consumer.poll(Duration.ofMillis(100));\n for (ConsumerRecord\u003cbyte[], byte[]\u003e record : records) {\n long offset = record.offset();\n String value = Base64.getEncoder().encodeToString(record.value());\n hashSet.add(record);\n System.out.printf(\"Received %s: %s%n\", offset, value);\n }\n // Early exit. Remove entirely to keep the consumer alive indefinitely.\n if (hashSet.size() \u003e= 10) {\n System.out.println(\"Received 10 messages.\");\n break;\n }\n }\n }\n }\n }\n\nWhat's next\n-----------\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=pubsublite)."]]