O modelo do Avro do Pub/Sub para BigQuery é um pipeline de streaming que ingere dados do Avro de uma assinatura do Pub/Sub em uma tabela do BigQuery. Qualquer erro que ocorre durante a gravação na tabela do BigQuery é transmitido para um tópico não processado do Pub/Sub.
Requisitos de pipeline
- A assinatura de entrada do Pub/Sub precisa existir.
- O arquivo de esquema para os registros do Avro precisa existir no Cloud Storage.
- O tópico do Pub/Sub não processado precisa existir.
- O conjunto de dados de saída do BigQuery precisa existir.
Parâmetros do modelo
Parâmetros obrigatórios
- schemaPath: o local do Cloud Storage do arquivo de esquema do Avro. Por exemplo,
gs://path/to/my/schema.avsc
. - inputSubscription: a assinatura de entrada do Pub/Sub a ser lida. Exemplo: projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_ID>.
- outputTableSpec: o local da tabela de saída do BigQuery em que a saída será gravada. Por exemplo,
<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
. Dependendo docreateDisposition
especificado, a tabela de saída pode ser criada automaticamente usando o esquema do Avro fornecido pelo usuário. - outputTopic: o tópico do Pub/Sub a ser usado para registros não processados. (Por exemplo: projects/<PROJECT_ID>/topics/<TOPIC_NAME>).
Parâmetros opcionais
- useStorageWriteApiAtLeastOnce : ao usar a API Storage Write, especifica a semântica de gravação. Para usar a semântica pelo menos uma vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), defina este parâmetro como verdadeiro. Para usar semântica exatamente uma vez, defina o parâmetro como
false
. Esse parâmetro se aplica apenas quandouseStorageWriteApi
étrue
. O valor padrão éfalse
. - writeDisposition: o valor de WriteDisposition do BigQuery (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Por exemplo,
WRITE_APPEND
,WRITE_EMPTY
ouWRITE_TRUNCATE
. O padrão éWRITE_APPEND
. - createDisposition: o valor de CreateDisposition do BigQuery (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Por exemplo,
CREATE_IF_NEEDED
eCREATE_NEVER
. O padrão éCREATE_IF_NEEDED
. - useStorageWriteApi: se verdadeiro, o pipeline usará a API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). O valor padrão é
false
. Para mais informações, consulte Como usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams : ao usar a API Storage Write, especifica o número de fluxos de gravação. Se
useStorageWriteApi
fortrue
euseStorageWriteApiAtLeastOnce
forfalse
, será necessário definir esse parâmetro. Padrão: 0. - storageWriteApiTriggeringFrequencySec: ao usar a API Storage Write, especifica a frequência de acionamento, em segundos. Se
useStorageWriteApi
fortrue
euseStorageWriteApiAtLeastOnce
forfalse
, você precisará definir esse parâmetro.
Executar o modelo
Console
- Acesse a página Criar job usando um modelo do Dataflow. Acesse Criar job usando um modelo
- No campo Nome do job, insira um nome exclusivo.
- Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é
us-central1
.Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.
- No menu suspenso Modelo do Dataflow, selecione the Pub/Sub Avro to BigQuery template.
- Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
- Cliquem em Executar job.
gcloud
No shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
Substitua:
JOB_NAME
: um nome de job de sua escolhaREGION_NAME
: a região onde você quer implantar o job do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: o caminho do Cloud Storage para o arquivo de esquema do Avro (por exemplo,gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: o nome da assinatura de entrada do Pub/SubBIGQUERY_TABLE
: o nome da tabela de saída do BigQuery.DEADLETTER_TOPIC
: o tópico do Pub/Sub a ser usado para a fila não processada
API
Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a
API e os respectivos escopos de autorização, consulte
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
Substitua:
JOB_NAME
: um nome de job de sua escolhaLOCATION
: a região onde você quer implantar o job do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: o caminho do Cloud Storage para o arquivo de esquema do Avro (por exemplo,gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: o nome da assinatura de entrada do Pub/SubBIGQUERY_TABLE
: o nome da tabela de saída do BigQuery.DEADLETTER_TOPIC
: o tópico do Pub/Sub a ser usado para a fila não processada
A seguir
- Saiba mais sobre os modelos do Dataflow.
- Confira a lista de modelos fornecidos pelo Google.