O modelo do Datastream para o BigQuery é um pipeline de streaming que lê
dados do
Datastream e os replica no BigQuery. O modelo lê dados do Cloud Storage
usando notificações do Pub/Sub e os replica em uma tabela de preparo particionada
do BigQuery. Após a replicação, o modelo executa um MERGE
no BigQuery
para mesclar todas as alterações de change data capture (CDC) em uma réplica da tabela de origem.
O modelo lida com a criação e a atualização das tabelas do BigQuery gerenciadas pela replicação. Quando a linguagem de definição de dados (DDL) é obrigatória, um callback para o Datastream extrai o esquema da tabela de origem e o converte em tipos de dados do BigQuery. As operações compatíveis incluem:
- Novas tabelas são criadas à medida que os dados são inseridos.
- Novas colunas são adicionadas às tabelas do BigQuery com valores iniciais nulos.
- As colunas descartadas são ignoradas no BigQuery, e os valores futuros são nulos.
- As colunas renomeadas são adicionadas ao BigQuery como novas colunas.
- As alterações de tipo não são propagadas para o BigQuery.
É recomendável executar esse pipeline usando o modo de streaming "Pelo menos uma vez", porque o modelo realiza a eliminação de duplicação ao mesclar dados de uma tabela temporária do BigQuery com a tabela do BigQuery principal. Essa etapa no pipeline significa que não há benefícios extras em usar o modo de streaming "Exatamente uma vez".
Requisitos de pipeline
- Um stream do Datastream que está pronto ou já está replicando dados.
- As notificações do Pub/Sub do Cloud Storage estão ativadas para os dados do Datastream.
- Os conjuntos de dados de destino do BigQuery são criados, e a conta de serviço do Compute Engine recebe acesso de administrador a eles.
- Uma chave primária é necessária na tabela de origem para a criação da tabela de réplica de destino.
- Um banco de dados de origem MySQL ou Oracle. Os bancos de dados PostgreSQL e SQL Server não são compatíveis.
Parâmetros do modelo
Parâmetros obrigatórios
- inputFilePattern: o local do arquivo da saída do arquivo Datastream no Cloud Storage, no formato
gs://<BUCKET_NAME>/<ROOT_PATH>/
. - inputFileFormat: o formato dos arquivos de saída produzidos pelo Datastream. Os valores permitidos são
avro
ejson
. O padrão éavro
. - gcsPubSubSubscription: a assinatura do Pub/Sub usada pelo Cloud Storage para notificar o Dataflow sobre novos arquivos disponíveis para processamento, no formato
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>
. - outputStagingDatasetTemplate: o nome do conjunto de dados que contém as tabelas de preparo. Esse parâmetro aceita modelos, por exemplo,
{_metadata_dataset}_log
oumy_dataset_log
. Normalmente, esse parâmetro é um nome de conjunto de dados. O padrão é{_metadata_dataset}
. - outputDatasetTemplate: o nome do conjunto de dados que contém as tabelas de réplica. Esse parâmetro aceita modelos, por exemplo,
{_metadata_dataset}
oumy_dataset
. Normalmente, esse parâmetro é um nome de conjunto de dados. O padrão é{_metadata_dataset}
. - deadLetterQueueDirectory: o caminho usado pelo Dataflow para gravar a saída da fila de mensagens inativas. Esse caminho não pode estar no mesmo caminho de saída do arquivo do Datastream. O padrão é
empty
.
Parâmetros opcionais
- streamName: o nome ou modelo do stream para pesquisar informações de esquema. O padrão é {_metadata_stream}. O valor padrão costuma ser suficiente.
- rfcStartDateTime: o DateTime inicial a ser usado para buscar dados no Cloud Storage (https://tools.ietf.org/html/rfc3339). O padrão é
1970-01-01T00:00:00.00Z
. - fileReadConcurrency: o número de arquivos do DataStream simultâneos a serem lidos. O padrão é
10
. - outputProjectId: o ID do projeto do Google Cloud que contém os conjuntos de dados do BigQuery para onde os dados serão enviados. O padrão para esse parâmetro é o projeto em que o pipeline do Dataflow está sendo executado.
- outputStagingTableNameTemplate: o modelo a ser usado para nomear as tabelas de preparo. Por exemplo,
{_metadata_table}
. O padrão é{_metadata_table}_log
. - outputTableNameTemplate: o modelo a ser usado para o nome das tabelas de réplica, por exemplo,
{_metadata_table}
. O padrão é{_metadata_table}
. - ignoreFields: campos separados por vírgulas a serem ignorados no BigQuery. O padrão é
_metadata_stream,_metadata_schema,_metadata_table,_metadata_source,_metadata_tx_id,_metadata_dlq_reconsumed,_metadata_primary_keys,_metadata_error,_metadata_retry_count
. Por exemplo,_metadata_stream,_metadata_schema
. - mergeFrequencyMinutes: o número de minutos entre as mesclagens de uma determinada tabela. O padrão é
5
. - dlqRetryMinutes: o número de minutos entre as novas tentativas de DLQ. O padrão é
10
. - dataStreamRootUrl: o URL raiz da API Datastream. O padrão é: https://datastream.googleapis.com/.
- applyMerge: se as consultas MERGE serão desativadas para o job. O padrão é
true
. - mergeConcurrency: o número de consultas MERGE do BigQuery simultâneas. Só é eficaz quando o atributo "applyMerge" é definido como verdadeiro. O padrão é
30
. - partitionRetentionDays: o número de dias a serem usados para a retenção de partição ao executar mesclagens do BigQuery. O padrão é
1
. - useStorageWriteApiAtLeastOnce: esse parâmetro só entra em vigor se
Use BigQuery Storage Write API
estiver ativado. Setrue
, a semântica do tipo "pelo menos uma vez" será usada para a API Storage Write. Caso contrário, será usada apenas uma semântica. O padrão éfalse
. - javascriptTextTransformGcsPath: o URI do Cloud Storage do arquivo .js que define a função JavaScript definida pelo usuário (UDF) a ser usada. Por exemplo,
gs://my-bucket/my-udfs/my_file.js
. - javascriptTextTransformFunctionName: o nome da função definida pelo usuário (UDF) do JavaScript a ser usada. Por exemplo, se o código de função do JavaScript for
myTransform(inJson) { /*...do stuff...*/ }
, o nome da função serámyTransform
. Para ver exemplos de UDFs em JavaScript, consulte os exemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). - javascriptTextTransformReloadIntervalMinutes: especifica a frequência de recarregamento da UDF em minutos. Se o valor for maior que 0, o Dataflow vai verificar periodicamente o arquivo da UDF no Cloud Storage e vai atualizar a UDF se o arquivo for modificado. Com esse parâmetro, é possível atualizar a UDF enquanto o pipeline está em execução, sem precisar reiniciar o job. Se o valor for
0
, o recarregamento da UDF será desativado. O valor padrão é0
. - pythonTextTransformGcsPath: o padrão de caminho do Cloud Storage para o código Python que contém as funções definidas pelo usuário. Por exemplo,
gs://your-bucket/your-transforms/*.py
. - pythonRuntimeVersion: a versão do ambiente de execução a ser usada para essa UDF do Python.
- pythonTextTransformFunctionName: o nome da função a ser chamada no arquivo JavaScript. Use apenas letras, dígitos e sublinhados. Por exemplo,
transform_udf1
. - runtimeRetries: o número de vezes que um ambiente de execução será repetido antes de falhar. O padrão é 5.
- useStorageWriteApi: se verdadeiro, o pipeline usa a API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). O valor padrão é
false
. Para mais informações, consulte Como usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams: ao usar a API Storage Write, especifica o número de fluxos de gravação. Se
useStorageWriteApi
fortrue
euseStorageWriteApiAtLeastOnce
forfalse
, será necessário definir esse parâmetro. Padrão: 0. - storageWriteApiTriggeringFrequencySec: ao usar a API Storage Write, especifica a frequência de acionamento, em segundos. Se
useStorageWriteApi
fortrue
euseStorageWriteApiAtLeastOnce
forfalse
, será necessário definir esse parâmetro.
Função definida pelo usuário
Também é possível estender esse modelo escrevendo uma função definida pelo usuário (UDF). O modelo chama a UDF para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON. Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.
Especificação da função
A UDF tem a seguinte especificação:
Executar o modelo
Console
- Acesse a página Criar job usando um modelo do Dataflow. Acesse Criar job usando um modelo
- No campo Nome do job, insira um nome exclusivo.
- Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é
us-central1
.Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.
- No menu suspenso Modelo do Dataflow, selecione the Datastream to BigQuery template.
- Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
- Opcional: para alternar do processamento "Exatamente uma vez" para o modo de streaming "Pelo menos uma vez", selecione Pelo menos uma vez.
- Cliquem em Executar job.
gcloud
No shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --enable-streaming-engine \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \ --parameters \ inputFilePattern=GCS_FILE_PATH,\ gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\ outputStagingDatasetTemplate=BIGQUERY_DATASET,\ outputDatasetTemplate=BIGQUERY_DATASET,\ outputStagingTableNameTemplate=BIGQUERY_TABLE,\ outputTableNameTemplate=BIGQUERY_TABLE_log
Substitua:
PROJECT_ID
: o ID do projeto do Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaREGION_NAME
: a região em que você quer implantar o job do Dataflow, por exemplo,us-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH
: o caminho do Cloud Storage para os dados do Datastream. Exemplo:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: a assinatura do Pub/Sub para ler os arquivos alterados. Por exemplo,projects/my-project-id/subscriptions/my-subscription-id
.BIGQUERY_DATASET
: nome do conjunto de dados do BigQuery.BIGQUERY_TABLE
: o modelo da tabela do BigQuery. Por exemplo,{_metadata_schema}_{_metadata_table}_log
API
Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a
API e os respectivos escopos de autorização, consulte
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME", "outputStagingDatasetTemplate": "BIGQUERY_DATASET", "outputDatasetTemplate": "BIGQUERY_DATASET", "outputStagingTableNameTemplate": "BIGQUERY_TABLE", "outputTableNameTemplate": "BIGQUERY_TABLE_log" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery", } }
Substitua:
PROJECT_ID
: o ID do projeto do Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaLOCATION
: a região em que você quer implantar o job do Dataflow, por exemplo,us-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH
: o caminho do Cloud Storage para os dados do Datastream. Exemplo:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: a assinatura do Pub/Sub para ler os arquivos alterados. Por exemplo,projects/my-project-id/subscriptions/my-subscription-id
.BIGQUERY_DATASET
: nome do conjunto de dados do BigQuery.BIGQUERY_TABLE
: o modelo da tabela do BigQuery. Por exemplo,{_metadata_schema}_{_metadata_table}_log
A seguir
- Saiba como implementar o Datastream e o Dataflow para análise.
- Saiba mais sobre os modelos do Dataflow.
- Confira a lista de modelos fornecidos pelo Google.