Criar um pipeline do Dataflow usando Go

Nesta página, mostramos como usar o SDK do Apache Beam para Go a fim de criar um programa que define um pipeline. Em seguida, você executa o pipeline localmente e no serviço do Dataflow. Para uma introdução ao pipeline do WordCount, consulte o vídeo Como usar o WordCount no Apache Beam.

Antes de começar

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. Instale a CLI do Google Cloud.
  3. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  4. Crie ou selecione um projeto do Google Cloud.

    • Crie um projeto do Google Cloud:

      gcloud projects create PROJECT_ID

      Substitua PROJECT_ID por um nome para o projeto do Google Cloud que você está criando.

    • Selecione o projeto do Google Cloud que você criou:

      gcloud config set project PROJECT_ID

      Substitua PROJECT_ID pelo nome do projeto do Google Cloud.

  5. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  6. Ative as APIs Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager:

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  7. Crie as credenciais de autenticação para sua Conta do Google:

    gcloud auth application-default login
  8. Atribua os papéis à sua Conta do Google. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Substitua PROJECT_ID pela ID do seu projeto.
    • Substitua EMAIL_ADDRESS pelo seu endereço de e-mail.
    • Substitua ROLE por cada papel individual.
  9. Instale a CLI do Google Cloud.
  10. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  11. Crie ou selecione um projeto do Google Cloud.

    • Crie um projeto do Google Cloud:

      gcloud projects create PROJECT_ID

      Substitua PROJECT_ID por um nome para o projeto do Google Cloud que você está criando.

    • Selecione o projeto do Google Cloud que você criou:

      gcloud config set project PROJECT_ID

      Substitua PROJECT_ID pelo nome do projeto do Google Cloud.

  12. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  13. Ative as APIs Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager:

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  14. Crie as credenciais de autenticação para sua Conta do Google:

    gcloud auth application-default login
  15. Atribua os papéis à sua Conta do Google. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Substitua PROJECT_ID pela ID do seu projeto.
    • Substitua EMAIL_ADDRESS pelo seu endereço de e-mail.
    • Substitua ROLE por cada papel individual.
  16. Conceda papéis à conta de serviço padrão do Compute Engine. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Substitua PROJECT_ID pela ID do seu projeto.
    • Substitua PROJECT_NUMBER pelo número do projeto. Para encontrar o número do projeto, consulte Identificar projetos ou use o comando gcloud projects describe.
    • Substitua SERVICE_ACCOUNT_ROLE por cada papel individual.
  17. Crie um bucket do Cloud Storage e configure-o da seguinte maneira:
    • Defina a classe de armazenamento como S (Standard).
    • Defina o local de armazenamento como o seguinte: US (Estados Unidos).
    • Substitua BUCKET_NAME por um nome de bucket exclusivo. Não inclua informações confidenciais no nome do bucket já que o namespace dele é global e visível para o público.
    gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
  18. Copie o ID do projeto do Google Cloud e o nome do bucket do Cloud Storage. Você precisará desses valores posteriormente neste guia de início rápido.

Configurar o ambiente de desenvolvimento

O SDK do Apache Beam é um modelo de programação de código aberto para pipelines de dados. Defina um pipeline com um programa do Apache Beam e escolha um executor, como o Dataflow, para executar o pipeline.

Recomendamos que você use a versão mais recente do Go ao trabalhar com o SDK do Apache Beam para Go. Se você não tiver a versão mais recente do Go instalada, use o Guia de download e instalação para fazer o download e instalar o Go no seu sistema operacional específico.

Para verificar a versão do Go instalada, execute o seguinte comando no seu terminal local:

go version

Executar o exemplo de contagem de palavras do Beam

O SDK do Apache Beam para Go inclui um exemplo de pipeline wordcount (em inglês). O exemplo wordcount faz o seguinte:

  1. Lê um arquivo de texto como entrada. Por padrão, ele lê um arquivo de texto localizado em um bucket do Cloud Storage com o nome de recurso gs://dataflow-samples/shakespeare/kinglear.txt.
  2. Analisa cada linha na forma de palavras.
  3. Realiza uma contagem de frequência com base nas palavras tokenizadas.

Para executar a versão mais recente do exemplo wordcount do Beam na máquina local, use o comando a seguir. A sinalização input especifica o arquivo a ser lido e a sinalização output especifica o nome do arquivo da saída da contagem de frequência.

go run github.com/apache/beam/sdks/v2/go/examples/wordcount@latest \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output outputs

Depois que o pipeline for concluído, veja os resultados da saída:

more outputs*

Para sair, pressione q.

Modificar o código do pipeline

O pipeline wordcount do Beam diferencia palavras maiúsculas e minúsculas. As etapas a seguir mostram como criar seu próprio módulo Go, modificar o pipeline wordcount para que ele não diferencie maiúsculas de minúsculas e executá-lo no Dataflow.

Criar um módulo Go

