Desenvolvimiento de la carga útil para suscripciones de envío de Pub/Sub

Cuando compilas tu sistema de Pub/Sub, la separación de la carga útil puede ayudarte a conectarte a otros sistemas que no cumplen con todos los requisitos del sistema de una implementación estándar de extremos de envío de Pub/Sub.

Estos son algunos casos de uso posibles para la separación de la carga útil:

  • No quieres escribir un código de análisis de mensajes específico de Pub/Sub para tus extremos de envío HTTP.
  • Prefieres recibir metadatos de mensajes de Pub/Sub como encabezados HTTP, en lugar de metadatos en el cuerpo de HTTP POST.
  • Deseas enviar mensajes de Pub/Sub y excluir los metadatos de Pub/Sub, por ejemplo, cuando envías datos a una API de terceros.

Cómo funciona la separación de la carga útil

La separación de la carga útil es una función que quita los mensajes de Pub/Sub de todos los metadatos del mensaje, excepto los datos del mensaje. Con el envío de datos de mensajes sin procesar, los suscriptores pueden procesar el mensaje sin tener que cumplir con ningún requisito del sistema de Pub/Sub.

  • Con la separación de la carga útil, los datos del mensaje se entregan directamente como el cuerpo HTTP.
  • Sin separación de la carga útil, Pub/Sub entrega un objeto JSON que contiene varios campos de metadatos del mensaje y uno de datos del mensaje. En este caso, se debe analizar el JSON para recuperar los datos del mensaje y, luego, decodificarse en base64.

Escritura de metadatos

Después de habilitar la separación de la carga útil, puedes usar la opción de escribir metadatos, que agrega los metadatos del mensaje que se quitaron anteriormente al encabezado de la solicitud.

  • Metadatos de escritura habilitados. Agrega metadatos del mensaje de nuevo en el encabezado de la solicitud. También entrega los datos de mensajes decodificados sin procesar.
  • Los metadatos de escritura están inhabilitados. Solo entrega los datos de mensajes decodificados sin procesar.

Los metadatos de escritura se exponen a través de Pub/Sub, el argumento --push-no-wrapper-write-metadata de Google Cloud CLI y la propiedad de la API NoWrapper. De forma predeterminada, este valor es nulo.

Antes de comenzar

Ejemplo de mensajes unidos y separados

Los siguientes ejemplos ilustran la diferencia entre el envío de un mensaje HTTP unido y separado. En estos ejemplos, los datos del mensaje contienen la cadena {"status": "Hello there"}.

En este ejemplo, se crea una suscripción con la función de separación de carga útil habilitada y publica un mensaje en mytopic. Usa una clave de ordenamiento con un valor de some-key y el tipo de medio se declara como application/json.

gcloud pubsub topics publish mytopic
   --message='{"status": "Hello there"}'
   --ordering-key="some-key"
   --attribute "Content-Type=application/json"

En las siguientes secciones, se muestra la diferencia entre un mensaje unido y un mensaje separado.

Mensaje unido

En el siguiente ejemplo, se muestra un mensaje estándar unido de Pub/Sub. En este caso, no está habilitado el desenvolvimiento de la carga útil.

Publicación El extremo de envío recibe
data="{"status": "Hello there"}"
ordering_key="some-key"
attributes=
  {
     {"Content-Type", "application/json"}
  }
Content-Length: 361
Content-Type: application/json
User-Agent: CloudPubSub-Google
Host: subscription-project.uc.r.appspot.com

{
  "message": {
      "attributes": {
          "Content-Type": "application/json"
      },
      "data": "eyJzdGF0dXMiOiAiSGVsbG8gdGhlcmUifQ==", //  Base64 - {"status": "Hello there"}
      "messageId": "2070443601311540",
      "message_id": "2070443601311540",
      "publishTime": "2021-02-26T19:13:55.749Z",
      "publish_time": "2021-02-26T19:13:55.749Z"
  },
  "subscription": "projects/myproject/..."
}

Mensaje separado con metadatos de escritura inhabilitados

En el siguiente ejemplo, se muestra un mensaje separado con la opción de escritura de metadatos inhabilitada. En este caso, no se incluyen los encabezados x-goog-pubsub-* ni los atributos del mensaje.

Publicación El extremo de envío recibe
data="{"status": "Hello there"}"
ordering_key="some-key"
attributes=
  {
     {"Content-Type", "application/json"}
  }
