O modelo Pub/Sub Avro para BigQuery é um pipeline de streaming que carrega dados Avro de uma subscrição do Pub/Sub para uma tabela do BigQuery. Todos os erros que ocorrem durante a escrita na tabela do BigQuery são transmitidos para um tópico não processado do Pub/Sub.
Antes de executar um pipeline do Dataflow para este cenário, considere se uma subscrição do Pub/Sub BigQuery com uma UDF cumpre os seus requisitos.
Requisitos do pipeline
- A subscrição do Pub/Sub de entrada tem de existir.
- O ficheiro de esquema dos registos Avro tem de existir no Cloud Storage.
- O tópico Pub/Sub não processado tem de existir.
- O conjunto de dados do BigQuery de saída tem de existir.
Parâmetros de modelos
Parâmetros obrigatórios
- schemaPath: a localização do Cloud Storage do ficheiro de esquema Avro. Por exemplo,
gs://path/to/my/schema.avsc
. - inputSubscription: a subscrição de entrada do Pub/Sub a partir da qual ler. Por exemplo,
projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_ID>
. - outputTableSpec: a localização da tabela de resultados do BigQuery na qual escrever os resultados. Por exemplo,
<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
.Consoante ocreateDisposition
especificado, a tabela de saída pode ser criada automaticamente através do esquema Avro fornecido pelo utilizador. - outputTopic: o tópico do Pub/Sub a usar para registos não processados. Por exemplo,
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
.
Parâmetros opcionais
- useStorageWriteApiAtLeastOnce: quando usa a API Storage Write, especifica a semântica de escrita. Para usar a semântica, pelo menos, uma vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), defina este parâmetro como verdadeiro. Para usar a semântica exatamente uma vez, defina o parâmetro como
false
. Este parâmetro só se aplica quandouseStorageWriteApi
étrue
. O valor predefinido éfalse
. - writeDisposition: o valor WriteDisposition do BigQuery (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Por exemplo,
WRITE_APPEND
,WRITE_EMPTY
ouWRITE_TRUNCATE
. A predefinição éWRITE_APPEND
. - createDisposition: o CreateDisposition do BigQuery (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Por exemplo,
CREATE_IF_NEEDED
eCREATE_NEVER
. A predefinição éCREATE_IF_NEEDED
. - useStorageWriteApi: se for verdadeiro, o pipeline usa a API Storage Write do BigQuery (https://cloud.google.com/bigquery/docs/write-api). O valor predefinido é
false
. Para mais informações, consulte a secção Usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams: quando usa a API Storage Write, especifica o número de streams de escrita. Se
useStorageWriteApi
fortrue
euseStorageWriteApiAtLeastOnce
forfalse
, tem de definir este parâmetro. A predefinição é: 0. - storageWriteApiTriggeringFrequencySec: quando usa a API Storage Write, especifica a frequência de acionamento, em segundos. Se
useStorageWriteApi
fortrue
euseStorageWriteApiAtLeastOnce
forfalse
, tem de definir este parâmetro.
Execute o modelo
Consola
- Aceda à página do fluxo de dados Criar tarefa a partir de um modelo. Aceda a Criar tarefa a partir de modelo
- No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
- Opcional: para Ponto final regional, selecione um valor no menu pendente. A região
predefinida é
us-central1
.Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.
- No menu pendente Modelo do fluxo de dados, selecione the Pub/Sub Avro to BigQuery template.
- Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
- Clique em Executar tarefa.
gcloud
Na shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
Substitua o seguinte:
JOB_NAME
: um nome de tarefa exclusivo à sua escolhaREGION_NAME
: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que quer usarPode usar os seguintes valores:
latest
para usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: o caminho do Cloud Storage para o ficheiro de esquema Avro (por exemplo,gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: o nome da subscrição de entrada do Pub/SubBIGQUERY_TABLE
: o nome da tabela de saída do BigQueryDEADLETTER_TOPIC
: o tópico do Pub/Sub a usar para a fila não processada
API
Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
Substitua o seguinte:
JOB_NAME
: um nome de tarefa exclusivo à sua escolhaLOCATION
: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que quer usarPode usar os seguintes valores:
latest
para usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: o caminho do Cloud Storage para o ficheiro de esquema Avro (por exemplo,gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: o nome da subscrição de entrada do Pub/SubBIGQUERY_TABLE
: o nome da tabela de saída do BigQueryDEADLETTER_TOPIC
: o tópico do Pub/Sub a usar para a fila não processada
O que se segue?
- Saiba mais sobre os modelos do Dataflow.
- Consulte a lista de modelos fornecidos pela Google.