Depois de criar e organizar seu modelo do Dataflow, execute-o com o console do Google Cloud, a API REST ou a Google Cloud CLI. É possível implantar jobs de modelo do Dataflow em muitos ambientes, inclusive no ambiente padrão do App Engine, nas funções do Cloud Run e em outros ambientes restritos.
Usar o Console do Google Cloud
É possível usar o Console do Google Cloud para executar modelos do Dataflow personalizados e fornecidos pelo Google.
Modelos fornecidos pelo Google
Para executar um modelo fornecido pelo Google:
- Acesse a página do Dataflow no Console do Google Cloud. Acessar a página do Dataflow
- Clique em add_boxCRIAR JOB USANDO UM MODELO.
- Selecione o modelo fornecido pelo Google que você quer executar no menu suspenso Modelo do Dataflow.
- Digite o nome de um job no campo Nome do job.
- Digite os valores de parâmetro nos campos fornecidos. Você não precisa da seção Parâmetros adicionais ao usar um modelo fornecido pelo Google.
- Clique em Run Job.
Modelos personalizados
Para executar um modelo personalizado:
- Acesse a página do Dataflow no Console do Google Cloud. Acessar a página do Dataflow
- Clique em CRIAR JOB A PARTIR DO MODELO.
- Selecione Modelo personalizado no menu suspenso Modelo do Dataflow.
- Digite o nome de um job no campo Nome do job.
- Digite o caminho do Cloud Storage para o arquivo modelo no campo do caminho do modelo no Cloud Storage.
- Caso o modelo precise de parâmetros, clique em addADICIONAR PARÂMETRO na seção Parâmetros adicionais. Digite o Nome e o Valor do parâmetro. Repita essa etapa para cada parâmetro necessário.
- Clique em Run Job.
Usar a API REST
Para executar um modelo com uma solicitação da API REST, envie uma solicitação POST HTTP com o ID do projeto. Isso requer uma autorização.
Consulte a referência da API REST para projects.locations.templates.launch e saiba mais sobre os parâmetros disponíveis.
Criar um job em lote de modelo personalizado
Neste exemplo, a solicitação projects.locations.templates.launch cria um job em lote a partir de um modelo que lê um arquivo de texto e grava um arquivo de texto de saída. Caso a solicitação tenha êxito, o corpo de resposta conterá uma instância de LaunchTemplateResponse.
Modifique os seguintes valores:
- Substitua
YOUR_PROJECT_ID
pela ID do seu projeto. - Substitua
LOCATION
pela região do Dataflow de sua escolha. - Substitua
JOB_NAME
por um nome de job de sua escolha. - Substitua
YOUR_BUCKET_NAME
pelo nome do bucket no Cloud Storage. - Defina
gcsPath
como o local do Cloud Storage do arquivo de modelo. - Defina
parameters
como sua lista de pares de chave/valor. - Defina
tempLocation
como um local em que você tenha permissão de gravação. Esse valor é obrigatório para executar modelos fornecidos pelo Google.
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName { "jobName": "JOB_NAME", "parameters": { "inputFile" : "gs://YOUR_BUCKET_NAME/input/my_input.txt", "output": "gs://YOUR_BUCKET_NAME/output/my_output" }, "environment": { "tempLocation": "gs://YOUR_BUCKET_NAME/temp", "zone": "us-central1-f" } }
Criar um job de streaming de modelo personalizado
Neste exemplo, a solicitação projects.locations.templates.launch cria um job de streaming com base em um modelo clássico que lê de uma assinatura do Pub/Sub e grava em uma tabela do BigQuery. Se você quiser iniciar um modelo flexível, use projects.locations.flexTemplates.launch. O modelo de exemplo é fornecido pelo Google. É possível modificar o caminho do modelo para que ele aponte para um modelo personalizado. A mesma lógica é usada para iniciar modelos personalizados e fornecidos pelo Google. Neste exemplo, a tabela do BigQuery já precisa existir com o esquema apropriado. Caso tenha êxito, o corpo de resposta conterá uma instância de LaunchTemplateResponse.
Modifique os seguintes valores:
- Substitua
YOUR_PROJECT_ID
pela ID do seu projeto. - Substitua
LOCATION
pela região do Dataflow de sua escolha. - Substitua
JOB_NAME
por um nome de job de sua escolha. - Substitua
YOUR_BUCKET_NAME
pelo nome do bucket no Cloud Storage. - Substitua
GCS_PATH
pelo local do Cloud Storage do arquivo de modelo. O local deve começar com gs:// - Defina
parameters
como sua lista de pares de chave/valor. Os parâmetros listados são específicos a esse exemplo de modelo. Se você estiver usando um modelo personalizado, modifique os parâmetros conforme necessário. Se você estiver usando o modelo de exemplo, substitua as variáveis a seguir.- Substitua
YOUR_SUBSCRIPTION_NAME
pelo nome da sua assinatura do Pub/Sub. - Substitua
YOUR_DATASET
pelo conjunto de dados do BigQuery eYOUR_TABLE_NAME
pelo nome da tabela do BigQuery.
- Substitua
- Defina
tempLocation
como um local em que você tenha permissão de gravação. Esse valor é obrigatório para executar modelos fornecidos pelo Google.
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME", "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME" }, "environment": { "tempLocation": "gs://YOUR_BUCKET_NAME/temp", "zone": "us-central1-f" } }
Atualizar um job de streaming de modelo personalizado
Este exemplo de solicitação project.locations.templates.launch mostra como atualizar um job de streaming de modelo. Se você quiser atualizar um modelo flexível, use projects.locations.flexTemplates.launch.
- Execute o Exemplo 2: como criar um job de streaming de modelo personalizado para iniciar um job de modelo de streaming.
- Envie a seguinte solicitação POST HTTP com os seguintes valores modificados:
- Substitua
YOUR_PROJECT_ID
pela ID do seu projeto. - Substitua
LOCATION
pela região do Dataflow do job que você está atualizando. - Substitua
JOB_NAME
pelo nome exato do job que você quer atualizar. - Substitua
GCS_PATH
pelo local do Cloud Storage do arquivo de modelo. O local deve começar com gs:// - Defina
parameters
como sua lista de pares de chave/valor. Os parâmetros listados são específicos a esse exemplo de modelo. Se você estiver usando um modelo personalizado, modifique os parâmetros conforme necessário. Se você estiver usando o modelo de exemplo, substitua as variáveis a seguir.- Substitua
YOUR_SUBSCRIPTION_NAME
pelo nome da sua assinatura do Pub/Sub. - Substitua
YOUR_DATASET
pelo conjunto de dados do BigQuery eYOUR_TABLE_NAME
pelo nome da tabela do BigQuery.
- Substitua
- Use o parâmetro
environment
para alterar as configurações do ambiente, como o tipo de máquina. Este exemplo usa o tipo de máquina n2-highmem-2, que tem mais memória e CPU por worker do que o tipo de máquina padrão.
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_TOPIC_NAME", "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME" }, "environment": { "machineType": "n2-highmem-2" }, "update": true }
- Substitua
- Acesse a interface de monitoramento do Dataflow e verifique se um novo trabalho com o mesmo nome foi criado. Este job tem o status Atualizado.
Usar as bibliotecas de cliente das APIs do Google
Use as bibliotecas de cliente das APIs do Google para fazer chamadas para as APIs REST do Dataflow com facilidade. Neste script de amostra, foi usada a biblioteca de cliente das APIs do Google para Python (links em inglês).
Neste exemplo, você definirá as seguintes variáveis:
project
: defina como o ID do projeto.job
: defina um nome de job exclusivo de sua escolha.template
: defina como o local do Cloud Storage do arquivo de modelo.parameters
: defina como um dicionário com os parâmetros do modelo.
Para definir a região, inclua o parâmetro location
.
Para mais informações sobre as opções disponíveis, consulte o
método projects.locations.templates.launch
na referência da API REST do Cloud Dataflow.
Usar a gcloud CLI
A CLI da gcloud pode executar um modelo personalizado ou um fornecido pelo Google usando o comando gcloud dataflow jobs run
. Veja exemplos de execução de modelos fornecidos pelo Google documentados nesta página.
Para os exemplos de modelos personalizados a seguir, configure os seguintes valores:
- Substitua
JOB_NAME
por um nome de job de sua escolha. - Substitua
YOUR_BUCKET_NAME
pelo nome do bucket no Cloud Storage. - Defina
--gcs-location
como o local do Cloud Storage do arquivo de modelo. - Defina
--parameters
como a lista de parâmetros separada por vírgulas para transmitir o job. Espaços entre vírgulas e valores não são permitidos. - Para impedir que as VMs aceitem chaves SSH armazenadas nos metadados do projeto, use a sinalização
additional-experiments
com a opção de serviçoblock_project_ssh_keys
:--additional-experiments=block_project_ssh_keys
.
Criar um job em lote de modelo personalizado
Este exemplo cria um job em lote a partir de um modelo que lê um arquivo de texto e grava um arquivo de texto de saída.
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \ --parameters inputFile=gs://YOUR_BUCKET_NAME/input/my_input.txt,output=gs://YOUR_BUCKET_NAME/output/my_output
A solicitação retorna uma resposta com o seguinte formato:
id: 2016-10-11_17_10_59-1234530157620696789 projectId: YOUR_PROJECT_ID type: JOB_TYPE_BATCH
Criar um job de streaming de modelo personalizado
Este exemplo cria um job de streaming a partir de um modelo que lê a partir de um tópico do Pub/Sub e grava em uma tabela do BigQuery. A tabela do BigQuery já deve existir com o esquema apropriado.
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \ --parameters topic=projects/project-identifier/topics/resource-name,table=my_project:my_dataset.my_table_name
A solicitação retorna uma resposta com o seguinte formato:
id: 2016-10-11_17_10_59-1234530157620696789 projectId: YOUR_PROJECT_ID type: JOB_TYPE_STREAMING
Para uma lista completa de sinalizações do comando gcloud dataflow jobs run
, consulte a referência da CLI da gcloud.
Monitoramento e solução de problemas
A Interface de monitoramento do Dataflow permite que você monitore seus jobs do Dataflow. Caso um job falhe, confira as dicas para solução de problemas, estratégias de depuração e um catálogo de erros comuns no guia Como solucionar problemas do pipeline.