Content-Length: 25
User-Agent: CloudPubSub-Google
Host: subscription-project.uc.r.appspot.com

{"status": "Hello there"}

Se desencapsuló el mensaje con la escritura de metadatos habilitada

En el siguiente ejemplo, se muestra un mensaje separado con la opción de escritura de metadatos habilitada. En este caso, se incluyen los encabezados x-goog-pubsub-* y los atributos del mensaje.

Publicación El extremo de envío recibe
data="{"status": "Hello there"}"
ordering_key="some-key"
attributes=
  {
     {"Content-Type", "application/json"}
  }
x-goog-pubsub-subscription-name: "projects/myproject/..."
x-goog-pubsub-message-id: "2070443601311540"
x-goog-pubsub-publish-time: "2021-02-26T19:13:55.749Z"
x-goog-pubsub-ordering-key: "some-key"
Content-Type: application/json
Content-Length: 12
User-Agent: CloudPubSub-Google
Host: subscription-project.uc.r.appspot.com

{"status": "Hello there"}

Configura la separación de la carga útil

Puedes habilitar la carga útil para separar la entrega de envío de una suscripción mediante la página Detalles de suscripción de la consola de Google Cloud, Google Cloud CLI o las bibliotecas cliente.

Console

  1. En la consola de Google Cloud, ve a la página Suscripciones.

    Abre las suscripciones de Pub/Sub

  2. Haz clic en Crear suscripción.

  3. En el campo ID de la suscripción, ingresa un nombre.

    Si quieres obtener información para asignarle un nombre a una suscripción, consulta Lineamientos para asignar el nombre de un tema o una suscripción.

  4. Selecciona un tema en el menú desplegable. La suscripción recibe mensajes del tema.

  5. En Tipo de entrega, selecciona Envío.

  6. Para habilitar la separación de la carga útil, selecciona Habilitar la separación de la carga útil.

  7. Para conservar los metadatos de los mensajes en el encabezado de la solicitud, selecciona Escribir metadatos (opcional). Debes habilitar esta opción para configurar un encabezado Content-Type para los mensajes.

  8. Especifica una URL de extremo.

  9. Conserva todos los demás valores predeterminados.

  10. Haz clic en Crear.

gcloud

Para configurar una suscripción con desenvolvimiento de la carga útil que incluya encabezados HTTP estándar, ejecuta el siguiente comando gcloud pubsub subscriptions create:

gcloud pubsub subscriptions create SUBSCRIPTION \
  --topic TOPIC \
  --push-endpoint=PUSH_ENDPOINT \
  --push-no-wrapper

Reemplaza lo siguiente:

  • SUBSCRIPTION: Es el nombre o el ID de la suscripción de extracción.
  • TOPIC: Es el ID del tema.
  • PUSH_ENDPOINT: Es la URL que se usará como extremo de esta suscripción. Por ejemplo, https://myproject.appspot.com/myhandler
  • --push-no-wrapper: Entrega los datos del mensaje directamente como el cuerpo de HTTP.

Para configurar una suscripción con separación de carga útil y controlar el uso de encabezados x-goog-pubsub-*, ejecuta el siguiente comando:

gcloud pubsub subscriptions create SUBSCRIPTION \
  --topic TOPIC \
  --push-endpoint=PUSH_ENDPOINT \
  --push-no-wrapper \
  --push-no-wrapper-write-metadata
  • --push-no-wrapper-write-metadata: Cuando es verdadero, escribe los metadatos del mensaje de Pub/Sub en los encabezados x-goog-pubsub-<KEY>:<VAL> de la solicitud HTTP. Escribe los atributos del mensaje de Pub/Sub en los encabezados <KEY>:<VAL> de la solicitud HTTP.

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# endpoint = "https://my-test-project.appspot.com/push"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

no_wrapper = pubsub_v1.types.PushConfig.NoWrapper(write_metadata=True)
push_config = pubsub_v1.types.PushConfig(
    push_endpoint=endpoint, no_wrapper=no_wrapper
)

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "push_config": push_config,
        }
    )

print(f"Push no wrapper subscription created: {subscription}.")
print(f"Endpoint for subscription is: {endpoint}")
print(f"No wrapper configuration for subscription is: {no_wrapper}")

Java

Antes de probar esta muestra, sigue las instrucciones de configuración de Java en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.

