Pub/Sub para BigQuery com modelo de UDF em Python

O modelo do Pub/Sub para BigQuery com UDF em Python é um pipeline de streaming que lê mensagens em formato JSON do Pub/Sub e as grava em uma tabela do BigQuery. Outra opção é fornecer uma função definida pelo usuário (UDF) escrita em Python para processar as mensagens recebidas.

Requisitos de pipeline

  • A tabela do BigQuery precisa existir e ter um esquema.
  • Os dados da mensagem do Pub/Sub precisam usar o formato JSON ou fornecer uma UDF que converta os dados da mensagem para JSON. Os dados JSON precisam corresponder ao esquema da tabela do BigQuery. Por exemplo, se os payloads JSON forem formatados como {"k1":"v1", "k2":"v2"}, a tabela do BigQuery precisará ter duas colunas de string chamadas k1 e k2.
  • Especifique o parâmetro inputSubscription ou inputTopic, mas não ambos.

Parâmetros do modelo

Parâmetro Descrição
outputTableSpec A tabela do BigQuery em que será feita a gravação, formatada como "PROJECT_ID:DATASET_NAME.TABLE_NAME".
inputSubscription Opcional: a assinatura do Pub/Sub de que será feita a leitura, formatada como "projects/PROJECT_ID/subscriptions/SUBCRIPTION_NAME".
inputTopic Opcional: o tópico do Pub/Sub de que será feita a leitura, formatado como "projects/PROJECT_ID/topics/TOPIC_NAME".
outputDeadletterTable A tabela do BigQuery para mensagens que não conseguiram acessar a tabela de saída, formatada como "PROJECT_ID:DATASET_NAME.TABLE_NAME". Se a tabela não existir, ela será criada quando o pipeline for executado. Se esse parâmetro não for especificado, o valor "OUTPUT_TABLE_SPEC_error_records" será usado.
pythonExternalTextTransformGcsPath Opcional: o URI do Cloud Storage do arquivo de código Python que define a função definida pelo usuário (UDF) que você quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.py
pythonExternalTextTransformFunctionName Opcional: O nome da função definida pelo usuário (UDF) do Python que você quer usar.
useStorageWriteApi Opcional: se estiver definido como true, o pipeline vai usar a API BigQuery Storage Write. O valor padrão é false. Para mais informações, consulte Como usar a API Storage Write.
useStorageWriteApiAtLeastOnce Opcional: ao usar a API Storage Write, especifica a semântica de gravação. Para usar semântica pelo menos uma vez, defina esse parâmetro como true. Para usar semântica exatamente uma vez, defina o parâmetro como false. Esse parâmetro se aplica apenas quando useStorageWriteApi é true. O valor padrão é false.
numStorageWriteApiStreams Opcional: ao usar a API Storage Write, especifica o número de fluxos de gravação. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, você precisará definir esse parâmetro.
storageWriteApiTriggeringFrequencySec Opcional: ao usar a API Storage Write, especifica a frequência de acionamento, em segundos. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, você precisará definir esse parâmetro.

Função definida pelo usuário

Também é possível estender esse modelo escrevendo uma função definida pelo usuário (UDF). O modelo chama a UDF para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON. Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.

Especificação da função

A UDF tem a seguinte especificação:

  • Entrada: o campo de dados da mensagem do Pub/Sub, serializado como uma string JSON.
  • Saída: uma string JSON que corresponde ao esquema da tabela de destino do BigQuery.
  • Executar o modelo

    Console

    1. Acesse a página Criar job usando um modelo do Dataflow.
    2. Acesse Criar job usando um modelo
    3. No campo Nome do job, insira um nome exclusivo.
    4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

      Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

    5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub to BigQuery with Python UDF template.
    6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
    7. Opcional: para alternar do processamento "Exatamente uma vez" para o modo de streaming "Pelo menos uma vez", selecione Pelo menos uma vez.
    8. Cliquem em Executar job.

    gcloud

    No shell ou no terminal, execute o modelo:

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_BigQuery_Xlang \
        --region REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME

    Substitua:

    • JOB_NAME: um nome de job de sua escolha
    • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que você quer usar

      Use estes valores:

    • STAGING_LOCATION: o local para fase de testes de arquivos locais (por exemplo, gs://your-bucket/staging)
    • TOPIC_NAME: o nome do tópico do Pub/Sub
    • DATASET: o conjunto de dados do BigQuery
    • TABLE_NAME: o nome da tabela do BigQuery

    API

    Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
           "inputTopic": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_BigQuery_Xlang",
       }
    }

    Substitua:

    • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
    • JOB_NAME: um nome de job de sua escolha
    • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que você quer usar

      Use estes valores:

    • STAGING_LOCATION: o local para fase de testes de arquivos locais (por exemplo, gs://your-bucket/staging)
    • TOPIC_NAME: o nome do tópico do Pub/Sub
    • DATASET: o conjunto de dados do BigQuery
    • TABLE_NAME: o nome da tabela do BigQuery

    A seguir