Executar modelos clássicos

Depois de criar e preparar o modelo do Dataflow, execute o modelo com a Google Cloud consola, a API REST ou a Google Cloud CLI. Pode implementar tarefas de modelos do Dataflow a partir de muitos ambientes, incluindo o ambiente padrão do App Engine, as funções do Cloud Run e outros ambientes restritos.

Use a Google Cloud consola

Pode usar a Google Cloud consola para executar modelos do Dataflow fornecidos pela Google e personalizados.

Modelos fornecidos pela Google

Para executar um modelo fornecido pela Google:

  1. Aceda à página Dataflow na Google Cloud consola.
  2. Aceda à página Fluxo de dados
  3. Clique em CRIAR TAREFA A PARTIR DE MODELO.
  4. Google Cloud console Create Job From Template Button
  5. Selecione o modelo fornecido pela Google que quer executar no menu pendente Modelo do Dataflow.
  6. Formulário de execução do modelo de contagem de palavras
  7. Introduza um nome de tarefa no campo Nome da tarefa.
  8. Introduza os valores dos parâmetros nos campos de parâmetros fornecidos. Não precisa da secção Parâmetros adicionais quando usa um modelo fornecido pela Google.
  9. Clique em Executar tarefa.

Modelos personalizados

Para executar um modelo personalizado:

  1. Aceda à página Dataflow na Google Cloud consola.
  2. Aceda à página Fluxo de dados
  3. Clique em CRIAR TAREFA A PARTIR DE MODELO.
  4. Google Cloud console Create Job From Template Button
  5. Selecione Modelo personalizado no menu pendente Modelo de fluxo de dados.
  6. Formulário de execução de modelo personalizado
  7. Introduza um nome de tarefa no campo Nome da tarefa.
  8. Introduza o caminho do Cloud Storage para o ficheiro de modelo no campo de caminho do Cloud Storage do modelo.
  9. Se o seu modelo precisar de parâmetros, clique em ADICIONAR PARÂMETRO na secção Parâmetros adicionais. Introduza o Nome e o Valor do parâmetro. Repita este passo para cada parâmetro necessário.
  10. Clique em Executar tarefa.

Use a API REST

Para executar um modelo com um pedido da API REST, envie um pedido HTTP POST com o ID do seu projeto. Este pedido requer autorização.

Consulte a referência da API REST para projects.locations.templates.launch para saber mais acerca dos parâmetros disponíveis.

Crie uma tarefa em lote de modelos personalizados

Este pedido projects.locations.templates.launch cria uma tarefa em lote a partir de um modelo que lê um ficheiro de texto e escreve um ficheiro de texto de saída. Se o pedido for bem-sucedido, o corpo da resposta contém uma instância de LaunchTemplateResponse.

Modifique os seguintes valores:

  • Substitua YOUR_PROJECT_ID pelo ID do seu projeto.
  • Substitua LOCATION pela região do fluxo de dados à sua escolha.
  • Substitua JOB_NAME por um nome de trabalho à sua escolha.
  • Substitua YOUR_BUCKET_NAME pelo nome do seu contentor do Cloud Storage.
  • Defina gcsPath para a localização do Cloud Storage do ficheiro do modelo.
  • Defina parameters para a sua lista de pares de chave-valor.
  • Defina tempLocation para uma localização onde tenha autorização de escrita. Este valor é obrigatório para executar modelos fornecidos pela Google.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputFile" : "gs://YOUR_BUCKET_NAME/input/my_input.txt",
            "output": "gs://YOUR_BUCKET_NAME/output/my_output"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

Crie uma tarefa de streaming de modelo personalizado

Este exemplo projects.locations.templates.launch cria uma tarefa de streaming a partir de um modelo clássico que lê a partir de uma subscrição do Pub/Sub e escreve numa tabela do BigQuery. Se quiser iniciar um modelo flexível, use projects.locations.flexTemplates.launch em alternativa. O modelo de exemplo é um modelo fornecido pela Google. Pode modificar o caminho no modelo para apontar para um modelo personalizado. É usada a mesma lógica para iniciar modelos personalizados e fornecidos pela Google. Neste exemplo, a tabela do BigQuery já tem de existir com o esquema adequado. Se for bem-sucedido, o corpo da resposta contém uma instância de LaunchTemplateResponse.

Modifique os seguintes valores:

  • Substitua YOUR_PROJECT_ID pelo ID do seu projeto.
  • Substitua LOCATION pela região do fluxo de dados à sua escolha.
  • Substitua JOB_NAME por um nome de trabalho à sua escolha.
  • Substitua YOUR_BUCKET_NAME pelo nome do seu contentor do Cloud Storage.
  • Substitua GCS_PATH pela localização de armazenamento na nuvem do ficheiro de modelo. A localização deve começar com gs://
  • Defina parameters para a sua lista de pares de chave-valor. Os parâmetros listados são específicos deste exemplo de modelo. Se estiver a usar um modelo personalizado, modifique os parâmetros conforme necessário. Se estiver a usar o modelo de exemplo, substitua as seguintes variáveis.
    • Substitua YOUR_SUBSCRIPTION_NAME pelo nome da sua subscrição do Pub/Sub.
    • Substitua YOUR_DATASET pelo seu conjunto de dados do BigQuery e substitua YOUR_TABLE_NAME pelo nome da sua tabela do BigQuery.
  • Defina tempLocation para uma localização onde tenha autorização de escrita. Este valor é obrigatório para executar modelos fornecidos pela Google.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME",
            "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

Atualize uma tarefa de streaming de modelo personalizado

