Modelo do Datastream para BigQuery (Stream)

O modelo do Datastream para o BigQuery é um pipeline de streaming que lê dados do Datastream e os replica no BigQuery. O modelo lê dados do Cloud Storage usando notificações do Pub/Sub e os replica em uma tabela de preparo particionada do BigQuery. Após a replicação, o modelo executa um MERGE no BigQuery para mesclar todas as alterações de change data capture (CDC) em uma réplica da tabela de origem.

O modelo lida com a criação e a atualização das tabelas do BigQuery gerenciadas pela replicação. Quando a linguagem de definição de dados (DDL) é obrigatória, um callback para o Datastream extrai o esquema da tabela de origem e o converte em tipos de dados do BigQuery. As operações compatíveis incluem:

  • Novas tabelas são criadas à medida que os dados são inseridos.
  • Novas colunas são adicionadas às tabelas do BigQuery com valores iniciais nulos.
  • As colunas descartadas são ignoradas no BigQuery, e os valores futuros são nulos.
  • As colunas renomeadas são adicionadas ao BigQuery como novas colunas.
  • As alterações de tipo não são propagadas para o BigQuery.

É recomendável executar esse pipeline usando o modo de streaming "Pelo menos uma vez", porque o modelo realiza a eliminação de duplicação ao mesclar dados de uma tabela temporária do BigQuery com a tabela do BigQuery principal. Essa etapa no pipeline significa que não há benefícios extras em usar o modo de streaming "Exatamente uma vez".

Requisitos de pipeline

  • Um stream do Datastream que está pronto ou já está replicando dados.
  • As notificações do Pub/Sub do Cloud Storage estão ativadas para os dados do Datastream.
  • Os conjuntos de dados de destino do BigQuery são criados, e a conta de serviço do Compute Engine recebe acesso de administrador a eles.
  • Uma chave primária é necessária na tabela de origem para a criação da tabela de réplica de destino.
  • Um banco de dados de origem MySQL ou Oracle. Os bancos de dados PostgreSQL não são compatíveis.

Parâmetros do modelo

Parâmetro Descrição
inputFilePattern O local dos arquivos do Datastream que serão replicados no Cloud Storage. Normalmente, esse local de arquivo é o caminho raiz do stream.
gcsPubSubSubscription A assinatura do Pub/Sub com notificações de arquivos do Datastream. Por exemplo, projects/my-project-id/subscriptions/my-subscription-id.
inputFileFormat O formato do arquivo de saída produzido pelo Datastream. Por exemplo, avro,json. Padrão: avro.
outputStagingDatasetTemplate O nome de um conjunto de dados existente para conter tabelas de preparo. É possível incluir o modelo {_metadata_dataset} como um marcador que é substituído pelo nome do seu conjunto de dados/esquema de origem (por exemplo, {_metadata_dataset}_log).
outputDatasetTemplate O nome de um conjunto de dados existente para conter tabelas de réplica. É possível incluir o modelo {_metadata_dataset} como um marcador que é substituído pelo nome do conjunto de dados/esquema de origem (por exemplo, {_metadata_dataset}).
deadLetterQueueDirectory O caminho do arquivo para armazenar todas as mensagens não processadas com o motivo de falha no processamento. O padrão é um diretório no local temporário do job do Dataflow. O valor padrão é suficiente na maioria das condições.
outputStagingTableNameTemplate Opcional: O modelo para o nome das tabelas de preparo. O padrão é {_metadata_table}_log. Se você estiver replicando vários esquemas, a sugestão é {_metadata_schema}_{_metadata_table}_log.
outputTableNameTemplate Opcional: O modelo para o nome das tabelas de réplica. Padrão: {_metadata_table}. Se você estiver replicando vários esquemas, a sugestão é {_metadata_schema}_{_metadata_table}.
outputProjectId Opcional: Projeto para conjuntos de dados do BigQuery em que os dados serão gerados. O padrão para esse parâmetro é o projeto em que o pipeline do Dataflow está sendo executado.
streamName Opcional: O nome ou modelo do stream para pesquisar informações de esquema. Padrão: {_metadata_stream}.
mergeFrequencyMinutes Opcional: O número de minutos entre as mesclagens de uma determinada tabela. Padrão: 5.
dlqRetryMinutes Opcional: O número de minutos entre novas tentativas de fila de mensagens inativas (DLQ) Padrão: 10.
javascriptTextTransformGcsPath Opcional: O URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF) do JavaScript que você quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName Opcional: O nome da função definida pelo usuário (UDF) do JavaScript que você quer usar. Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.
useStorageWriteApi Opcional: Se true, o pipeline usa a API BigQuery Storage Write. O valor padrão é false. Para mais informações, consulte Como usar a API Storage Write.
useStorageWriteApiAtLeastOnce Opcional: Ao usar a API Storage Write, especifica a semântica de gravação. Para usar semântica pelo menos uma vez, defina esse parâmetro como true. Para usar semântica exatamente uma vez, defina o parâmetro como false. Esse parâmetro se aplica apenas quando useStorageWriteApi é true. O valor padrão é false.
numStorageWriteApiStreams Opcional: Ao usar a API Storage Write, especifica o número de fluxos de gravação. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, você precisará definir esse parâmetro.
storageWriteApiTriggeringFrequencySec Opcional: Ao usar a API Storage Write, especifica a frequência de acionamento, em segundos. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, você precisará definir esse parâmetro.
applyMerge Opcional: especifica se o modelo executa uma instrução MERGE no BigQuery depois de replicar dados para a tabela de preparo. Padrão: true.
fileReadConcurrency Opcional: o número de arquivos do Datastream que serão lidos simultaneamente. Padrão: 10.
mergeConcurrency Opcional: o número de instruções MERGE simultâneas do BigQuery. Padrão: 30.
partitionRetentionDays Opcional: o número de dias a serem usados para a retenção de partição ao executar instruções MERGE do BigQuery. Padrão: 1;
rfcStartDateTime Opcional: o horário de início da leitura de arquivos do Cloud Storage, como um valor de data e hora RFC 3339. Padrão: 1970-01-01T00:00:00.00Z.

