Modelo de Pub/Sub Proto para BigQuery

O modelo Pub/Sub para BigQuery é um pipeline de streaming que carrega dados proto de uma subscrição do Pub/Sub para uma tabela do BigQuery. Todos os erros que ocorrem durante a escrita na tabela do BigQuery são transmitidos para um tópico não processado do Pub/Sub.

Pode fornecer uma função definida pelo utilizador (FDU) em JavaScript para transformar dados. Os erros durante a execução da UDF podem ser enviados para um tópico do Pub/Sub separado ou para o mesmo tópico não processado que os erros do BigQuery.

Antes de executar um pipeline do Dataflow para este cenário, considere se uma subscrição do Pub/Sub BigQuery com uma UDF cumpre os seus requisitos.

Requisitos do pipeline

  • A subscrição do Pub/Sub de entrada tem de existir.
  • O ficheiro de esquema dos registos Proto tem de existir no Cloud Storage.
  • O tópico Pub/Sub de saída tem de existir.
  • O conjunto de dados do BigQuery de saída tem de existir.
  • Se a tabela do BigQuery existir, tem de ter um esquema correspondente aos dados proto, independentemente do valor de createDisposition.

Parâmetros de modelos

Parâmetros obrigatórios

  • protoSchemaPath (caminho do Cloud Storage para o ficheiro de esquema Proto): caminho do Cloud Storage para um ficheiro de conjunto de descritores autónomo. Exemplo: gs://MyBucket/schema.pb. schema.pb pode ser gerado adicionando --descriptor_set_out=schema.pb ao comando protoc que compila os protos. A flag --include_imports pode ser usada para garantir que o ficheiro é autónomo.
  • fullMessageName (nome completo da mensagem Proto): o nome completo da mensagem (por exemplo: package.name.MessageName). Se a mensagem estiver aninhada noutra mensagem, inclua todas as mensagens com o delimitador "." (exemplo: package.name.OuterMessage.InnerMessage). "package.name" deve ser da declaração package e não da declaração java_package.
  • inputSubscription (subscrição de entrada do Pub/Sub): subscrição do Pub/Sub para ler a entrada, no formato "projects/your-project-id/subscriptions/your-subscription-name" (exemplo: projects/your-project-id/subscriptions/your-subscription-name).
  • outputTableSpec (tabela de resultados do BigQuery): localização da tabela do BigQuery para escrever os resultados. O nome deve estar no formato <project>:<dataset>.<table_name>. O esquema da tabela tem de corresponder aos objetos de entrada.
  • outputTopic (tópico de saída do Pub/Sub): o nome do tópico para o qual os dados devem ser publicados, no formato "projects/your-project-id/topics/your-topic-name" (exemplo: projects/your-project-id/topics/your-topic-name).

