E/S do Pub/Sub

As transformações integradas Read e Write do Cloud Pub/Sub estão incluídas no PubsubIO. É possível usar o PubsubIO para ler e gravar dados em um tópico ou assinatura do Cloud Pub/Sub. Além disso, é possível conseguir um processamento único de streams de mensagens do Cloud Pub/Sub, uma vez que o PubsubIO remove mensagens duplicadas com base em identificadores de mensagens personalizados ou atribuídos pelo Cloud Pub/Sub.

Observação: por padrão, transformações PubsubIO criam PCollections ilimitadas. Depois de criar uma PCollection usando PubsubIO.Read, aplique uma estratégia de gestão de janelas nessa PCollection antes de qualquer transformação que agrupe elementos como GroupByKey ou Combine.

Como especificar um tópico ou assinatura do Cloud Pub/Sub

Para usar o PubsubIO, forneça o nome de um tópico do Cloud Pub/Sub ou uma assinatura que você já tenha criado para um determinado tópico. Se você usa uma transformação PubsubIO com um nome de tópico, uma assinatura é criada e gerenciada automaticamente nos bastidores pelo Dataflow.

Como especificar um tópico do Cloud Pub/Sub

Ao usar o PubsubIO, opte por especificar um tópico do Cloud Pub/Sub para ler ou gravar. Quando você fornece um nome de tópico, uma assinatura é criada automaticamente para ele. A leitura desse tópico pelo Dataflow começa no mesmo momento que o canal. Dados publicados no tópico antes do início do canal real não estão disponíveis para seu canal.

Observação: o Dataflow cria a assinatura necessária do Cloud Pub/Sub como parte da configuração do canal. Como os tempos de configuração do canal podem variar de acordo com o número de instâncias do Compute Engine e de outros recursos alocados no Cloud Platform, é difícil saber exatamente quando o canal iniciou a leitura do tópico do Cloud Pub / Sub. Se for necessário um controle mais rígido do tempo de leitura e gravação, crie e gerencie a própria assinatura em um tópico do Cloud Pub/Sub e a transmita para o PubsubIO. Consulte Como especificar uma assinatura do Cloud Pub/Sub para mais informações.

O nome do tópico que você fornecer precisa seguir o formato projects/<Cloud Platform Project Name>/topics/<topic name>, em que o nome do projeto é o nome do tópico. Além disso, é necessário que o nome do tópico atenda aos seguintes requisitos:

  • O nome do tópico precisa ter entre 3 e 255 caracteres.
  • No nome do tópico, só é possível usar letras minúsculas, números, traços ("-"), sublinhados ("_") e pontos finais (".").
  • O nome do tópico precisa começar com uma letra.
  • O nome do tópico precisa terminar com uma letra ou um número.
  • Não é possível iniciar o nome do tópico com o prefixo "goog".

Como especificar uma assinatura do Cloud Pub/Sub

Ao usar uma transformação PubsubIO, opte por especificar uma assinatura do Cloud Pub/Sub para ler ou gravar em um determinado tópico do Cloud Pub/Sub. Para usar uma assinatura, é preciso ter criado e gerenciado sua própria. Consulte a documentação do assinante do Cloud Pub/Sub para conseguir mais informações sobre como criar uma assinatura do Cloud Pub/Sub.

O nome de assinatura fornecido deve seguir o formato /projects/<Cloud Platform Project Name>/subscriptions/<subscription name>, em que o nome do projeto é o nome do projeto proprietário da assinatura. Além disso, o nome da assinatura deve atender aos seguintes requisitos:

  • O nome da assinatura precisa ter entre 3 e 255 caracteres.
  • No nome da assinatura, só é possível usar letras minúsculas, números, traços ("-"), sublinhados ("_") e pontos finais (".").
  • O nome da assinatura precisa começar com uma letra.
  • O nome da assinatura precisa terminar com uma letra ou um número.
  • Não é possível iniciar o nome da assinatura com o prefixo "goog".

Use uma assinatura se quiser garantir que seu canal não perca nenhum dado do tópico do Cloud Pub/Sub. Uma assinatura do Cloud Pub/Sub continua coletando dados, mesmo sem nenhuma leitura de canal. Quando o canal for iniciado, ele terá acesso a todos os dados coletados pela assinatura, até mesmo dados anteriores à inicialização do canal. Além disso, se você parar o canal e criar um novo para fazer a leitura na mesma assinatura, nenhum dado será perdido, uma vez que a assinatura continuará coletando dados na ausência dele.