Função definida pelo usuário

Também é possível estender esse modelo escrevendo uma função definida pelo usuário (UDF, na sigla em inglês). O modelo chama a UDF para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON. Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.

Especificação da função

A UDF tem a seguinte especificação:

  • Entrada: os dados da CDC, serializados como uma string JSON.
  • Saída: uma string JSON que corresponde ao esquema da tabela de destino do BigQuery.
  • Executar o modelo

    Console

    1. Acesse a página Criar job usando um modelo do Dataflow.
    2. Acesse Criar job usando um modelo
    3. No campo Nome do job, insira um nome exclusivo.
    4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

      Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

    5. No menu suspenso Modelo do Dataflow, selecione the Datastream to BigQuery template.
    6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
    7. Opcional: para alternar do processamento "Exatamente uma vez" para o modo de streaming "Pelo menos uma vez", selecione Pelo menos uma vez.
    8. Cliquem em Executar job.

    gcloud

    No shell ou no terminal, execute o modelo:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --enable-streaming-engine \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \
        --parameters \
    inputFilePattern=GCS_FILE_PATH,\
    gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
    outputStagingDatasetTemplate=BIGQUERY_DATASET,\
    outputDatasetTemplate=BIGQUERY_DATASET,\
    outputStagingTableNameTemplate=BIGQUERY_TABLE,\
    outputTableNameTemplate=BIGQUERY_TABLE_log
      

    Substitua:

    • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
    • JOB_NAME: um nome de job de sua escolha
    • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: o caminho do Cloud Storage para os dados do Datastream. Exemplo: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: a assinatura do Pub/Sub para ler os arquivos alterados. Por exemplo, projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: nome do conjunto de dados do BigQuery.
    • BIGQUERY_TABLE: o modelo da tabela do BigQuery. Por exemplo, {_metadata_schema}_{_metadata_table}_log

    API

    Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
    
              "inputFilePattern": "GCS_FILE_PATH",
              "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
              "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
              "outputDatasetTemplate": "BIGQUERY_DATASET",
              "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
              "outputTableNameTemplate": "BIGQUERY_TABLE_log"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery",
       }
    }
      

    Substitua:

    • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
    • JOB_NAME: um nome de job de sua escolha
    • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: o caminho do Cloud Storage para os dados do Datastream. Exemplo: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: a assinatura do Pub/Sub para ler os arquivos alterados. Por exemplo, projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: nome do conjunto de dados do BigQuery.
    • BIGQUERY_TABLE: o modelo da tabela do BigQuery. Por exemplo, {_metadata_schema}_{_metadata_table}_log

    A seguir