Criar um pipeline do Dataflow usando Go

Nesta página, mostramos como usar o SDK do Apache Beam para Go a fim de criar um programa que define um pipeline. Em seguida, você executa o pipeline localmente e no serviço do Dataflow. Para uma introdução ao pipeline do WordCount, consulte o vídeo Como usar o WordCount no Apache Beam.

Antes de começar

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  6. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  13. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Conceda papéis à conta de serviço padrão do Compute Engine. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Substitua PROJECT_ID pela ID do seu projeto.
    • Substitua PROJECT_NUMBER pelo número do projeto. Para encontrar o número do projeto, consulte Identificar projetos ou use o comando gcloud projects describe.
    • Substitua SERVICE_ACCOUNT_ROLE por cada papel individual.
  17. Create a Cloud Storage bucket and configure it as follows:
    • Set the storage class to S (Standard).
    • Defina o local de armazenamento como o seguinte: US (Estados Unidos).
    • Substitua BUCKET_NAME por um nome de bucket exclusivo . Não inclua informações confidenciais no nome do bucket já que o namespace dele é global e visível para o público.
    • gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
    • Copie o ID do projeto do Google Cloud e o nome do bucket do Cloud Storage. Você precisará desses valores posteriormente neste guia de início rápido.

Configurar o ambiente de desenvolvimento

O SDK do Apache Beam é um modelo de programação de código aberto para pipelines de dados. Defina um pipeline com um programa do Apache Beam e escolha um executor, como o Dataflow, para executar o pipeline.

Recomendamos que você use a versão mais recente do Go ao trabalhar com o SDK do Apache Beam para Go. Se você não tiver a versão mais recente do Go instalada, use o Guia de download e instalação para fazer o download e instalar o Go no seu sistema operacional específico.

Para verificar a versão do Go instalada, execute o seguinte comando no seu terminal local:

go version

Executar o exemplo de contagem de palavras do Beam

O SDK do Apache Beam para Go inclui um exemplo de pipeline wordcount (em inglês). O exemplo wordcount faz o seguinte:

  1. Lê um arquivo de texto como entrada. Por padrão, ele lê um arquivo de texto localizado em um bucket do Cloud Storage com o nome de recurso gs://dataflow-samples/shakespeare/kinglear.txt.
  2. Analisa cada linha na forma de palavras.
  3. Realiza uma contagem de frequência com base nas palavras tokenizadas.

Para executar a versão mais recente do exemplo wordcount do Beam na máquina local, use o comando a seguir. A sinalização input especifica o arquivo a ser lido e a sinalização output especifica o nome do arquivo da saída da contagem de frequência.

go run github.com/apache/beam/sdks/v2/go/examples/wordcount@latest \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output outputs

Depois que o pipeline for concluído, veja os resultados da saída:

more outputs*

Para sair, pressione q.

Modificar o código do pipeline

O pipeline wordcount do Beam diferencia palavras maiúsculas e minúsculas. As etapas a seguir mostram como criar seu próprio módulo Go, modificar o pipeline wordcount para que ele não diferencie maiúsculas de minúsculas e executá-lo no Dataflow.

Criar um módulo Go

Para fazer alterações no código do pipeline, siga estas etapas.

  1. Crie um diretório para seu módulo Go em um local de sua escolha:

    mkdir wordcount
    cd wordcount
    
  2. Crie um módulo Go. Neste exemplo, use example/dataflow como o caminho do módulo.

    go mod init example/dataflow
    
  3. Faça o download da cópia mais recente do código wordcount no repositório do Apache Beam no GitHub. Coloque esse arquivo no diretório wordcount que você criou.

  4. Se estiver usando um sistema operacional não Linux, será necessário conseguir o pacote unix do Go. Esse pacote é necessário para executar pipelines no serviço do Dataflow.

    go get -u golang.org/x/sys/unix
    
  5. Verifique se o arquivo go.mod corresponde ao código-fonte do módulo:

    go mod tidy
    

Execute o pipeline não modificado

Verifique se o pipeline wordcount não modificado é executado localmente.

  1. No terminal, crie e execute o pipeline localmente:

     go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
         --output outputs
    
  2. Veja os resultados da saída:

     more outputs*
    
  3. Para sair, pressione q.

