Kurzanleitung: Nachrichten mit der Apache Kafka API in Pub/Sub Lite veröffentlichen und empfangen

Nachrichten mit der Apache Kafka API in Pub/Sub Lite veröffentlichen und empfangen

Auf dieser Seite wird beschrieben, wie Sie mit dem Pub/Sub Lite Kafka Shim Nachrichten in Pub/Sub Lite veröffentlichen und von Pub/Sub Lite empfangen können.

Pub/Sub Lite Kafka Shim ist eine Java-Bibliothek, die Nutzern der Apache Kafka Java-Clientbibliothek die Arbeit mit Pub/Sub Lite erleichtert. Dies wird durch die Implementierung der Producer API und der Consumer API erreicht.

Dies ist möglich, da Pub/Sub Lite-Themen wie Apache Kafka-Themen partitionierte Logs sind, mit denen der Fortschritt des Nutzers mit numerischen Offsets verfolgt wird.

Obwohl die beiden Systeme ähnlich sind, gibt es einige praktische Unterschiede:

  • Ein Pub/Sub Lite-Thema entspricht einem Kafka-Thema. Ein Lite-Thema hat jedoch konfigurierbare Durchsatz- und Speicherkapazität für jede Themenpartition, während die Kafka-Themenkapazität von Kafka-Clusterkonfigurationen bestimmt wird.
  • Ein Pub/Sub Lite-Abo entspricht einer Kafka-Nutzergruppe. Ein Lite-Abo ist eine Google Cloud Platform-Ressource mit benannten Namen, die Nachrichten aus Partitionen mit Lite-Themen darstellt, aus denen Abonnenten lesen können. Ebenso umfasst eine Kafka-Nutzergruppe Nutzer, die Daten aus den Partitionen eines Kafka-Themas lesen können.

Hinweis

  1. Melden Sie sich bei Ihrem Google Cloud-Konto an. Wenn Sie mit Google Cloud noch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
  2. Installieren Sie die Google Cloud CLI.
  3. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  4. Google Cloud-Projekt erstellen oder auswählen.

    • Erstellen Sie ein Google Cloud-Projekt:

      gcloud projects create PROJECT_ID

      Ersetzen Sie PROJECT_ID durch einen Namen für das Google Cloud-Projekt, das Sie erstellen.

    • Wählen Sie das von Ihnen erstellte Google Cloud-Projekt aus:

      gcloud config set project PROJECT_ID

      Ersetzen Sie PROJECT_ID durch den Namen Ihres Google Cloud-Projekts.

  5. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  6. Aktivieren Sie die Pub/Sub Lite API:

    gcloud services enable pubsublite.googleapis.com
  7. Erstellen Sie lokale Anmeldedaten zur Authentifizierung für Ihr Google-Konto:

    gcloud auth application-default login
  8. Gewähren Sie Ihrem Google-Konto Rollen. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus: roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Ersetzen Sie PROJECT_ID durch Ihre Projekt-ID.
    • Ersetzen Sie EMAIL_ADDRESS durch Ihre E-Mail-Adresse.
    • Ersetzen Sie ROLE durch jede einzelne Rolle.
  9. Installieren Sie die Google Cloud CLI.
  10. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  11. Google Cloud-Projekt erstellen oder auswählen.

    • Erstellen Sie ein Google Cloud-Projekt:

      gcloud projects create PROJECT_ID

      Ersetzen Sie PROJECT_ID durch einen Namen für das Google Cloud-Projekt, das Sie erstellen.

    • Wählen Sie das von Ihnen erstellte Google Cloud-Projekt aus:

      gcloud config set project PROJECT_ID

      Ersetzen Sie PROJECT_ID durch den Namen Ihres Google Cloud-Projekts.

  12. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  13. Aktivieren Sie die Pub/Sub Lite API:

    gcloud services enable pubsublite.googleapis.com
  14. Erstellen Sie lokale Anmeldedaten zur Authentifizierung für Ihr Google-Konto:

    gcloud auth application-default login
  15. Gewähren Sie Ihrem Google-Konto Rollen. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus: roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Ersetzen Sie PROJECT_ID durch Ihre Projekt-ID.
    • Ersetzen Sie EMAIL_ADDRESS durch Ihre E-Mail-Adresse.
    • Ersetzen Sie ROLE durch jede einzelne Rolle.

Clientbibliothek installieren

Die folgenden Beispiele zeigen, wie Sie die pubsublite-kafka-Clientbibliotheken installieren.

Java

Wenn Sie Maven ohne BOM verwenden, fügen Sie Ihren Abhängigkeiten Folgendes hinzu:

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>pubsublite-kafka</artifactId>
  <version>1.1.1</version>
</dependency>

Wenn Sie Gradle verwenden, fügen Sie den Abhängigkeiten Folgendes hinzu:

implementation 'com.google.cloud:pubsublite-kafka:1.1.2'

Wenn Sie sbt nutzen, fügen Sie den Abhängigkeiten Folgendes hinzu:

libraryDependencies += "com.google.cloud" % "pubsublite-kafka" % "1.1.2"

Wenn Sie Visual Studio Code, IntelliJ oder Eclipse verwenden, können Sie Ihrem Projekt mithilfe der folgenden IDE-Plug-ins Clientbibliotheken hinzufügen:

Diese Plug-ins bieten zusätzliche Funktionen wie die Schlüsselverwaltung für Dienstkonten. Einzelheiten finden Sie in der Dokumentation der einzelnen Plug-ins.

Lite-Thema und Lite-Abo erstellen

Erstellen Sie mit den folgenden Befehlen ein Lite-Thema und ein Lite-Abo.

gcloud pubsub lite-topics create LITE_TOPIC_NAME \
    --location=LITE_LOCATION \
    --partitions=1 \
    --per-partition-bytes=30GiB

gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION_NAME \
    --location=LITE_LOCATION \
    --topic=LITE_TOPIC_NAME \
    --starting-offset=end \
    --delivery-requirement=deliver-after-stored

Dabei gilt:

  • LITE_TOPIC_NAME ist der Name Ihres neuen Lite-Themas.
  • LITE_SUBSCRIPTION_NAME ist der Name Ihres neuen Lite-Abos.
  • LITE_LOCATION: der Standort, an dem Sie Ihr Lite-Thema und Lite-Abo erstellen. Wählen Sie einen unterstützten Pub/Sub Lite-Speicherort aus. Geben Sie auch eine Zone für die Region an. Beispiel: us-central1-a

Nachrichten in Pub/Sub Lite veröffentlichen

Erstellen Sie einen Pub/Sub Lite-Ersteller, um Daten an Ihr Lite-Thema zu senden. Diese Klasse implementiert org.apache.kafka.clients.Producer, die gleiche Schnittstelle wie KafkaProducer.

import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.kafka.ProducerSettings;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class ProducerExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Use an existing Pub/Sub Lite topic.
    String topicId = "your-topic-id";
    // Using the project number is required for constructing a Pub/Sub Lite
    // topic path that the Kafka producer can use.
    long projectNumber = Long.parseLong("123456789");

    producerExample(cloudRegion, zoneId, projectNumber, topicId);
  }

  public static void producerExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId)
      throws InterruptedException, ExecutionException {
    TopicPath topicPath =
        TopicPath.newBuilder()
            .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setProject(ProjectNumber.of(projectNumber))
            .setName(TopicName.of(topicId))
            .build();

    ProducerSettings producerSettings =
        ProducerSettings.newBuilder().setTopicPath(topicPath).build();

    List<Future<RecordMetadata>> futures = new ArrayList<>();
    try (Producer<byte[], byte[]> producer = producerSettings.instantiate()) {
      for (long i = 0L; i < 10L; i++) {
        String key = "demo";
        Future<RecordMetadata> future =
            producer.send(
                new ProducerRecord(
                    topicPath.toString(), key.getBytes(), ("message-" + i).getBytes()));
        futures.add(future);
      }
      for (Future<RecordMetadata> future : futures) {
        RecordMetadata meta = future.get();
        System.out.println(meta.offset());
      }
    }
    System.out.printf("Published 10 messages to %s%n", topicPath.toString());
  }
}

Nachrichten von Pub/Sub Lite empfangen

Erstellen Sie einen Pub/Sub Lite-Nutzer, um Daten von Ihrem Lite-Abo zu erhalten. Diese Klasse implementiert org.apache.kafka.clients.consumer.Consumer, die gleiche Schnittstelle wie KafkaConsumer.

import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.kafka.ConsumerSettings;
import java.time.Duration;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

public class ConsumerExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Use an existing Pub/Sub Lite topic and subscription.
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    // Using the project number here is required for constructing a Pub/Sub Lite
    // topic path that the Kafka consumer can use.
    long projectNumber = Long.parseLong("123456789");

    consumerExample(cloudRegion, zoneId, projectNumber, topicId, subscriptionId);
  }

  public static void consumerExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, String subscriptionId) {

    CloudZone location = CloudZone.of(CloudRegion.of(cloudRegion), zoneId);

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(TopicName.of(topicId))
            .build();

    SubscriptionPath subscription =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    FlowControlSettings flowControlSettings =
        FlowControlSettings.builder()
            // 50 MiB. Must be greater than the allowed size of the largest message (1 MiB).
            .setBytesOutstanding(50 * 1024 * 1024L)
            // 10,000 outstanding messages. Must be >0.
            .setMessagesOutstanding(10000L)
            .build();

    ConsumerSettings settings =
        ConsumerSettings.newBuilder()
            .setSubscriptionPath(subscription)
            .setPerPartitionFlowControlSettings(flowControlSettings)
            .setAutocommit(true)
            .build();

    Set<ConsumerRecord<byte[], byte[]>> hashSet = new HashSet<>();
    try (Consumer<byte[], byte[]> consumer = settings.instantiate()) {
      // The consumer can only subscribe to the topic that it is associated to.
      // If this is the only subscriber for this subscription, it will take up
      // to 90s for the subscriber to warm up.
      consumer.subscribe(Arrays.asList(topicPath.toString()));
      while (true) {
        ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<byte[], byte[]> record : records) {
          long offset = record.offset();
          String value = Base64.getEncoder().encodeToString(record.value());
          hashSet.add(record);
          System.out.printf("Received %s: %s%n", offset, value);
        }
        // Early exit. Remove entirely to keep the consumer alive indefinitely.
        if (hashSet.size() >= 10) {
          System.out.println("Received 10 messages.");
          break;
        }
      }
    }
  }
}

Bereinigen

Löschen Sie das Cloud-Projekt mit den Ressourcen, damit Ihrem Google Cloud-Konto die auf dieser Seite verwendeten Ressourcen nicht in Rechnung gestellt werden.

  1. Löschen Sie das Lite-Thema und -Abo.

    gcloud pubsub lite-topics delete LITE_TOPIC_NAME
    gcloud pubsub lite-subscriptions delete LITE_SUBSCRIPTION_NAME
    

  2. Löschen Sie das Dienstkonto:
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  3. Optional: Widerrufen Sie die von Ihnen erstellten Anmeldedaten für die Authentifizierung und löschen Sie die lokale Datei mit den Anmeldedaten:

    gcloud auth application-default revoke
  4. Optional: Widerrufen Sie Anmeldedaten von der gcloud-CLI.

    gcloud auth revoke

Nächste Schritte