Processe uma stream de alterações do Bigtable


Este tutorial mostra como implementar um pipeline de dados no Dataflow para um fluxo em tempo real de alterações à base de dados provenientes de um fluxo de alterações de uma tabela do Bigtable. A saída do pipeline é escrita numa série de ficheiros no Cloud Storage.

É fornecido um conjunto de dados de exemplo para uma aplicação de audição de música. Neste tutorial, vai monitorizar as músicas que são ouvidas e, em seguida, classificar as cinco principais durante um período.

Este tutorial destina-se a utilizadores técnicos com experiência na escrita de código e na implementação de pipelines de dados no Google Cloud.

Objetivos

Este tutorial mostra como fazer o seguinte:

  • Crie uma tabela do Bigtable com uma stream de alterações ativada.
  • Implemente um pipeline no Dataflow que transforme e produza a stream de alterações.
  • Veja os resultados do pipeline de dados.

Custos

Neste documento, usa os seguintes componentes faturáveis do Google Cloud:

Para gerar uma estimativa de custos com base na sua utilização projetada, use a calculadora de preços.

Os novos Google Cloud utilizadores podem ser elegíveis para uma avaliação gratuita.

Quando terminar as tarefas descritas neste documento, pode evitar a faturação contínua eliminando os recursos que criou. Para mais informações, consulte o artigo Limpe.

Antes de começar

    Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.

    Instale a CLI Google Cloud. Após a instalação, inicialize a CLI gcloud executando o seguinte comando:

    gcloud init

    Se estiver a usar um fornecedor de identidade (IdP) externo, primeiro tem de iniciar sessão na CLI gcloud com a sua identidade federada.

    Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

    Verify that billing is enabled for your Google Cloud project.

    Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com

    Instale a CLI Google Cloud. Após a instalação, inicialize a CLI gcloud executando o seguinte comando:

    gcloud init

    Se estiver a usar um fornecedor de identidade (IdP) externo, primeiro tem de iniciar sessão na CLI gcloud com a sua identidade federada.

    Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

    Verify that billing is enabled for your Google Cloud project.

    Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  1. Atualize e instale a CLI cbt .
    gcloud components update
    gcloud components install cbt
  2. Prepare o ambiente

    Obter o código

    Clone o repositório que contém o código de exemplo. Se já transferiu este repositório anteriormente, extraia a versão mais recente.

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
    cd java-docs-samples/bigtable/beam/change-streams
    

    Crie um contentor

  3. Create a Cloud Storage bucket:
    gcloud storage buckets create gs://BUCKET_NAME
    Replace BUCKET_NAME with a bucket name that meets the bucket naming requirements.

    Crie uma instância do Bigtable

    Pode usar uma instância existente para este tutorial ou criar uma instância com as configurações predefinidas numa região perto de si.

    Criar uma tabela

    A aplicação de exemplo acompanha as músicas que os utilizadores ouvem e armazena os eventos de audição no Bigtable. Crie uma tabela com uma stream de alterações ativada que tenha uma família de colunas (cf) e uma coluna (song) e use IDs de utilizadores para chaves de linhas.

    Crie a tabela.

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    Substitua o seguinte:

    • PROJECT_ID: o ID do projeto que está a usar
    • BIGTABLE_INSTANCE_ID: o ID da instância que vai conter a nova tabela

    Inicie a conduta

    Este pipeline transforma o fluxo de alterações fazendo o seguinte:

    1. Lê a stream de alterações
    2. Obtém o nome da música
    3. Agrupa os eventos de audição de músicas em intervalos de N segundos
    4. Conta as cinco músicas mais ouvidas
    5. Produz os resultados

    Execute a conduta.

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    Substitua BIGTABLE_REGION pelo ID da região em que a sua instância do Bigtable se encontra, como us-east5.

    Compreenda o processo

    Os seguintes fragmentos de código do pipeline podem ajudar a compreender o código que está a executar.

    Ler a stream de alterações

    O código neste exemplo configura a stream de origem com os parâmetros da instância e da tabela do Bigtable específicas.

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    Obter o nome da música

    Quando uma música é ouvida, o nome da música é escrito na família de colunas cf e no qualificador de colunas song, pelo que o código extrai o valor da mutação do fluxo de alterações e envia-o para o passo seguinte do pipeline.

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    Contagem das cinco músicas mais ouvidas

    Pode usar as funções integradas do Beam Count e Top.of para obter as cinco músicas mais ouvidas na janela atual.

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    Produção dos resultados

    Este pipeline escreve os resultados na saída padrão, bem como em ficheiros. Para os ficheiros, divide as gravações em grupos de 10 elementos ou segmentos de 1 minuto.

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    Veja o pipeline

    1. Na Google Cloud consola, aceda à página Fluxo de dados.

      Aceda ao Dataflow

    2. Clique na tarefa com um nome que comece por song-rank.

    3. Na parte inferior do ecrã, clique em Mostrar para abrir o painel de registos.

    4. Clique em Registos do trabalhador para monitorizar os registos de saída da stream de alterações.

    Escritas de streams

    Use a cbtCLI para escrever um número de audições de músicas para vários utilizadores na tabela song-rank. Esta ação foi concebida para escrever durante alguns minutos para simular a audição de músicas em streaming ao longo do tempo.

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    Veja a saída

    Leia o resultado no Cloud Storage para ver as músicas mais populares.

    gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    Exemplo de saída:

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
    
  4. Limpar

    Para evitar incorrer em custos na sua conta do Google Cloud pelos recursos usados neste tutorial, elimine o projeto que contém os recursos ou mantenha o projeto e elimine os recursos individuais.

    Elimine o projeto

      Delete a Google Cloud project:

      gcloud projects delete PROJECT_ID

    Elimine recursos individuais

    1. Elimine o contentor e os ficheiros.

      gcloud storage rm --recursive gs://BUCKET_NAME/
      
    2. Desative a stream de alterações na tabela.

      gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \
      --clear-change-stream-retention-period
      
    3. Elimine a tabela song-rank.

      cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
      
    4. Parar o pipeline de streams de alterações.

      1. Liste as tarefas para obter o ID da tarefa.

        gcloud dataflow jobs list --region=BIGTABLE_REGION
        
      2. Cancele a tarefa.

        gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
        

        Substitua JOB_ID pelo ID da tarefa apresentado após o comando anterior.

    O que se segue?