Modelo de assinatura do Pub/Sub para BigQuery

O modelo de assinatura do Pub/Sub para BigQuery é um pipeline de streaming que lê mensagens formatadas em JSON de uma assinatura de Pub/Sub e as grava em uma tabela do BigQuery. É possível usar o modelo como uma solução rápida para mover dados do Pub/Sub para BigQuery. O modelo lê mensagens em formato JSON do Pub/Sub e as converte em elementos do BigQuery.

Requisitos de pipeline

  • O campo data das mensagens do Pub/Sub precisa usar o formato JSON, conforme descrito neste guia JSON. Por exemplo, mensagens com valores no campo data formatados como {"k1":"v1", "k2":"v2"} podem ser inseridos em uma tabela do BigQuery com duas colunas, k1 e k2, com um tipo de dados de string.
  • O diretório de saída precisa ser criado antes de executar o pipeline. O esquema da tabela precisa corresponder aos objetos JSON de entrada.

Parâmetros do modelo

Parâmetros obrigatórios

  • outputTableSpec: o local da tabela de saída do BigQuery, no formato <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • inputSubscription: a assinatura de entrada do Pub/Sub que será lida, no formato projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION>.

Parâmetros opcionais

  • outputDeadletterTable: a tabela do BigQuery a ser usada para mensagens que não alcançaram a tabela de saída, no formato de <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>. Se a tabela não existir, ela será criada durante a execução do pipeline. Se não for especificado, OUTPUT_TABLE_SPEC_error_records será usado.
  • javascriptTextTransformGcsPath: o URI do Cloud Storage do arquivo .js que define a função JavaScript definida pelo usuário (UDF) a ser usada. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: o nome da função definida pelo usuário (UDF) do JavaScript a ser usada. Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para ver exemplos de UDFs em JavaScript, consulte os exemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: defina o intervalo que os workers podem verificar se há alterações na UDF em JavaScript para recarregar os arquivos. Padrão: 0.

Função definida pelo usuário

Também é possível estender esse modelo escrevendo uma função definida pelo usuário (UDF). O modelo chama a UDF para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON. Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.

Especificação da função

A UDF tem a seguinte especificação:

  • Entrada: o campo de dados da mensagem do Pub/Sub, serializado como uma string JSON.
  • Saída: uma string JSON que corresponde ao esquema da tabela de destino do BigQuery.
  • Executar o modelo

    Console

    1. Acesse a página Criar job usando um modelo do Dataflow.
    2. Acesse Criar job usando um modelo
    3. No campo Nome do job, insira um nome exclusivo.
    4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

      Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

    5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub Subscription to BigQuery template.
    6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
    7. Opcional: para alternar do processamento "Exatamente uma vez" para o modo de streaming "Pelo menos uma vez", selecione Pelo menos uma vez.
    8. Cliquem em Executar job.

    gcloud

    No shell ou no terminal, execute o modelo:

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/PubSub_Subscription_to_BigQuery \
        --region REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
    outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

    Substitua:

    • JOB_NAME: um nome de job de sua escolha
    • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que você quer usar

      Use estes valores:

    • STAGING_LOCATION: o local para fase de testes de arquivos locais (por exemplo, gs://your-bucket/staging)
    • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
    • DATASET: o conjunto de dados do BigQuery
    • TABLE_NAME: o nome da tabela do BigQuery

    API

    Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/PubSub_Subscription_to_BigQuery
    {
       "jobName": "JOB_NAME",
       "parameters": {
           "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
       },
       "environment": {
           "ipConfiguration": "WORKER_IP_UNSPECIFIED",
           "additionalExperiments": []
       },
    }

    Substitua:

    • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
    • JOB_NAME: um nome de job de sua escolha
    • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que você quer usar

      Use estes valores:

    • STAGING_LOCATION: o local para fase de testes de arquivos locais (por exemplo, gs://your-bucket/staging)
    • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
    • DATASET: o conjunto de dados do BigQuery
    • TABLE_NAME: o nome da tabela do BigQuery

    A seguir