Transmitir alterações com o Dataflow

Com o conector Bigtable Beam, você usa o Dataflow para ler registros de alteração de dados do Bigtable sem precisar rastrear ou processar alterações de partição no código, porque o conector processa essa lógica para você.

Neste documento, descrevemos como configurar e usar o conector Bigtable Beam para ler um fluxo de alterações usando um pipeline do Dataflow. Antes de ler este documento, leia a Visão geral do fluxo de alterações e conheça o Dataflow.

Alternativas para criar seu próprio pipeline

Se você não quiser criar seu próprio canal do Dataflow, use uma das opções a seguir.

Você pode usar um modelo do Dataflow fornecido pelo Google.

Você também pode usar os exemplos de código do tutorial ou do guia de início rápido do Bigtable como ponto de partida para seu código.

Verifique se o código gerado usa o google cloud libraries-bom versão 26.14.0 ou posterior.

Detalhes do conector

O método do conector Bigtable Beam, BigtableIO.readChangeStream, permite ler um fluxo de registros de alteração de dados (ChangeStreamMutation) que podem ser processados. O conector Bigtable Beam é um componente do repositório do Apache Beam no GitHub (em inglês). Para conferir uma descrição do código do conector, consulte os comentários em BigtableIO.java.

É preciso usar o conector com a versão 2.48.0 ou posterior do Beam. Verifique o suporte ao ambiente de execução do Apache Beam para garantir que você está usando uma versão compatível do Java. É possível implantar um pipeline que usa o conector no Dataflow, que lida com o provisionamento e o gerenciamento de recursos e ajuda na escalonabilidade e confiabilidade do processamento de dados do fluxo.

Para mais informações sobre o modelo de programação do Apache Beam, consulte a documentação do Beam.

Como agrupar dados sem horas de evento

Os registros de alteração de dados transmitidos usando o conector do Bigtable Beam não são compatíveis com as funções do Dataflow que dependem de horários de evento.

Conforme explicado em Replicação e marcas-d'água, uma marca-d'água baixa poderá não avançar se a replicação da partição não tiver alcançado o restante da instância. Quando uma marca-d'água baixa deixa de avançar, isso pode interromper o fluxo de alterações.

Para evitar a interrupção do fluxo, o conector do Beam do Bigtable gera todos os dados com um carimbo de data/hora de saída igual a zero. O carimbo de data/hora zerado faz o Dataflow considerar todos os registros de alteração de dados como dados atrasados. Por isso, os recursos do Dataflow que dependem das horas de evento não são compatíveis com o fluxo de alterações do Bigtable. Especificamente, não é possível usar funções de gestão de janelas, gatilhos de hora de evento ou timers de hora de evento.

Em vez disso, use GlobalWindows com gatilhos de horário sem evento para agrupar esses dados atrasados em painéis, conforme demonstrado no exemplo do tutorial. Para saber mais sobre gatilhos e painéis, consulte Gatilhos no guia de programação do Beam.

Escalonamento automático

O conector é compatível com o escalonamento automático do Dataflow, que é ativado por padrão ao usar o Runner v2 (obrigatório). O algoritmo do escalonamento automático do Dataflow considera o backlog estimado do fluxo de alterações, que pode ser monitorado na página Monitoramento do Dataflow na seção Backlog. Use a flag --maxNumWorkers ao implantar um job para limitar o número de workers.

Para escalonar manualmente o pipeline em vez de usar o escalonamento automático, consulte Como escalonar manualmente um pipeline de streaming.

Limitações

Observe as seguintes limitações antes de usar o conector do Bigtable Beam com o Dataflow.

Dataflow Runner V2

O conector só pode ser executado usando o Dataflow Runner v2. Para ativá-lo, especifique --experiments=use_runner_v2 nos argumentos da linha de comando. A execução com o Runner v1 causa falha no pipeline com a seguinte exceção:

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

Snapshots

O conector não é compatível com snapshots do Dataflow.

Cópias

O conector do Beam para Bigtable transmite mudanças para cada chave de linha e cada cluster na ordem do carimbo de data/hora de confirmação, mas, como às vezes ele é reiniciado em momentos anteriores no fluxo, pode produzir duplicações.

Antes de começar

Antes de usar o conector, conclua os pré-requisitos a seguir.

Configurar a autenticação

Para usar os exemplos Java desta página em um ambiente de desenvolvimento local, instale e inicialize o gcloud CLI e e configure o Application Default Credentials com suas credenciais de usuário.

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

Confira mais informações em Set up authentication for a local development environment.

Para informações sobre como configurar a autenticação para um ambiente de produção, consulte Set up Application Default Credentials for code running on Google Cloud.

