Guida rapida: pubblica e ricevi messaggi in Pub/Sub Lite utilizzando l'API Apache Kafka

Pubblica e ricevi messaggi in Pub/Sub Lite utilizzando l'API Apache Kafka

Questa pagina mostra come utilizzare Pub/Sub Lite Kafka Shim per pubblicare e ricevere messaggi da Pub/Sub Lite.

Pub/Sub Lite Kafka Shim è una libreria Java che consente agli utenti di Apache Kafka libreria client Java di utilizzare facilmente Pub/Sub Lite. A questo scopo, implementa l'API Producer e l'API Consumer.

Ciò è possibile perché gli argomenti Pub/Sub Lite, come gli argomenti Apache Kafka, sono log partizionati che monitorano i progressi dei consumatori con offset numerici.

Sebbene i due sistemi siano simili, esistono alcune differenze pratiche significative:

  • Un argomento Pub/Sub Lite è equivalente a un argomento Kafka. Tuttavia, un argomento Lite ha velocità effettiva e capacità di archiviazione configurabili su ciascuna partizione di argomenti, mentre la capacità dell'argomento Kafka è determinata dalle configurazioni del cluster Kafka.
  • Un abbonamento Pub/Sub Lite è equivalente a un gruppo di consumatori Kafka. Una sottoscrizione Lite è una risorsa denominata Google Cloud Platform di primo ordine che rappresenta i messaggi provenienti dalle partizioni di argomenti Lite da cui i sottoscrittori possono leggere. Analogamente, un gruppo di consumatori Kafka è composto da consumatori in grado di leggere i dati delle partizioni di un argomento Kafka.

Prima di iniziare

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. Installa Google Cloud CLI.
  3. Per initialize gcloud CLI, esegui questo comando:

    gcloud init
  4. Crea o seleziona un progetto Google Cloud.

    • Crea un progetto Google Cloud:

      gcloud projects create PROJECT_ID

      Sostituisci PROJECT_ID con un nome per il progetto Google Cloud che stai creando.

    • Seleziona il progetto Google Cloud che hai creato:

      gcloud config set project PROJECT_ID

      Sostituisci PROJECT_ID con il nome del tuo progetto Google Cloud.

  5. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  6. Attiva l'API Pub/Sub Lite.

    gcloud services enable pubsublite.googleapis.com
  7. Crea credenziali di autenticazione locali per il tuo Account Google:

    gcloud auth application-default login
  8. Concedi i ruoli al tuo Account Google. Esegui questo comando una volta per ciascuno dei seguenti ruoli IAM: roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Sostituisci PROJECT_ID con l'ID progetto.
    • Sostituisci EMAIL_ADDRESS con il tuo indirizzo email.
    • Sostituisci ROLE con ogni singolo ruolo.
  9. Installa Google Cloud CLI.
  10. Per initialize gcloud CLI, esegui questo comando:

    gcloud init
  11. Crea o seleziona un progetto Google Cloud.

    • Crea un progetto Google Cloud:

      gcloud projects create PROJECT_ID

      Sostituisci PROJECT_ID con un nome per il progetto Google Cloud che stai creando.

    • Seleziona il progetto Google Cloud che hai creato:

      gcloud config set project PROJECT_ID

      Sostituisci PROJECT_ID con il nome del tuo progetto Google Cloud.

  12. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  13. Attiva l'API Pub/Sub Lite.

    gcloud services enable pubsublite.googleapis.com
  14. Crea credenziali di autenticazione locali per il tuo Account Google:

    gcloud auth application-default login
  15. Concedi i ruoli al tuo Account Google. Esegui questo comando una volta per ciascuno dei seguenti ruoli IAM: roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Sostituisci PROJECT_ID con l'ID progetto.
    • Sostituisci EMAIL_ADDRESS con il tuo indirizzo email.
    • Sostituisci ROLE con ogni singolo ruolo.

Installa la libreria client

I seguenti esempi mostrano come installare le librerie client di pubsublite-kafka.

Java

If you are using Maven without a BOM, add this to your dependencies:

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>pubsublite-kafka</artifactId>
  <version>1.1.1</version>
</dependency>

If you are using Gradle, add the following to your dependencies:

implementation 'com.google.cloud:pubsublite-kafka:1.1.2'

If you are using sbt, add the following to your dependencies:

libraryDependencies += "com.google.cloud" % "pubsublite-kafka" % "1.1.2"

If you're using Visual Studio Code, IntelliJ, or Eclipse, you can add client libraries to your project using the following IDE plugins:

The plugins provide additional functionality, such as key management for service accounts. Refer to each plugin's documentation for details.

Creare un argomento e una sottoscrizione Lite

Crea un argomento Lite e una sottoscrizione Lite con i comandi seguenti.

gcloud pubsub lite-topics create LITE_TOPIC_NAME \
    --location=LITE_LOCATION \
    --partitions=1 \
    --per-partition-bytes=30GiB

gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION_NAME \
    --location=LITE_LOCATION \
    --topic=LITE_TOPIC_NAME \
    --starting-offset=end \
    --delivery-requirement=deliver-after-stored

