Este tutorial mostra como implementar um pipeline de dados no Dataflow para um fluxo em tempo real de alterações à base de dados provenientes de um fluxo de alterações de uma tabela do Bigtable. A saída do pipeline é escrita numa série de ficheiros no Cloud Storage.
É fornecido um conjunto de dados de exemplo para uma aplicação de audição de música. Neste tutorial, vai monitorizar as músicas que são ouvidas e, em seguida, classificar as cinco principais durante um período.
Este tutorial destina-se a utilizadores técnicos com experiência na escrita de código e na implementação de pipelines de dados no Google Cloud.
Objetivos
Este tutorial mostra como fazer o seguinte:
- Crie uma tabela do Bigtable com uma stream de alterações ativada.
- Implemente um pipeline no Dataflow que transforme e produza a stream de alterações.
- Veja os resultados do pipeline de dados.
Custos
Neste documento, usa os seguintes componentes faturáveis do Google Cloud:
Para gerar uma estimativa de custos com base na sua utilização projetada,
use a calculadora de preços.
Quando terminar as tarefas descritas neste documento, pode evitar a faturação contínua eliminando os recursos que criou. Para mais informações, consulte o artigo Limpe.
Antes de começar
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles. -
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name. - Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles. -
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name. - Atualize e instale a
CLI
cbt
.gcloud components update gcloud components install cbt
-
Create a Cloud Storage bucket:
Replacegcloud storage buckets create gs://BUCKET_NAME
BUCKET_NAME
with a bucket name that meets the bucket naming requirements.Crie uma instância do Bigtable
Pode usar uma instância existente para este tutorial ou criar uma instância com as configurações predefinidas numa região perto de si.
Criar uma tabela
A aplicação de exemplo acompanha as músicas que os utilizadores ouvem e armazena os eventos de audição no Bigtable. Crie uma tabela com uma stream de alterações ativada que tenha uma família de colunas (cf) e uma coluna (song) e use IDs de utilizadores para chaves de linhas.
Crie a tabela.
gcloud bigtable instances tables create song-rank \ --column-families=cf --change-stream-retention-period=7d \ --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
Substitua o seguinte:
- PROJECT_ID: o ID do projeto que está a usar
- BIGTABLE_INSTANCE_ID: o ID da instância que vai conter a nova tabela
Inicie a conduta
Este pipeline transforma o fluxo de alterações fazendo o seguinte:
- Lê a stream de alterações
- Obtém o nome da música
- Agrupa os eventos de audição de músicas em intervalos de N segundos
- Conta as cinco músicas mais ouvidas
- Produz os resultados
Execute a conduta.
mvn compile exec:java -Dexec.mainClass=SongRank \ "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \ --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \ --outputLocation=gs://BUCKET_NAME/ \ --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
Substitua BIGTABLE_REGION pelo ID da região em que a sua instância do Bigtable se encontra, como
us-east5
.Compreenda o processo
Os seguintes fragmentos de código do pipeline podem ajudar a compreender o código que está a executar.
Ler a stream de alterações
O código neste exemplo configura a stream de origem com os parâmetros da instância e da tabela do Bigtable específicas.
Obter o nome da música
Quando uma música é ouvida, o nome da música é escrito na família de colunas
cf
e no qualificador de colunassong
, pelo que o código extrai o valor da mutação do fluxo de alterações e envia-o para o passo seguinte do pipeline.Contagem das cinco músicas mais ouvidas
Pode usar as funções integradas do Beam
Count
eTop.of
para obter as cinco músicas mais ouvidas na janela atual.Produção dos resultados
Este pipeline escreve os resultados na saída padrão, bem como em ficheiros. Para os ficheiros, divide as gravações em grupos de 10 elementos ou segmentos de 1 minuto.
Veja o pipeline
Na Google Cloud consola, aceda à página Fluxo de dados.
Clique na tarefa com um nome que comece por song-rank.
Na parte inferior do ecrã, clique em Mostrar para abrir o painel de registos.
Clique em Registos do trabalhador para monitorizar os registos de saída da stream de alterações.
Escritas de streams
Use a
cbt
CLI para escrever um número de audições de músicas para vários utilizadores na tabelasong-rank
. Esta ação foi concebida para escrever durante alguns minutos para simular a audição de músicas em streaming ao longo do tempo.cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \ song-rank song-rank-data.csv column-family=cf batch-size=1
Veja a saída
Leia o resultado no Cloud Storage para ver as músicas mais populares.
gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
Exemplo de saída:
2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}] 2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}] 2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
Elimine o contentor e os ficheiros.
gcloud storage rm --recursive gs://BUCKET_NAME/
Desative a stream de alterações na tabela.
gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \ --clear-change-stream-retention-period
Elimine a tabela
song-rank
.cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
Parar o pipeline de streams de alterações.
Liste as tarefas para obter o ID da tarefa.
gcloud dataflow jobs list --region=BIGTABLE_REGION
Cancele a tarefa.
gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
Substitua JOB_ID pelo ID da tarefa apresentado após o comando anterior.
Instale a CLI Google Cloud. Após a instalação, inicialize a CLI gcloud executando o seguinte comando:
gcloud init
Se estiver a usar um fornecedor de identidade (IdP) externo, primeiro tem de iniciar sessão na CLI gcloud com a sua identidade federada.
Create or select a Google Cloud project.
Roles required to select or create a project
Verify that billing is enabled for your Google Cloud project.
Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM
role (roles/serviceusage.serviceUsageAdmin
), which contains the
serviceusage.services.enable
permission. Learn how to grant
roles.
gcloud services enable dataflow.googleapis.combigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
Instale a CLI Google Cloud. Após a instalação, inicialize a CLI gcloud executando o seguinte comando:
gcloud init
Se estiver a usar um fornecedor de identidade (IdP) externo, primeiro tem de iniciar sessão na CLI gcloud com a sua identidade federada.
Create or select a Google Cloud project.
Roles required to select or create a project
Verify that billing is enabled for your Google Cloud project.
Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM
role (roles/serviceusage.serviceUsageAdmin
), which contains the
serviceusage.services.enable
permission. Learn how to grant
roles.
gcloud services enable dataflow.googleapis.combigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
Prepare o ambiente
Obter o código
Clone o repositório que contém o código de exemplo. Se já transferiu este repositório anteriormente, extraia a versão mais recente.
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams
Crie um contentor
Limpar
Para evitar incorrer em custos na sua conta do Google Cloud pelos recursos usados neste tutorial, elimine o projeto que contém os recursos ou mantenha o projeto e elimine os recursos individuais.
Elimine o projeto
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID