Menggunakan Pub/Sub di aplikasi Spring

Halaman ini menjelaskan cara menggunakan Pub/Sub di aplikasi Java yang dibuat dengan Spring Framework.

Spring Cloud GCP memiliki beberapa modul untuk mengirim pesan ke topik Pub/Sub dan menerima pesan dari langganan Pub/Sub menggunakan Spring Framework. Anda dapat menggunakan modul ini secara terpisah atau menggabungkannya untuk kasus penggunaan yang berbeda:

CATATAN: Library Spring Cloud GCP tidak memberikan akses ke AckReplyConsumerWithResponse, yang merupakan modul yang diperlukan untuk menerapkan fitur exactly-once menggunakan library klien Java.

Sebelum memulai

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Set up a Google Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Google Cloud console.

  3. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  4. Set up a Google Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Google Cloud console.

  5. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  6. Tetapkan variabel lingkungan GOOGLE_CLOUD_PROJECT ke project ID Google Cloud Anda.

Menggunakan Spring Cloud GCP Pub/Sub Starter

Modul Spring Cloud GCP Pub/Sub Starter menginstal library klien Java Pub/Sub menggunakan modul Spring Cloud GCP Pub/Sub. Anda dapat memanggil Pub/Sub API dari aplikasi Spring menggunakan class yang disediakan oleh Spring Cloud GCP Pub/Sub Starter atau library klien Java Pub/Sub. Jika menggunakan class yang disediakan Spring Cloud GCP Pub/Sub Starter, Anda dapat mengganti konfigurasi Pub/Sub default.

Menginstal modul

Untuk menginstal modul Spring Cloud GCP Pub/Sub Starter, tambahkan dependensi ini ke file pom.xml Anda:

  1. Bill of Materials (BOM) Spring Cloud:

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Artefak Spring Cloud GCP Pub/Sub Starter:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>

Operasi yang didukung

Modul Pemicu Pub/Sub Spring Cloud GCP menyertakan class berikut:

  • PubSubAdmin untuk operasi administratif:
    • Membuat topik dan langganan.
    • Mendapatkan topik dan langganan.
    • Mencantumkan topik dan langganan.
    • Menghapus topik dan langganan.
    • Mendapatkan dan menetapkan batas waktu konfirmasi pada langganan.
  • PubSubTemplate untuk mengirim dan menerima pesan:
    • Memublikasikan pesan ke topik.
    • Menarik pesan secara sinkron dari langganan.
    • Menarik pesan secara asinkron dari langganan.
    • Mengonfirmasi pesan.
    • Ubah batas waktu konfirmasi.
    • Mengonversi pesan Pub/Sub menjadi Plain Old Java Objects (POJO).

Menggunakan adaptor saluran Spring Integration

Jika aplikasi Spring Anda menggunakan saluran pesan Spring Integration, Anda dapat merutekan pesan antara saluran pesan dan Pub/Sub menggunakan adaptor saluran.

Menginstal modul

Untuk menginstal modul bagi adaptor saluran Spring Integration, tambahkan kode berikut ke file pom.xml Anda:

  1. BOM Spring Cloud GCP.

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Artefak Spring Cloud GCP Pub/Sub Starter dan Spring Integration Core:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-core</artifactId>
    </dependency>

Menerima pesan dari Pub/Sub

Untuk menerima pesan dari langganan Pub/Sub di aplikasi Spring, gunakan adaptor saluran masuk. Adaptor saluran masuk mengonversi pesan Pub/Sub masuk menjadi POJO, lalu meneruskan POJO ke saluran pesan.

// Create a message channel for messages arriving from the subscription `sub-one`.
@Bean
public MessageChannel inputMessageChannel() {
  return new PublishSubscribeChannel();
}

// Create an inbound channel adapter to listen to the subscription `sub-one` and send
// messages to the input message channel.
@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
    @Qualifier("inputMessageChannel") MessageChannel messageChannel,
    PubSubTemplate pubSubTemplate) {
  PubSubInboundChannelAdapter adapter =
      new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
  adapter.setOutputChannel(messageChannel);
  adapter.setAckMode(AckMode.MANUAL);
  adapter.setPayloadType(String.class);
  return adapter;
}

// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
public void messageReceiver(
    String payload,
    @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
  LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
  message.ack();
}

Contoh di atas menggunakan bean Spring dan resource Pub/Sub berikut:

  • Bean saluran pesan bernama inputMessageChannel.
  • Bean adaptor saluran masuk bernama inboundChannelAdapter dari jenis PubSubInboundChannelAdapter.
  • ID langganan Pub/Sub bernama sub-one.

inboundChannelAdapter secara asinkron mengambil pesan dari sub-one menggunakan PubSubTemplate dan mengirim pesan ke inputMessageChannel.

inboundChannelAdapter menetapkan mode konfirmasi ke MANUAL sehingga aplikasi dapat mengonfirmasi pesan setelah memprosesnya. Mode konfirmasi default dari jenis PubSubInboundChannelAdapter adalah AUTO.

