E/S de Pub/Sub

Las transformaciones integradas de Read y Write para Cloud Pub/Sub se incluyen en PubsubIO. Puedes usar PubsubIO para leer o escribir datos de un tema o suscripción de Cloud Pub/Sub. Además, puedes lograr exactamente un procesamiento de las transmisiones de mensajes de Cloud Pub/Sub, ya que PubsubIO deduplica los mensajes según los identificadores de mensajes personalizados o los identificadores que asigna Cloud Pub/Sub.

Nota: De forma predeterminada, las transformaciones de PubsubIO crean PCollection no delimitadas. Después de crear una PCollection con PubsubIO.Read, deberías aplicar una estrategia de sistema de ventanas a esa PCollection antes de aplicar cualquier transformación que agrupe elementos, como GroupByKey o Combine.

Cómo especificar un tema o suscripción de Cloud Pub/Sub

Para usar PubsubIO, puedes proporcionar el nombre de un tema de Cloud Pub/Sub o de una suscripción que ya hayas creado para un determinado tema. Si usas una transformación de PubsubIO con el nombre de un tema, Dataflow automáticamente crea y administra una suscripción para ti en segundo plano.

Cómo especificar un tema de Cloud Pub/Sub

Cuando usas PubsubIO, puedes elegir especificar un tema de Cloud Pub/Sub para leer o escribir. Cuando proporcionas un nombre del tema, Dataflow automáticamente crea una suscripción a ese tema. Dataflow comienza a leer en el tema desde el momento en que se inicia la canalización; cualquier dato que se publique en el tema antes de que se inicie tu canalización real no estará disponible en esta.

Nota: Dataflow crea la suscripción necesaria de Cloud Pub/Sub como parte de la configuración de tu canalización. Debido a que los tiempos de configuración de las canalizaciones pueden variar según la cantidad de instancias de Compute Engine y otros recursos asignados en Cloud Platform, puede ser difícil saber con exactitud el momento en el que tu canalización comenzó a leer en el tema de Cloud Pub/Sub. Si necesitas un control más preciso sobre el tiempo de lectura y escritura de los datos, puedes crear y administrar tu propia suscripción a un tema de Cloud Pub/Sub y pasar esa suscripción a PubsubIO. Consulta Cómo especificar una suscripción de Cloud Pub/Sub para obtener más información.

El nombre del tema que proporciones debe seguir el formato projects/<Cloud Platform Project Name>/topics/<topic name>, en el que el nombre del proyecto es el nombre al que pertenece el tema. Además, el nombre del tema debe cumplir con los siguientes requisitos:

  • El nombre del tema debe tener entre 3 y 255 caracteres.
  • El nombre del tema solo puede contener letras en minúsculas, números, guiones ("-"), guiones bajos ("_") y puntos (".").
  • El nombre del tema debe comenzar con una letra.
  • El nombre del tema debe terminar con una letra o un número.
  • El nombre del tema no puede comenzar con el prefijo "goog".

Cómo especificar una suscripción de Cloud Pub/Sub

Cuando usas una transformación de PubsubIO, puedes elegir especificar una suscripción de Cloud Pub/Sub para usar cuando lees o escribes desde un tema de Cloud Pub/Sub específico. Para usar una suscripción, debes haberla creado y administrado. Consulta la Documentación del suscriptor de Cloud Pub/Sub para obtener más información sobre cómo crear una suscripción de Cloud Pub/Sub.

El nombre de suscripción que proporciones debería seguir el formato /projects/<Cloud Platform Project Name>/subscriptions/<subscription name>, en el que el nombre del proyecto es el nombre al que pertenece la suscripción. Además, el nombre de suscripción que suministres debe cumplir con los siguientes requisitos:

  • El nombre de la suscripción debe tener entre 3 y 255 caracteres.
  • El nombre de la suscripción solo puede contener letras en minúsculas, números, guiones ("-"), guiones bajos ("_") y puntos (".").
  • El nombre de la suscripción debe comenzar con una letra.
  • El nombre de la suscripción debe terminar con una letra o un número.
  • El nombre de la suscripción no puede comenzar con el prefijo "goog".

Si lo deseas, puedes usar una suscripción para asegurarte de que tu canalización no pierda ningún dato del tema de Cloud Pub/Sub. Una suscripción a Cloud Pub/Sub sigue recolectando datos, incluso si no hay lecturas; cuando se inicie tu canalización, tendrás acceso a todos los datos recopilados por la suscripción, incluidos los datos que llegaron antes del inicio de la canalización. Además, si detienes la canalización y creas una nueva para leer desde la misma suscripción no se perderán datos, ya que la suscripción seguirá recopilando datos sin una canalización.

