Criar um pipeline do Dataflow usando Go
Nesta página, mostramos como usar o SDK do Apache Beam para Go a fim de criar um programa que define um pipeline. Em seguida, você executa o pipeline localmente e no serviço do Dataflow. Para uma introdução ao pipeline do WordCount, consulte o vídeo Como usar o WordCount no Apache Beam.
Antes de começar
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:
gcloud services enable dataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:
gcloud services enable dataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Conceda papéis à conta de serviço padrão do Compute Engine. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- Substitua
PROJECT_ID
pela ID do seu projeto. - Substitua
PROJECT_NUMBER
pelo número do projeto. Para encontrar o número do projeto, consulte Identificar projetos ou use o comandogcloud projects describe
. - Substitua
SERVICE_ACCOUNT_ROLE
por cada papel individual.
-
Create a Cloud Storage bucket and configure it as follows:
-
Set the storage class to
S
(Standard). -
Defina o local de armazenamento como o seguinte:
US
(Estados Unidos). -
Substitua
BUCKET_NAME
por um nome de bucket exclusivo . Não inclua informações confidenciais no nome do bucket já que o namespace dele é global e visível para o público. - Copie o ID do projeto do Google Cloud e o nome do bucket do Cloud Storage. Você precisará desses valores posteriormente neste guia de início rápido.
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
Set the storage class to
Configurar o ambiente de desenvolvimento
O SDK do Apache Beam é um modelo de programação de código aberto para pipelines de dados. Defina um pipeline com um programa do Apache Beam e escolha um executor, como o Dataflow, para executar o pipeline.
Recomendamos que você use a versão mais recente do Go ao trabalhar com o SDK do Apache Beam para Go. Se você não tiver a versão mais recente do Go instalada, use o Guia de download e instalação para fazer o download e instalar o Go no seu sistema operacional específico.
Para verificar a versão do Go instalada, execute o seguinte comando no seu terminal local:
go version
Executar o exemplo de contagem de palavras do Beam
O SDK do Apache Beam para Go inclui um
exemplo de pipeline wordcount
(em inglês).
O exemplo wordcount
faz o seguinte:
- Lê um arquivo de texto como entrada. Por padrão, ele lê um arquivo de texto localizado em um
bucket do Cloud Storage com o nome de recurso
gs://dataflow-samples/shakespeare/kinglear.txt
. - Analisa cada linha na forma de palavras.
- Realiza uma contagem de frequência com base nas palavras tokenizadas.
Para executar a versão mais recente do exemplo wordcount
do Beam na máquina local, use o comando a seguir. A sinalização input
especifica o arquivo a ser lido
e a sinalização output
especifica o nome do arquivo da saída da contagem de frequência.
go run github.com/apache/beam/sdks/v2/go/examples/wordcount@latest \
--input gs://dataflow-samples/shakespeare/kinglear.txt \
--output outputs
Depois que o pipeline for concluído, veja os resultados da saída:
more outputs*
Para sair, pressione q.
Modificar o código do pipeline
O pipeline wordcount
do Beam diferencia palavras maiúsculas e minúsculas. As etapas a seguir mostram como criar seu próprio módulo Go, modificar o pipeline wordcount
para que ele não diferencie maiúsculas de minúsculas e executá-lo no Dataflow.
Criar um módulo Go
Para fazer alterações no código do pipeline, siga estas etapas.
Crie um diretório para seu módulo Go em um local de sua escolha:
mkdir wordcount
cd wordcount
Crie um módulo Go. Neste exemplo, use
example/dataflow
como o caminho do módulo.go mod init example/dataflow
Faça o download da cópia mais recente do código
wordcount
no repositório do Apache Beam no GitHub. Coloque esse arquivo no diretóriowordcount
que você criou.Se estiver usando um sistema operacional não Linux, será necessário conseguir o pacote
unix
do Go. Esse pacote é necessário para executar pipelines no serviço do Dataflow.go get -u golang.org/x/sys/unix
Verifique se o arquivo
go.mod
corresponde ao código-fonte do módulo:go mod tidy
Execute o pipeline não modificado
Verifique se o pipeline wordcount
não modificado é executado localmente.
No terminal, crie e execute o pipeline localmente:
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
Veja os resultados da saída:
more outputs*
Para sair, pressione q.
Alterar o código do pipeline
Para alterar o pipeline para que ele não diferencie maiúsculas de minúsculas, modifique o código para aplicar a função strings.ToLower
a todas as palavras.
Em um editor de sua escolha, abra o arquivo
wordcount.go
.Examine o bloco
init
(os comentários foram removidos para maior clareza):func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() }
Adicione uma nova linha para registrar a função
strings.ToLower
:func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() register.Function1x1(strings.ToLower) }
Examine a função
CountWords
:func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Count the number of times each word occurs. return stats.Count(s, col) }
Para colocar as palavras em minúsculas, adicione uma ParDo que aplica
strings.ToLower
a cada palavra:func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Map all letters to lowercase. lowercaseWords := beam.ParDo(s, strings.ToLower, col) // Count the number of times each word occurs. return stats.Count(s, lowercaseWords) }
Salve o arquivo.
Executar o pipeline atualizado localmente
Execute o pipeline wordcount
atualizado localmente e verifique se a saída mudou.
Crie e execute o pipeline
wordcount
modificado:go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
Veja os resultados de saída do pipeline modificado. Todas as palavras precisam estar em letras minúsculas.
more outputs*
Para sair, pressione q.
Executar o pipeline no serviço do Dataflow
Para executar o exemplo wordcount
atualizado no serviço Dataflow, use o seguinte comando:
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://BUCKET_NAME/results/outputs \
--runner dataflow \
--project PROJECT_ID \
--region DATAFLOW_REGION \
--staging_location gs://BUCKET_NAME/binaries/
Substitua:
BUCKET_NAME
: o nome do bucket do Cloud Storage.PROJECT_ID
: o ID do projeto do Google Cloud.DATAFLOW_REGION
: a região onde você quer implantar o job do Dataflow. Por exemplo,europe-west1
. Para uma lista de locais disponíveis, consulte Locais do Dataflow. A sinalização--region
substitui a região padrão definida no servidor de metadados, no cliente local ou nas variáveis de ambiente.
Ver os resultados
É possível ver uma lista dos jobs do Dataflow no Console do Google Cloud. No Console do Google Cloud, acesse a página Jobs do Dataflow.
A página Jobs exibe detalhes do job do wordcount
, incluindo o status
Em execução primeiro e, depois, Finalizado.
Quando você executa um pipeline usando o Dataflow, os resultados são armazenados em um bucket do Cloud Storage. Visualize os resultados de saída usando o Console do Google Cloud ou o terminal local.
Console
Para ver os resultados no console do Google Cloud, acesse a página Buckets do Cloud Storage.
Na lista de buckets do projeto, clique no bucket de armazenamento que você criou anteriormente. Os arquivos de saída criados pelo job são exibidos no
diretório results
.
Terminal
Acesse os resultados no seu terminal ou usando o Cloud Shell.
Para listar os arquivos de saída, use o comando
gcloud storage ls
:gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
Substitua
BUCKET_NAME
pelo nome do bucket de saída especificado do Cloud Storage.Para acessar os resultados nos arquivos de saída, use o comando
gcloud storage cat
:gcloud storage cat gs://BUCKET_NAME/results/outputs*
Limpar
Para evitar cobranças na sua conta do Google Cloud pelos recursos usados nesta página, exclua o projeto do Google Cloud com esses recursos.
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click the checkbox for the bucket that you want to delete.
- To delete the bucket, click Delete, and then follow the instructions.
Se você mantiver o projeto, revogue os papéis concedidos à conta de serviço padrão do Compute Engine. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke