Modelo do Datastream para BigQuery (Stream)

O modelo do Datastream para o BigQuery é um pipeline de streaming que lê dados do Datastream e os replica no BigQuery. O modelo lê dados do Cloud Storage usando notificações do Pub/Sub e os replica em uma tabela de preparo particionada do BigQuery. Após a replicação, o modelo executa um MERGE no BigQuery para mesclar todas as alterações de change data capture (CDC) em uma réplica da tabela de origem.

O modelo lida com a criação e a atualização das tabelas do BigQuery gerenciadas pela replicação. Quando a linguagem de definição de dados (DDL) é obrigatória, um callback para o Datastream extrai o esquema da tabela de origem e o converte em tipos de dados do BigQuery. As operações compatíveis incluem:

  • Novas tabelas são criadas à medida que os dados são inseridos.
  • Novas colunas são adicionadas às tabelas do BigQuery com valores iniciais nulos.
  • As colunas descartadas são ignoradas no BigQuery, e os valores futuros são nulos.
  • As colunas renomeadas são adicionadas ao BigQuery como novas colunas.
  • As alterações de tipo não são propagadas para o BigQuery.

É recomendável executar esse pipeline usando o modo de streaming "Pelo menos uma vez", porque o modelo realiza a eliminação de duplicação ao mesclar dados de uma tabela temporária do BigQuery com a tabela do BigQuery principal. Essa etapa no pipeline significa que não há benefícios extras em usar o modo de streaming "Exatamente uma vez".

Requisitos de pipeline

  • Um stream do Datastream que está pronto ou já está replicando dados.
  • As notificações do Pub/Sub do Cloud Storage estão ativadas para os dados do Datastream.
  • Os conjuntos de dados de destino do BigQuery são criados, e a conta de serviço do Compute Engine recebe acesso de administrador a eles.
  • Uma chave primária é necessária na tabela de origem para a criação da tabela de réplica de destino.
  • Um banco de dados de origem MySQL ou Oracle. Os bancos de dados PostgreSQL e SQL Server não são compatíveis.

Parâmetros do modelo

Parâmetros obrigatórios

  • inputFilePattern: o local do arquivo da saída do arquivo Datastream no Cloud Storage, no formato: gs://<BUCKET_NAME>/<ROOT_PATH>/.
  • inputFileFormat: o formato dos arquivos de saída produzidos pelo Datastream. O valor pode ser "avro" ou "json". O padrão é avro.
  • gcsPubSubSubscription: a assinatura do Pub/Sub usada pelo Cloud Storage para notificar o Dataflow sobre novos arquivos disponíveis para processamento, no formato projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • outputStagingDatasetTemplate : o nome do conjunto de dados que contém as tabelas de preparo. Este parâmetro aceita modelos como {_metadata_dataset}_log ou my_dataset_log. Normalmente, esse parâmetro é um nome de conjunto de dados. O padrão é: {_metadata_dataset}.
  • outputDatasetTemplate: o nome do conjunto de dados que contém as tabelas de réplica. Este parâmetro aceita modelos como {_metadata_dataset} ou my_dataset. Normalmente, esse parâmetro é um nome de conjunto de dados. O padrão é: {_metadata_dataset}.
  • deadLetterQueueDirectory: o caminho usado pelo Dataflow para gravar a saída da fila de mensagens inativas. Esse caminho não pode estar no mesmo caminho de saída do arquivo do Datastream. O padrão é vazio.

