Apache Kafka에서 메시지 수신
컬렉션을 사용해 정리하기
내 환경설정을 기준으로 콘텐츠를 저장하고 분류하세요.
Kafka Consumer API용 shim을 사용하여 Pub/Sub 라이트 구독에서 메시지를 수신합니다.
코드 샘플
Java
Pub/Sub Lite에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다.
자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.
달리 명시되지 않는 한 이 페이지의 콘텐츠에는 Creative Commons Attribution 4.0 라이선스에 따라 라이선스가 부여되며, 코드 샘플에는 Apache 2.0 라이선스에 따라 라이선스가 부여됩니다. 자세한 내용은 Google Developers 사이트 정책을 참조하세요. 자바는 Oracle 및/또는 Oracle 계열사의 등록 상표입니다.
[[["이해하기 쉬움","easyToUnderstand","thumb-up"],["문제가 해결됨","solvedMyProblem","thumb-up"],["기타","otherUp","thumb-up"]],[["이해하기 어려움","hardToUnderstand","thumb-down"],["잘못된 정보 또는 샘플 코드","incorrectInformationOrSampleCode","thumb-down"],["필요한 정보/샘플이 없음","missingTheInformationSamplesINeed","thumb-down"],["번역 문제","translationIssue","thumb-down"],["기타","otherDown","thumb-down"]],[],[],[],null,["# Receive messages in Apache Kafka\n\nReceive messages from a Pub/Sub Lite subscription using a shim for the Kafka Consumer API\n\nCode sample\n-----------\n\n### Java\n\n\nTo authenticate to Pub/Sub Lite, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudRegion.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudZone.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.ProjectNumber.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.SubscriptionName.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.SubscriptionPath.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.TopicName.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html;\n import com.google.cloud.pubsublite.cloudpubsub.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings.html;\n import com.google.cloud.pubsublite.kafka.https://cloud.google.com/java/docs/reference/pubsublite-kafka/latest/com.google.cloud.pubsublite.kafka.ConsumerSettings.html;\n import java.time.Duration;\n import java.util.Arrays;\n import java.util.Base64;\n import java.util.HashSet;\n import java.util.Set;\n import org.apache.kafka.clients.consumer.Consumer;\n import org.apache.kafka.clients.consumer.ConsumerRecord;\n import org.apache.kafka.clients.consumer.ConsumerRecords;\n\n public class ConsumerExample {\n\n public static void main(String... args) throws Exception {\n // TODO(developer): Replace these variables before running the sample.\n String cloudRegion = \"your-cloud-region\";\n char zoneId = 'b';\n // Use an existing Pub/Sub Lite topic and subscription.\n String topicId = \"your-topic-id\";\n String subscriptionId = \"your-subscription-id\";\n // Using the project number here is required for constructing a Pub/Sub Lite\n // topic path that the Kafka consumer can use.\n long projectNumber = Long.parseLong(\"123456789\");\n\n consumerExample(cloudRegion, zoneId, projectNumber, topicId, subscriptionId);\n }\n\n public static void consumerExample(\n String cloudRegion, char zoneId, long projectNumber, String topicId, String subscriptionId) {\n\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudZone.html location = https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudZone.html.of(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudRegion.html.of(cloudRegion), zoneId);\n\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html topicPath =\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html.newBuilder()\n .setLocation(location)\n .setProject(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.ProjectNumber.html.of(projectNumber))\n .setName(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.TopicName.html.of(topicId))\n .build();\n\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.SubscriptionPath.html subscription =\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.SubscriptionPath.html.newBuilder()\n .setLocation(location)\n .setProject(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.ProjectNumber.html.of(projectNumber))\n .setName(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.SubscriptionName.html.of(subscriptionId))\n .build();\n\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings.html flowControlSettings =\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings.html.builder()\n // 50 MiB. Must be greater than the allowed size of the largest message (1 MiB).\n .setBytesOutstanding(50 * 1024 * 1024L)\n // 10,000 outstanding messages. Must be \u003e0.\n .setMessagesOutstanding(10000L)\n .build();\n\n https://cloud.google.com/java/docs/reference/pubsublite-kafka/latest/com.google.cloud.pubsublite.kafka.ConsumerSettings.html settings =\n https://cloud.google.com/java/docs/reference/pubsublite-kafka/latest/com.google.cloud.pubsublite.kafka.ConsumerSettings.html.newBuilder()\n .setSubscriptionPath(subscription)\n .setPerPartitionFlowControlSettings(flowControlSettings)\n .setAutocommit(true)\n .build();\n\n Set\u003cConsumerRecord\u003cbyte[], byte[]\u003e\u003e hashSet = new HashSet\u003c\u003e();\n try (Consumer\u003cbyte[], byte[]\u003e consumer = settings.https://cloud.google.com/java/docs/reference/pubsublite-kafka/latest/com.google.cloud.pubsublite.kafka.ConsumerSettings.html#com_google_cloud_pubsublite_kafka_ConsumerSettings_instantiate__()) {\n // The consumer can only subscribe to the topic that it is associated to.\n // If this is the only subscriber for this subscription, it will take up\n // to 90s for the subscriber to warm up.\n consumer.subscribe(Arrays.asList(https://cloud.google.com/java/docs/reference/pubsublite-spark-sql-streaming/latest/com.google.cloud.pubsublite.spark.PslWriteDataSourceOptions.html#com_google_cloud_pubsublite_spark_PslWriteDataSourceOptions_topicPath__.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html#com_google_cloud_pubsublite_TopicPath_toString__()));\n while (true) {\n ConsumerRecords\u003cbyte[], byte[]\u003e records = consumer.poll(Duration.ofMillis(100));\n for (ConsumerRecord\u003cbyte[], byte[]\u003e record : records) {\n long offset = record.offset();\n String value = Base64.getEncoder().encodeToString(record.value());\n hashSet.add(record);\n System.out.printf(\"Received %s: %s%n\", offset, value);\n }\n // Early exit. Remove entirely to keep the consumer alive indefinitely.\n if (hashSet.size() \u003e= 10) {\n System.out.println(\"Received 10 messages.\");\n break;\n }\n }\n }\n }\n }\n\nWhat's next\n-----------\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=pubsublite)."]]