Como executar modelos clássicos

Depois de criar e organizar seu modelo do Dataflow, execute-o com o console do Google Cloud, a API REST ou a Google Cloud CLI. É possível implantar jobs de modelo do Dataflow em muitos ambientes, inclusive no ambiente padrão do App Engine, nas funções do Cloud Run e em outros ambientes restritos.

Usar o Console do Google Cloud

É possível usar o Console do Google Cloud para executar modelos do Dataflow personalizados e fornecidos pelo Google.

Modelos fornecidos pelo Google

Para executar um modelo fornecido pelo Google:

  1. Acesse a página do Dataflow no Console do Google Cloud.
  2. Acessar a página do Dataflow
  3. Clique em CRIAR JOB USANDO UM MODELO.
  4. Criar job usando um botão de modelo no Console do Cloud Platform
  5. Selecione o modelo fornecido pelo Google que você quer executar no menu suspenso Modelo do Dataflow.
  6. Formulário para execução de modelos do WordCount
  7. Digite o nome de um job no campo Nome do job.
  8. Digite os valores de parâmetro nos campos fornecidos. Você não precisa da seção Parâmetros adicionais ao usar um modelo fornecido pelo Google.
  9. Clique em Executar job.

Modelos personalizados

Para executar um modelo personalizado:

  1. Acesse a página do Dataflow no Console do Google Cloud.
  2. Acessar a página do Dataflow
  3. Clique em CRIAR JOB A PARTIR DO MODELO.
  4. Criar job usando um botão de modelo no Console do Cloud Platform
  5. Selecione Modelo personalizado no menu suspenso Modelo do Dataflow.
  6. Formulário para execução de modelos personalizados
  7. Digite o nome de um job no campo Nome do job.
  8. Digite o caminho do Cloud Storage para o arquivo modelo no campo do caminho do modelo no Cloud Storage.
  9. Caso o modelo precise de parâmetros, clique em ADICIONAR PARÂMETRO na seção Parâmetros adicionais. Digite o Nome e o Valor do parâmetro. Repita essa etapa para cada parâmetro necessário.
  10. Clique em Executar job.

Usar a API REST

Para executar um modelo com uma solicitação da API REST, envie uma solicitação POST HTTP com o ID do projeto. Isso requer uma autorização.

Consulte a referência da API REST para projects.locations.templates.launch e saiba mais sobre os parâmetros disponíveis.

Criar um job em lote de modelo personalizado

Neste exemplo, a solicitação projects.locations.templates.launch cria um job em lote a partir de um modelo que lê um arquivo de texto e grava um arquivo de texto de saída. Caso a solicitação tenha êxito, o corpo de resposta conterá uma instância de LaunchTemplateResponse.

Modifique os seguintes valores:

  • Substitua YOUR_PROJECT_ID pela ID do seu projeto.
  • Substitua LOCATION pela região do Dataflow de sua escolha.
  • Substitua JOB_NAME por um nome de job de sua escolha.
  • Substitua YOUR_BUCKET_NAME pelo nome do bucket no Cloud Storage.
  • Defina gcsPath como o local do Cloud Storage do arquivo de modelo.
  • Defina parameters como sua lista de pares de chave/valor.
  • Defina tempLocation como um local em que você tenha permissão de gravação. Esse valor é obrigatório para executar modelos fornecidos pelo Google.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputFile" : "gs://YOUR_BUCKET_NAME/input/my_input.txt",
            "output": "gs://YOUR_BUCKET_NAME/output/my_output"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

Criar um job de streaming de modelo personalizado

Neste exemplo, a solicitação projects.locations.templates.launch cria um job de streaming com base em um modelo clássico que lê de uma assinatura do Pub/Sub e grava em uma tabela do BigQuery. Se você quiser iniciar um modelo flexível, use projects.locations.flexTemplates.launch. O modelo de exemplo é fornecido pelo Google. É possível modificar o caminho do modelo para que ele aponte para um modelo personalizado. A mesma lógica é usada para iniciar modelos personalizados e fornecidos pelo Google. Neste exemplo, a tabela do BigQuery já precisa existir com o esquema apropriado. Caso tenha êxito, o corpo de resposta conterá uma instância de LaunchTemplateResponse.

Modifique os seguintes valores:

  • Substitua YOUR_PROJECT_ID pela ID do seu projeto.
  • Substitua LOCATION pela região do Dataflow de sua escolha.
  • Substitua JOB_NAME por um nome de job de sua escolha.
  • Substitua YOUR_BUCKET_NAME pelo nome do bucket no Cloud Storage.
  • Substitua GCS_PATH pelo local do Cloud Storage do arquivo de modelo. O local deve começar com gs://
  • Defina parameters como sua lista de pares de chave/valor. Os parâmetros listados são específicos a esse exemplo de modelo. Se você estiver usando um modelo personalizado, modifique os parâmetros conforme necessário. Se você estiver usando o modelo de exemplo, substitua as variáveis a seguir.
    • Substitua YOUR_SUBSCRIPTION_NAME pelo nome da sua assinatura do Pub/Sub.
    • Substitua YOUR_DATASET pelo conjunto de dados do BigQuery e YOUR_TABLE_NAME pelo nome da tabela do BigQuery.
  • Defina tempLocation como um local em que você tenha permissão de gravação. Esse valor é obrigatório para executar modelos fornecidos pelo Google.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME",
            "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

