Modelo de Datastream para o BigQuery (stream)

O modelo Datastream para BigQuery é um pipeline de streaming que lê dados do Datastream e os replica no BigQuery. O modelo lê dados do Cloud Storage através de notificações do Pub/Sub e replica-os numa tabela de preparação do BigQuery particionada por tempo. Após a replicação, o modelo executa um MERGE no BigQuery para inserir/atualizar todas as alterações de captura de dados de alterações (CDC) numa réplica da tabela de origem. Especifique o parâmetro gcsPubSubSubscription para ler dados de notificações do Pub/Sub OU forneça o parâmetro inputFilePattern para ler diretamente dados de ficheiros no Cloud Storage.

O modelo processa a criação e a atualização das tabelas do BigQuery geridas pela replicação. Quando é necessária a linguagem de definição de dados (LDD), um callback para o Datastream extrai o esquema da tabela de origem e traduz-o em tipos de dados do BigQuery. As operações compatíveis incluem o seguinte:

  • As novas tabelas são criadas à medida que os dados são inseridos.
  • As novas colunas são adicionadas às tabelas do BigQuery com valores iniciais nulos.
  • As colunas eliminadas são ignoradas no BigQuery e os valores futuros são nulos.
  • As colunas com nomes alterados são adicionadas ao BigQuery como novas colunas.
  • As alterações de tipo não são propagadas para o BigQuery.

Recomendamos que execute este pipeline usando o modo de streaming, pelo menos, uma vez, porque o modelo remove as duplicidades quando une dados de uma tabela temporária do BigQuery à tabela principal do BigQuery. Este passo no pipeline significa que não existe nenhuma vantagem adicional em usar o modo de streaming exatamente uma vez.

Requisitos do pipeline

  • Uma stream do Datastream que está pronta ou já está a replicar dados.
  • As notificações do Pub/Sub do Cloud Storage estão ativadas para os dados do Datastream.
  • Os conjuntos de dados de destino do BigQuery são criados e a conta de serviço do Compute Engine tem acesso de administrador aos mesmos.
  • É necessária uma chave principal na tabela de origem para criar a tabela de réplica de destino.
  • Uma base de dados de origem MySQL ou Oracle. As bases de dados PostgreSQL e SQL Server não são suportadas.

Parâmetros de modelos

Parâmetros obrigatórios

  • inputFilePattern: a localização do ficheiro para a saída do ficheiro Datastream no Cloud Storage, no formato gs://<BUCKET_NAME>/<ROOT_PATH>/.
  • inputFileFormat: o formato dos ficheiros de saída produzidos pelo Datastream. Os valores permitidos são avro e json. A predefinição é avro.
  • gcsPubSubSubscription: a subscrição do Pub/Sub usada pelo Cloud Storage para notificar o Dataflow de novos ficheiros disponíveis para processamento, no formato: projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • outputStagingDatasetTemplate: o nome do conjunto de dados que contém tabelas de preparação. Este parâmetro suporta modelos, por exemplo, {_metadata_dataset}_log ou my_dataset_log. Normalmente, este parâmetro é o nome de um conjunto de dados. A predefinição é {_metadata_dataset}. Nota: para origens MySQL, o nome da base de dados é mapeado para {_metadata_schema} em vez de {_metadata_dataset}.
  • outputDatasetTemplate: o nome do conjunto de dados que contém as tabelas de réplica. Este parâmetro suporta modelos, por exemplo, {_metadata_dataset} ou my_dataset. Normalmente, este parâmetro é o nome de um conjunto de dados. A predefinição é {_metadata_dataset}. Nota: para origens MySQL, o nome da base de dados é mapeado para {_metadata_schema} em vez de {_metadata_dataset}.
  • deadLetterQueueDirectory: o caminho que o Dataflow usa para escrever o resultado da fila de mensagens rejeitadas. Este caminho não pode estar no mesmo caminho que a saída do ficheiro de fluxo de dados. A predefinição é empty.

