Criar um pipeline do Dataflow usando Python
Neste guia de início rápido, você aprenderá a usar o SDK do Apache Beam para Python na criação de um programa que defina um pipeline. Em seguida, execute o pipeline usando um executor local direto ou um executor baseado na nuvem, como o Dataflow. Para uma introdução ao pipeline do WordCount, consulte o vídeo Como usar o WordCount no Apache Beam.
Para seguir as instruções detalhadas desta tarefa diretamente no console do Google Cloud, clique em Orientação:
Antes de começar
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:
gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:
gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Conceda papéis à conta de serviço padrão do Compute Engine. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- Substitua
PROJECT_ID
pela ID do seu projeto. - Substitua
PROJECT_NUMBER
pelo número do projeto. Para encontrar o número do projeto, consulte Identificar projetos ou use o comandogcloud projects describe
. - Substitua
SERVICE_ACCOUNT_ROLE
por cada papel individual.
-
Create a Cloud Storage bucket and configure it as follows:
-
Set the storage class to
S
(Standard). -
Defina o local de armazenamento como o seguinte:
US
(Estados Unidos). -
Substitua
BUCKET_NAME
por um nome de bucket exclusivo . Não inclua informações confidenciais no nome do bucket já que o namespace dele é global e visível para o público. - Copie o ID do projeto do Google Cloud e o nome do bucket do Cloud Storage. Você precisará desses valores posteriormente neste documento.
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
Set the storage class to
Configure o ambiente
Nesta seção, use o prompt de comando para configurar um ambiente virtual Python isolado e executar seu projeto de pipeline usando venv. Esse processo permite isolar as dependências de um projeto das dependências de outros projetos.
Caso você não tenha um prompt de comando disponível, use o Cloud Shell. O Cloud Shell já tem o gerenciador de pacotes do Python 3 instalado, portanto, você pode pular para a criação de um ambiente virtual.
Para instalar o Python e criar um ambiente virtual, siga estas etapas:
- Verifique se o Python 3 e o
pip
estão em execução no sistema:python --version python -m pip --version
- Se necessário, instale o Python 3 e, em seguida, configure um ambiente virtual do Python: siga as instruções fornecidas nas seções Como instalar o Python e Como configurar o venv do Como configurar uma página do ambiente de desenvolvimento em Python. Se você estiver usando o Python 3.10 ou posterior, também precisará ativar o Dataflow Runner v2. Para usar o Runner v1, use o Python 3.9 ou anterior.
Depois de concluir o guia de início rápido, execute deactivate
para desativar o ambiente virtual.
Instale o SDK do Apache Beam
O SDK do Apache Beam é um modelo de programação de código aberto para pipelines de dados. Defina um pipeline com um programa do Apache Beam e escolha um executor, como o Dataflow, para executar o pipeline.
Para fazer o download e instalar o SDK do Apache Beam, siga estas etapas:
- Verifique se você está no ambiente virtual do Python criado na seção anterior.
Verifique se o prompt começa com
<env_name>
, em queenv_name
é o nome do ambiente virtual. - Instale o padrão de empacotamento da roda Python:
pip install wheel
- Instale a versão mais recente do SDK do Apache Beam para Python:
pip install 'apache-beam[gcp]'
No Microsoft Windows, use o seguinte comando:
pip install apache-beam[gcp]
Dependendo da conexão, a instalação pode levar algum tempo.
Execute o pipeline localmente
Para ver como um pipeline é executado localmente, use um módulo Python pronto para o exemplo wordcount
incluído no pacote apache_beam
.
O exemplo de pipeline wordcount
faz o seguinte:
Usa um arquivo de texto como entrada.
Este arquivo de texto está localizado em um bucket do Cloud Storage com o nome do recurso
gs://dataflow-samples/shakespeare/kinglear.txt
.- Analisa cada linha na forma de palavras.
- Realiza uma contagem de frequência com base nas palavras tokenizadas.
Para preparar o pipeline wordcount
localmente, siga estas etapas:
- No terminal local, execute o exemplo
wordcount
:python -m apache_beam.examples.wordcount \ --output outputs
- Veja a saída do pipeline:
more outputs*
- Para sair, pressione q.
wordcount.py
pode ser visualizado no
GitHub do Apache Beam.
Executar o pipeline no serviço do Dataflow
Nesta seção, execute o pipeline de exemplowordcount
do pacote
apache_beam
no serviço do Dataflow. Este
exemplo especifica DataflowRunner
como o parâmetro para
--runner
.
- Execute o pipeline:
python -m apache_beam.examples.wordcount \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
Substitua:
DATAFLOW_REGION
: a região onde você quer implantar o job do Dataflow, por exemplo,europe-west1
A sinalização
--region
substitui a região padrão definida no servidor de metadados, no cliente local ou nas variáveis de ambiente.BUCKET_NAME
: o nome do bucket do Cloud Storage que você copiou anteriormentePROJECT_ID
: o ID do projeto do Google Cloud que você copiou anteriormente.
Ver os resultados
Quando você executa um pipeline usando o Dataflow, os resultados são armazenados em um bucket do Cloud Storage. Nesta seção, verifique se o pipeline está em execução usando o console do Google Cloud ou o terminal local.
Console do Google Cloud
Para ver os resultados no console do Google Cloud, siga estas etapas:
- No console do Google Cloud, acesse a página Jobs do Dataflow.
A página Jobs exibe detalhes do job do
wordcount
, incluindo o status Em execução primeiro e depois Finalizado. - Acesse a página Buckets do Cloud Storage:
Na lista de buckets do projeto, clique no bucket de armazenamento que você criou anteriormente.
No diretório
wordcount
, os arquivos de saída criados pelo seu job são exibidos.
Terminal local
Acesse os resultados no seu terminal ou usando o Cloud Shell.
- Para listar os arquivos de saída, use o comando
gcloud storage ls
:gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
- Para acessar os resultados nos arquivos de saída, use o comando
gcloud storage cat
:gcloud storage cat gs://BUCKET_NAME/results/outputs*
Substitua BUCKET_NAME
pelo nome do bucket do Cloud Storage usado
no programa de pipeline.
Modificar o código do pipeline
Nos exemplos anteriores, o pipelinewordcount
diferencia letras maiúsculas e minúsculas.
Nas etapas a seguir, mostramos como modificar o pipeline para que o wordcount
não diferencie maiúsculas de minúsculas.
- Na máquina local, faça o download da cópia mais recente do
código
wordcount
no repositório do Apache Beam no GitHub. - No terminal local, execute o pipeline:
python wordcount.py --output outputs
- Conferir os resultados:
more outputs*
- Para sair, pressione q.
- Em um editor de sua escolha, abra o arquivo
wordcount.py
. - Dentro da função
run
, examine as etapas do pipeline:counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
Depois de
split
, as linhas são divididas em palavras como strings. - Para letras minúsculas das strings, modifique a linha após
split
:counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'lowercase' >> beam.Map(str.lower) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
Essa modificação mapeia a funçãostr.lower
em cada palavra. Essa linha é equivalente abeam.Map(lambda word: str.lower(word))
. - Salve o arquivo e execute o job
wordcount
modificadopython wordcount.py --output outputs
- Veja os resultados do pipeline modificado:
more outputs*
- Para sair, pressione q.
- Execute o pipeline modificado no serviço do Dataflow:
python wordcount.py \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
Substitua:
DATAFLOW_REGION
: a região onde você quer implantar o job do DataflowBUCKET_NAME
: seu nome do bucket do Cloud StoragePROJECT_ID
: o ID do projeto do Google Cloud
Limpar
Para evitar cobranças na sua conta do Google Cloud pelos recursos usados nesta página, exclua o projeto do Google Cloud com esses recursos.
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click the checkbox for the bucket that you want to delete.
- To delete the bucket, click Delete, and then follow the instructions.
Se você mantiver o projeto, revogue os papéis concedidos à conta de serviço padrão do Compute Engine. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke