Sottoscrizione con listener di errore

Gestisce gli errori quando la sottoscrizione genera un errore.

Per saperne di più

Per la documentazione dettagliata che include questo esempio di codice, vedi quanto segue:

Esempio di codice

C++

Prima di provare questo esempio, segui le istruzioni di configurazione di C++ nella guida rapida di Pub/Sub per l'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub C++.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber
      .Subscribe([&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message " << m << "\n";
        std::move(h).ack();
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      })
      // Setup an error handler for the subscription session
      .then([](future<google::cloud::Status> f) {
        std::cout << "Subscription session result: " << f.get() << "\n";
      });
};

C#

Prima di provare questo esempio, segui le istruzioni di configurazione di C# nella guida rapida di Pub/Sub per l'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub C#.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.


using Google.Cloud.PubSub.V1;
using System;
using System.Threading;
using System.Threading.Tasks;

public class SubscriberErrorListenerSample
{
    public async Task<int> SubscriberErrorListener(string projectId, string subscriptionId)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);

        int messageCount = 0;
        try
        {
            Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
            {
                string text = message.Data.ToStringUtf8();
                Console.WriteLine($"Message {message.MessageId}: {text}");
                Interlocked.Increment(ref messageCount);
                return Task.FromResult(SubscriberClient.Reply.Ack);
            });
            // Run for 5 seconds.
            await Task.Delay(5000);
            await subscriber.StopAsync(CancellationToken.None);
            // Wait for the StartAsync call to finish
            await startTask;
        }
        // Catch unrecoverable failures.
        catch (Exception e)
        {
            Console.WriteLine($"Message: {e.Message}");
            Console.WriteLine($"StackTrace: {e.StackTrace}");
        }
        return messageCount;
    }
}

Go

Prima di provare questo esempio, segui le istruzioni di configurazione di Go nella guida rapida di Pub/Sub per l'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Go.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
)

func pullMsgsError(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// If the service returns a non-retryable error, Receive returns that error after
	// all of the outstanding calls to the handler have returned.
	// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
	// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
	// If a subscription ID is provided, the project ID from the client is used.
	sub := client.Subscriber(subID)
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("Receive: %w", err)
	}
	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java nella guida rapida di Pub/Sub per l'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Java.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.


import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithErrorListenerExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithErrorListenerExample(projectId, subscriptionId);
  }

  public static void subscribeWithErrorListenerExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      // Provides an executor service for processing messages.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setExecutorProvider(executorProvider)
              .build();

      // Listen for unrecoverable failures.
      subscriber.addListener(
          new Subscriber.Listener() {
            public void failed(Subscriber.State from, Throwable failure) {
              System.out.println("Unrecoverable subscriber failure:" + failure.getStackTrace());
            }
          },
          MoreExecutors.directExecutor());

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js nella guida rapida di Pub/Sub per l'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Node.js.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 10;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForErrors(subscriptionNameOrId, timeout) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  const messageHandler = message => {
    // Do something with the message
    console.log(`Message: ${message}`);

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Create an event handler to handle errors
  const errorHandler = error => {
    // Do something with the error
    console.error(`ERROR: ${error}`);
    throw error;
  };

  // Listen for new messages/errors until timeout is hit
  subscription.on('message', messageHandler);
  subscription.on('error', errorHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    subscription.removeListener('error', errorHandler);
  }, timeout * 1000);
}

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js nella guida rapida di Pub/Sub per l'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Node.js.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 10;

// Imports the Google Cloud client library
import {Message, PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForErrors(subscriptionNameOrId: string, timeout: number) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  const messageHandler = (message: Message) => {
    // Do something with the message
    console.log(`Message: ${message}`);

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Create an event handler to handle errors
  const errorHandler = (error: Error) => {
    // Do something with the error
    console.error(`ERROR: ${error}`);
    throw error;
  };

  // Listen for new messages/errors until timeout is hit
  subscription.on('message', messageHandler);
  subscription.on('error', errorHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    subscription.removeListener('error', errorHandler);
  }, timeout * 1000);
}

PHP

Prima di provare questo esempio, segui le istruzioni di configurazione di PHP nella guida rapida di Pub/Sub per l'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub PHP.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Subscribes with an error listener
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionId  The ID of the subscription.
 */
function subscriber_error_listener(
    string $projectId,
    string $topicName,
    string $subscriptionId
): void {

    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionId, $topicName);

    try {
        $messages = $subscription->pull();
        foreach ($messages as $message) {
            printf('PubSub Message: %s' . PHP_EOL, $message->data());
            $subscription->acknowledge($message);
        }
    } catch (\Exception $e) { // Handle unrecoverable exceptions
        printf('Exception Message: %s' . PHP_EOL, $e->getMessage());
        printf('StackTrace: %s' . PHP_EOL, $e->getTraceAsString());
    }
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python nella guida rapida di Pub/Sub per l'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Python.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    # When `timeout` is not set, result() will block indefinitely,
    # unless an exception is encountered first.
    try:
        streaming_pull_future.result(timeout=timeout)
    except Exception as e:
        print(
            f"Listening for messages on {subscription_path} threw an exception: {e}."
        )
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Ruby

Prima di provare questo esempio, segui le istruzioni di configurazione di Ruby nella guida rapida di Pub/Sub per l'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Ruby.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber subscription_id

listener = subscriber.listen do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

# Propagate exception from child threads to the main thread as soon as it is
# raised. Exceptions happened in the callback thread are collected in the
# callback thread pool and do not propagate to the main thread
Thread.abort_on_exception = true

begin
  listener.start
  # Let the main thread sleep for 60 seconds so the thread for listening
  # messages does not quit
  sleep 60
  listener.stop.wait!
rescue StandardError => e
  puts "Exception #{e.inspect}: #{e.message}"
  raise "Stopped listening for messages."
end

Passaggi successivi

Per cercare e filtrare gli esempi di codice per altri prodotti Google Cloud , consulta il browser degli esempi diGoogle Cloud .