Atualizar um job de streaming de modelo personalizado

Este exemplo de solicitação projects.locations.templates.launch mostra como atualizar um job de streaming de modelo. Se você quiser atualizar um modelo flexível, use projects.locations.flexTemplates.launch.

  1. Execute o Exemplo 2: como criar um job de streaming de modelo personalizado para iniciar um job de modelo de streaming.
  2. Envie a seguinte solicitação POST HTTP com os seguintes valores modificados:
    • Substitua YOUR_PROJECT_ID pela ID do seu projeto.
    • Substitua LOCATION pela região do Dataflow do job que você está atualizando.
    • Substitua JOB_NAME pelo nome exato do job que você quer atualizar.
    • Substitua GCS_PATH pelo local do Cloud Storage do arquivo de modelo. O local deve começar com gs://
    • Defina parameters como sua lista de pares de chave/valor. Os parâmetros listados são específicos a esse exemplo de modelo. Se você estiver usando um modelo personalizado, modifique os parâmetros conforme necessário. Se você estiver usando o modelo de exemplo, substitua as variáveis a seguir.
      • Substitua YOUR_SUBSCRIPTION_NAME pelo nome da sua assinatura do Pub/Sub.
      • Substitua YOUR_DATASET pelo conjunto de dados do BigQuery e YOUR_TABLE_NAME pelo nome da tabela do BigQuery.
    • Use o parâmetro environment para alterar as configurações do ambiente, como o tipo de máquina. Este exemplo usa o tipo de máquina n2-highmem-2, que tem mais memória e CPU por worker do que o tipo de máquina padrão.
        POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH
        {
            "jobName": "JOB_NAME",
            "parameters": {
                "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_TOPIC_NAME",
                "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
            },
            "environment": {
                "machineType": "n2-highmem-2"
            },
            "update": true
        }
    
  3. Acesse a interface de monitoramento do Dataflow e verifique se um novo trabalho com o mesmo nome foi criado. Este job tem o status Atualizado.

Usar as bibliotecas de cliente das APIs do Google

Use as bibliotecas de cliente das APIs do Google para fazer chamadas para as APIs REST do Dataflow com facilidade. Neste script de amostra, foi usada a biblioteca de cliente das APIs do Google para Python (links em inglês).

Neste exemplo, você definirá as seguintes variáveis:

  • project: defina como o ID do projeto.
  • job: defina um nome de job exclusivo de sua escolha.
  • template: defina como o local do Cloud Storage do arquivo de modelo.
  • parameters: defina como um dicionário com os parâmetros do modelo.

Para definir a região, inclua o parâmetro location.

from googleapiclient.discovery import build

# project = 'your-gcp-project'
# job = 'unique-job-name'
# template = 'gs://dataflow-templates/latest/Word_Count'
# parameters = {
#     'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
#     'output': 'gs://<your-gcs-bucket>/wordcount/outputs',
# }

dataflow = build("dataflow", "v1b3")
request = (
    dataflow.projects()
    .templates()
    .launch(
        projectId=project,
        gcsPath=template,
        body={
            "jobName": job,
            "parameters": parameters,
        },
    )
)

response = request.execute()

Para mais informações sobre as opções disponíveis, consulte o método projects.locations.templates.launch na referência da API REST do Cloud Dataflow.

Usar a gcloud CLI

A CLI da gcloud pode executar um modelo personalizado ou um fornecido pelo Google usando o comando gcloud dataflow jobs run. Veja exemplos de execução de modelos fornecidos pelo Google documentados nesta página.

Para os exemplos de modelos personalizados a seguir, configure os seguintes valores:

  • Substitua JOB_NAME por um nome de job de sua escolha.
  • Substitua YOUR_BUCKET_NAME pelo nome do bucket no Cloud Storage.
  • Defina --gcs-location como o local do Cloud Storage do arquivo de modelo.
  • Defina --parameters como a lista de parâmetros separada por vírgulas para transmitir o job. Espaços entre vírgulas e valores não são permitidos.
  • Para impedir que as VMs aceitem chaves SSH armazenadas nos metadados do projeto, use a sinalização additional-experiments com a opção de serviço block_project_ssh_keys: --additional-experiments=block_project_ssh_keys.

Criar um job em lote de modelo personalizado

Este exemplo cria um job em lote a partir de um modelo que lê um arquivo de texto e grava um arquivo de texto de saída.

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters inputFile=gs://YOUR_BUCKET_NAME/input/my_input.txt,output=gs://YOUR_BUCKET_NAME/output/my_output

A solicitação retorna uma resposta com o seguinte formato:

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_BATCH

Criar um job de streaming de modelo personalizado

Este exemplo cria um job de streaming a partir de um modelo que lê a partir de um tópico do Pub/Sub e grava em uma tabela do BigQuery. A tabela do BigQuery já deve existir com o esquema apropriado.

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters topic=projects/project-identifier/topics/resource-name,table=my_project:my_dataset.my_table_name

A solicitação retorna uma resposta com o seguinte formato:

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_STREAMING

Para uma lista completa de sinalizações do comando gcloud dataflow jobs run, consulte a referência da CLI da gcloud.

Monitoramento e solução de problemas

A Interface de monitoramento do Dataflow permite que você monitore seus jobs do Dataflow. Caso um job falhe, confira as dicas para solução de problemas, estratégias de depuração e um catálogo de erros comuns no guia Como solucionar problemas do pipeline.