Usar o Datastream e o Dataflow para fazer streaming de dados para o BigQuery

Nesta página, você vai conhecer as práticas recomendadas para usar o Datastream e o Dataflow para fazer streaming de dados para o BigQuery.

Particionar conjuntos de dados de réplica em chaves definidas pelo usuário

O conjunto de dados de teste no BigQuery é particionado automaticamente. No entanto, por padrão, o conjunto de dados de réplica não é particionado porque as chaves de partição nas tabelas de réplica precisam ser definidas com base em uma lógica de negócios específica, em vez de serem aplicadas pelo Datastream e pelo Dataflow.

Para cada tabela no conjunto de dados de réplica que precisa de particionamento:

  1. Interrompa e drene o job do Dataflow.

  2. Use o editor SQL no BigQuery para executar o seguinte script SQL para cada tabela no conjunto de dados de réplica. Neste exemplo, a tabela actor no conjunto de dados datastream_cdc tem uma coluna last_update que queremos definir como chave de partição. Ao executar o script, você recria a tabela com a chave de partição correta.

    create table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' partition by date(last_update)
    as SELECT * FROM '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    drop table '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    alter table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' rename to 'actor'
    
    
  3. Use o modelo do Datastream para BigQuery para recriar um job do Dataflow.

Executar funções definidas pelo usuário para manipular dados de eventos

Você pode usar o modelo do Datastream para BigQuery para executar uma função JavaScript definida pelo usuário. Para fazer isso, coloque um arquivo contendo a função em um local específico no Cloud Storage. Em seguida, faça o seguinte:

  • Use o parâmetro javascriptTextTransformGcsPath no modelo para especificar o local do arquivo no Cloud Storage que contém a função definida pelo usuário.
  • Use o parâmetro javascriptTextTransformFunctionName para especificar o nome da função JavaScript que você quer chamar como a função definida pelo usuário.

Por exemplo, é possível executar uma função definida pelo usuário para reter registros excluídos nas tabelas do conjunto de dados de réplica no BigQuery. Esse processo é conhecido como exclusão reversível.

Para fazer isso, crie uma função que copie o valor da coluna _metadata_deleted para uma nova coluna chamada is_deleted e, em seguida, redefina o valor da coluna _metadata_deleted como false. Isso faz com que o job do Dataflow ignore os eventos de exclusão e retenha os registros excluídos ao atualizar o conjunto de dados de réplica no BigQuery.

Confira o exemplo de código para esta função definida pelo usuário:

/**
* This function changes the behavior of the Datastream to
* BigQuery template to allow soft deletes.
* @param {string} messageString from DatastreamIO data
* @return {string} same as an input message with an added property
*/
function transform(messageString) {
   // messageString is a JSON object as a string
   var messageJson = JSON.parse(messageString);
    // Moving the deleted flag to a new column will cause the pipeline to soft delete data.
   messageJson['is_deleted'] = messageJson['_metadata_deleted'];
   messageJson['_metadata_deleted'] = false;
    return JSON.stringify(messageJson);
 }

Definir a frequência de mesclagem

Use o parâmetro mergeFrequencyMinutes do modelo do Datastream para o BigQuery para definir a frequência de mesclagem. Este é o número de minutos entre mesclagens de uma determinada tabela no conjunto de dados de réplica no BigQuery. Enquanto os dados históricos são preenchidos, recomendamos que você mantenha a frequência de mesclagem baixa (12 ou 24 horas) para manter os custos sob controle.

Por exemplo, se você definir o valor desse parâmetro como 10 minutos, o Dataflow vai executar o job que usa o modelo a cada 10 minutos. No entanto, na primeira execução do job, haverá um atraso de cinco minutos. Neste exemplo, se o job for executado às 9h14, a primeira mesclagem vai ocorrer às 9h29 (10 minutos para a mesclagem e 5 minutos para o atraso). A segunda mesclagem vai ocorrer às 9h39, e todas as outras vão ocorrer em intervalos de 10 minutos (9h49, 9h59, 10h09 e assim por diante).

Se você definir a frequência de mesclagem como 60 minutos, o job será executado na hora, após um atraso de cinco minutos para a execução inicial do job. Se o job estiver programado para ser executado às 10h, na verdade, ele será executado às 10h05 devido ao atraso de cinco minutos. Todas as mesclagens subsequentes ocorrerão em intervalos de 60 minutos (11h05, 12h05, 13h05 e assim por diante).

Seja como resultado do controle de custos ou por outros motivos, talvez você não consiga realizar uma mesclagem a uma frequência que atenda às necessidades de sua empresa. Talvez você não tenha os dados mais recentes. Para acessar os dados mais recentes, crie uma visualização sobre as tabelas dos conjuntos de dados de teste e réplica no BigQuery, em que a visualização imita a mesclagem. Essa visualização é criada como uma tabela lógica para os conjuntos de dados de teste e de réplica. Se a frequência de mesclagem for baixa e você precisar de um acesso mais rápido aos dados, use a visualização.