O modelo Pub/Sub para BigQuery é um pipeline de streaming que carrega dados proto de uma subscrição do Pub/Sub para uma tabela do BigQuery.
Todos os erros que ocorrem durante a escrita na tabela do BigQuery são transmitidos para um tópico não processado do Pub/Sub.
Pode fornecer uma função definida pelo utilizador (FDU) em JavaScript para transformar dados. Os erros durante a execução da UDF podem ser enviados para um tópico do Pub/Sub separado ou para o mesmo tópico não processado que os erros do BigQuery.
Antes de executar um pipeline do Dataflow para este cenário, considere se uma subscrição do Pub/Sub BigQuery com uma UDF cumpre os seus requisitos.
Requisitos do pipeline
- A subscrição do Pub/Sub de entrada tem de existir.
- O ficheiro de esquema dos registos Proto tem de existir no Cloud Storage.
- O tópico Pub/Sub de saída tem de existir.
- O conjunto de dados do BigQuery de saída tem de existir.
- Se a tabela do BigQuery existir, tem de ter um esquema correspondente aos dados proto, independentemente do valor de
createDisposition
.
Parâmetros de modelos
Parâmetros obrigatórios
- protoSchemaPath (caminho do Cloud Storage para o ficheiro de esquema Proto): caminho do Cloud Storage para um ficheiro de conjunto de descritores autónomo. Exemplo: gs://MyBucket/schema.pb.
schema.pb
pode ser gerado adicionando--descriptor_set_out=schema.pb
ao comandoprotoc
que compila os protos. A flag--include_imports
pode ser usada para garantir que o ficheiro é autónomo. - fullMessageName (nome completo da mensagem Proto): o nome completo da mensagem (por exemplo: package.name.MessageName). Se a mensagem estiver aninhada noutra mensagem, inclua todas as mensagens com o delimitador "." (exemplo: package.name.OuterMessage.InnerMessage). "package.name" deve ser da declaração
package
e não da declaraçãojava_package
. - inputSubscription (subscrição de entrada do Pub/Sub): subscrição do Pub/Sub para ler a entrada, no formato "projects/your-project-id/subscriptions/your-subscription-name" (exemplo: projects/your-project-id/subscriptions/your-subscription-name).
- outputTableSpec (tabela de resultados do BigQuery): localização da tabela do BigQuery para escrever os resultados. O nome deve estar no formato
<project>:<dataset>.<table_name>
. O esquema da tabela tem de corresponder aos objetos de entrada. - outputTopic (tópico de saída do Pub/Sub): o nome do tópico para o qual os dados devem ser publicados, no formato "projects/your-project-id/topics/your-topic-name" (exemplo: projects/your-project-id/topics/your-topic-name).
Parâmetros opcionais
- preserveProtoFieldNames (Preservar nomes de campos proto): indicador para controlar se os nomes de campos proto devem ser mantidos ou convertidos para lowerCamelCase. Se a tabela já existir, isto deve basear-se no que corresponde ao esquema da tabela. Caso contrário, determina os nomes das colunas da tabela criada. Verdadeiro para preservar o snake_case do proto. False converte os campos em lowerCamelCase. (Predefinição: false).
- bigQueryTableSchemaPath (caminho do esquema da tabela do BigQuery): caminho do Cloud Storage para o ficheiro JSON do esquema do BigQuery. Se não estiver definido, o esquema é inferido a partir do esquema Proto. (Exemplo: gs://MyBucket/bq_schema.json).
- udfOutputTopic (tópico de saída do Pub/Sub para falhas de UDF): um tópico de saída opcional para enviar falhas de UDF. Se esta opção não estiver definida, as falhas são escritas no mesmo tópico que as falhas do BigQuery. (Exemplo: projects/your-project-id/topics/your-topic-name).
- writeDisposition (Write Disposition a usar para o BigQuery): WriteDisposition do BigQuery. Por exemplo, WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. A predefinição é: WRITE_APPEND.
- createDisposition (Create Disposition to use for BigQuery): BigQuery CreateDisposition. Por exemplo, CREATE_IF_NEEDED, CREATE_NEVER. A predefinição é: CREATE_IF_NEEDED.
- javascriptTextTransformGcsPath (caminho do Cloud Storage para a origem da FDU JavaScript): o padrão do caminho do Cloud Storage para o código JavaScript que contém as suas funções definidas pelo utilizador. (Exemplo: gs://your-bucket/your-function.js).
- javascriptTextTransformFunctionName (nome da função JavaScript da FDU): o nome da função a chamar a partir do seu ficheiro JavaScript. Use apenas letras, dígitos e sublinhados. (Exemplo: "transform" ou "transform_udf1").
- javascriptTextTransformReloadIntervalMinutes (intervalo de recarregamento automático da FDU de JavaScript [minutos]): define o intervalo que os trabalhadores podem verificar quanto a alterações da FDU de JavaScript para recarregar os ficheiros. A predefinição é: 0.
- useStorageWriteApi (Usar API Storage Write do BigQuery): se for verdadeiro, o pipeline usa a API Storage Write ao escrever os dados no BigQuery (consulte https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api). O valor predefinido é false. Quando usar a API Storage Write no modo exatamente uma vez, tem de definir os seguintes parâmetros: "Número de streams para a API Storage Write do BigQuery" e "Frequência de acionamento em segundos para a API Storage Write do BigQuery". Se ativar o modo Dataflow at-least-once ou definir o parâmetro useStorageWriteApiAtLeastOnce como verdadeiro, não precisa de definir o número de streams nem a frequência de acionamento.
- useStorageWriteApiAtLeastOnce (Usar semântica de pelo menos uma vez na API Storage Write do BigQuery): este parâmetro só entra em vigor se a opção "Usar API Storage Write do BigQuery" estiver ativada. Se estiver ativada, a semântica de pelo menos uma vez é usada para a API Storage Write. Caso contrário, é usada a semântica de exatamente uma vez. A predefinição é: false.
- numStorageWriteApiStreams (Número de streams para a API Storage Write do BigQuery): o número de streams define o paralelismo da transformação de escrita do BigQueryIO e corresponde aproximadamente ao número de streams da API Storage Write que serão usadas pelo pipeline. Consulte https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api para ver os valores recomendados. A predefinição é: 0.
- storageWriteApiTriggeringFrequencySec (Frequência de acionamento em segundos para a API BigQuery Storage Write): a frequência de acionamento determina a rapidez com que os dados ficam visíveis para consulta no BigQuery. Consulte https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api para ver os valores recomendados.
Função definida pelo utilizador
Opcionalmente, pode estender este modelo escrevendo uma função definida pelo utilizador (FDU). O modelo chama a FDU para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON. Para mais informações, consulte o artigo Crie funções definidas pelo utilizador para modelos do Dataflow.
Especificação da função
A FDU tem a seguinte especificação:
Execute o modelo
Consola
- Aceda à página do fluxo de dados Criar tarefa a partir de um modelo. Aceda a Criar tarefa a partir de modelo
- No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
- Opcional: para Ponto final regional, selecione um valor no menu pendente. A região
predefinida é
us-central1
.Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.
- No menu pendente Modelo do fluxo de dados, selecione the Pub/Sub Proto to BigQuery template.
- Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
- Clique em Executar tarefa.
gcloud
Na shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex \ --parameters \ schemaPath=SCHEMA_PATH,\ fullMessageName=PROTO_MESSAGE_NAME,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=UNPROCESSED_TOPIC
Substitua o seguinte:
JOB_NAME
: um nome de tarefa exclusivo à sua escolhaREGION_NAME
: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que quer usarPode usar os seguintes valores:
latest
para usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: o caminho do Cloud Storage para o ficheiro de esquema Proto (por exemplo,gs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
: o nome da mensagem Proto (por exemplo,package.name.MessageName
)SUBSCRIPTION_NAME
: o nome da subscrição de entrada do Pub/SubBIGQUERY_TABLE
: o nome da tabela de saída do BigQueryUNPROCESSED_TOPIC
: o tópico do Pub/Sub a usar para a fila não processada
API
Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex", "parameters": { "schemaPath": "SCHEMA_PATH", "fullMessageName": "PROTO_MESSAGE_NAME", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "UNPROCESSED_TOPIC" } } }
Substitua o seguinte:
PROJECT_ID
: o ID do projeto onde quer executar a tarefa do Dataflow Google CloudJOB_NAME
: um nome de tarefa exclusivo à sua escolhaLOCATION
: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que quer usarPode usar os seguintes valores:
latest
para usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: o caminho do Cloud Storage para o ficheiro de esquema Proto (por exemplo,gs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
: o nome da mensagem Proto (por exemplo,package.name.MessageName
)SUBSCRIPTION_NAME
: o nome da subscrição de entrada do Pub/SubBIGQUERY_TABLE
: o nome da tabela de saída do BigQueryUNPROCESSED_TOPIC
: o tópico do Pub/Sub a usar para a fila não processada
O que se segue?
- Saiba mais sobre os modelos do Dataflow.
- Consulte a lista de modelos fornecidos pela Google.