Como executar modelos

Depois de criar e organizar seu modelo do Dataflow, execute o modelo com o Console do Google Cloud, a REST API ou ferramenta de linha de comando gcloud. Você pode implantar jobs de modelo do Dataflow em muitos ambientes, incluindo o ambiente padrão do App Engine, Cloud Functions e outros ambientes restritos.

Observação: além do arquivo de modelo, a execução do pipeline também depende de arquivos organizados e referenciados no momento da criação do modelo. Caso os arquivos preparados sejam movidos ou removidos, o job do pipeline falhará.

No Console do Cloud

Você pode usar o Console do Cloud para executar modelos do Dataflow personalizados e fornecidos pelo Google.

Modelos fornecidos pelo Google

Para executar um modelo fornecido pelo Google:

  1. Acesse a página do Dataflow no Console do Cloud.
  2. Acessar a página do Dataflow
  3. Clique em CRIAR JOB A PARTIR DO MODELO.
  4. Criar job a partir de um botão de modelo no Console do Cloud Platform
  5. Selecione o modelo fornecido pelo Google que você quer executar no menu suspenso Modelo do Dataflow.
  6. Formulário para execução de modelos do WordCount
  7. Digite o nome de um job no campo Nome do job. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  8. Digite os valores de parâmetro nos campos fornecidos. Você não precisa da seção Parâmetros adicionais quando usa um modelo fornecido pelo Google.
  9. Clique em Run Job.

Modelos personalizados

Para executar um modelo personalizado:

  1. Acesse a página do Dataflow no Console do Cloud.
  2. Acessar a página do Dataflow
  3. Clique em CRIAR JOB A PARTIR DO MODELO.
  4. Criar job a partir de um botão de modelo no Console do Cloud Platform
  5. Selecione Modelo personalizado no menu suspenso Modelo do Dataflow.
  6. Formulário para execução de modelos personalizados
  7. Digite o nome de um job no campo Nome do job. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  8. Digite o caminho do Cloud Storage para o arquivo de modelo no campo do caminho do modelo no Cloud Storage.
  9. Caso precise de parâmetros para o modelo, clique em Adicionar item na seção Parâmetros adicionais. Digite o Nome e o Valor do parâmetro. Repita essa etapa para cada parâmetro necessário.
  10. Clique em Run Job.

Uso da API REST

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Consulte a referência da API REST para projects.templates.launch e saiba mais sobre os parâmetros disponíveis.

Exemplo 1: como criar um job em lote de modelo personalizado

Neste exemplo, a solicitação projects.templates.launch cria um job em lote a partir de um modelo que lê um arquivo de texto e grava um arquivo de texto de saída. Caso a solicitação tenha êxito, o corpo de resposta conterá uma instância de LaunchTemplateResponse.

É preciso modificar os seguintes valores:

  • Substitua YOUR_PROJECT_ID pelo ID do projeto.
  • Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Substitua YOUR_BUCKET_NAME pelo nome do intervalo no Cloud Storage.
  • Defina gcsPath como o local do Cloud Storage do arquivo de modelo.
  • Defina parameters como sua lista de pares de chave/valor.
  • Defina tempLocation como um local em que você tenha permissão de gravação. Esse valor é obrigatório para executar modelos fornecidos pelo Google.
        POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
        {
            "jobName": "JOB_NAME",
            "parameters": {
                "inputFile" : "gs://YOUR_BUCKET_NAME/input/my_input.txt",
                "outputFile": "gs://YOUR_BUCKET_NAME/output/my_output"
            },
            "environment": {
                "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
                "zone": "us-central1-f"
            }
        }
    

Exemplo 2: como criar um job de streaming de um modelo personalizado

Neste exemplo, a solicitação projects.templates.launch cria um job de streaming com base em um modelo que lê partir de um tópico do Pub/Sub e grava em uma tabela do BigQuery já existente e com o esquema apropriado. Caso tenha êxito, o corpo de resposta conterá uma instância de LaunchTemplateResponse.

É preciso modificar os seguintes valores:

  • Substitua YOUR_PROJECT_ID pelo ID do projeto.
  • Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Substitua YOUR_BUCKET_NAME pelo nome do intervalo do Cloud Storage.
  • Substitua YOUR_TOPIC_NAME pelo nome do tópico do Pub/Sub.
  • Substitua YOUR_DATASET pelo conjunto de dados do BigQuery e YOUR_TABLE_NAME pelo nome da tabela do BigQuery.
  • Defina gcsPath como o local do Cloud Storage do arquivo de modelo.
  • Defina parameters como sua lista de pares de chave/valor.
  • Defina tempLocation como um local em que você tenha permissão de gravação. Esse valor é obrigatório para executar modelos fornecidos pelo Google.
        POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
        {
            "jobName": "JOB_NAME",
            "parameters": {
                "topic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
                "table": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
            },
            "environment": {
                "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
                "zone": "us-central1-f"
            }
        }
    

Exemplo 3: como atualizar um job de streaming de um modelo personalizado

