快速入门:使用客户端库

利用 Pub/Sub 服务,应用能够可靠、快速地异步交换消息。为了实现此目标,数据的生产者将消息发布到某 Pub/Sub 主题。然后,订阅者客户端创建对该主题的订阅,并处理来自订阅的消息。对于无法可靠地传送的消息,Pub/Sub 会将其最多保留七天。本页面介绍如何开始使用客户端库发布 Pub/Sub 消息。

准备工作

  1. 登录您的 Google 帐号。

    如果您还没有 Google 帐号,请注册一个新帐号

  2. 设置 Cloud Console 项目。

    设置项目

    点击即可执行以下操作:

    • 创建或选择项目。
    • 为该项目启用 Pub/Sub API。
    • 创建服务帐号。
    • 下载 JSON 格式的私钥。

    您可以随时在 Cloud Console 中查看和管理这些资源。

  3. 将环境变量 GOOGLE_APPLICATION_CREDENTIALS 设置为包含您的服务帐号密钥的 JSON 文件的路径。 此变量仅适用于当前的 shell 会话,因此,如果您打开新的会话,请重新设置该变量。

  4. 安装并初始化 Cloud SDK

安装客户端库

以下示例向您展示了如何安装客户端库:

Python

如需详细了解如何设置 Python 开发环境,请参阅 Python 开发环境设置指南

# ensure that you are using virtualenv
# as described in the python dev setup guide

pip install --upgrade google-cloud-pubsub

C#

Install-Package Google.Cloud.PubSub.V1 -Pre

Go

go get -u cloud.google.com/go/pubsub

Java

如果您使用的是 Maven,请将以下代码添加到您的 pom.xml 文件中。如需详细了解 BOM,请参阅 Google Cloud Platform 库 BOM

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>libraries-bom</artifactId>
      <version>8.0.0</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-pubsub</artifactId>
  </dependency>

如果您使用的是 Gradle,请将以下代码添加到您的依赖项中:

compile 'com.google.cloud:google-cloud-pubsub:1.107.0'

如果您使用的是 sbt,请将以下代码添加到您的依赖项中:

libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.107.0"

如果您使用的是 IntelliJ 或 Eclipse,可通过以下 IDE 插件将客户端库添加到您的项目中:

上述插件还提供其他功能,例如服务帐号密钥管理。如需了解详情,请参阅各个插件相应的文档。

Node.js

npm install --save @google-cloud/pubsub

PHP

composer require google/cloud-pubsub

Ruby

gem install google-cloud-pubsub

创建主题和订阅

创建主题后,您可以订阅该主题或向其发布消息。

使用 gcloud pubsub topics create 命令创建主题:

gcloud pubsub topics create my-topic

使用 gcloud pubsub subscriptions create 命令创建订阅。订阅者应用只能获取创建订阅后发布到主题的消息。

gcloud pubsub subscriptions create my-sub --topic my-topic

如需详细了解如何命名您的主题和订阅,请参阅资源名称

发布消息

Python

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher = pubsub_v1.PublisherClient()
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)

for n in range(1, 10):
    data = u"Message number {}".format(n)
    # Data must be a bytestring
    data = data.encode("utf-8")
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data=data)
    print(future.result())

print("Published messages.")

C#

// PublisherClient collects messages into appropriately sized
// batches.
var publishTasks =
    messageTexts.Select(async text =>
    {
        try
        {
            string message = await publisher.PublishAsync(text);
            await Console.Out.WriteLineAsync($"Published message {message}");
        }
        catch (Exception exception)
        {
            await Console.Out.WriteLineAsync($"An error ocurred when publishing message {text}:");
            await Console.Out.WriteLineAsync(exception.Message);
        }
    });
await Task.WhenAll(publishTasks);

Go

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func publish(w io.Writer, projectID, topicID, msg string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// msg := "Hello World"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	t := client.Topic(topicID)
	result := t.Publish(ctx, &pubsub.Message{
		Data: []byte(msg),
	})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %v", err)
	}
	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
	return nil
}

Java


import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublisherExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publisherExample(projectId, topicId);
  }

  public static void publisherExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);

    Publisher publisher = null;
    try {
      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).build();

      String message = "Hello World!";
      ByteString data = ByteString.copyFromUtf8(message);
      PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

      // Once published, returns a server-assigned message id (unique within the topic)
      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published message ID: " + messageId);
    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicName = 'YOUR_TOPIC_NAME';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishMessage() {
  /**
   * TODO(developer): Uncomment the following lines to run the sample.
   */
  // const topicName = 'my-topic';

  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  const messageId = await pubSubClient.topic(topicName).publish(dataBuffer);
  console.log(`Message ${messageId} published.`);
}

publishMessage().catch(console.error);

PHP

use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message  The message to publish.
 */
function publish_message($projectId, $topicName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $topic->publish(['data' => $message]);
    print('Message published' . PHP_EOL);
}

Ruby

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name
topic.publish "This is a test message."

puts "Message published."

接收消息

设置订阅者以拉取您刚刚发布的消息。每个订阅者必须在可配置的时间范围内确认每条消息。未确认的消息将被再次传送。请注意,Pub/Sub 偶尔会将一条消息传送多次,这是为了确保所有消息至少发送到订阅者一次。以下示例说明了您可能如何接收和确认消息:

Python

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message):
    print("Received message: {}".format(message))
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print("Listening for messages on {}..\n".format(subscription_path))

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()

C#

// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
Task startTask = subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SubscriberClient.Reply.Ack
            : SubscriberClient.Reply.Nack;
    });
// Run for 3 seconds.
await Task.Delay(3000);
await subscriber.StopAsync(CancellationToken.None);

Go

import (
	"context"
	"fmt"
	"io"
	"sync"

	"cloud.google.com/go/pubsub"
)

func pullMsgs(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	// Consume 10 messages.
	var mu sync.Mutex
	received := 0
	sub := client.Subscription(subID)
	cctx, cancel := context.WithCancel(ctx)
	err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
		mu.Lock()
		defer mu.Unlock()
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		msg.Ack()
		received++
		if received == 10 {
			cancel()
		}
	})
	if err != nil {
		return fmt.Errorf("Receive: %v", err)
	}
	return nil
}

Java


import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeAsyncExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeAsyncExample(projectId, subscriptionId);
  }

  public static void subscribeAsyncExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForMessages() {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionName);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

listenForMessages();

PHP

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

Ruby

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

结果怎么样?

清理(可选)

为避免系统因本指南中使用的资源向您的 Google Cloud Platform 帐号收取费用,您可以使用命令行删除主题和订阅。

  gcloud pubsub subscriptions delete my-sub
  gcloud pubsub topics delete my-topic

后续步骤