Parâmetros opcionais

  • streamName: o nome ou modelo do stream para pesquisar informações de esquema. O padrão é {_metadata_stream}. O valor padrão costuma ser suficiente.
  • rfcStartDateTime: o DateTime inicial a ser usado para buscar dados no Cloud Storage (https://tools.ietf.org/html/rfc3339). O padrão é: 1970-01-01T00:00:00.00Z.
  • fileReadConcurrency: o número de arquivos do DataStream simultâneos a serem lidos. O padrão é 10.
  • outputProjectId: o ID do projeto do Google Cloud que contém os conjuntos de dados do BigQuery para onde os dados serão enviados. O padrão para esse parâmetro é o projeto em que o pipeline do Dataflow está sendo executado.
  • outputStagingTableNameTemplate: o modelo a ser usado para nomear as tabelas de preparo. Por exemplo, {_metadata_table}). O padrão é: {_metadata_table}_log.
  • outputTableNameTemplate: o modelo a ser usado para o nome das tabelas de réplica, por exemplo, {_metadata_table}. O padrão é: {_metadata_table}.
  • ignoreFields: campos separados por vírgulas a serem ignorados no BigQuery. O padrão é: _metadata_stream,_metadata_schema,_metadata_table,_metadata_source,_metadata_tx_id,_metadata_dlq_reconsumed,_metadata_primary_keys,_metadata_error,_metadata_retry_count. Por exemplo: _metadata_stream,_metadata_schema.
  • mergeFrequencyMinutes: o número de minutos entre as mesclagens de uma determinada tabela. O padrão é 5.
  • dlqRetryMinutes: o número de minutos entre as novas tentativas de DLQ. O padrão é 10.
  • dataStreamRootUrl: o URL raiz da API Datastream. O padrão é: https://datastream.googleapis.com/.
  • applyMerge: se as consultas MERGE serão desativadas para o job. O padrão é "true".
  • mergeConcurrency: o número de consultas MERGE do BigQuery simultâneas. Só é eficaz quando o atributo "applyMerge" é definido como verdadeiro. O padrão é: 30.
  • partitionRetentionDays: o número de dias a serem usados para a retenção de partição ao executar mesclagens do BigQuery. O padrão é 1.
  • useStorageWriteApiAtLeastOnce : esse parâmetro só entra em vigor se a opção "Usar a API BigQuery Storage Write" está ativada. Se for verdadeiro, a semântica do tipo "pelo menos uma vez" será usada para a API Storage Write. Caso contrário, será usada apenas uma semântica. O padrão é: falso.
  • javascriptTextTransformGcsPath: o URI do Cloud Storage do arquivo .js que define a função JavaScript definida pelo usuário (UDF) a ser usada. Exemplo: gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: o nome da função JavaScript definida pelo usuário (UDF) a ser usada. Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para ver exemplos de UDFs em JavaScript, consulte os exemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: especifica a frequência de recarregamento da UDF em minutos Se o valor for maior que 0, o Dataflow vai verificar periodicamente o arquivo da UDF no Cloud Storage e vai atualizar a UDF se o arquivo for modificado. Com esse parâmetro, é possível atualizar a UDF enquanto o pipeline está em execução, sem precisar reiniciar o job. Se o valor for 0, o recarregamento da UDF será desativado. O valor padrão é 0.
  • pythonTextTransformGcsPath : o padrão de caminho do Cloud Storage para o código Python que contém as funções definidas pelo usuário. (Exemplo: gs://your-bucket/your-transforms/*.py).
  • pythonRuntimeVersion: a versão do ambiente de execução a ser usada para essa UDF do Python.
  • pythonTextTransformFunctionName: o nome da função a ser chamada no arquivo JavaScript. Use apenas letras, dígitos e sublinhados. Exemplo: transform_udf1.
  • runtimeRetries: o número de vezes que um ambiente de execução será repetido antes de falhar. O padrão é 5.
  • useStorageWriteApi: se verdadeiro, o pipeline usará a API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). O valor padrão é false. Para mais informações, consulte Como usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams : ao usar a API Storage Write, especifica o número de fluxos de gravação. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, será necessário definir esse parâmetro. Padrão: 0.
  • storageWriteApiTriggeringFrequencySec: ao usar a API Storage Write, especifica a frequência de acionamento, em segundos. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, será necessário definir esse parâmetro.

Função definida pelo usuário

Também é possível estender esse modelo escrevendo uma função definida pelo usuário (UDF). O modelo chama a UDF para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON. Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.

Especificação da função

A UDF tem a seguinte especificação:

  • Entrada: os dados da CDC, serializados como uma string JSON.
  • Saída: uma string JSON que corresponde ao esquema da tabela de destino do BigQuery.
  • Executar o modelo

    Console

    1. Acesse a página Criar job usando um modelo do Dataflow.
    2. Acesse Criar job usando um modelo
    3. No campo Nome do job, insira um nome exclusivo.
    4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

      Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

    5. No menu suspenso Modelo do Dataflow, selecione the Datastream to BigQuery template.
    6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
    7. Opcional: para alternar do processamento "Exatamente uma vez" para o modo de streaming "Pelo menos uma vez", selecione Pelo menos uma vez.
    8. Cliquem em Executar job.

    gcloud

    No shell ou no terminal, execute o modelo:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --enable-streaming-engine \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \
        --parameters \
    inputFilePattern=GCS_FILE_PATH,\
    gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
    outputStagingDatasetTemplate=BIGQUERY_DATASET,\
    outputDatasetTemplate=BIGQUERY_DATASET,\
    outputStagingTableNameTemplate=BIGQUERY_TABLE,\
    outputTableNameTemplate=BIGQUERY_TABLE_log
      

    Substitua:

    • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
    • JOB_NAME: um nome de job de sua escolha
    • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: o caminho do Cloud Storage para os dados do Datastream. Exemplo: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: a assinatura do Pub/Sub para ler os arquivos alterados. Por exemplo, projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: nome do conjunto de dados do BigQuery.
    • BIGQUERY_TABLE: o modelo da tabela do BigQuery. Por exemplo, {_metadata_schema}_{_metadata_table}_log

    API

    Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
    
              "inputFilePattern": "GCS_FILE_PATH",
              "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
              "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
              "outputDatasetTemplate": "BIGQUERY_DATASET",
              "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
              "outputTableNameTemplate": "BIGQUERY_TABLE_log"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery",
       }
    }
      

    Substitua:

    • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
    • JOB_NAME: um nome de job de sua escolha
    • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: o caminho do Cloud Storage para os dados do Datastream. Exemplo: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: a assinatura do Pub/Sub para ler os arquivos alterados. Por exemplo, projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: nome do conjunto de dados do BigQuery.
    • BIGQUERY_TABLE: o modelo da tabela do BigQuery. Por exemplo, {_metadata_schema}_{_metadata_table}_log

    A seguir