Modelo do Avro do Pub/Sub para BigQuery

O modelo do Avro do Pub/Sub para BigQuery é um pipeline de streaming que ingere dados do Avro de uma assinatura do Pub/Sub em uma tabela do BigQuery. Qualquer erro que ocorre durante a gravação na tabela do BigQuery é transmitido para um tópico não processado do Pub/Sub.

Requisitos de pipeline

  • A assinatura de entrada do Pub/Sub precisa existir.
  • O arquivo de esquema para os registros do Avro precisa existir no Cloud Storage.
  • O tópico do Pub/Sub não processado precisa existir.
  • O conjunto de dados de saída do BigQuery precisa existir.

Parâmetros do modelo

Parâmetro Descrição
schemaPath O local do Cloud Storage do arquivo de esquema do Avro. Por exemplo, gs://path/to/my/schema.avsc.
inputSubscription A assinatura de entrada do Pub/Sub a ser lida. Por exemplo, projects/<project>/subscriptions/<subscription>.
outputTopic O tópico do Pub/Sub a ser usado para registros não processados. Por exemplo, projects/<project-id>/topics/<topic-name>.
outputTableSpec O local da tabela de saída do BigQuery. Por exemplo, <my-project>:<my-dataset>.<my-table>. Dependendo do createDisposition especificado, a tabela de saída pode ser criada automaticamente usando o esquema do Avro fornecido pelo usuário.
writeDisposition Opcional: O WriteDisposition do BigQuery. Por exemplo, WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. Padrão: WRITE_APPEND
createDisposition Opcional: O CreateDisposition do BigQuery. Por exemplo: CREATE_IF_NEEDED e CREATE_NEVER. Padrão: CREATE_IF_NEEDED
useStorageWriteApi Opcional: Se true, o pipeline usa a API BigQuery Storage Write. O valor padrão é false. Para mais informações, consulte Como usar a API Storage Write.
useStorageWriteApiAtLeastOnce Opcional: Ao usar a API Storage Write, especifica a semântica de gravação. Para usar semântica pelo menos uma vez, defina esse parâmetro como true. Para usar semântica exatamente uma vez, defina o parâmetro como false. Esse parâmetro se aplica apenas quando useStorageWriteApi é true. O valor padrão é false.
numStorageWriteApiStreams Opcional: Ao usar a API Storage Write, especifica o número de fluxos de gravação. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, você precisará definir esse parâmetro.
storageWriteApiTriggeringFrequencySec Opcional: Ao usar a API Storage Write, especifica a frequência de acionamento, em segundos. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, você precisará definir esse parâmetro.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub Avro to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • SCHEMA_PATH: o caminho do Cloud Storage para o arquivo de esquema do Avro (por exemplo, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: o nome da assinatura de entrada do Pub/Sub
  • BIGQUERY_TABLE: o nome da tabela de saída do BigQuery.
  • DEADLETTER_TOPIC: o tópico do Pub/Sub a ser usado para a fila não processada

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • SCHEMA_PATH: o caminho do Cloud Storage para o arquivo de esquema do Avro (por exemplo, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: o nome da assinatura de entrada do Pub/Sub
  • BIGQUERY_TABLE: o nome da tabela de saída do BigQuery.
  • DEADLETTER_TOPIC: o tópico do Pub/Sub a ser usado para a fila não processada

A seguir