Transmitir alterações com o Dataflow

Com o conector do Bigtable Beam, é possível usar o Dataflow para ler registros de alteração de dados do Bigtable sem precisar rastrear ou processar alterações de partição no código, porque o conector processa essa lógica para você.

Neste documento, descrevemos como configurar e usar o conector do Bigtable Beam para ler um fluxo de alterações usando um pipeline do Dataflow. Antes de ler este documento, leia a Visão geral do fluxo de alterações e conheça o Dataflow.

Alternativas para criar seu próprio pipeline

Se você não quiser criar seu próprio canal do Dataflow, use uma das opções a seguir.

Você pode usar um modelo do Dataflow fornecido pelo Google.

Você também pode usar os exemplos de código do tutorial ou do guia de início rápido do Bigtable como ponto de partida para seu código.

Verifique se o código gerado usa o google cloud libraries-bom versão 26.14.0 ou posterior.

Detalhes do conector

O método do conector do Bigtable Beam, BigtableIO.readChangeStream, permite ler um fluxo de registros de alteração de dados (ChangeStreamMutation) que podem ser processados. O conector do Bigtable Beam é um componente do repositório do GitHub do Apache Beam (em inglês). Para conferir uma descrição do código do conector, consulte os comentários em BigtableIO.java.

É preciso usar o conector com a versão 2.48.0 ou posterior do Beam. Verifique o suporte ao ambiente de execução do Apache Beam para garantir que você está usando uma versão compatível do Java. É possível implantar um pipeline que usa o conector no Dataflow, que lida com o provisionamento e o gerenciamento de recursos e ajuda na escalonabilidade e confiabilidade do processamento de dados do fluxo.

Para mais informações sobre o modelo de programação do Apache Beam, consulte a documentação do Beam.

Como agrupar dados sem horas de evento

Os registros de alteração de dados transmitidos usando o conector do Bigtable Beam não são compatíveis com funções do Dataflow que dependem de horários do evento.

Conforme explicado em Replicação e marcas-d'água, uma marca-d'água baixa poderá não avançar se a replicação da partição não tiver alcançado o restante da instância. Quando uma marca-d'água baixa deixa de avançar, isso pode interromper o fluxo de alterações.

Para evitar que o stream seja interrompido, o conector do Bigtable Beam gera todos os dados com um carimbo de data/hora de saída igual a zero. O carimbo de data/hora zerado faz o Dataflow considerar todos os registros de alteração de dados como dados atrasados. Por isso, os recursos do Dataflow que dependem das horas de evento não são compatíveis com o fluxo de alterações do Bigtable. Especificamente, não é possível usar funções de gestão de janelas, gatilhos de hora de evento ou timers de hora de evento.

Em vez disso, é possível usar GlobalWindows com gatilhos de horário que não seja de evento para agrupar esses dados atrasados em painéis, conforme demonstrado no exemplo do tutorial. Para detalhes sobre gatilhos e painéis, consulte Gatilhos no guia de programação do Beam.

Escalonamento automático

O conector é compatível com o escalonamento automático do Dataflow, que é ativado por padrão ao usar o Runner v2 (obrigatório). O algoritmo de escalonamento automático do Dataflow considera o backlog estimado do fluxo de alterações, que pode ser monitorado na página Monitoramento do Dataflow na seção Backlog. Use a flag --maxNumWorkers ao implantar um job para limitar o número de workers.

Para escalonar manualmente o pipeline em vez de usar o escalonamento automático, consulte Como escalonar manualmente um pipeline de streaming.

Limitações

Observe as seguintes limitações antes de usar o conector do Bigtable Beam com o Dataflow.

Dataflow Runner V2

O conector só pode ser executado usando o Dataflow Runner v2. Para ativá-lo, especifique --experiments=use_runner_v2 nos argumentos da linha de comando. A execução com o Runner v1 causa falha no pipeline com a seguinte exceção:

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

Snapshots

O conector não é compatível com snapshots do Dataflow.

Cópias

O conector do Bigtable Beam transmite alterações para cada chave de linha e cada cluster na ordem do carimbo de data/hora de confirmação, mas, como às vezes reinicia a partir de momentos anteriores no stream, pode produzir duplicatas.

Antes de começar

Antes de usar o conector, cumpra os pré-requisitos a seguir.

Configurar a autenticação

Para usar as amostras de Java nesta página de um ambiente de desenvolvimento local, instale e inicialize a CLI gcloud e, em seguida, configure o Application Default Credentials com as credenciais de usuário.

  1. Instale a CLI do Google Cloud.
  2. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  3. Crie as credenciais de autenticação para sua Conta do Google:

    gcloud auth application-default login

Veja mais informações em: Configurar a autenticação para um ambiente de desenvolvimento local.

