Menggunakan Pub/Sub di aplikasi Spring

Halaman ini menjelaskan cara menggunakan Pub/Sub dalam aplikasi Java yang dibangun dengan Framework Spring.

Spring Cloud GCP memiliki beberapa modul untuk mengirim pesan ke topik Pub/Sub dan menerima pesan dari langganan Pub/Sub menggunakan Framework Spring. Anda dapat menggunakan modul-modul ini secara independen atau menggabungkannya untuk kasus penggunaan yang berbeda:

CATATAN: Library Spring Cloud GCP tidak menyediakan akses ke AckReplyConsumerWithResponse, yang merupakan modul yang diperlukan untuk mengimplementasikan fitur tepat satu kali menggunakan library klien Java.

Sebelum memulai

  1. Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
  2. Siapkan project Konsol Google Cloud

    Menyiapkan project

    Klik:

    • Buat atau pilih project.
    • Aktifkan API Pub/Sub untuk project tersebut.
    • Buat akun layanan.
    • Download kunci pribadi sebagai JSON.

    Anda dapat melihat dan mengelola resource ini kapan saja di Konsol Google Cloud.

  3. Tetapkan variabel lingkungan GOOGLE_APPLICATION_CREDENTIALS ke jalur file JSON yang berisi kredensial Anda. Variabel ini hanya berlaku untuk sesi shell Anda saat ini. Jadi, jika Anda membuka sesi baru, tetapkan variabel kembali.

  4. Siapkan project Konsol Google Cloud

    Menyiapkan project

    Klik:

    • Buat atau pilih project.
    • Aktifkan API Pub/Sub untuk project tersebut.
    • Buat akun layanan.
    • Download kunci pribadi sebagai JSON.

    Anda dapat melihat dan mengelola resource ini kapan saja di Konsol Google Cloud.

  5. Tetapkan variabel lingkungan GOOGLE_APPLICATION_CREDENTIALS ke jalur file JSON yang berisi kredensial Anda. Variabel ini hanya berlaku untuk sesi shell Anda saat ini. Jadi, jika Anda membuka sesi baru, tetapkan variabel kembali.

  6. Tetapkan variabel lingkungan GOOGLE_CLOUD_PROJECT ke project ID Google Cloud Anda.

Menggunakan Spring Cloud GCP Pub/Sub Starter

Modul Spring Cloud GCP Pub/Sub Starter menginstal library klien Java Pub/Sub menggunakan modul Spring Cloud GCP Pub/Sub. Anda dapat memanggil Pub/Sub API dari aplikasi Spring menggunakan class yang disediakan oleh Spring Cloud GCP Pub/Sub Starter atau library klien Pub/Sub Java. Jika menggunakan class yang disediakan oleh Spring Cloud GCP Pub/Sub Starter, Anda dapat mengganti konfigurasi Pub/Sub default.

Menginstal modul

Untuk menginstal modul Spring Cloud GCP Pub/Sub Starter, tambahkan dependensi berikut ke file pom.xml Anda:

  1. Spring Cloud Bill of Materials (BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Artefak Spring Cloud GCP Pub/Sub Starter:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>

Operasi yang didukung

Modul Spring Cloud GCP Pub/Sub Starter mencakup class berikut:

  • PubSubAdmin untuk operasi administratif:
    • Membuat topik dan langganan.
    • Mendapatkan topik dan langganan.
    • Mencantumkan topik dan langganan.
    • Hapus topik dan langganan.
    • Mendapatkan dan menetapkan batas waktu konfirmasi pada langganan.
  • PubSubTemplate untuk mengirim dan menerima pesan:
    • Memublikasikan pesan ke topik.
    • Menarik pesan dari langganan secara sinkron.
    • Tarik pesan secara asinkron dari langganan.
    • Mengonfirmasi pesan.
    • Ubah batas waktu konfirmasi.
    • Mengonversi pesan Pub/Sub menjadi Objek Java Lama Biasa (POJO).

Menggunakan adaptor saluran Spring Integration

Jika aplikasi Spring menggunakan saluran pesan Spring Integration, Anda dapat merutekan pesan antara saluran pesan dan Pub/Sub menggunakan adaptor saluran.

Menginstal modul

Untuk menginstal modul adaptor saluran Integrasi Spring, tambahkan kode berikut ke file pom.xml Anda:

  1. BOM GCP Spring Cloud.

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Artefak Spring Cloud GCP Pub/Sub Starter dan Spring Integration Core:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-core</artifactId>
    </dependency>

Menerima pesan dari Pub/Sub

Untuk menerima pesan dari langganan Pub/Sub di aplikasi Spring, gunakan adaptor saluran masuk. Adaptor saluran masuk mengonversi pesan Pub/Sub yang masuk menjadi POJO, lalu meneruskan POJO ke saluran pesan.

// Create a message channel for messages arriving from the subscription `sub-one`.
@Bean
public MessageChannel inputMessageChannel() {
  return new PublishSubscribeChannel();
}

// Create an inbound channel adapter to listen to the subscription `sub-one` and send
// messages to the input message channel.
@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
    @Qualifier("inputMessageChannel") MessageChannel messageChannel,
    PubSubTemplate pubSubTemplate) {
  PubSubInboundChannelAdapter adapter =
      new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
  adapter.setOutputChannel(messageChannel);
  adapter.setAckMode(AckMode.MANUAL);
  adapter.setPayloadType(String.class);
  return adapter;
}

// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
public void messageReceiver(
    String payload,
    @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
  LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
  message.ack();
}