Para fazer alterações no código do pipeline, siga estas etapas.

  1. Crie um diretório para seu módulo Go em um local de sua escolha:

    mkdir wordcount
    cd wordcount
    
  2. Crie um módulo Go. Neste exemplo, use example/dataflow como o caminho do módulo.

    go mod init example/dataflow
    
  3. Faça o download da cópia mais recente do código wordcount no repositório do Apache Beam no GitHub. Coloque esse arquivo no diretório wordcount que você criou.

  4. Se estiver usando um sistema operacional não Linux, será necessário conseguir o pacote unix do Go. Esse pacote é necessário para executar pipelines no serviço do Dataflow.

    go get -u golang.org/x/sys/unix
    
  5. Verifique se o arquivo go.mod corresponde ao código-fonte do módulo:

    go mod tidy
    

Execute o pipeline não modificado

Verifique se o pipeline wordcount não modificado é executado localmente.

  1. No terminal, crie e execute o pipeline localmente:

     go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
         --output outputs
    
  2. Veja os resultados da saída:

     more outputs*
    
  3. Para sair, pressione q.

Alterar o código do pipeline

Para alterar o pipeline para que ele não diferencie maiúsculas de minúsculas, modifique o código para aplicar a função strings.ToLower a todas as palavras.

  1. Em um editor de sua escolha, abra o arquivo wordcount.go.

  2. Examine o bloco init (os comentários foram removidos para maior clareza):

     func init() {
       register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
       register.Function2x1(formatFn)
       register.Emitter1[string]()
     }
    
  3. Adicione uma nova linha para registrar a função strings.ToLower:

     func init() {
       register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
       register.Function2x1(formatFn)
       register.Emitter1[string]()
       register.Function1x1(strings.ToLower)
     }
    
  4. Examine a função CountWords:

     func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
       s = s.Scope("CountWords")
    
       // Convert lines of text into individual words.
       col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
    
       // Count the number of times each word occurs.
       return stats.Count(s, col)
     }
    
  5. Para colocar as palavras em minúsculas, adicione uma ParDo que aplica strings.ToLower a cada palavra:

     func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
       s = s.Scope("CountWords")
    
       // Convert lines of text into individual words.
       col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
    
       // Map all letters to lowercase.
       lowercaseWords := beam.ParDo(s, strings.ToLower, col)
    
       // Count the number of times each word occurs.
       return stats.Count(s, lowercaseWords)
     }
    
  6. Salve o arquivo.

Executar o pipeline atualizado localmente

Execute o pipeline wordcount atualizado localmente e verifique se a saída mudou.

  1. Crie e execute o pipeline wordcount modificado:

     go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
         --output outputs
    
  2. Veja os resultados de saída do pipeline modificado. Todas as palavras precisam estar em letras minúsculas.

     more outputs*
    
  3. Para sair, pressione q.

Executar o pipeline no serviço do Dataflow

Para executar o exemplo wordcount atualizado no serviço Dataflow, use o seguinte comando:

go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://BUCKET_NAME/results/outputs \
    --runner dataflow \
    --project PROJECT_ID \
    --region DATAFLOW_REGION \
    --staging_location gs://BUCKET_NAME/binaries/

Substitua:

  • BUCKET_NAME: o nome do bucket do Cloud Storage.

  • PROJECT_ID: o ID do projeto do Google Cloud.

  • DATAFLOW_REGION: a região onde você quer implantar o job do Dataflow. Por exemplo, europe-west1. Para uma lista de locais disponíveis, consulte Locais do Dataflow. A sinalização --region substitui a região padrão definida no servidor de metadados, no cliente local ou nas variáveis de ambiente.

Ver os resultados

É possível ver uma lista dos jobs do Dataflow no Console do Google Cloud. No Console do Google Cloud, acesse a página Jobs do Dataflow.

Acessar "Jobs"

A página Jobs exibe detalhes do job do wordcount, incluindo o status Em execução primeiro e, depois, Finalizado.

Quando você executa um pipeline usando o Dataflow, os resultados são armazenados em um bucket do Cloud Storage. Visualize os resultados de saída usando o Console do Google Cloud ou o terminal local.

Console

Para ver os resultados no console do Google Cloud, acesse a página Buckets do Cloud Storage.

Acessar buckets

Na lista de buckets do projeto, clique no bucket de armazenamento que você criou anteriormente. Os arquivos de saída criados pelo job são exibidos no diretório results.

Terminal

Acesse os resultados no seu terminal ou usando o Cloud Shell.

  1. Para listar os arquivos de saída, use o comando gcloud storage ls:

    gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
    

    Substitua BUCKET_NAME pelo nome do bucket de saída especificado do Cloud Storage.

  2. Para acessar os resultados nos arquivos de saída, use o comando gcloud storage cat:

    gcloud storage cat gs://BUCKET_NAME/results/outputs*
    

Limpar

Para evitar cobranças na sua conta do Google Cloud pelos recursos usados nesta página, exclua o projeto do Google Cloud com esses recursos.

  1. No Console do Google Cloud, acesse a página Buckets do Cloud Storage.

    Acessar buckets

  2. Clique na caixa de seleção do bucket que você quer excluir.
  3. Para excluir o bucket, clique em Excluir e siga as instruções.
  4. Se você mantiver o projeto, revogue os papéis concedidos à conta de serviço padrão do Compute Engine. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. Opcional: revogue as credenciais de autenticação que você criou e exclua o arquivo de credenciais local:

    gcloud auth application-default revoke
  6. Opcional: revogar credenciais da CLI gcloud.

    gcloud auth revoke

A seguir