Modelo do Pub/Sub para o BigQuery

O modelo do Pub/Sub para o BigQuery é uma pipeline de streaming que lê mensagens formatadas em JSON do Pub/Sub e escreve-as numa tabela do BigQuery. Opcionalmente, pode fornecer uma função definida pelo utilizador (FDU) escrita em JavaScript para processar as mensagens recebidas.

Antes de executar um pipeline do Dataflow para este cenário, considere se uma subscrição do Pub/Sub BigQuery com uma UDF cumpre os seus requisitos.

Requisitos do pipeline

  • A tabela do BigQuery tem de existir e ter um esquema.
  • Os dados das mensagens do Pub/Sub têm de usar o formato JSON ou tem de fornecer uma FDU que converta os dados das mensagens em JSON. Os dados JSON têm de corresponder ao esquema da tabela do BigQuery. Por exemplo, se os payloads JSON estiverem formatados como {"k1":"v1", "k2":"v2"}, a tabela do BigQuery tem de ter duas colunas de string denominadas k1 e k2.
  • Especifique o parâmetro inputSubscription ou inputTopic, mas não ambos.

Parâmetros de modelos

Parâmetros obrigatórios

  • outputTableSpec: a tabela do BigQuery para a qual escrever, formatada como PROJECT_ID:DATASET_NAME.TABLE_NAME.

Parâmetros opcionais

  • inputTopic: o tópico do Pub/Sub a partir do qual ler, formatado como projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • inputSubscription: a subscrição do Pub/Sub a partir da qual ler, formatada como projects/<PROJECT_ID>/subscriptions/<SUBCRIPTION_NAME>.
  • outputDeadletterTable: a tabela do BigQuery a usar para mensagens que não conseguiram alcançar a tabela de saída, formatada como PROJECT_ID:DATASET_NAME.TABLE_NAME. Se a tabela não existir, é criada quando o pipeline é executado. Se este parâmetro não for especificado, é usado o valor OUTPUT_TABLE_SPEC_error_records.
  • useStorageWriteApiAtLeastOnce: quando usa a API Storage Write, especifica a semântica de escrita. Para usar a semântica, pelo menos, uma vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), defina este parâmetro como verdadeiro. Para usar a semântica exatamente uma vez, defina o parâmetro como false. Este parâmetro só se aplica quando useStorageWriteApi é true. O valor predefinido é false.
  • useStorageWriteApi: se for verdadeiro, o pipeline usa a API Storage Write do BigQuery (https://cloud.google.com/bigquery/docs/write-api). O valor predefinido é false. Para mais informações, consulte a secção Usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: quando usa a API Storage Write, especifica o número de streams de escrita. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, tem de definir este parâmetro. A predefinição é: 0.
  • storageWriteApiTriggeringFrequencySec: quando usa a API Storage Write, especifica a frequência de acionamento, em segundos. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, tem de definir este parâmetro.
  • javascriptTextTransformGcsPath: o URI do Cloud Storage do ficheiro .js que define a função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: o nome da função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo, se o código da função JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função é myTransform. Para ver exemplos de UDFs JavaScript, consulte Exemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: especifica a frequência com que o UDF é recarregado, em minutos. Se o valor for superior a 0, o Dataflow verifica periodicamente o ficheiro UDF no Cloud Storage e recarrega a UDF se o ficheiro for modificado. Este parâmetro permite-lhe atualizar a UDF enquanto o pipeline está em execução, sem ter de reiniciar a tarefa. Se o valor for 0, o recarregamento das FDU está desativado. O valor predefinido é 0.

Função definida pelo utilizador

Opcionalmente, pode estender este modelo escrevendo uma função definida pelo utilizador (FDU). O modelo chama a FDU para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON. Para mais informações, consulte o artigo Crie funções definidas pelo utilizador para modelos do Dataflow.

Especificação da função

A FDU tem a seguinte especificação:

  • Entrada: o campo de dados da mensagem do Pub/Sub, serializado como uma string JSON.
  • Saída: uma string JSON que corresponde ao esquema da tabela de destino do BigQuery.
  • Execute o modelo

    Consola

    1. Aceda à página do fluxo de dados Criar tarefa a partir de um modelo.
    2. Aceda a Criar tarefa a partir de modelo
    3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
    4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

      Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.

    5. No menu pendente Modelo do fluxo de dados, selecione the Pub/Sub to BigQuery template.
    6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
    7. Opcional: para mudar do processamento exatamente uma vez para o modo de streaming pelo menos uma vez, selecione Pelo menos uma vez.
    8. Clique em Executar tarefa.

    gcloud

    Na shell ou no terminal, execute o modelo:

    gcloud dataflow flex-template run JOB_NAME \
        --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_BigQuery_Flex \
        --template-file-gcs-location REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME

    Substitua o seguinte:

    • JOB_NAME: um nome de tarefa exclusivo à sua escolha
    • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que quer usar

      Pode usar os seguintes valores:

    • STAGING_LOCATION: a localização para organizar ficheiros locais (por exemplo, gs://your-bucket/staging)
    • TOPIC_NAME: o nome do seu tópico do Pub/Sub
    • DATASET: o seu conjunto de dados do BigQuery
    • TABLE_NAME: o nome da tabela do BigQuery

    API

    Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
           "inputTopic": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_BigQuery_Flex",
       }
    }

    Substitua o seguinte:

    • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
    • JOB_NAME: um nome de tarefa exclusivo à sua escolha
    • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que quer usar

      Pode usar os seguintes valores:

    • STAGING_LOCATION: a localização para organizar ficheiros locais (por exemplo, gs://your-bucket/staging)
    • TOPIC_NAME: o nome do seu tópico do Pub/Sub
    • DATASET: o seu conjunto de dados do BigQuery
    • TABLE_NAME: o nome da tabela do BigQuery

    O que se segue?