Para informações sobre como configurar a autenticação para um ambiente de produção, consulte Configure o Application Default Credentials para o código em execução no Google Cloud.

Ativar um fluxo de alterações

É necessário ativar um fluxo de alterações em uma tabela para poder lê-lo. Também é possível criar uma nova tabela com os fluxo de alterações ativados.

Tabela de metadados do fluxo de alterações

Quando você transmite alterações com o Dataflow, o conector do Bigtable Beam cria uma tabela de metadados que é denominada __change_stream_md_table por padrão. A tabela de metadados do fluxo de alterações gerencia o estado operacional do conector e armazena metadados sobre os registros de alteração de dados.

Por padrão, o conector cria a tabela na mesma instância que a tabela que está sendo transmitida. Para garantir que a tabela funcione corretamente, o perfil de aplicativo da tabela de metadados precisa usar roteamento de cluster único e ter transações de linha única ativadas.

Para mais informações sobre alterações de streaming do Bigtable com o conector do Bigtable Beam, consulte a documentação do BigtableIO.

Funções exigidas

Para receber as permissões necessárias para ler um fluxo de alterações do Bigtable usando o Dataflow, peça ao administrador para conceder a você os seguintes papéis do IAM.

Para ler as alterações do Bigtable, você precisa deste papel:

  • Administrador do Bigtable (roles/bigtable.admin) na instância do Bigtable que contém a tabela com as alterações que você pretende mostrar

Para executar o job do Dataflow, você precisa destes papéis:

Para mais informações sobre como conceder papéis, consulte Gerenciar acesso.

Também é possível receber as permissões necessárias por meio de papéis personalizados ou outros papéis predefinidos.

Adicionar o conector do Bigtable Beam como uma dependência

Adicione um código semelhante à seguinte dependência ao arquivo pom.xml do Maven. A versão precisa ser 2.48.0 ou posterior.

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Ler o fluxo de alterações

Para criar um pipeline do Dataflow para ler os registros de alteração de dados, configure o conector e adicione transformações e coletores. Em seguida, use o conector para ler objetos ChangeStreamMutation em um pipeline do Beam.

Os exemplos de código nesta seção, escritos em Java, demonstram como criar um pipeline e usá-lo para converter pares de chave-valor em uma string. Cada par consiste em uma chave de linha e um objeto ChangeStreamMutation. O pipeline converte as entradas de cada objeto em uma string separada por vírgulas.

Criar o pipeline

Este exemplo de código em Java demonstra como criar o pipeline:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

Processar os registros de alteração de dados

Neste exemplo, demonstramos como percorrer em repetição todas as entradas em um registro de alteração de dados de uma linha e chamar um método de conversão em string com base no tipo de entrada.

Veja uma lista dos tipos de entrada que um registro de alteração de dados pode conter em O que há em um registro de alteração de dados.

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

Neste exemplo, uma entrada de gravação é convertida:

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

Neste exemplo, uma entrada de exclusão de células é convertida:

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

Neste exemplo, uma exclusão de um grupo de colunas é convertida:


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

Monitorar

Os recursos a seguir no console do Google Cloud permitem monitorar os recursos do Google Cloud enquanto você executa um pipeline do Dataflow para ler um fluxo de alterações do Bigtable:

Verifique sobretudo as seguintes métricas:

  • Na página Monitoring do Bigtable, verifique estas metrics:
    • Dados de utilização da CPU por fluxo de alterações na métrica cpu_load_by_app_profile_by_method_by_table. Mostra o impacto do fluxo de alterações no uso da CPU do cluster.
    • Utilização do armazenamento do fluxo de alterações (bytes) (change_stream_log_used_bytes).
  • Na página de monitoramento do Dataflow, verifique a atualização de dados, que mostra a diferença entre a hora atual e a marca-d'água. Ela precisa ser por volta de dois minutos, com picos pontuais de um ou dois minutos a mais. Se a métrica de atualização de dados é consistentemente maior que esse limite, provavelmente o pipeline tem poucos recursos e é necessário adicionar mais workers do Dataflow. A atualização de dados não indica se os registros de alteração de dados estão sendo processados lentamente.
  • A métrica processing_delay_from_commit_timestamp_MEAN do Dataflow pode informar o tempo médio de processamento dos registros de alteração de dados durante a vida útil do job.

A métrica server/latencies do Bigtable não é útil quando você está monitorando um pipeline do Dataflow que está lendo um fluxo de alterações do Bigtable, porque reflete a duração da solicitação de streaming, não a latência de processamento do registro de alteração de dados. A alta latência em um fluxo de alterações não significa que as solicitações estão sendo processadas lentamente, mas significa que a conexão ficou aberta por tanto tempo.

A seguir