Neste tutorial, mostramos como implantar um pipeline de dados no Dataflow para um fluxo em tempo real das alterações do banco de dados provenientes de um fluxo de alterações da tabela do Bigtable. A saída do pipeline é gravada em uma série de arquivos no Cloud Storage.
Um exemplo de conjunto de dados para um aplicativo de música é fornecido. Neste tutorial, você rastreia as músicas ouvidas e, em seguida, classifica as cinco principais ao longo de um período.
Este tutorial é destinado a usuários técnicos familiarizados com a escrita de código e a implantação de pipelines de dados no Google Cloud.
Objetivos
Este tutorial mostra como fazer o seguinte:
- Criar uma tabela do Bigtable com um fluxo de alterações ativado
- Implantar um pipeline no Dataflow que transforma e gera o fluxo de alterações.
- Conferir resultados do seu pipeline de dados
Custos
Neste documento, você vai usar os seguintes componentes faturáveis do Google Cloud:
Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços.
Ao concluir as tarefas descritas neste documento, é possível evitar o faturamento contínuo excluindo os recursos criados. Saiba mais em Limpeza.
Antes de começar
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name. -
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name. - Atualize e instale a
CLI
cbt
.gcloud components update gcloud components install cbt
-
Create a Cloud Storage bucket and configure it as follows:
-
Substitua
STORAGE_CLASS
pela classe de armazenamento de sua preferência. -
Substitua
LOCATION
pelo local de sua preferência (ASIA
,EU
, ouUS
) -
Substitua
BUCKET_NAME
por um nome de bucket exclusivo um nome de bucket que atenda aos requisitos de nome de bucket. - PROJECT_ID: o ID do projeto que você está usando.
- BIGTABLE_INSTANCE_ID: o ID da instância para conter a nova tabela.
- Lê o fluxo de alterações
- Obtém o nome da música
- Agrupa os eventos de música para ouvir em janelas de N segundos
- Conta as cinco músicas principais
- Gera os resultados
No Google Cloud console, acesse a página Dataflow.
Clique no job com um nome que comece com song-rank.
Na parte inferior da tela, clique em Mostrar para abrir o painel de registros.
Clique em Registros do worker para monitorar os registros de saída do fluxo de alterações.
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STORAGE_CLASS --location LOCATION
Criar uma instância do Bigtable
É possível usar uma instância deste tutorial ou criar uma instância com as configurações padrão em uma região perto de você.
Criar uma tabela
O aplicativo de amostra rastreia as músicas que os usuários ouvem e armazena os eventos de detecção no Bigtable. Crie uma tabela com um fluxo de alterações ativado que tenha um grupo de colunas (cf) e uma coluna (música) e que use IDs de usuário para chaves de linha.
Crie a tabela.
gcloud bigtable instances tables create song-rank \ --column-families=cf --change-stream-retention-period=7d \ --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
Substitua:
Iniciar o pipeline
Esse pipeline transforma o stream de alteração fazendo o seguinte:
Executar o pipeline.
mvn compile exec:java -Dexec.mainClass=SongRank \ "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \ --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \ --outputLocation=gs://BUCKET_NAME/ \ --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
Substitua BIGTABLE_REGION pelo ID da região em que sua instância do Bigtable está, como
us-east5
.Entender o pipeline
Os snippets de código do pipeline a seguir podem ajudar a entender o código que você está executando.
Como ler o fluxo de alterações
O código nesta amostra configura o fluxo de origem com os parâmetros para a instância e a tabela específicas do Bigtable.
Como conseguir o nome da música
Quando uma música é ouvida, o nome dela é gravado no grupo de colunas
cf
e no qualificador de colunasong
. Assim, o código extrai o valor da mutação do fluxo de alterações e o envia para a próxima etapa do pipeline.Como contar as cinco músicas principais
É possível usar as funções
Count
eTop.of
integradas do Beam para encontrar facilmente as cinco músicas principais na janela atual.Como gerar os resultados
Esse pipeline grava os resultados na saída padrão e nos arquivos. Para os arquivos, ele exibe as gravações em grupos de 10 elementos ou segmentos de um minuto.
Ver o pipeline
Fazer streaming de gravações
Use a CLI
cbt
para gravar várias faixas de música para vários usuários na tabelasong-rank
. Ela é projetada para gravar durante alguns minutos e simular o streaming de detecções de músicas ao longo do tempo.cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \ song-rank song-rank-data.csv column-family=cf batch-size=1
Veja o resultado
Leia a saída no Cloud Storage para ver as músicas mais famosas.
gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
Exemplo de saída:
2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}] 2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}] 2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
-
Substitua
Exclua o bucket e os arquivos.
gcloud storage rm --recursive gs://BUCKET_NAME/
Desative o fluxo de alterações na tabela.
gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \ --clear-change-stream-retention-period
Exclua a tabela
song-rank
.cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
Interrompa o pipeline de fluxo de alterações.
Liste os jobs para ver o ID deles.
gcloud dataflow jobs list --region=BIGTABLE_REGION
Cancele o job.
gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
Substitua JOB_ID pelo ID do job exibido após o comando anterior.
Instale a CLI do Google Cloud. Após a instalação, inicialize a Google Cloud CLI executando o seguinte comando:
gcloud init
Se você estiver usando um provedor de identidade externo (IdP), primeiro faça login na CLI gcloud com sua identidade federada.
Create or select a Google Cloud project.
Verify that billing is enabled for your Google Cloud project.
Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:
gcloud services enable dataflow.googleapis.combigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
Instale a CLI do Google Cloud. Após a instalação, inicialize a Google Cloud CLI executando o seguinte comando:
gcloud init
Se você estiver usando um provedor de identidade externo (IdP), primeiro faça login na CLI gcloud com sua identidade federada.
Create or select a Google Cloud project.
Verify that billing is enabled for your Google Cloud project.
Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:
gcloud services enable dataflow.googleapis.combigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
Prepare o ambiente
Buscar o código
Clone o repositório GitHub que contém o código de exemplo: Se você já fez o download desse repositório, extraia a versão mais recente.
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams
Criar um bucket
Limpar
Para evitar cobranças na sua conta do Google Cloud pelos recursos usados no tutorial, exclua o projeto que os contém ou mantenha o projeto e exclua os recursos individuais.
Excluir o projeto
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID