Modelo .proto do Pub/Sub para BigQuery

O modelo de buffer de protocolo do Pub/Sub para BigQuery é um pipeline de streaming que ingere dados do buffer de protocolo de uma assinatura do Pub/Sub em uma tabela do BigQuery. Qualquer erro que ocorre durante a gravação na tabela do BigQuery é transmitido para um tópico não processado do Pub/Sub.

Uma função definida pelo usuário (UDF) do JavaScript pode ser fornecida para transformar dados. Erros ao executar a UDF podem ser enviados para um tópico separado do Pub/Sub ou para o mesmo tópico não processado como os erros do BigQuery.

Antes de executar um pipeline do Dataflow para esse cenário, considere se uma assinatura do Pub/Sub BigQuery com uma UDF atende aos seus requisitos.

Requisitos de pipeline

  • A assinatura de entrada do Pub/Sub precisa existir.
  • O arquivo de esquema dos registros do buffer de protocolo precisa existir no Cloud Storage.
  • O tópico do Pub/Sub não processado precisa existir.
  • O conjunto de dados de saída do BigQuery precisa existir.
  • Se a tabela do BigQuery existir, ela precisará ter um esquema que corresponda aos dados proto, independentemente do valor createDisposition.

Parâmetros do modelo

Parâmetros obrigatórios

  • protoSchemaPath (caminho do Cloud Storage para o arquivo de esquema do Proto): caminho do Cloud Storage para um arquivo de conjunto de descritores independente. Exemplo: gs://MyBucket/schema.pb. schema.pb pode ser gerado adicionando --descriptor_set_out=schema.pb ao comando protoc que compila os protos. A flag --include_imports pode ser usada para garantir que o arquivo seja independente.
  • fullMessageName (nome completo da mensagem do Proto): o nome completo da mensagem (exemplo: package.name.MessageName). Se a mensagem estiver aninhada em outra, inclua todas as mensagens com o delimitador "." (exemplo: package.name.OuterMessage.InnerMessage). "package.name" precisa ser da instrução package, não da java_package.
  • inputSubscription (assinatura de entrada do Pub/Sub): assinatura do Pub/Sub para ler a entrada no formato "projects/your-project-id/subscriptions/your-subscription-name" (por exemplo: projects/your-project-id/subscriptions/nome-da-assinatura).
  • outputTableSpec (tabela de saída do BigQuery): o local da tabela do BigQuery em que a saída será gravada. O nome precisa estar no formato <project>:<dataset>.<table_name>. O esquema da tabela precisa corresponder aos objetos de entrada.
  • outputTopic (tópico de saída do Pub/Sub): o nome do tópico em que os dados devem ser publicados, no formato "projects/your-project-id/topics/your-topic-name" (por exemplo: projects/your-project-id/topics/your-topic-name).

Parâmetros opcionais

  • preserveProtoFieldNames (Preservar nomes de campos proto): flag para controlar se os nomes de campos proto devem ser mantidos ou convertidos para lowerCamelCase. Se a tabela já existir, isso vai depender do que corresponde ao esquema dela. Caso contrário, ele vai determinar os nomes das colunas da tabela criada. True para preservar snake_case proto. "False" vai converter os campos para lowerCamelCase. (padrão: falso).
  • bigQueryTableSchemaPath: caminho do Cloud Storage para o arquivo JSON do esquema do BigQuery. Se não for definido, o esquema será inferido do esquema Proto. Exemplo: gs://MyBucket/bq_schema.json.
  • udfOutputTopic (tópico de saída do Pub/Sub para falhas de UDF): um tópico de saída opcional para enviar falhas de UDF. Se essa opção não for definida, as falhas serão gravadas no mesmo tópico que as falhas do BigQuery. (Exemplo: projects/your-project-id/topics/your-topic-name).
  • writeDisposition (disposição de gravação a ser usada no BigQuery): WriteDisposition do BigQuery. Por exemplo, WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. O valor padrão é: WRITE_APPEND.
  • createDisposition (disposição de criação a ser usada no BigQuery): CreateDisposition do BigQuery. Por exemplo, CREATE_IF_NEEDED, CREATE_NEVER. O valor padrão é: CREATE_IF_NEEDED.
  • javascriptTextTransformGcsPath (caminho do Cloud Storage para a origem da UDF em JavaScript): o padrão de caminho do Cloud Storage para o código JavaScript que contém as funções definidas pelo usuário. Exemplo: gs://your-bucket/your-function.js.
  • javascriptTextTransformFunctionName (nome da função JavaScript da UDF): o nome da função a ser chamada no arquivo JavaScript. Use apenas letras, dígitos e sublinhados. Exemplo: "transform" ou "transform_udf1".
  • javascriptTextTransformReloadIntervalMinutes (intervalo de recarregamento automático da UDF em JavaScript (minutos)): defina o intervalo que os workers podem verificar se há alterações na UDF em JavaScript para recarregar os arquivos. Padrão: 0.
  • useStorageWriteApi (usar a API BigQuery Storage Write): se verdadeiro, o pipeline usa a API Storage Write ao gravar os dados no BigQuery (consulte https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api). O valor padrão é falso. Ao usar a API Storage Write no modo "exatamente uma vez", defina os seguintes parâmetros: "número de fluxos da API BigQuery Storage Write" e "Frequência de acionamento da API BigQuery Storage Write". Se ativar o modo do Dataflow pelo menos uma vez ou definir o parâmetro useStorageWriteApiAtLeastOnce como verdadeiro, não será necessário definir o número de fluxos ou a frequência de acionamento.
  • useStorageWriteApiAtLeastOnce (usar semântica pelo menos uma vez na API BigQuery Storage Write): esse parâmetro só entra em vigor se a opção "Usar a API BigQuery Storage Write" está ativada. Se ativada, a semântica do tipo "pelo menos uma vez" será usada para a API Storage Write. Caso contrário, a semântica "exatamente uma" será usada. O padrão é: falso.
  • numStorageWriteApiStreams (número de fluxos da API BigQuery Storage Write): o número de fluxos define o paralelismo da transformação de gravação do BigQueryIO e corresponde aproximadamente ao número de fluxos da API Storage Write que serão usados pelo pipeline. Confira os valores recomendados em https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api. Padrão: 0.
  • storageWriteApiTriggeringFrequencySec (frequência de acionamento em segundos para a API BigQuery Storage Write): a frequência de acionamento determina em quanto tempo os dados ficarão visíveis para consulta no BigQuery. Confira os valores recomendados em https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api.

Função definida pelo usuário

Também é possível estender esse modelo escrevendo uma função definida pelo usuário (UDF). O modelo chama a UDF para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON. Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.

Especificação da função

A UDF tem a seguinte especificação:

  • Entrada: o campo de dados da mensagem do Pub/Sub, serializado como uma string JSON.
  • Saída: uma string JSON que corresponde ao esquema da tabela de destino do BigQuery.
  • Executar o modelo

    Console

    1. Acesse a página Criar job usando um modelo do Dataflow.
    2. Acesse Criar job usando um modelo
    3. No campo Nome do job, insira um nome exclusivo.
    4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

      Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

    5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub Proto to BigQuery template.
    6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
    7. Cliquem em Executar job.

    gcloud

    No shell ou no terminal, execute o modelo:

    gcloud dataflow flex-template run JOB_NAME \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex \
        --parameters \
    schemaPath=SCHEMA_PATH,\
    fullMessageName=PROTO_MESSAGE_NAME,\
    inputSubscription=SUBSCRIPTION_NAME,\
    outputTableSpec=BIGQUERY_TABLE,\
    outputTopic=UNPROCESSED_TOPIC
      

    Substitua:

    • JOB_NAME: um nome de job de sua escolha
    • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que você quer usar

      Use estes valores:

    • SCHEMA_PATH: o caminho do Cloud Storage para o arquivo de esquema do Proto (por exemplo, gs://MyBucket/file.pb)
    • PROTO_MESSAGE_NAME: o nome da mensagem do Proto (por exemplo, package.name.MessageName)
    • SUBSCRIPTION_NAME: o nome da assinatura de entrada do Pub/Sub
    • BIGQUERY_TABLE: o nome da tabela de saída do BigQuery.
    • UNPROCESSED_TOPIC: o tópico do Pub/Sub a ser usado para a fila não processada

    API

    Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex",
          "parameters": {
              "schemaPath": "SCHEMA_PATH",
              "fullMessageName": "PROTO_MESSAGE_NAME",
              "inputSubscription": "SUBSCRIPTION_NAME",
              "outputTableSpec": "BIGQUERY_TABLE",
              "outputTopic": "UNPROCESSED_TOPIC"
          }
       }
    }
      

    Substitua:

    • PROJECT_ID: o ID do projeto Google Cloud em que você quer executar o job do Dataflow
    • JOB_NAME: um nome de job de sua escolha
    • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que você quer usar

      Use estes valores:

    • SCHEMA_PATH: o caminho do Cloud Storage para o arquivo de esquema do Proto (por exemplo, gs://MyBucket/file.pb)
    • PROTO_MESSAGE_NAME: o nome da mensagem do Proto (por exemplo, package.name.MessageName)
    • SUBSCRIPTION_NAME: o nome da assinatura de entrada do Pub/Sub
    • BIGQUERY_TABLE: o nome da tabela de saída do BigQuery.
    • UNPROCESSED_TOPIC: o tópico do Pub/Sub a ser usado para a fila não processada

    A seguir