Bean ServiceActivator messageReceiver mencatat setiap pesan yang masuk ke inputMessageChannel ke output standar, lalu mengonfirmasi pesan tersebut.

Memublikasikan pesan ke Pub/Sub

Untuk memublikasikan pesan dari saluran pesan ke topik Pub/Sub, gunakan adaptor saluran keluar. Adaptor saluran keluar mengonversi POJO menjadi pesan Pub/Sub, lalu mengirim pesan ke topik Pub/Sub.

// Create an outbound channel adapter to send messages from the input message channel to the
// topic `topic-two`.
@Bean
@ServiceActivator(inputChannel = "inputMessageChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
  PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");

  adapter.setSuccessCallback(
      ((ackId, message) ->
          LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));

  adapter.setFailureCallback(
      (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));

  return adapter;
}

Contoh di atas menggunakan bean Spring dan resource Pub/Sub berikut:

  • Bean saluran pesan bernama inputMessageChannel.
  • Bean adaptor saluran keluar bernama messageSender dari jenis PubSubMessageHandler.
  • ID topik Pub/Sub bernama topic-two.

Bean ServiceActivator menerapkan logika di messageSender ke setiap pesan di inputMessageChannel.

PubSubMessageHandler di messageSender memublikasikan pesan di inputMessageChannel menggunakan PubSubTemplate. PubSubMessageHandler memublikasikan pesan ke topik Pub/Sub topic-two.

Menggunakan Spring Cloud Stream Binder

Untuk memanggil Pub/Sub API di aplikasi Spring Cloud Stream, gunakan modul Spring Cloud GCP Pub/Sub Stream Binder.

Menginstal modul

Untuk menginstal modul Spring Cloud Stream Binder, tambahkan kode berikut ke file pom.xml Anda:

  1. BOM Spring Cloud GCP.

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Artefak Spring Cloud Stream Binder:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
    </dependency>

Menerima pesan dari Pub/Sub

Untuk menggunakan aplikasi sebagai sink peristiwa, konfigurasikan binder input dengan menentukan hal berikut:

  • Bean Consumer yang menentukan logika penanganan pesan. Misalnya, bean Consumer berikut diberi nama receiveMessageFromTopicTwo:

    // Create an input binder to receive messages from `topic-two` using a Consumer bean.
    @Bean
    public Consumer<Message<String>> receiveMessageFromTopicTwo() {
      return message -> {
        LOGGER.info(
            "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
      };
    }
  • ID topik Pub/Sub dalam file konfigurasi application.properties. Misalnya, file konfigurasi berikut menggunakan ID topik Pub/Sub bernama topic-two:

    # Bind the Pub/Sub topic `topic-two` to the Consumer bean
    # `receiveMessageFromTopicTwo`. Your Spring application will
    # automatically create and attach a subscription to the topic.
    spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

Kode contoh menerima pesan dari Pub/Sub. Contoh ini melakukan hal berikut:

  1. Menemukan ID topik Pub/Sub topic-two di tujuan binding input di application.properties.
  2. Membuat langganan Pub/Sub ke topic-two.
  3. Menggunakan nama binding receiveMessageFromTopicTwo-in-0 untuk menemukan bean Consumer bernama receiveMessageFromTopicTwo.
  4. Mencetak pesan masuk ke output standar dan otomatis mengonfirmasinya.

Memublikasikan pesan ke Pub/Sub

Untuk menggunakan aplikasi Anda sebagai sumber peristiwa, konfigurasikan binder output dengan menentukan hal berikut:

  • Bean Supplier yang menentukan asal pesan dalam aplikasi Anda. Misalnya, bean Supplier berikut diberi nama sendMessageToTopicOne:

    // Create an output binder to send messages to `topic-one` using a Supplier bean.
    @Bean
    public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
      return () ->
          Flux.<Message<String>>generate(
                  sink -> {
                    try {
                      Thread.sleep(10000);
                    } catch (InterruptedException e) {
                      // Stop sleep earlier.
                    }
    
                    Message<String> message =
                        MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                    LOGGER.info(
                        "Sending a message via the output binder to topic-one! Payload: "
                            + message.getPayload());
                    sink.next(message);
                  })
              .subscribeOn(Schedulers.boundedElastic());
    }
  • ID topik Pub/Sub dalam file konfigurasi application.properties. Misalnya, file konfigurasi berikut menggunakan ID topik Pub/Sub bernama topic-one:

    # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
    # `topic-one`. If the topic does not exist, one will be created.
    spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

Kode contoh memublikasikan pesan ke Pub/Sub. Contoh ini melakukan hal berikut:

  1. Menemukan ID topik Pub/Sub topic-one di tujuan binding output di application.properties.
  2. Menggunakan nama binding sendMessageToTopicOne-out-0 untuk menemukan bean Supplier bernama sendMessageToTopicOne.
  3. Mengirim pesan bernomor ke topic-one setiap 10 detik.