Parâmetros opcionais

  • preserveProtoFieldNames (Preservar nomes de campos proto): indicador para controlar se os nomes de campos proto devem ser mantidos ou convertidos para lowerCamelCase. Se a tabela já existir, isto deve basear-se no que corresponde ao esquema da tabela. Caso contrário, determina os nomes das colunas da tabela criada. Verdadeiro para preservar o snake_case do proto. False converte os campos em lowerCamelCase. (Predefinição: false).
  • bigQueryTableSchemaPath (caminho do esquema da tabela do BigQuery): caminho do Cloud Storage para o ficheiro JSON do esquema do BigQuery. Se não estiver definido, o esquema é inferido a partir do esquema Proto. (Exemplo: gs://MyBucket/bq_schema.json).
  • udfOutputTopic (tópico de saída do Pub/Sub para falhas de UDF): um tópico de saída opcional para enviar falhas de UDF. Se esta opção não estiver definida, as falhas são escritas no mesmo tópico que as falhas do BigQuery. (Exemplo: projects/your-project-id/topics/your-topic-name).
  • writeDisposition (Write Disposition a usar para o BigQuery): WriteDisposition do BigQuery. Por exemplo, WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. A predefinição é: WRITE_APPEND.
  • createDisposition (Create Disposition to use for BigQuery): BigQuery CreateDisposition. Por exemplo, CREATE_IF_NEEDED, CREATE_NEVER. A predefinição é: CREATE_IF_NEEDED.
  • javascriptTextTransformGcsPath (caminho do Cloud Storage para a origem da FDU JavaScript): o padrão do caminho do Cloud Storage para o código JavaScript que contém as suas funções definidas pelo utilizador. (Exemplo: gs://your-bucket/your-function.js).
  • javascriptTextTransformFunctionName (nome da função JavaScript da FDU): o nome da função a chamar a partir do seu ficheiro JavaScript. Use apenas letras, dígitos e sublinhados. (Exemplo: "transform" ou "transform_udf1").
  • javascriptTextTransformReloadIntervalMinutes (intervalo de recarregamento automático da FDU de JavaScript [minutos]): define o intervalo que os trabalhadores podem verificar quanto a alterações da FDU de JavaScript para recarregar os ficheiros. A predefinição é: 0.
  • useStorageWriteApi (Usar API Storage Write do BigQuery): se for verdadeiro, o pipeline usa a API Storage Write ao escrever os dados no BigQuery (consulte https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api). O valor predefinido é false. Quando usar a API Storage Write no modo exatamente uma vez, tem de definir os seguintes parâmetros: "Número de streams para a API Storage Write do BigQuery" e "Frequência de acionamento em segundos para a API Storage Write do BigQuery". Se ativar o modo Dataflow at-least-once ou definir o parâmetro useStorageWriteApiAtLeastOnce como verdadeiro, não precisa de definir o número de streams nem a frequência de acionamento.
  • useStorageWriteApiAtLeastOnce (Usar semântica de pelo menos uma vez na API Storage Write do BigQuery): este parâmetro só entra em vigor se a opção "Usar API Storage Write do BigQuery" estiver ativada. Se estiver ativada, a semântica de pelo menos uma vez é usada para a API Storage Write. Caso contrário, é usada a semântica de exatamente uma vez. A predefinição é: false.
  • numStorageWriteApiStreams (Número de streams para a API Storage Write do BigQuery): o número de streams define o paralelismo da transformação de escrita do BigQueryIO e corresponde aproximadamente ao número de streams da API Storage Write que serão usadas pelo pipeline. Consulte https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api para ver os valores recomendados. A predefinição é: 0.
  • storageWriteApiTriggeringFrequencySec (Frequência de acionamento em segundos para a API BigQuery Storage Write): a frequência de acionamento determina a rapidez com que os dados ficam visíveis para consulta no BigQuery. Consulte https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api para ver os valores recomendados.

Função definida pelo utilizador

Opcionalmente, pode estender este modelo escrevendo uma função definida pelo utilizador (FDU). O modelo chama a FDU para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON. Para mais informações, consulte o artigo Crie funções definidas pelo utilizador para modelos do Dataflow.

Especificação da função

A FDU tem a seguinte especificação:

  • Entrada: o campo de dados da mensagem do Pub/Sub, serializado como uma string JSON.
  • Saída: uma string JSON que corresponde ao esquema da tabela de destino do BigQuery.
  • Execute o modelo

    Consola

    1. Aceda à página do fluxo de dados Criar tarefa a partir de um modelo.
    2. Aceda a Criar tarefa a partir de modelo
    3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
    4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

      Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.

    5. No menu pendente Modelo do fluxo de dados, selecione the Pub/Sub Proto to BigQuery template.
    6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
    7. Clique em Executar tarefa.

    gcloud

    Na shell ou no terminal, execute o modelo:

    gcloud dataflow flex-template run JOB_NAME \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex \
        --parameters \
    schemaPath=SCHEMA_PATH,\
    fullMessageName=PROTO_MESSAGE_NAME,\
    inputSubscription=SUBSCRIPTION_NAME,\
    outputTableSpec=BIGQUERY_TABLE,\
    outputTopic=UNPROCESSED_TOPIC
      

    Substitua o seguinte:

    • JOB_NAME: um nome de tarefa exclusivo à sua escolha
    • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que quer usar

      Pode usar os seguintes valores:

    • SCHEMA_PATH: o caminho do Cloud Storage para o ficheiro de esquema Proto (por exemplo, gs://MyBucket/file.pb)
    • PROTO_MESSAGE_NAME: o nome da mensagem Proto (por exemplo, package.name.MessageName)
    • SUBSCRIPTION_NAME: o nome da subscrição de entrada do Pub/Sub
    • BIGQUERY_TABLE: o nome da tabela de saída do BigQuery
    • UNPROCESSED_TOPIC: o tópico do Pub/Sub a usar para a fila não processada

    API

    Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex",
          "parameters": {
              "schemaPath": "SCHEMA_PATH",
              "fullMessageName": "PROTO_MESSAGE_NAME",
              "inputSubscription": "SUBSCRIPTION_NAME",
              "outputTableSpec": "BIGQUERY_TABLE",
              "outputTopic": "UNPROCESSED_TOPIC"
          }
       }
    }
      

    Substitua o seguinte:

    • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
    • JOB_NAME: um nome de tarefa exclusivo à sua escolha
    • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que quer usar

      Pode usar os seguintes valores:

    • SCHEMA_PATH: o caminho do Cloud Storage para o ficheiro de esquema Proto (por exemplo, gs://MyBucket/file.pb)
    • PROTO_MESSAGE_NAME: o nome da mensagem Proto (por exemplo, package.name.MessageName)
    • SUBSCRIPTION_NAME: o nome da subscrição de entrada do Pub/Sub
    • BIGQUERY_TABLE: o nome da tabela de saída do BigQuery
    • UNPROCESSED_TOPIC: o tópico do Pub/Sub a usar para a fila não processada

    O que se segue?