Cómo leer con PubsubIO

La transformación de PubsubIO.Read lee de forma continua desde una transmisión de Cloud Pub/Sub y muestra una PCollection no delimitada de Strings que representan los datos de la transmisión. De forma predeterminada, cada elemento de la PCollection resultante se codifica como una string UTF-8. Puedes anular la codificación predeterminada con .withCoder cuando llamas a PubsubIO.Read.

Java

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  // streamData is Unbounded; apply windowing afterward.
  PCollection<String> streamData =
    p.apply(PubsubIO.Read.named("ReadFromPubsub")
                       .topic("/topics/my-topic"));

Cómo leer un conjunto delimitado de registros desde Cloud Pub/Sub

Con el objetivo realizar pruebas, puedes leer desde Cloud Pub/Sub con InProcessPipelineRunner (SDK 1.X de Dataflow para Java) o DirectRunner (SDK 2.X de Dataflow para Java).

Si necesitas operar en una colección delimitada a fin de realizar pruebas, puedes proporcionar un límite en la cantidad de entradas para leer. Puedes usar la opción .maxNumRecords para leer una cantidad máxima de registros o puedes usar .maxReadTime si deseas leer registros durante un tiempo determinado.

Ten en cuenta que en este modo no se garantizan la deduplicación, los reintentos y la recuperación durante una falla. En el caso de la producción, deberías tratar a Cloud Pub/Sub como una fuente no delimitada.

Cómo escribir con PubsubIO

La transformación PubsubIO.Write escribe de manera continua una PCollection no delimitada de objetos String en una transmisión de Cloud Pub/Sub. De forma predeterminada, la PCollection de entrada a PubsubIO.Write debe contener strings codificadas en UTF-8. Puedes cambiar el tipo de entrada esperado y la codificación con withCoder.

Java

  // streamData is Unbounded.
  PCollection<String> streamData = ...;
  streamData.apply(PubsubIO.Write.named("WriteToPubsub")
                       .topic("/topics/my-topic"));

ID de registros y marcas de tiempo

Puedes agregar dos tipos de metadatos a los registros que lees o escribes con PubsubIO: ID de registro y marcas de tiempo.

Cómo usar marcas de tiempo especificadas por el usuario

Puedes usar marcas de tiempo especificadas por el usuario para tener un control preciso sobre cómo los elementos que leen desde Cloud Pub/Sub se asignan a ventanas en la canalización de Dataflow. Para establecer marcas de tiempo especificadas por el usuario, invoca timestampLabel cuando crees PubsubIO.Read o las transformaciones de PubsubIO.Write y pasa el valor de string que elijas.

Si estableces una etiqueta de marca de tiempo especificada por el usuario con PubsubIO.Read, cuando se lean elementos desde Cloud Pub/Sub, la transformación usará el valor del atributo con el nombre de la string que pasaste a timestampLabel como la marca de tiempo de cada mensaje entrante. Las marcas de tiempo deben estar en milisegundos desde el tiempo Unix o con un formato según RFC 3339.

Si estableces una etiqueta de marca de tiempo especificada por el usuario con PubsubIO.Write, la transformación escribirá cada elemento como un mensaje de Cloud Pub/Sub con un atributo del nombre específico con el valor de la marca de tiempo del elemento en milisegundos desde el tiempo Unix.

Cómo usar los ID de registro

Los ID de registro permiten un procesamiento único y exacto en el límite entre Dataflow y otros sistemas. Para usar el ID de registro, invoca idLabel cuando crees transformaciones de PubsubIO.Read o PubsubIO.Write, y pasa el valor de string que elijas.

Si estableciste una etiqueta de ID de registro con PubsubIO.Read, cuando Dataflow reciba varios mensajes con el mismo ID (los cuales se leerán desde el atributo con el nombre de la string que pasaste a idLabel), Dataflow descartará todos los mensajes, excepto uno. Sin embargo, Dataflow no realiza esta deduplicación de los mensajes con el mismo valor de ID de registro que se publican en Cloud Pub/Sub con más de 10 minutos de diferencia.

Si estableces una etiqueta de ID de registro cuando usas PubsubIO.Write, la transformación escribirá un atributo en todos los mensajes salientes con el nombre especificado y un valor único. Los sistemas descendentes pueden usar este valor único para deduplicar mensajes.

¿Te ha resultado útil esta página? Enviar comentarios:

Enviar comentarios sobre...

Si necesitas ayuda, visita nuestra página de asistencia.