メッセージを圧縮する

Pub/Sub を使用して大量のデータをパブリッシュする場合は、gRPC を使用してデータを圧縮することで、パブリッシャーのクライアントがパブリッシュ リクエストを送信する前にネットワーク費用を削減できます。 gRPC 用の Pub/Sub 圧縮は Gzip アルゴリズムを使用します。

このドキュメントでは、トピックにパブリッシュされたメッセージの圧縮について説明します。

メッセージの圧縮について

gRPC クライアント側の圧縮機能を使用するための圧縮率は、パブリッシャー クライアントおよび次の要因によって異なります。

  • データの量。圧縮率は、ペイロードのサイズが数百バイトから数キロバイトのデータに増加すると改善されます。パブリッシュ リクエストのバッチ設定によって、各パブリッシュ リクエストに含まれるデータの量が決まります。最良の結果を得るために、バッチ設定を gRPC 圧縮と組み合わせて使用することをおすすめします。

  • データの種類。JSON や XML などのテキストベースのデータは、画像などのバイナリデータよりも圧縮率を高めることが可能です。

パブリッシャー クライアントが Google Cloud 上に存在する場合は、送信バイト数instance/network/sent_bytes_count指標を使用して、パブリッシュ スループットをバイト単位で測定できます。パブリッシャー クライアントが別のアプリケーション上に存在する場合は、測定を行うためにクライアント固有のツールを使用する必要があります。

このセクションのコードサンプルは、Java クライアント ライブラリのコード スニペットを示しており、gRPC 圧縮も含まれています。

準備

パブリッシュ ワークフローを構成する前に、次のタスクを完了していることを確認してください。

必要なロール

メッセージを圧縮するために必要な権限を取得するには、トピックに対する Pub/Sub パブリッシャーroles/pubsub.publisher)の IAM ロールを付与するよう管理者に依頼してください。 ロールの付与の詳細については、アクセスの管理をご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

トピックとサブスクリプションを作成または更新するには、追加の権限が必要です。

メッセージを圧縮する

C++

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。

namespace g = ::google::cloud;
namespace pubsub = ::google::cloud::pubsub;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
      std::move(topic),
      g::Options{}
          // Compress any batch of messages over 10 bytes. By default, no
          // messages are compressed, set this to 0 to compress all batches,
          // regardless of their size.
          .set<pubsub::CompressionThresholdOption>(10)
          // Compress using the GZIP algorithm. By default, the library uses
          // GRPC_COMPRESS_DEFLATE.
          .set<pubsub::CompressionAlgorithmOption>(GRPC_COMPRESS_GZIP)));
  auto message_id = publisher.Publish(
      pubsub::MessageBuilder{}.SetData("Hello World!").Build());
  auto done = message_id.then([](g::future<g::StatusOr<std::string>> f) {
    auto id = f.get();
    if (!id) throw std::move(id).status();
    std::cout << "Hello World! published with id=" << *id << "\n";
  });
  // Block until the message is published
  done.get();
}

Java

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithGrpcCompressionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Choose an existing topic.
    String topicId = "your-topic-id";

    publishWithGrpcCompressionExample(projectId, topicId);
  }

  public static void publishWithGrpcCompressionExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);

    // Create a publisher and set enable compression to true.
    Publisher publisher = null;
    try {
      // Enable compression and configure the compression threshold to 10 bytes (default to 240 B).
      // Publish requests of sizes > 10 B (excluding the request headers) will get compressed.
      // The number of messages in a publish request is determined by publisher batch settings.
      // Batching is turned off by default, i.e. each publish request contains only one message.
      publisher =
          Publisher.newBuilder(topicName)
              .setEnableCompression(true)
              .setCompressionBytesThreshold(10L)
              .build();

      byte[] bytes = new byte[1024];
      ByteString data = ByteString.copyFrom(bytes);
      PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

      // Once published, returns a server-assigned message id (unique within the topic).
      // You can look up the actual size of the outbound data using the Java Logging API.
      // Configure logging properties as shown in
      // https://github.com/googleapis/java-pubsub/tree/main/samples/snippets/src/main/resources/logging.properties
      // and look for "OUTBOUND DATA" with "length=" in the output log.
      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published a compressed message of message ID: " + messageId);
    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

次のステップ

詳細なパブリッシュのオプションを構成する方法については、以下をご覧ください。