Este exemplo de pedido projects.locations.templates.launch mostra como atualizar uma tarefa de streaming de modelos. Se quiser atualizar um modelo flexível, use projects.locations.flexTemplates.launch em alternativa.

  1. Run Exemplo 2: criar uma tarefa de streaming de modelo personalizado para iniciar uma tarefa de modelo de streaming.
  2. Envie o seguinte pedido HTTP POST com os seguintes valores modificados:
    • Substitua YOUR_PROJECT_ID pelo ID do seu projeto.
    • Substitua LOCATION pela região do Dataflow da tarefa que está a atualizar.
    • Substitua JOB_NAME pelo nome exato da tarefa que quer atualizar.
    • Substitua GCS_PATH pela localização de armazenamento na nuvem do ficheiro de modelo. A localização deve começar com gs://
    • Defina parameters para a sua lista de pares de chave-valor. Os parâmetros listados são específicos deste exemplo de modelo. Se estiver a usar um modelo personalizado, modifique os parâmetros conforme necessário. Se estiver a usar o modelo de exemplo, substitua as seguintes variáveis.
      • Substitua YOUR_SUBSCRIPTION_NAME pelo nome da sua subscrição do Pub/Sub.
      • Substitua YOUR_DATASET pelo seu conjunto de dados do BigQuery e substitua YOUR_TABLE_NAME pelo nome da sua tabela do BigQuery.
    • Use o parâmetro environment para alterar as definições do ambiente, como o tipo de máquina. Este exemplo usa o tipo de máquina n2-highmem-2, que tem mais memória e CPU por trabalhador do que o tipo de máquina predefinido.
        POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH
        {
            "jobName": "JOB_NAME",
            "parameters": {
                "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_TOPIC_NAME",
                "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
            },
            "environment": {
                "machineType": "n2-highmem-2"
            },
            "update": true
        }
    
  3. Aceda à interface de monitorização do Dataflow e verifique se foi criada uma nova tarefa com o mesmo nome. Este trabalho tem o estado Atualizado.

Use as bibliotecas cliente de APIs Google

Considere usar as bibliotecas cliente de APIs Google para fazer chamadas facilmente para as APIs REST do Dataflow. Este script de exemplo usa a biblioteca cliente de APIs Google para Python.

Neste exemplo, tem de definir as seguintes variáveis:

  • project: definido como o ID do seu projeto.
  • job: definido para um nome de tarefa exclusivo à sua escolha.
  • template: definido para a localização do ficheiro de modelo no Cloud Storage.
  • parameters: definido como um dicionário com os parâmetros do modelo.

Para definir a região, inclua o parâmetro location.

from googleapiclient.discovery import build

# project = 'your-gcp-project'
# job = 'unique-job-name'
# template = 'gs://dataflow-templates/latest/Word_Count'
# parameters = {
#     'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
#     'output': 'gs://<your-gcs-bucket>/wordcount/outputs',
# }

dataflow = build("dataflow", "v1b3")
request = (
    dataflow.projects()
    .templates()
    .launch(
        projectId=project,
        gcsPath=template,
        body={
            "jobName": job,
            "parameters": parameters,
        },
    )
)

response = request.execute()

Para mais informações sobre as opções disponíveis, consulte o método projects.locations.templates.launch na referência da API REST Dataflow.

Use a CLI gcloud

A CLI gcloud pode executar um modelo personalizado ou um modelo fornecido pela Google através do comando gcloud dataflow jobs run. Os exemplos de execução de modelos fornecidos pela Google estão documentados na página de modelos fornecidos pela Google.

Para os seguintes exemplos de modelos personalizados, defina os seguintes valores:

  • Substitua JOB_NAME por um nome de trabalho à sua escolha.
  • Substitua YOUR_BUCKET_NAME pelo nome do seu contentor do Cloud Storage.
  • Defina --gcs-location para a localização do Cloud Storage do ficheiro de modelo.
  • Defina --parameters para a lista de parâmetros separados por vírgulas a transmitir à tarefa. Não são permitidos espaços entre vírgulas e valores.
  • Para impedir que as VMs aceitem chaves SSH armazenadas nos metadados do projeto, use a flag additional-experiments com a opção de serviço block_project_ssh_keys: --additional-experiments=block_project_ssh_keys.

Crie uma tarefa em lote de modelos personalizados

Este exemplo cria uma tarefa em lote a partir de um modelo que lê um ficheiro de texto e escreve um ficheiro de texto de saída.

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters inputFile=gs://YOUR_BUCKET_NAME/input/my_input.txt,output=gs://YOUR_BUCKET_NAME/output/my_output

O pedido devolve uma resposta com o seguinte formato.

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_BATCH

Crie uma tarefa de streaming de modelo personalizado

Este exemplo cria uma tarefa de streaming a partir de um modelo que lê a partir de um tópico do Pub/Sub e escreve numa tabela do BigQuery. A tabela do BigQuery já tem de existir com o esquema adequado.

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters topic=projects/project-identifier/topics/resource-name,table=my_project:my_dataset.my_table_name

O pedido devolve uma resposta com o seguinte formato.

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_STREAMING

Para ver uma lista completa de flags para o comando gcloud dataflow jobs run, consulte a referência da CLI gcloud.

Monitorização e resolução de problemas

A interface de monitorização do Dataflow permite-lhe monitorizar as suas tarefas do Dataflow. Se uma tarefa falhar, pode encontrar sugestões de resolução de problemas, estratégias de depuração e um catálogo de erros comuns no guia Resolução de problemas do pipeline.