Criar um pipeline do Dataflow usando Go
Nesta página, mostramos como usar o SDK do Apache Beam para Go a fim de criar um programa que define um pipeline. Em seguida, você executa o pipeline localmente e no serviço do Dataflow. Para uma introdução ao pipeline do WordCount, consulte o vídeo Como usar o WordCount no Apache Beam.
Antes de começar
- Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
- Instale a CLI do Google Cloud.
-
Para inicializar a CLI gcloud, execute o seguinte comando:
gcloud init
-
Crie ou selecione um projeto do Google Cloud.
-
Crie um projeto do Google Cloud:
gcloud projects create PROJECT_ID
Substitua
PROJECT_ID
por um nome para o projeto do Google Cloud que você está criando. -
Selecione o projeto do Google Cloud que você criou:
gcloud config set project PROJECT_ID
Substitua
PROJECT_ID
pelo nome do projeto do Google Cloud.
-
-
Verifique se a cobrança está ativada para o seu projeto do Google Cloud.
-
Ative as APIs Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager:
gcloud services enable dataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
Crie as credenciais de autenticação para sua Conta do Google:
gcloud auth application-default login
-
Atribua os papéis à sua Conta do Google. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- Substitua
PROJECT_ID
pela ID do seu projeto. - Substitua
EMAIL_ADDRESS
pelo seu endereço de e-mail. - Substitua
ROLE
por cada papel individual.
- Substitua
- Instale a CLI do Google Cloud.
-
Para inicializar a CLI gcloud, execute o seguinte comando:
gcloud init
-
Crie ou selecione um projeto do Google Cloud.
-
Crie um projeto do Google Cloud:
gcloud projects create PROJECT_ID
Substitua
PROJECT_ID
por um nome para o projeto do Google Cloud que você está criando. -
Selecione o projeto do Google Cloud que você criou:
gcloud config set project PROJECT_ID
Substitua
PROJECT_ID
pelo nome do projeto do Google Cloud.
-
-
Verifique se a cobrança está ativada para o seu projeto do Google Cloud.
-
Ative as APIs Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager:
gcloud services enable dataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
Crie as credenciais de autenticação para sua Conta do Google:
gcloud auth application-default login
-
Atribua os papéis à sua Conta do Google. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- Substitua
PROJECT_ID
pela ID do seu projeto. - Substitua
EMAIL_ADDRESS
pelo seu endereço de e-mail. - Substitua
ROLE
por cada papel individual.
- Substitua
Conceda papéis à conta de serviço padrão do Compute Engine. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- Substitua
PROJECT_ID
pela ID do seu projeto. - Substitua
PROJECT_NUMBER
pelo número do projeto. Para encontrar o número do projeto, consulte Identificar projetos ou use o comandogcloud projects describe
. - Substitua
SERVICE_ACCOUNT_ROLE
por cada papel individual.
-
Crie um bucket do Cloud Storage e configure-o da seguinte maneira:
-
Defina a classe de armazenamento como
S
(Standard). -
Defina o local de armazenamento como o seguinte:
US
(Estados Unidos). -
Substitua
BUCKET_NAME
por um nome de bucket exclusivo. Não inclua informações confidenciais no nome do bucket já que o namespace dele é global e visível para o público.
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
Defina a classe de armazenamento como
- Copie o ID do projeto do Google Cloud e o nome do bucket do Cloud Storage. Você precisará desses valores posteriormente neste guia de início rápido.
Configurar o ambiente de desenvolvimento
O SDK do Apache Beam é um modelo de programação de código aberto para pipelines de dados. Defina um pipeline com um programa do Apache Beam e escolha um executor, como o Dataflow, para executar o pipeline.
Recomendamos que você use a versão mais recente do Go ao trabalhar com o SDK do Apache Beam para Go. Se você não tiver a versão mais recente do Go instalada, use o Guia de download e instalação para fazer o download e instalar o Go no seu sistema operacional específico.
Para verificar a versão do Go instalada, execute o seguinte comando no seu terminal local:
go version
Executar o exemplo de contagem de palavras do Beam
O SDK do Apache Beam para Go inclui um
exemplo de pipeline wordcount
(em inglês).
O exemplo wordcount
faz o seguinte:
- Lê um arquivo de texto como entrada. Por padrão, ele lê um arquivo de texto localizado em um
bucket do Cloud Storage com o nome de recurso
gs://dataflow-samples/shakespeare/kinglear.txt
. - Analisa cada linha na forma de palavras.
- Realiza uma contagem de frequência com base nas palavras tokenizadas.
Para executar a versão mais recente do exemplo wordcount
do Beam na máquina local, use o comando a seguir. A sinalização input
especifica o arquivo a ser lido
e a sinalização output
especifica o nome do arquivo da saída da contagem de frequência.
go run github.com/apache/beam/sdks/v2/go/examples/wordcount@latest \
--input gs://dataflow-samples/shakespeare/kinglear.txt \
--output outputs
Depois que o pipeline for concluído, veja os resultados da saída:
more outputs*
Para sair, pressione q.
Modificar o código do pipeline
O pipeline wordcount
do Beam diferencia palavras maiúsculas e minúsculas. As etapas a seguir mostram como criar seu próprio módulo Go, modificar o pipeline wordcount
para que ele não diferencie maiúsculas de minúsculas e executá-lo no Dataflow.
Criar um módulo Go
Para fazer alterações no código do pipeline, siga estas etapas.
Crie um diretório para seu módulo Go em um local de sua escolha:
mkdir wordcount
cd wordcount
Crie um módulo Go. Neste exemplo, use
example/dataflow
como o caminho do módulo.go mod init example/dataflow
Faça o download da cópia mais recente do código
wordcount
no repositório do Apache Beam no GitHub. Coloque esse arquivo no diretóriowordcount
que você criou.Se estiver usando um sistema operacional não Linux, será necessário conseguir o pacote
unix
do Go. Esse pacote é necessário para executar pipelines no serviço do Dataflow.go get -u golang.org/x/sys/unix
Verifique se o arquivo
go.mod
corresponde ao código-fonte do módulo:go mod tidy
Execute o pipeline não modificado
Verifique se o pipeline wordcount
não modificado é executado localmente.
No terminal, crie e execute o pipeline localmente:
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
Veja os resultados da saída:
more outputs*
Para sair, pressione q.
Alterar o código do pipeline
Para alterar o pipeline para que ele não diferencie maiúsculas de minúsculas, modifique o código para aplicar a função strings.ToLower
a todas as palavras.
Em um editor de sua escolha, abra o arquivo
wordcount.go
.Examine o bloco
init
(os comentários foram removidos para maior clareza):func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() }
Adicione uma nova linha para registrar a função
strings.ToLower
:func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() register.Function1x1(strings.ToLower) }
Examine a função
CountWords
:func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Count the number of times each word occurs. return stats.Count(s, col) }
Para colocar as palavras em minúsculas, adicione uma ParDo que aplica
strings.ToLower
a cada palavra:func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Map all letters to lowercase. lowercaseWords := beam.ParDo(s, strings.ToLower, col) // Count the number of times each word occurs. return stats.Count(s, lowercaseWords) }
Salve o arquivo.
Executar o pipeline atualizado localmente
Execute o pipeline wordcount
atualizado localmente e verifique se a saída mudou.
Crie e execute o pipeline
wordcount
modificado:go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
Veja os resultados de saída do pipeline modificado. Todas as palavras precisam estar em letras minúsculas.
more outputs*
Para sair, pressione q.
Executar o pipeline no serviço do Dataflow
Para executar o exemplo wordcount
atualizado no serviço Dataflow, use o seguinte comando:
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://BUCKET_NAME/results/outputs \
--runner dataflow \
--project PROJECT_ID \
--region DATAFLOW_REGION \
--staging_location gs://BUCKET_NAME/binaries/
Substitua:
BUCKET_NAME
: o nome do bucket do Cloud Storage.PROJECT_ID
: o ID do projeto do Google Cloud.DATAFLOW_REGION
: a região onde você quer implantar o job do Dataflow. Por exemplo,europe-west1
. Para uma lista de locais disponíveis, consulte Locais do Dataflow. A sinalização--region
substitui a região padrão definida no servidor de metadados, no cliente local ou nas variáveis de ambiente.
Ver os resultados
É possível ver uma lista dos jobs do Dataflow no Console do Google Cloud. No Console do Google Cloud, acesse a página Jobs do Dataflow.
A página Jobs exibe detalhes do job do wordcount
, incluindo o status
Em execução primeiro e, depois, Finalizado.
Quando você executa um pipeline usando o Dataflow, os resultados são armazenados em um bucket do Cloud Storage. Visualize os resultados de saída usando o Console do Google Cloud ou o terminal local.
Console
Para ver os resultados no console do Google Cloud, acesse a página Buckets do Cloud Storage.
Na lista de buckets do projeto, clique no bucket de armazenamento que você criou anteriormente. Os arquivos de saída criados pelo job são exibidos no
diretório results
.
Terminal
Acesse os resultados no seu terminal ou usando o Cloud Shell.
Para listar os arquivos de saída, use o comando
gcloud storage ls
:gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
Substitua
BUCKET_NAME
pelo nome do bucket de saída especificado do Cloud Storage.Para acessar os resultados nos arquivos de saída, use o comando
gcloud storage cat
:gcloud storage cat gs://BUCKET_NAME/results/outputs*
Limpar
Para evitar cobranças na sua conta do Google Cloud pelos recursos usados nesta página, exclua o projeto do Google Cloud com esses recursos.
- No Console do Google Cloud, acesse a página Buckets do Cloud Storage.
- Clique na caixa de seleção do bucket que você quer excluir.
- Para excluir o bucket, clique em Excluir e siga as instruções.
Se você mantiver o projeto, revogue os papéis concedidos à conta de serviço padrão do Compute Engine. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
Opcional: revogue as credenciais de autenticação que você criou e exclua o arquivo de credenciais local:
gcloud auth application-default revoke
-
Opcional: revogar credenciais da CLI gcloud.
gcloud auth revoke