Como ler com PubsubIO

A transformação PubsubIO.Read faz leituras contínuas de um stream do Cloud Pub/Sub e retorna uma PCollection ilimitada de Strings que representa os dados do stream. Por padrão, cada elemento na PCollection resultante é codificado como uma string UTF-8. Substitua a codificação padrão usando .withCoder ao chamar PubsubIO.Read.

Java

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  // streamData is Unbounded; apply windowing afterward.
  PCollection<String> streamData =
    p.apply(PubsubIO.Read.named("ReadFromPubsub")
                       .topic("/topics/my-topic"));

Como ler um conjunto limitado de registros do Cloud Pub/Sub

Para fins de teste, leia dados do Cloud Pub/Sub com o InProcessPipelineRunner (SDK 1.x para Java do Cloud Dataflow)ou o DirectRunner (SDK do Dataflow para Java 2.X).

Para operar em uma coleção limitada para fins de teste, forneça um limite na quantidade de entrada para leitura. Opte por .maxNumRecords para ler um número máximo fixo de registros ou .maxReadTime para ler registros por um período de tempo fixo.

Observe que a eliminação da duplicação, as tentativas e a recuperação durante a falha não são garantidas neste modo. Para produção, trate o Cloud Pub/Sub como uma fonte ilimitada.

Gravar com PubsubIO

A transformação PubsubIO.Write grava continuamente uma PCollection ilimitada de objetos String para um stream do Cloud Pub/Sub. Por padrão, a entrada PCollection para PubsubIO.Write precisa conter strings codificadas em UTF-8. Altere o tipo de entrada e a codificação esperada usando withCoder.

Java

  // streamData is Unbounded.
  PCollection<String> streamData = ...;
  streamData.apply(PubsubIO.Write.named("WriteToPubsub")
                       .topic("/topics/my-topic"));

Carimbos de data/hora e códigos de registro

Adicione dois tipos de metadados aos registros lidos ou gravados por você usando PubsubIO: carimbos de data/hora e códigos de registro.

Como usar carimbos de data/hora especificados pelo usuário

Use carimbos de data/hora especificados pelo usuário para um controle preciso sobre como os elementos lidos do Cloud Pub/Sub são atribuídos a janelas em um canal do Dataflow. Para criar esses carimbos de data/hora, chame timestampLabel ao construir transformações PubsubIO.Read ou PubsubIO.Write passando um valor de string de sua escolha.

Se você tiver configurado um rótulo de carimbo de data/hora especificado pelo usuário ao utilizar PubsubIO.Read durante a leitura de elementos do Cloud Pub/Sub, o valor do atributo com o nome da string transmitida para timestampLabel será usado pela transformação como carimbo de data/hora de cada mensagem de entrada. Carimbos de data/hora precisam estar em forma de milissegundos desde a era Unix ou formatados de acordo com RFC 3339.

Se você tiver configurado um rótulo de carimbo de data/hora especificado pelo usuário ao utilizar PubsubIO.Write, cada elemento será gravado pela transformação como uma mensagem do Cloud Pub/Sub com um atributo do nome fornecido com o valor do carimbo de data/hora em milésimos de segundo, desde o período Unix.

Como usar códigos de registro

Códigos de registro permitem exatamente um único processamento no limite entre o Dataflow e outros sistemas. Para usar códigos de registro, invoque idLabel ao construir transformações PubsubIO.Read ou PubsubIO.Write passando um valor de string de sua escolha.

Se você tiver configurado um rótulo de código de registro durante o uso de PubsubIO.Read, quando o Dataflow receber diversas mensagens com o mesmo código, ele descartará todas as mensagens, exceto uma. As mensagens são lidas com base no atributo com o nome da string transmitida para idLabel. No entanto, o Dataflow não realiza essa eliminação de duplicação para mensagens com o mesmo valor do código de registro publicado no Cloud Pub/Sub com mais de 10 minutos de intervalo.

Se você configurar um rótulo de código de registro ao usar PubsubIO.Write, um atributo será gravado pela transformação em todas as mensagens enviadas com o nome especificado e um valor exclusivo. Sistemas downstream usam esse valor exclusivo para eliminar a duplicação de mensagens.

Esta página foi útil? Conte sua opinião sobre:

Enviar comentários sobre…

Precisa de ajuda? Acesse nossa página de suporte.