Modèle Datastream vers BigQuery (Flux)

Le modèle Datastream vers BigQuery est un pipeline de streaming qui lit les données Datastream et les réplique dans BigQuery. Le modèle lit les données depuis Cloud Storage à l'aide de notifications Pub/Sub et les réplique dans une table de préproduction BigQuery partitionnée par date. Après la réplication, le modèle exécute un objet MERGE dans BigQuery pour sauvegarder toutes les modifications apportées par la capture de données modifiées dans une instance dupliquée de la table source.

Le modèle gère la création et la mise à jour des tables BigQuery gérées par la réplication. Lorsque le langage de définition de données (LDD) est requis, un rappel à Datastream extrait le schéma de la table source et le convertit en types de données BigQuery. Les opérations suivantes sont acceptées :

  • Des tables sont créées à mesure que des données sont insérées.
  • Les nouvelles colonnes sont ajoutées aux tables BigQuery avec des valeurs initiales nulles.
  • Les colonnes supprimées sont ignorées dans BigQuery, et les valeurs futures sont nulles.
  • Les colonnes renommées sont ajoutées à BigQuery en tant que nouvelles colonnes.
  • Les modifications de type ne sont pas transmises à BigQuery.

Il est recommandé d'exécuter ce pipeline en utilisant le mode de traitement par flux de type "au moins une fois", car le modèle effectue une déduplication lorsqu'il fusionne les données d'une table BigQuery temporaire vers la table BigQuery principale. Cette étape du pipeline signifie qu'il n'y a aucun avantage supplémentaire à utiliser le mode de traitement par flux de type "exactement une fois".

Conditions requises pour ce pipeline

  • Un flux DataStream prêt à répliquer ou qui réplique déjà des données.
  • Les notifications Pub/Sub pour Cloud Storage sont activées pour les données DataStream.
  • Les ensembles de données de destination BigQuery sont créés et un compte administrateur a été accordé au compte de service Compute Engine.
  • Une clé primaire est nécessaire dans la table source pour que la table dupliquée de destination soit créée.
  • Une base de données source MySQL ou Oracle. Les bases de données PostgreSQL ne sont pas compatibles.

Paramètres de modèle

Paramètres Description
inputFilePattern Emplacement des fichiers Datastream à répliquer dans Cloud Storage. Cet emplacement de fichier est généralement le chemin racine du flux.
gcsPubSubSubscription Abonnement Pub/Sub avec notifications de fichier Datastream. Par exemple, projects/my-project-id/subscriptions/my-subscription-id.
inputFileFormat Format du fichier de sortie généré par Datastream. Par exemple, avro,json. Par défaut, avro.
outputStagingDatasetTemplate Nom d'un ensemble de données existant contenant les tables de préproduction. Vous pouvez inclure le modèle {_metadata_dataset} en tant qu'espace réservé qui est remplacé par le nom de l'ensemble de données ou le schéma source (par exemple {_metadata_dataset}_log).
outputDatasetTemplate Nom d'un ensemble de données existant devant contenir les tables dupliquées Vous pouvez inclure le modèle {_metadata_dataset} en tant qu'espace réservé qui est remplacé par le nom de l'ensemble de données ou le schéma source (par exemple, {_metadata_dataset}).
deadLetterQueueDirectory Chemin d'accès au fichier de stockage des messages non traités et le motif pour lesquels ils n'ont pas pu être traités. La valeur par défaut est un répertoire situé sous l'emplacement temporaire de la tâche Dataflow. La valeur par défaut est suffisante dans la plupart des conditions.
outputStagingTableNameTemplate Facultatif : Modèle pour le nom des tables de préproduction. La valeur par défaut est {_metadata_table}_log. Si vous répliquez plusieurs schémas, la solution suggérée est {_metadata_schema}_{_metadata_table}_log.
outputTableNameTemplate Facultatif : Modèle pour le nom des tables dupliquées. Par défaut, {_metadata_table}. Si vous répliquez plusieurs schémas, la solution suggérée est {_metadata_schema}_{_metadata_table}.
outputProjectId Facultatif : Projet associé aux ensembles de données BigQuery dans lesquels exporter des données. La valeur par défaut de ce paramètre correspond au projet dans lequel le pipeline Dataflow est exécuté.
streamName Facultatif : Nom ou modèle du flux à interroger pour les informations de schéma. Par défaut, {_metadata_stream}.
mergeFrequencyMinutes Facultatif : Nombre de minutes entre les fusions pour une table donnée. Par défaut, 5.
dlqRetryMinutes Facultatif : Nombre de minutes entre les tentatives d'exécution de la file d'attente de lettres mortes. Par défaut, 10.
javascriptTextTransformGcsPath Facultatif : URI Cloud Storage du fichier .js qui définit la fonction JavaScript définie par l'utilisateur (UDF, user-defined function) que vous souhaitez utiliser. Exemple : gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName Facultatif : Nom de la fonction JavaScript définie par l'utilisateur (UDF) que vous souhaitez utiliser. Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.
useStorageWriteApi Facultatif : si la valeur est true, le pipeline utilise l'API BigQuery Storage Write. La valeur par défaut est false. Pour en savoir plus, consultez la page Utiliser l'API Storage Write.
useStorageWriteApiAtLeastOnce Facultatif : spécifie la sémantique d'écriture, lorsque vous utilisez l'API Storage Write. Pour utiliser la sémantique de type "au moins une fois", définissez ce paramètre sur true. Pour utiliser la sémantique de type "exactement une fois", définissez le paramètre sur false. Ce paramètre ne s'applique que lorsque la valeur de useStorageWriteApi est définie sur true. La valeur par défaut est false.
numStorageWriteApiStreams Facultatif : spécifie le nombre de flux d'écriture, lorsque vous utilisez l'API Storage Write. Si useStorageWriteApi est défini sur true et useStorageWriteApiAtLeastOnce sur false, vous devez définir ce paramètre.
storageWriteApiTriggeringFrequencySec Facultatif : spécifie la fréquence de déclenchement, en secondes, Lorsque vous utilisez l'API Storage Write. Si useStorageWriteApi est défini sur true et useStorageWriteApiAtLeastOnce sur false, vous devez définir ce paramètre.
applyMerge Facultatif: spécifie si le modèle exécute une instruction MERGE dans BigQuery après la réplication des données vers la table de préproduction. Par défaut : true.
fileReadConcurrency Facultatif: nombre de fichiers Datastream à lire simultanément. Par défaut : 10.
mergeConcurrency Facultatif: le nombre d'instructions MERGE BigQuery simultanées. Par défaut : 30.
partitionRetentionDays Facultatif: nombre de jours à utiliser pour la conservation des partitions lors de l'exécution d'instructions MERGE BigQuery. Par défaut : 1.
rfcStartDateTime Facultatif: heure de début pour la lecture des fichiers de Cloud Storage, sous forme de valeur RFC 3339 de date et heure. Par défaut : 1970-01-01T00:00:00.00Z.