Parâmetros opcionais

  • streamName: o nome ou o modelo da stream para sondar informações do esquema. A predefinição é: {_metadata_stream}. Normalmente, o valor predefinido é suficiente.
  • rfcStartDateTime: o DateTime de início a usar para obter dados do Cloud Storage (https://tools.ietf.org/html/rfc3339). Predefinição: 1970-01-01T00:00:00.00Z.
  • fileReadConcurrency: o número de ficheiros DataStream a ler em simultâneo. A predefinição é 10.
  • outputProjectId: o ID do projeto do Google Cloud que contém os conjuntos de dados do BigQuery para os quais os dados são enviados. O valor predefinido deste parâmetro é o projeto no qual o pipeline do Dataflow está a ser executado.
  • outputStagingTableNameTemplate: o modelo a usar para dar nome às tabelas de preparação. Por exemplo, {_metadata_table}. A predefinição é {_metadata_table}_log.
  • outputTableNameTemplate: o modelo a usar para o nome das tabelas de réplicas, por exemplo, {_metadata_table}. A predefinição é {_metadata_table}.
  • ignoreFields: campos separados por vírgulas a ignorar no BigQuery. Predefinição: _metadata_stream,_metadata_schema,_metadata_table,_metadata_source,_metadata_tx_id,_metadata_dlq_reconsumed,_metadata_primary_keys,_metadata_error,_metadata_retry_count. Por exemplo, _metadata_stream,_metadata_schema.
  • mergeFrequencyMinutes: o número de minutos entre as uniões para uma determinada tabela. A predefinição é 5.
  • dlqRetryMinutes: o número de minutos entre as novas tentativas da DLQ. A predefinição é 10.
  • dataStreamRootUrl: o URL raiz da API Datastream. O valor predefinido é: https://datastream.googleapis.com/.
  • applyMerge: se deve desativar as consultas MERGE para a tarefa. A predefinição é true.
  • mergeConcurrency: o número de consultas MERGE do BigQuery simultâneas. Só é eficaz quando applyMerge está definido como verdadeiro. A predefinição é 30.
  • partitionRetentionDays: o número de dias a usar para a retenção de partições quando executa as unificações do BigQuery. A predefinição é 1.
  • useStorageWriteApiAtLeastOnce: este parâmetro só tem efeito se Use BigQuery Storage Write API estiver ativado. Se true, a semântica de, pelo menos, uma vez é usada para a API Storage Write. Caso contrário, são usadas semânticas de execução única. A predefinição é false.
  • datastreamSourceType: substitui a deteção do tipo de origem para dados de CDC do Datastream. Quando especificado, este valor é usado em vez de derivar o tipo de origem do campo read_method. Os valores válidos incluem "mysql", "postgresql", "oracle", etc. Este parâmetro é útil quando o campo read_method contém "cdc" e não é possível determinar automaticamente o tipo de origem real.
  • javascriptTextTransformGcsPath: o URI do Cloud Storage do ficheiro .js que define a função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: o nome da função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo, se o código da função JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função é myTransform. Para ver exemplos de UDFs JavaScript, consulte Exemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: especifica a frequência com que o UDF é recarregado, em minutos. Se o valor for superior a 0, o Dataflow verifica periodicamente o ficheiro UDF no Cloud Storage e recarrega a UDF se o ficheiro for modificado. Este parâmetro permite-lhe atualizar a UDF enquanto o pipeline está em execução, sem ter de reiniciar a tarefa. Se o valor for 0, o recarregamento das FDU está desativado. O valor predefinido é 0.
  • pythonTextTransformGcsPath: o padrão do caminho do Cloud Storage para o código Python que contém as suas funções definidas pelo utilizador. Por exemplo, gs://your-bucket/your-transforms/*.py.
  • pythonRuntimeVersion: a versão de tempo de execução a usar para esta FDU Python.
  • pythonTextTransformFunctionName: o nome da função a chamar a partir do seu ficheiro JavaScript. Use apenas letras, dígitos e sublinhados. Por exemplo, transform_udf1.
  • runtimeRetries: o número de vezes que uma execução vai ser repetida antes de falhar. A predefinição é: 5.
  • useStorageWriteApi: se for verdadeiro, o pipeline usa a API Storage Write do BigQuery (https://cloud.google.com/bigquery/docs/write-api). O valor predefinido é false. Para mais informações, consulte a secção Usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: quando usa a API Storage Write, especifica o número de streams de escrita. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, tem de definir este parâmetro. A predefinição é: 0.
  • storageWriteApiTriggeringFrequencySec: quando usa a API Storage Write, especifica a frequência de acionamento, em segundos. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, tem de definir este parâmetro.

Função definida pelo utilizador

Opcionalmente, pode estender este modelo escrevendo uma função definida pelo utilizador (FDU). O modelo chama a FDU para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON. Para mais informações, consulte o artigo Crie funções definidas pelo utilizador para modelos do Dataflow.

Especificação da função

A FDU tem a seguinte especificação:

  • Input: os dados de CDC, serializados como uma string JSON.
  • Saída: uma string JSON que corresponde ao esquema da tabela de destino do BigQuery.
  • Execute o modelo

    Consola

    1. Aceda à página do fluxo de dados Criar tarefa a partir de um modelo.
    2. Aceda a Criar tarefa a partir de modelo
    3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
    4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

      Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.

    5. No menu pendente Modelo do fluxo de dados, selecione the Datastream to BigQuery template.
    6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
    7. Opcional: para mudar do processamento exatamente uma vez para o modo de streaming pelo menos uma vez, selecione Pelo menos uma vez.
    8. Clique em Executar tarefa.

    gcloud

    Na shell ou no terminal, execute o modelo:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --enable-streaming-engine \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \
        --parameters \
    inputFilePattern=GCS_FILE_PATH,\
    gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
    outputStagingDatasetTemplate=BIGQUERY_DATASET,\
    outputDatasetTemplate=BIGQUERY_DATASET,\
    outputStagingTableNameTemplate=BIGQUERY_TABLE,\
    outputTableNameTemplate=BIGQUERY_TABLE_log
      

    Substitua o seguinte:

    • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
    • JOB_NAME: um nome de tarefa exclusivo à sua escolha
    • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: o caminho do Cloud Storage para os dados do Datastream. Por exemplo: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: a subscrição do Pub/Sub para ler ficheiros alterados. Por exemplo: projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: o nome do seu conjunto de dados do BigQuery.
    • BIGQUERY_TABLE: o modelo de tabela do BigQuery. Por exemplo, {_metadata_schema}_{_metadata_table}_log

    API

    Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
    
              "inputFilePattern": "GCS_FILE_PATH",
              "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
              "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
              "outputDatasetTemplate": "BIGQUERY_DATASET",
              "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
              "outputTableNameTemplate": "BIGQUERY_TABLE_log"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery",
       }
    }
      

    Substitua o seguinte:

    • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
    • JOB_NAME: um nome de tarefa exclusivo à sua escolha
    • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: o caminho do Cloud Storage para os dados do Datastream. Por exemplo: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: a subscrição do Pub/Sub para ler ficheiros alterados. Por exemplo: projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: o nome do seu conjunto de dados do BigQuery.
    • BIGQUERY_TABLE: o modelo de tabela do BigQuery. Por exemplo, {_metadata_schema}_{_metadata_table}_log

    O que se segue?