/*
 * Copyright 2016 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package pubsub;


import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.PushConfig.NoWrapper;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateUnwrappedPushSubscriptionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    String topicId = "your-topic-id";
    String pushEndpoint = "https://my-test-project.appspot.com/push";

    createPushSubscriptionExample(projectId, subscriptionId, topicId, pushEndpoint);
  }

  public static void createPushSubscriptionExample(
      String projectId, String subscriptionId, String topicId, String pushEndpoint)
      throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);
      SubscriptionName subscriptionName = SubscriptionName.of(projectId, subscriptionId);
      NoWrapper noWrapper =
          NoWrapper.newBuilder()
              // Determines if message metadata is added to the HTTP headers of
              // the delivered message.
              .setWriteMetadata(true)
              .build();
      PushConfig pushConfig =
          PushConfig.newBuilder().setPushEndpoint(pushEndpoint).setNoWrapper(noWrapper).build();

      // Create a push subscription with default acknowledgement deadline of 10 seconds.
      // Messages not successfully acknowledged within 10 seconds will get resent by the server.
      Subscription subscription =
          subscriptionAdminClient.createSubscription(subscriptionName, topicName, pushConfig, 10);
      System.out.println("Created push subscription: " + subscription.getName());
    }
  }
}

C++

Antes de probar esta muestra, sigue las instrucciones de configuración de C++ en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C++.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id, std::string const& endpoint) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.mutable_push_config()->set_push_endpoint(endpoint);
  request.mutable_push_config()->mutable_no_wrapper()->set_write_metadata(
      true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

Go

Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

// createPushNoWrapperSubscription creates a push subscription where messages are delivered in the HTTP body.
func createPushNoWrapperSubscription(w io.Writer, projectID, subID string, topic *pubsub.Topic, endpoint string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	// endpoint := "https://my-test-project.appspot.com/push"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:       topic,
		AckDeadline: 10 * time.Second,
		PushConfig: pubsub.PushConfig{
			Endpoint: endpoint,
			Wrapper: &pubsub.NoWrapper{
				// Determines if message metadata is added to the HTTP headers of
				// the delivered message.
				WriteMetadata: true,
			},
		},
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created push no wrapper subscription: %v\n", sub)
	return nil
}

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const pushEndpoint = 'YOUR_ENDPOINT_URL';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createPushSubscriptionNoWrapper(
  pushEndpoint,
  topicNameOrId,
  subscriptionNameOrId
) {
  const options = {
    pushConfig: {
      // Set to an HTTPS endpoint of your choice. If necessary, register
      // (authorize) the domain on which the server is hosted.
      pushEndpoint,
      // When true, writes the Pub/Sub message metadata to
      // `x-goog-pubsub-<KEY>:<VAL>` headers of the HTTP request. Writes the
      // Pub/Sub message attributes to `<KEY>:<VAL>` headers of the HTTP request.
      noWrapper: {
        writeMetadata: true,
      },
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);
  console.log(`Subscription ${subscriptionNameOrId} created.`);
}

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const pushEndpoint = 'YOUR_ENDPOINT_URL';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub, CreateSubscriptionOptions} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createPushSubscriptionNoWrapper(
  pushEndpoint: string,
  topicNameOrId: string,
  subscriptionNameOrId: string
) {
  const options: CreateSubscriptionOptions = {
    pushConfig: {
      // Set to an HTTPS endpoint of your choice. If necessary, register
      // (authorize) the domain on which the server is hosted.
      pushEndpoint,
      // When true, writes the Pub/Sub message metadata to
      // `x-goog-pubsub-<KEY>:<VAL>` headers of the HTTP request. Writes the
      // Pub/Sub message attributes to `<KEY>:<VAL>` headers of the HTTP request.
      noWrapper: {
        writeMetadata: true,
      },
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);
  console.log(`Subscription ${subscriptionNameOrId} created.`);
}

Establece un encabezado de tipo de contenido en tu mensaje

Después de habilitar la separación de la carga útil, Pub/Sub no establece automáticamente un campo de encabezado de tipo de medio en tu solicitud. Si no configuras de forma explícita un campo de encabezado Content-Type, el servidor web que procesa la solicitud podría establecer un valor predeterminado de application/octet-stream o interpretar la solicitud de manera inesperada.

Si necesitas un encabezado Content-Type, asegúrate de declararlo de manera explícita en el momento de la publicación en cada mensaje publicado individual. Para ello, primero debes habilitar la opción Escribir metadatos. Este resultado de habilitar la función Escribir metadatos se muestra en los ejemplos proporcionados.

¿Qué sigue?