从订阅中拉取 Pub/Sub 消息

从现有 Pub/Sub 订阅持续拉取消息。

代码示例

Python

如需向 GKE 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

from google import auth
from google.cloud import pubsub_v1

def main():
    """Continuously pull messages from subsciption"""

    # read default project ID
    _, project_id = auth.default()
    subscription_id = 'echo-read'

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project_id, subscription_id)

    def callback(message: pubsub_v1.subscriber.message.Message) -> None:
        """Process received message"""
        print(f"Received message: ID={message.message_id} Data={message.data}")
        print(f"[{datetime.datetime.now()}] Processing: {message.message_id}")
        time.sleep(3)
        print(f"[{datetime.datetime.now()}] Processed: {message.message_id}")
        message.ack()

    streaming_pull_future = subscriber.subscribe(
        subscription_path, callback=callback)
    print(f"Pulling messages from {subscription_path}...")

    with subscriber:
        try:
            streaming_pull_future.result()
        except Exception as e:
            print(e)

后续步骤

如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器