E/S de Pub/Sub

Las transformaciones Read y Write integradas para Pub/Sub se incluyen en PubsubIO. Puedes usar PubsubIO para leer datos desde un tema o suscripción de Pub/Sub (o escribir allí). Además, puedes lograr exactamente un procesamiento de las transmisiones de mensajes de Pub/Sub, ya que PubsubIO anula la duplicación de mensajes según los identificadores de mensajes personalizados o los identificadores asignados por Pub/Sub.

Nota: De forma predeterminada, las transformaciones PubsubIO crean PCollection no delimitadas. Después de crear una PCollection mediante PubsubIO.Read, debes aplicar una estrategia de sistema de ventanas a esa PCollection antes de aplicar cualquier transformación que agrupe elementos, como GroupByKey o Combine.

Especifica un tema o suscripción de Pub/Sub

A fin de usar PubsubIO, puedes proporcionar el nombre de un tema de Pub/Sub o una suscripción que ya creaste para un tema determinado. Si usas una transformación PubsubIO con un nombre del tema, Dataflow crea y administra de forma automática una suscripción por ti en segundo plano.

Especifica un tema de Pub/Sub

Cuando usas PubsubIO, puedes elegir especificar un tema de Pub/Sub para leer o escribir. Cuando proporcionas un nombre del tema, Dataflow crea una suscripción a ese tema de manera automática. Dataflow comienza a leer en el tema desde el momento en que se inicia la canalización; cualquier dato que se publique en el tema antes de que se inicie la canalización real no estará disponible en esta.

Nota: Dataflow crea la suscripción de Pub/Sub necesaria como parte de la configuración de tu canalización. Como los tiempos de configuración de la canalización pueden variar según la cantidad de instancias de Compute Engine y otros recursos asignados en Cloud Platform, puede ser difícil saber con exactitud cuándo tu canalización comenzó a leer desde el tema de Pub/Sub. Si necesitas un control más preciso sobre el tiempo de lectura y escritura de los datos, puedes crear y administrar tu propia suscripción a un tema de Pub/Sub y pasar esa suscripción a PubsubIO. Consulta Especifica una suscripción de Pub/Sub para obtener más información.

El nombre del tema que proporciones debe seguir el formato projects/<Cloud Platform Project Name>/topics/<topic name>, en el que el nombre es el nombre del proyecto propietario del tema. Además, el nombre del tema debe cumplir con los siguientes requisitos:

  • El nombre del tema debe tener entre 3 y 255 caracteres.
  • El nombre del tema solo puede contener letras en minúsculas, números, guiones ("-"), guiones bajos ("_") y puntos (".").
  • El nombre del tema debe comenzar con una letra.
  • El nombre del tema debe terminar con una letra o un número.
  • El nombre del tema no puede comenzar con el prefijo “goog”.

Especifica una suscripción de Pub/Sub

Cuando usas una transformación PubsubIO, puedes elegir especificar una suscripción de Pub/Sub para usar cuando leas o escribas en un tema de Pub/Sub en particular. Si deseas usar una suscripción, debes haber creado y administrado la suscripción tú mismo. Consulta la documentación de suscriptores de Pub/Sub para obtener más información sobre cómo crear una suscripción de Pub/Sub.

El nombre de la suscripción que proporciones debe seguir el formato /projects/<Cloud Platform Project Name>/subscriptions/<subscription name>, en el que el nombre es el nombre del proyecto propietario de la suscripción. Además, el nombre de la suscripción que suministres debe cumplir con los siguientes requisitos:

  • El nombre de la suscripción debe tener entre 3 y 255 caracteres.
  • El nombre de la suscripción solo puede contener letras en minúsculas, números, guiones ("-"), guiones bajos ("_") y puntos (".").
  • El nombre de la suscripción debe comenzar con una letra.
  • El nombre de la suscripción debe terminar con una letra o un número.
  • El nombre de la suscripción no puede comenzar con el prefijo “goog”.

Debes usar una suscripción si deseas asegurarte de que tu canalización no pierda ningún dato de tu tema de Pub/Sub. Una suscripción de Pub/Sub continúa recopilando datos incluso si no hay una canalización que realice operaciones de lectura en ella. Cuando se inicia la canalización, tendrá acceso a todos los datos recopilados por la suscripción, incluso los datos que llegaron antes del inicio de la canalización. Además, si detienes la canalización y creas una nueva para leer desde la misma suscripción, no se perderán datos, ya que la suscripción seguirá recopilando datos sin una canalización.

Lee con PubsubIO

La transformación PubsubIO.Read lee de forma continua desde una transmisión de Pub/Sub y muestra una PCollection no delimitada de String que representan los datos de la transmisión. De forma predeterminada, cada elemento en la PCollection resultante se codifica como una string UTF-8. Puedes anular la codificación predeterminada mediante .withCoder cuando llamas a PubsubIO.Read.

Java

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  // streamData is Unbounded; apply windowing afterward.
  PCollection<String> streamData =
    p.apply(PubsubIO.Read.named("ReadFromPubsub")
                       .topic("/topics/my-topic"));

Lee un conjunto delimitado de registros de Pub/Sub

A fin de realizar pruebas, puedes leer desde Pub/Sub con el InProcessPipelineRunner (SDK de Dataflow para Java 1.X) o el DirectRunner (SDK de Dataflow para Java 2.X).

Si necesitas operar en una colección delimitada a fin de realizar pruebas, puedes proporcionar un límite en la cantidad de entradas para leer. Puedes usar la opción .maxNumRecords a fin de leer una cantidad máxima fija de registros, o puedes usar .maxReadTime para leer registros durante un tiempo determinado.

Ten en cuenta que, en este modo, no se garantizan la anulación de la duplicación, los reintentos y la recuperación durante una falla. Para la producción, debes tratar Pub/Sub como una fuente no delimitada.

Escribe con PubsubIO

La transformación PubsubIO.Write escribe de forma continua una PCollection no delimitada de objetos String en una transmisión de Pub/Sub. De forma predeterminada, la PCollection de entrada a PubsubIO.Write debe contener strings codificadas en UTF-8. Puedes cambiar la codificación y el tipo de entrada esperados mediante withCoder.

Java

  // streamData is Unbounded.
  PCollection<String> streamData = ...;
  streamData.apply(PubsubIO.Write.named("WriteToPubsub")
                       .topic("/topics/my-topic"));

ID de registros y marcas de tiempo

Puedes agregar dos tipos de metadatos a los registros que lees o escribes mediante PubsubIO: ID de registro y marcas de tiempo.

Usa marcas de tiempo especificadas por el usuario

Puedes usar marcas de tiempo especificadas por el usuario para un control preciso sobre cómo se asignan los elementos leídos de Pub/Sub a las ventanas en una canalización de Dataflow. Para establecer marcas de tiempo especificadas por el usuario, invoca timestampLabel cuando construyas transformaciones PubsubIO.Read o PubsubIO.Write, y pasa un valor de string de tu elección.

Si configuras una etiqueta de marca de tiempo especificada por el usuario cuando usas PubsubIO.Read, cuando leas elementos de Pub/Sub, la transformación usará el valor del atributo con el nombre de la string que pasaste a timestampLabel como la marca de tiempo de cada mensaje entrante. Las marcas de tiempo deben estar en milisegundos desde el tiempo Unix, o con un formato según RFC 3339.

Si configuras una etiqueta de marca de tiempo especificada por el usuario cuando usas PubsubIO.Write, la transformación escribirá cada elemento como un mensaje de Pub/Sub con un atributo del nombre específico con el valor de la marca de tiempo del elemento en milisegundos desde el tiempo Unix.

Usa los ID de registro

Los ID de registro permiten un procesamiento único y exacto en el límite entre Dataflow y otros sistemas. Para usar ID de registro, invoca idLabel cuando construyas transformaciones PubsubIO.Read o PubsubIO.Write, y pasa un valor de string de tu elección.

Si configuras una etiqueta de ID de registro cuando usas PubsubIO.Read, cuando Dataflow reciba varios mensajes con el mismo ID (que se leerá desde el atributo con el nombre de la string que pasaste a idLabel), Dataflow descartará todos los mensajes, excepto uno. Sin embargo, Dataflow no realiza esta anulación de duplicación para los mensajes con el mismo valor de ID de registro que se publican en Pub/Sub con más de 10 minutos de diferencia.

Si configuras una etiqueta de ID de registro cuando usas PubsubIO.Write, la transformación escribirá un atributo en todos los mensajes salientes con el nombre especificado y un valor único. Los sistemas descendentes pueden usar este valor único para anular la duplicación de mensajes.