Alterar o código do pipeline

Para alterar o pipeline para que ele não diferencie maiúsculas de minúsculas, modifique o código para aplicar a função strings.ToLower a todas as palavras.

  1. Em um editor de sua escolha, abra o arquivo wordcount.go.

  2. Examine o bloco init (os comentários foram removidos para maior clareza):

     func init() {
       register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
       register.Function2x1(formatFn)
       register.Emitter1[string]()
     }
    
  3. Adicione uma nova linha para registrar a função strings.ToLower:

     func init() {
       register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
       register.Function2x1(formatFn)
       register.Emitter1[string]()
       register.Function1x1(strings.ToLower)
     }
    
  4. Examine a função CountWords:

     func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
       s = s.Scope("CountWords")
    
       // Convert lines of text into individual words.
       col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
    
       // Count the number of times each word occurs.
       return stats.Count(s, col)
     }
    
  5. Para colocar as palavras em minúsculas, adicione uma ParDo que aplica strings.ToLower a cada palavra:

     func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
       s = s.Scope("CountWords")
    
       // Convert lines of text into individual words.
       col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
    
       // Map all letters to lowercase.
       lowercaseWords := beam.ParDo(s, strings.ToLower, col)
    
       // Count the number of times each word occurs.
       return stats.Count(s, lowercaseWords)
     }
    
  6. Salve o arquivo.

Executar o pipeline atualizado localmente

Execute o pipeline wordcount atualizado localmente e verifique se a saída mudou.

  1. Crie e execute o pipeline wordcount modificado:

     go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
         --output outputs
    
  2. Veja os resultados de saída do pipeline modificado. Todas as palavras precisam estar em letras minúsculas.

     more outputs*
    
  3. Para sair, pressione q.

Executar o pipeline no serviço do Dataflow

Para executar o exemplo wordcount atualizado no serviço Dataflow, use o seguinte comando:

go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://BUCKET_NAME/results/outputs \
    --runner dataflow \
    --project PROJECT_ID \
    --region DATAFLOW_REGION \
    --staging_location gs://BUCKET_NAME/binaries/

Substitua:

  • BUCKET_NAME: o nome do bucket do Cloud Storage.

  • PROJECT_ID: o ID do projeto do Google Cloud.

  • DATAFLOW_REGION: a região onde você quer implantar o job do Dataflow. Por exemplo, europe-west1. Para uma lista de locais disponíveis, consulte Locais do Dataflow. A sinalização --region substitui a região padrão definida no servidor de metadados, no cliente local ou nas variáveis de ambiente.

Ver os resultados

É possível ver uma lista dos jobs do Dataflow no Console do Google Cloud. No Console do Google Cloud, acesse a página Jobs do Dataflow.

Acessar "Jobs"

A página Jobs exibe detalhes do job do wordcount, incluindo o status Em execução primeiro e, depois, Finalizado.

Quando você executa um pipeline usando o Dataflow, os resultados são armazenados em um bucket do Cloud Storage. Visualize os resultados de saída usando o Console do Google Cloud ou o terminal local.

Console

Para ver os resultados no console do Google Cloud, acesse a página Buckets do Cloud Storage.

Acessar buckets

Na lista de buckets do projeto, clique no bucket de armazenamento que você criou anteriormente. Os arquivos de saída criados pelo job são exibidos no diretório results.

Terminal

Acesse os resultados no seu terminal ou usando o Cloud Shell.

  1. Para listar os arquivos de saída, use o comando gcloud storage ls:

    gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
    

    Substitua BUCKET_NAME pelo nome do bucket de saída especificado do Cloud Storage.

  2. Para acessar os resultados nos arquivos de saída, use o comando gcloud storage cat:

    gcloud storage cat gs://BUCKET_NAME/results/outputs*
    

Limpar

Para evitar cobranças na sua conta do Google Cloud pelos recursos usados nesta página, exclua o projeto do Google Cloud com esses recursos.

  1. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  2. Click the checkbox for the bucket that you want to delete.
  3. To delete the bucket, click Delete, and then follow the instructions.
  4. Se você mantiver o projeto, revogue os papéis concedidos à conta de serviço padrão do Compute Engine. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  6. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

A seguir