从 Apache Kafka 发布消息
使用集合让一切井井有条
根据您的偏好保存内容并对其进行分类。
使用用于 Kafka Producer API 的 shim,将消息发布到 Pub/Sub Lite 主题
代码示例
如未另行说明,那么本页面中的内容已根据知识共享署名 4.0 许可获得了许可,并且代码示例已根据 Apache 2.0 许可获得了许可。有关详情,请参阅 Google 开发者网站政策。Java 是 Oracle 和/或其关联公司的注册商标。
[[["易于理解","easyToUnderstand","thumb-up"],["解决了我的问题","solvedMyProblem","thumb-up"],["其他","otherUp","thumb-up"]],[["很难理解","hardToUnderstand","thumb-down"],["信息或示例代码不正确","incorrectInformationOrSampleCode","thumb-down"],["没有我需要的信息/示例","missingTheInformationSamplesINeed","thumb-down"],["翻译问题","translationIssue","thumb-down"],["其他","otherDown","thumb-down"]],[],[],[],null,["# Publish messages from Apache Kafka\n\nPublish messages to a Pub/Sub Lite topic using a shim for the Kafka Producer API\n\nCode sample\n-----------\n\n### Java\n\n\nTo authenticate to Pub/Sub Lite, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudRegion.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudZone.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.ProjectNumber.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.TopicName.html;\n import com.google.cloud.pubsublite.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html;\n import com.google.cloud.pubsublite.kafka.https://cloud.google.com/java/docs/reference/pubsublite-kafka/latest/com.google.cloud.pubsublite.kafka.ProducerSettings.html;\n import java.util.ArrayList;\n import java.util.List;\n import java.util.concurrent.ExecutionException;\n import java.util.concurrent.Future;\n import org.apache.kafka.clients.producer.Producer;\n import org.apache.kafka.clients.producer.ProducerRecord;\n import org.apache.kafka.clients.producer.RecordMetadata;\n\n public class ProducerExample {\n\n public static void main(String... args) throws Exception {\n // TODO(developer): Replace these variables before running the sample.\n String cloudRegion = \"your-cloud-region\";\n char zoneId = 'b';\n // Use an existing Pub/Sub Lite topic.\n String topicId = \"your-topic-id\";\n // Using the project number is required for constructing a Pub/Sub Lite\n // topic path that the Kafka producer can use.\n long projectNumber = Long.parseLong(\"123456789\");\n\n producerExample(cloudRegion, zoneId, projectNumber, topicId);\n }\n\n public static void producerExample(\n String cloudRegion, char zoneId, long projectNumber, String topicId)\n throws InterruptedException, ExecutionException {\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html topicPath =\n https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html.newBuilder()\n .setLocation(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudZone.html.of(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.CloudRegion.html.of(cloudRegion), zoneId))\n .setProject(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.ProjectNumber.html.of(projectNumber))\n .setName(https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.TopicName.html.of(topicId))\n .build();\n\n https://cloud.google.com/java/docs/reference/pubsublite-kafka/latest/com.google.cloud.pubsublite.kafka.ProducerSettings.html producerSettings =\n https://cloud.google.com/java/docs/reference/pubsublite-kafka/latest/com.google.cloud.pubsublite.kafka.ProducerSettings.html.newBuilder().setTopicPath(topicPath).build();\n\n List\u003cFuture\u003cRecordMetadata\u003e\u003e futures = new ArrayList\u003c\u003e();\n try (Producer\u003cbyte[], byte[]\u003e producer = producerSettings.https://cloud.google.com/java/docs/reference/pubsublite-kafka/latest/com.google.cloud.pubsublite.kafka.ProducerSettings.html#com_google_cloud_pubsublite_kafka_ProducerSettings_instantiate__()) {\n for (long i = 0L; i \u003c 10L; i++) {\n String key = \"demo\";\n Future\u003cRecordMetadata\u003e future =\n producer.send(\n new ProducerRecord(\n https://cloud.google.com/java/docs/reference/pubsublite-spark-sql-streaming/latest/com.google.cloud.pubsublite.spark.PslWriteDataSourceOptions.html#com_google_cloud_pubsublite_spark_PslWriteDataSourceOptions_topicPath__.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html#com_google_cloud_pubsublite_TopicPath_toString__(), https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.Message.html#com_google_cloud_pubsublite_Message_key__.getBytes(), (\"message-\" + i).getBytes()));\n futures.add(future);\n }\n for (Future\u003cRecordMetadata\u003e future : futures) {\n RecordMetadata meta = future.get();\n System.out.println(meta.offset());\n }\n }\n System.out.printf(\"Published 10 messages to %s%n\", https://cloud.google.com/java/docs/reference/pubsublite-spark-sql-streaming/latest/com.google.cloud.pubsublite.spark.PslWriteDataSourceOptions.html#com_google_cloud_pubsublite_spark_PslWriteDataSourceOptions_topicPath__.https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.TopicPath.html#com_google_cloud_pubsublite_TopicPath_toString__());\n }\n }\n\nWhat's next\n-----------\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=pubsublite)."]]