Este exemplo de solicitação project.templates.launch mostra como atualizar um job de streaming de modelo.

  1. Execute o Exemplo 2: como criar um job de streaming de modelo personalizado para iniciar um job de modelo de streaming.
  2. Envie a seguinte solicitação POST HTTP com os seguintes valores modificados:
    • Substitua YOUR_PROJECT_ID pelo ID do projeto.
    • Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
    • Substitua YOUR_BUCKET_NAME pelo nome do intervalo do Cloud Storage.
    • Substitua YOUR_TOPIC_NAME pelo nome do tópico do Pub/Sub.
    • Substitua YOUR_DATASET pelo conjunto de dados do BigQuery e YOUR_TABLE_NAME pelo nome da tabela do BigQuery.
    • Defina gcsPath como o local do Cloud Storage do arquivo de modelo.
    • Defina parameters como sua lista de pares de chave/valor.
    • Defina tempLocation como um local em que você tenha permissão de gravação. Esse valor é obrigatório para executar modelos fornecidos pelo Google.
            POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
            {
                "jobName": "JOB_NAME",
                "parameters": {
                    "topic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
                    "table": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
                },
                "environment": {
                    "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
                    "zone": "us-central1-f"
                }
                "update": true
            }
        
  3. Acesse a interface de monitoramento do Dataflow e verifique se um novo trabalho com o mesmo nome foi criado. Este job tem o status Atualizado.

Uso de bibliotecas de cliente da API do Google

Use as bibliotecas de cliente das APIs do Google para fazer chamadas para as APIs REST do Dataflow com facilidade. Neste script de exemplo, foi usada a biblioteca de cliente das APIs do Google para Python (links em inglês).

Neste exemplo, você definirá as seguintes variáveis:

  • project: defina como o ID do projeto.
  • job: defina um nome de job exclusivo de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • template: defina como o local do Cloud Storage do arquivo de modelo.
  • parameters: defina como um dicionário com os parâmetros do modelo.
from googleapiclient.discovery import build

    # project = 'your-gcp-project'
    # job = 'unique-job-name'
    # template = 'gs://dataflow-templates/latest/Word_Count'
    # parameters = {
    #     'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
    #     'output': 'gs://<your-gcs-bucket>/wordcount/outputs',
    # }

    dataflow = build('dataflow', 'v1b3')
    request = dataflow.projects().templates().launch(
        projectId=project,
        gcsPath=template,
        body={
            'jobName': job,
            'parameters': parameters,
        }
    )

    response = request.execute()

Para mais informações sobre as opções disponíveis, consulte Método projects.templates.launch na referência da API REST do Cloud Dataflow.

Uso da gcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, é necessário ter a versão 138.0.0 ou mais recente do SDK do Cloud.

A ferramenta de linha de comando gcloud pode executar um modelo personalizado ou um modelo fornecido pelo Google usando o comando gcloud dataflow jobs run. Veja exemplos de modelos fornecidos pelo Google em execução documentados nesta página.

Para os exemplos de modelos personalizados a seguir, configure os seguintes valores:

  • Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Substitua YOUR_BUCKET_NAME pelo nome do intervalo do Cloud Storage.
  • Você precisa incluir a sinalização --gcs-location. Defina --gcs-location como o local do Cloud Storage do arquivo de modelo.
  • Defina --parameters como a lista de parâmetros separada por vírgulas para transmitir o job. Espaços entre vírgulas e valores não são permitidos.

Exemplo 1: modelo personalizado, job em lote

Este exemplo cria um job em lote a partir de um modelo que lê um arquivo de texto e grava um arquivo de texto de saída.

        gcloud dataflow jobs run JOB_NAME \
            --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
            --parameters inputFile=gs://YOUR_BUCKET_NAME/input/my_input.txt,outputFile=gs://YOUR_BUCKET_NAME/output/my_output
    

A solicitação retorna uma resposta com o seguinte formato:

        id: 2016-10-11_17_10_59-1234530157620696789
        projectId: YOUR_PROJECT_ID
        type: JOB_TYPE_BATCH
    

Exemplo 2: modelo personalizado, job de streaming

Este exemplo cria um job de streaming a partir de um modelo que lê a partir de um tópico do Pub/Sub e grava em uma tabela do BigQuery já existente e com o esquema apropriado.

        gcloud dataflow jobs run JOB_NAME \
            --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
            --parameters topic=projects/project-identifier/topics/resource-name,table=my_project:my_dataset.my_table_name
    

A solicitação retorna uma resposta com o seguinte formato:

        id: 2016-10-11_17_10_59-1234530157620696789
        projectId: YOUR_PROJECT_ID
        type: JOB_TYPE_STREAMING
    

Para uma lista completa de sinalizações do comando gcloud dataflow jobs run, consulte a referência da ferramenta gcloud.

Monitoramento e solução de problemas

A Interface de monitoramento do Dataflow permite que você monitore seus jobs do Dataflow. Caso um job falhe, veja dicas para solução de problemas, estratégias de depuração e um catálogo de erros comuns no guia Como solucionar problemas do pipeline.

Como atualizar o pipeline

A atualização de um pipeline existente que usa um modelo do Dataflow não é compatível no momento.