Modelo de subscrição do Pub/Sub para o BigQuery

O modelo de subscrição do Pub/Sub para o BigQuery é um pipeline de streaming que lê mensagens formatadas em JSON de uma subscrição do Pub/Sub e escreve-as numa tabela do BigQuery. Pode usar o modelo como uma solução rápida para mover dados do Pub/Sub para o BigQuery. O modelo lê mensagens formatadas em JSON do Pub/Sub e converte-as em elementos do BigQuery.

Antes de executar um pipeline do Dataflow para este cenário, considere se uma subscrição do Pub/Sub BigQuery com uma UDF cumpre os seus requisitos.

Requisitos do pipeline

  • O campo data das mensagens do Pub/Sub tem de usar o formato JSON, descrito neste guia JSON. Por exemplo, as mensagens com valores no campo data formatados como {"k1":"v1", "k2":"v2"} podem ser inseridas numa tabela do BigQuery com duas colunas, denominadas k1 e k2, com um tipo de dados de string.
  • A tabela de saída tem de existir antes de executar o pipeline. O esquema de tabela tem de corresponder aos objetos JSON de entrada.

Parâmetros de modelos

Parâmetros obrigatórios

  • outputTableSpec: a localização da tabela de saída do BigQuery, no formato <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • inputSubscription: a subscrição de entrada do Pub/Sub a partir da qual ler, no formato projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION>.

Parâmetros opcionais

  • outputDeadletterTable: a tabela do BigQuery a usar para mensagens que não conseguem alcançar a tabela de saída, no formato <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>. Se a tabela não existir, é criada durante a execução do pipeline. Se não for especificado, é usado OUTPUT_TABLE_SPEC_error_records.
  • javascriptTextTransformGcsPath: o URI do Cloud Storage do ficheiro .js que define a função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: o nome da função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo, se o código da função JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função é myTransform. Para ver exemplos de UDFs JavaScript, consulte Exemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: define o intervalo que os trabalhadores podem verificar quanto a alterações nas FDUs JavaScript para recarregar os ficheiros. A predefinição é: 0.

Função definida pelo utilizador

Opcionalmente, pode estender este modelo escrevendo uma função definida pelo utilizador (FDU). O modelo chama a FDU para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON. Para mais informações, consulte o artigo Crie funções definidas pelo utilizador para modelos do Dataflow.

Especificação da função

A FDU tem a seguinte especificação:

  • Entrada: o campo de dados da mensagem do Pub/Sub, serializado como uma string JSON.
  • Saída: uma string JSON que corresponde ao esquema da tabela de destino do BigQuery.
  • Execute o modelo

    Consola

    1. Aceda à página do fluxo de dados Criar tarefa a partir de um modelo.
    2. Aceda a Criar tarefa a partir de modelo
    3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
    4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

      Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.

    5. No menu pendente Modelo do fluxo de dados, selecione the Pub/Sub Subscription to BigQuery template.
    6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
    7. Opcional: para mudar do processamento exatamente uma vez para o modo de streaming pelo menos uma vez, selecione Pelo menos uma vez.
    8. Clique em Executar tarefa.

    gcloud

    Na shell ou no terminal, execute o modelo:

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/PubSub_Subscription_to_BigQuery \
        --region REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
    outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

    Substitua o seguinte:

    • JOB_NAME: um nome de tarefa exclusivo à sua escolha
    • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que quer usar

      Pode usar os seguintes valores:

    • STAGING_LOCATION: a localização para organizar ficheiros locais (por exemplo, gs://your-bucket/staging)
    • SUBSCRIPTION_NAME: o nome da sua subscrição do Pub/Sub
    • DATASET: o seu conjunto de dados do BigQuery
    • TABLE_NAME: o nome da tabela do BigQuery

    API

    Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/PubSub_Subscription_to_BigQuery
    {
       "jobName": "JOB_NAME",
       "parameters": {
           "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
       },
       "environment": {
           "ipConfiguration": "WORKER_IP_UNSPECIFIED",
           "additionalExperiments": []
       },
    }

    Substitua o seguinte:

    • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
    • JOB_NAME: um nome de tarefa exclusivo à sua escolha
    • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que quer usar

      Pode usar os seguintes valores:

    • STAGING_LOCATION: a localização para organizar ficheiros locais (por exemplo, gs://your-bucket/staging)
    • SUBSCRIPTION_NAME: o nome da sua subscrição do Pub/Sub
    • DATASET: o seu conjunto de dados do BigQuery
    • TABLE_NAME: o nome da tabela do BigQuery

    O que se segue?