Ativar um fluxo de alterações

Você precisa ativar um fluxo de alterações em uma tabela antes de poder lê-la. Também é possível criar uma nova tabela com fluxo de alterações ativados.

Alterar a tabela de metadados da transmissão

Quando você transmite mudanças com o Dataflow, o conector Bigtable Beam cria uma tabela de metadados chamada __change_stream_md_table por padrão. A tabela de metadados do fluxo de alterações gerencia o estado operacional do conector e armazena metadados sobre registros de mudança de dados.

Por padrão, o conector cria a tabela na mesma instância que a tabela que está sendo transmitida. Para garantir que a tabela funcione corretamente, o perfil do app para a tabela de metadados precisa usar o roteamento de cluster único e ter transações de linha única ativadas.

Para mais informações sobre como fazer streaming de mudanças do Bigtable com o conector Bigtable Beam, consulte a documentação do BigtableIO.

Funções exigidas

Para receber as permissões necessárias para ler um fluxo de alterações do Bigtable usando o Dataflow, peça ao administrador para conceder a você os seguintes papéis do IAM.

Para ler as alterações do Bigtable, você precisa deste papel:

  • Administrador do Bigtable (roles/bigtable.admin) na instância do Bigtable que contém a tabela com as alterações que você pretende mostrar

Para executar o job do Dataflow, você precisa destes papéis:

Para mais informações sobre como conceder papéis, consulte Gerenciar acesso.

Também é possível conseguir as permissões necessárias com papéis personalizados ou outros papéis predefinidos.

Adicionar o conector Bigtable Beam como uma dependência

Adicione um código semelhante à seguinte dependência ao arquivo pom.xml do Maven. A versão precisa ser 2.48.0 ou posterior.

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Ler o fluxo de alterações

Para criar um pipeline do Dataflow para ler os registros de alteração de dados, configure o conector e adicione transformações e coletores. Em seguida, use o conector para ler objetos ChangeStreamMutation em um pipeline do Beam.

Os exemplos de código nesta seção, escritos em Java, demonstram como criar um pipeline e usá-lo para converter pares de chave-valor em uma string. Cada par consiste em uma chave de linha e um objeto ChangeStreamMutation. O pipeline converte as entradas de cada objeto em uma string separada por vírgulas.

Criar o pipeline

Este exemplo de código em Java demonstra como criar o pipeline:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

Processar os registros de alteração de dados

Neste exemplo, demonstramos como percorrer em repetição todas as entradas em um registro de alteração de dados de uma linha e chamar um método de conversão em string com base no tipo de entrada.

Veja uma lista dos tipos de entrada que um registro de alteração de dados pode conter em O que há em um registro de alteração de dados.

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

Neste exemplo, uma entrada de gravação é convertida:

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

Neste exemplo, uma entrada de exclusão de células é convertida:

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

Neste exemplo, uma exclusão de um grupo de colunas é convertida:


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

Monitoramento

Os recursos a seguir no console do Google Cloud permitem monitorar os recursos do Google Cloud enquanto você executa um pipeline do Dataflow para ler um fluxo de alterações do Bigtable:

Verifique sobretudo as seguintes métricas:

  • Na página Monitoring do Bigtable, verifique estas métricas:
    • Dados de utilização da CPU por fluxo de alterações na métrica cpu_load_by_app_profile_by_method_by_table. Mostra o impacto do fluxo de alterações no uso da CPU do cluster.
    • Utilização do armazenamento do fluxo de alterações (bytes) (change_stream_log_used_bytes).
  • Na página de monitoramento do Dataflow, verifique a atualização de dados, que mostra a diferença entre a hora atual e a marca-d'água. Ela precisa ser por volta de dois minutos, com picos pontuais de um ou dois minutos a mais. Se a métrica de atualização de dados é consistentemente maior que esse limite, provavelmente o pipeline tem poucos recursos e é necessário adicionar mais workers do Dataflow. A atualização dos dados não indica se os registros de alteração de dados estão sendo processados lentamente.
  • A métrica processing_delay_from_commit_timestamp_MEAN do Dataflow pode informar o tempo médio de processamento dos registros de alteração de dados durante a duração do job.

A métrica server/latencies do Bigtable não é útil quando você está monitorando um pipeline do Dataflow que está lendo um fluxo de alterações do Bigtable, porque ela reflete a duração da solicitação de streaming, não a latência do processamento do registro de alteração de dados. Uma latência alta em um fluxo de mudanças não significa que as solicitações estão sendo processadas lentamente. Significa que a conexão ficou aberta por esse tempo.

A seguir