Contoh di atas menggunakan resource beans Spring dan Pub/Sub berikut:

  • Kacang saluran pesan bernama inputMessageChannel.
  • Bean adaptor saluran masuk bernama inboundChannelAdapter dari jenis PubSubInboundChannelAdapter.
  • ID langganan Pub/Sub yang bernama sub-one.

inboundChannelAdapter secara asinkron menarik pesan dari sub-one menggunakan PubSubTemplate dan mengirim pesan ke inputMessageChannel.

inboundChannelAdapter menetapkan mode konfirmasi ke MANUAL sehingga aplikasi dapat mengonfirmasi pesan setelah memprosesnya. Mode konfirmasi default dari jenis PubSubInboundChannelAdapter adalah AUTO.

Kacang ServiceActivator messageReceiver mencatat setiap pesan yang masuk di inputMessageChannel ke output standar, lalu mengonfirmasi pesan.

Memublikasikan pesan ke Pub/Sub

Untuk memublikasikan pesan dari saluran pesan ke topik Pub/Sub, gunakan adaptor saluran keluar. Adaptor saluran keluar mengonversi POJO menjadi pesan Pub/Sub, lalu mengirimkan pesan ke topik Pub/Sub.

// Create an outbound channel adapter to send messages from the input message channel to the
// topic `topic-two`.
@Bean
@ServiceActivator(inputChannel = "inputMessageChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
  PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");

  adapter.setSuccessCallback(
      ((ackId, message) ->
          LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));

  adapter.setFailureCallback(
      (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));

  return adapter;
}

Contoh di atas menggunakan resource Spring bean dan Pub/Sub berikut:

  • Kacang saluran pesan bernama inputMessageChannel.
  • Bean adaptor saluran keluar bernama messageSender dari jenis PubSubMessageHandler.
  • ID topik Pub/Sub bernama topic-two.

Kacang ServiceActivator menerapkan logika di messageSender ke setiap pesan di inputMessageChannel.

PubSubMessageHandler di messageSender memublikasikan pesan di inputMessageChannel menggunakan PubSubTemplate. PubSubMessageHandler memublikasikan pesan ke topik Pub/Sub topic-two.

Menggunakan Binder Aliran Spring Cloud

Untuk memanggil Pub/Sub API di aplikasi Spring Cloud Stream, gunakan modul Spring Cloud GCP Pub/Sub Stream Binder.

Menginstal modul

Untuk menginstal modul Binder Spring Cloud Stream, tambahkan kode berikut ke file pom.xml Anda:

  1. BOM GCP Spring Cloud.

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Artefak Spring Cloud Stream Binder:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
    </dependency>

Menerima pesan dari Pub/Sub

Untuk menggunakan aplikasi Anda sebagai sink peristiwa, konfigurasikan binder input dengan menetapkan hal berikut:

  • Bean Consumer yang menentukan logika penanganan pesan. Misalnya, biji Consumer berikut diberi nama receiveMessageFromTopicTwo:

    // Create an input binder to receive messages from `topic-two` using a Consumer bean.
    @Bean
    public Consumer<Message<String>> receiveMessageFromTopicTwo() {
      return message -> {
        LOGGER.info(
            "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
      };
    }
  • ID topik Pub/Sub dalam file konfigurasi application.properties. Misalnya, file konfigurasi berikut menggunakan ID topik Pub/Sub bernama topic-two:

    # Bind the Pub/Sub topic `topic-two` to the Consumer bean
    # `receiveMessageFromTopicTwo`. Your Spring application will
    # automatically create and attach a subscription to the topic.
    spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

Kode contoh ini menerima pesan dari Pub/Sub. Contoh tersebut melakukan hal berikut:

  1. Menemukan ID topik Pub/Sub topic-two di tujuan binding input di application.properties.
  2. Membuat langganan Pub/Sub ke topic-two.
  3. Menggunakan nama binding receiveMessageFromTopicTwo-in-0 untuk menemukan bean Consumer yang bernama receiveMessageFromTopicTwo.
  4. Mencetak pesan masuk ke output standar dan otomatis mengonfirmasinya.

Memublikasikan pesan ke Pub/Sub

Untuk menggunakan aplikasi Anda sebagai sumber peristiwa, konfigurasikan binder output dengan menentukan hal berikut:

  • Kacang Supplier yang menentukan asal pesan dalam aplikasi Anda. Misalnya, biji Supplier berikut diberi nama sendMessageToTopicOne:

    // Create an output binder to send messages to `topic-one` using a Supplier bean.
    @Bean
    public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
      return () ->
          Flux.<Message<String>>generate(
                  sink -> {
                    try {
                      Thread.sleep(10000);
                    } catch (InterruptedException e) {
                      // Stop sleep earlier.
                    }
    
                    Message<String> message =
                        MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                    LOGGER.info(
                        "Sending a message via the output binder to topic-one! Payload: "
                            + message.getPayload());
                    sink.next(message);
                  })
              .subscribeOn(Schedulers.boundedElastic());
    }
  • ID topik Pub/Sub dalam file konfigurasi application.properties. Misalnya, file konfigurasi berikut menggunakan ID topik Pub/Sub bernama topic-one:

    # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
    # `topic-one`. If the topic does not exist, one will be created.
    spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

Kode contoh memublikasikan pesan ke Pub/Sub. Contoh tersebut melakukan hal berikut:

  1. Menemukan ID topik Pub/Sub topic-one di tujuan binding output di application.properties.
  2. Menggunakan nama binding sendMessageToTopicOne-out-0 untuk menemukan bean Supplier yang bernama sendMessageToTopicOne.
  3. Mengirim pesan bernomor ke topic-one setiap 10 detik.