使用用于 Kafka Producer API 的 shim,将消息发布到 Pub/Sub Lite 主题
代码示例
Java
如需向 Pub/Sub Lite 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.kafka.ProducerSettings;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class ProducerExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String cloudRegion = "your-cloud-region";
char zoneId = 'b';
// Use an existing Pub/Sub Lite topic.
String topicId = "your-topic-id";
// Using the project number is required for constructing a Pub/Sub Lite
// topic path that the Kafka producer can use.
long projectNumber = Long.parseLong("123456789");
producerExample(cloudRegion, zoneId, projectNumber, topicId);
}
public static void producerExample(
String cloudRegion, char zoneId, long projectNumber, String topicId)
throws InterruptedException, ExecutionException {
TopicPath topicPath =
TopicPath.newBuilder()
.setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
.setProject(ProjectNumber.of(projectNumber))
.setName(TopicName.of(topicId))
.build();
ProducerSettings producerSettings =
ProducerSettings.newBuilder().setTopicPath(topicPath).build();
List<Future<RecordMetadata>> futures = new ArrayList<>();
try (Producer<byte[], byte[]> producer = producerSettings.instantiate()) {
for (long i = 0L; i < 10L; i++) {
String key = "demo";
Future<RecordMetadata> future =
producer.send(
new ProducerRecord(
topicPath.toString(), key.getBytes(), ("message-" + i).getBytes()));
futures.add(future);
}
for (Future<RecordMetadata> future : futures) {
RecordMetadata meta = future.get();
System.out.println(meta.offset());
}
}
System.out.printf("Published 10 messages to %s%n", topicPath.toString());
}
}
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。