Fonction définie par l'utilisateur

Vous pouvez éventuellement étendre ce modèle en écrivant une fonction définie par l'utilisateur (UDF). Le modèle appelle l'UDF pour chaque élément d'entrée. Les charges utiles des éléments sont sérialisées sous forme de chaînes JSON. Pour en savoir plus, consultez la page Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.

Spécification de la fonction

La spécification de l'UDF se présente comme suit :

  • Entrée : données CDC sérialisées en tant que chaîne JSON.
  • Sortie : chaîne JSON correspondant au schéma de la table de destination BigQuery.
  • Exécuter le modèle

    Console

    1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
    2. Accéder à la page Créer un job à partir d'un modèle
    3. Dans le champ Nom du job, saisissez un nom de job unique.
    4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

      Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

    5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Datastream to BigQuery template.
    6. Dans les champs fournis, saisissez vos valeurs de paramètres.
    7. Facultatif : Pour passer du traitement de type "exactement une fois" au mode de traitement par flux de type "au moins une fois", sélectionnez Au moins une fois.
    8. Cliquez sur Run Job (Exécuter la tâche).

    gcloud

    Dans le shell ou le terminal, exécutez le modèle :

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --enable-streaming-engine \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \
        --parameters \
    inputFilePattern=GCS_FILE_PATH,\
    gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
    outputStagingDatasetTemplate=BIGQUERY_DATASET,\
    outputDatasetTemplate=BIGQUERY_DATASET,\
    outputStagingTableNameTemplate=BIGQUERY_TABLE,\
    outputTableNameTemplate=BIGQUERY_TABLE_log
      

    Remplacez les éléments suivants :

    • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
    • JOB_NAME : nom de job unique de votre choix
    • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH : chemin d'accès Cloud Storage aux données Datastream. Par exemple : gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME : abonnement Pub/Sub à partir duquel lire les fichiers modifiés. Exemple : projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET : nom de votre ensemble de données BigQuery.
    • BIGQUERY_TABLE : votre modèle de table BigQuery. Par exemple, {_metadata_schema}_{_metadata_table}_log.

    API

    Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
    
              "inputFilePattern": "GCS_FILE_PATH",
              "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
              "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
              "outputDatasetTemplate": "BIGQUERY_DATASET",
              "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
              "outputTableNameTemplate": "BIGQUERY_TABLE_log"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery",
       }
    }
      

    Remplacez les éléments suivants :

    • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
    • JOB_NAME : nom de job unique de votre choix
    • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH : chemin d'accès Cloud Storage aux données Datastream. Par exemple : gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME : abonnement Pub/Sub à partir duquel lire les fichiers modifiés. Exemple : projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET : nom de votre ensemble de données BigQuery.
    • BIGQUERY_TABLE : votre modèle de table BigQuery. Par exemple, {_metadata_schema}_{_metadata_table}_log.

    Étapes suivantes