Sostituisci quanto segue:

  • LITE_TOPIC_NAME: il nome del nuovo argomento Lite.
  • LITE_SUBSCRIPTION_NAME: il nome del nuovo abbonamento Lite.
  • LITE_LOCATION: la località in cui stai creando l'argomento Lite e la sottoscrizione Lite. Scegli una località Pub/Sub Lite supportata. Specifica anche una zona per la regione. Ad esempio: us-central1-a.

Pubblica messaggi in Pub/Sub Lite

Crea un producer Pub/Sub Lite per inviare dati all'argomento Lite. Questo corso implementa org.apache.kafka.clients.Producer, la stessa interfaccia di KafkaProducer.

import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.kafka.ProducerSettings;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class ProducerExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Use an existing Pub/Sub Lite topic.
    String topicId = "your-topic-id";
    // Using the project number is required for constructing a Pub/Sub Lite
    // topic path that the Kafka producer can use.
    long projectNumber = Long.parseLong("123456789");

    producerExample(cloudRegion, zoneId, projectNumber, topicId);
  }

  public static void producerExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId)
      throws InterruptedException, ExecutionException {
    TopicPath topicPath =
        TopicPath.newBuilder()
            .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setProject(ProjectNumber.of(projectNumber))
            .setName(TopicName.of(topicId))
            .build();

    ProducerSettings producerSettings =
        ProducerSettings.newBuilder().setTopicPath(topicPath).build();

    List<Future<RecordMetadata>> futures = new ArrayList<>();
    try (Producer<byte[], byte[]> producer = producerSettings.instantiate()) {
      for (long i = 0L; i < 10L; i++) {
        String key = "demo";
        Future<RecordMetadata> future =
            producer.send(
                new ProducerRecord(
                    topicPath.toString(), key.getBytes(), ("message-" + i).getBytes()));
        futures.add(future);
      }
      for (Future<RecordMetadata> future : futures) {
        RecordMetadata meta = future.get();
        System.out.println(meta.offset());
      }
    }
    System.out.printf("Published 10 messages to %s%n", topicPath.toString());
  }
}

Ricevi messaggi da Pub/Sub Lite

Crea un consumer Pub/Sub Lite per ricevere dati dalla sottoscrizione Lite. Questo corso implementa org.apache.kafka.clients.consumer.Consumer, la stessa interfaccia di KafkaConsumer.

import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.kafka.ConsumerSettings;
import java.time.Duration;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

public class ConsumerExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Use an existing Pub/Sub Lite topic and subscription.
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    // Using the project number here is required for constructing a Pub/Sub Lite
    // topic path that the Kafka consumer can use.
    long projectNumber = Long.parseLong("123456789");

    consumerExample(cloudRegion, zoneId, projectNumber, topicId, subscriptionId);
  }

  public static void consumerExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, String subscriptionId) {

    CloudZone location = CloudZone.of(CloudRegion.of(cloudRegion), zoneId);

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(TopicName.of(topicId))
            .build();

    SubscriptionPath subscription =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    FlowControlSettings flowControlSettings =
        FlowControlSettings.builder()
            // 50 MiB. Must be greater than the allowed size of the largest message (1 MiB).
            .setBytesOutstanding(50 * 1024 * 1024L)
            // 10,000 outstanding messages. Must be >0.
            .setMessagesOutstanding(10000L)
            .build();

    ConsumerSettings settings =
        ConsumerSettings.newBuilder()
            .setSubscriptionPath(subscription)
            .setPerPartitionFlowControlSettings(flowControlSettings)
            .setAutocommit(true)
            .build();

    Set<ConsumerRecord<byte[], byte[]>> hashSet = new HashSet<>();
    try (Consumer<byte[], byte[]> consumer = settings.instantiate()) {
      // The consumer can only subscribe to the topic that it is associated to.
      // If this is the only subscriber for this subscription, it will take up
      // to 90s for the subscriber to warm up.
      consumer.subscribe(Arrays.asList(topicPath.toString()));
      while (true) {
        ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<byte[], byte[]> record : records) {
          long offset = record.offset();
          String value = Base64.getEncoder().encodeToString(record.value());
          hashSet.add(record);
          System.out.printf("Received %s: %s%n", offset, value);
        }
        // Early exit. Remove entirely to keep the consumer alive indefinitely.
        if (hashSet.size() >= 10) {
          System.out.println("Received 10 messages.");
          break;
        }
      }
    }
  }
}

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa pagina, elimina il progetto Cloud con le risorse.

  1. Elimina l'argomento e la sottoscrizione Lite.

    gcloud pubsub lite-topics delete LITE_TOPIC_NAME
    gcloud pubsub lite-subscriptions delete LITE_SUBSCRIPTION_NAME
    

  2. Elimina l'account di servizio:
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  3. Facoltativo: revoca le credenziali di autenticazione che hai creato ed elimina il file delle credenziali locale.

    gcloud auth application-default revoke
  4. Facoltativo: revoca le credenziali dallgcloud CLI.

    gcloud auth revoke

Passaggi successivi