Modèle Datastream vers MySQL ou PostgreSQL (Flux)

Le modèle Datastream vers SQL est un pipeline de streaming qui lit les données Datastream et les réplique dans n'importe quelle base de données MySQL ou PostgreSQL. Le modèle lit les données depuis Cloud Storage à l'aide de notifications Pub/Sub et les réplique dans des tables dupliquées SQL.

Le modèle n'est pas compatible avec le langage de définition de données (LDD) et attend que toutes les tables existent déjà dans la base de données. La réplication utilise des transformations avec état Dataflow pour filtrer les données obsolètes et assurer la cohérence des données dans le désordre. Par exemple, si une version plus récente d'une ligne est déjà transmise, une version tardive de cette ligne est ignorée. Le langage de manipulation de données (LMD) qui s'exécute constitue le meilleur moyen de répliquer parfaitement les données cibles. Les instructions LMD exécutées respectent les règles suivantes :

  • Si une clé primaire existe, les opérations d'insertion et de mise à jour utilisent une syntaxe upsert (par exemple, INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE).
  • Si des clés primaires existent, les suppressions sont répliquées en tant que suppression LMD.
  • Si aucune clé primaire n'existe, les opérations d'insertion et de mise à jour sont insérées dans la table.
  • Si aucune clé primaire n'existe, les suppressions sont ignorées.

Si vous utilisez les utilitaires Oracle vers Postgres, ajoutez ROWID dans SQL en tant que clé primaire lorsqu'il n'en existe pas.

Conditions requises pour ce pipeline

  • Un flux DataStream prêt à répliquer ou qui réplique déjà des données.
  • Les notifications Pub/Sub pour Cloud Storage sont activées pour les données DataStream.
  • Une base de données PostgreSQL a été ajoutée au schéma requis.
  • L'accès réseau entre les nœuds de calcul Dataflow et PostgreSQL est configuré.

Paramètres de modèle

Paramètres Description
inputFilePattern Emplacement des fichiers Datastream à répliquer dans Cloud Storage. Cet emplacement de fichier est généralement le chemin racine du flux.
gcsPubSubSubscription Abonnement Pub/Sub avec notifications de fichier Datastream. Par exemple, projects/my-project-id/subscriptions/my-subscription-id.
inputFileFormat Format du fichier de sortie généré par Datastream. Par exemple, avro,json. Par défaut, avro.
databaseHost Hôte SQL sur lequel se connecter.
databaseUser L'utilisateur SQL disposant de toutes les autorisations requises pour écrire dans toutes les tables de réplication.
databasePassword Mot de passe de l'utilisateur SQL donné.
databasePort (Facultatif) Port de la base de données SQL auquel se connecter. Par défaut, 5432.
databaseName (Facultatif) Nom de la base de données SQL à laquelle se connecter. Par défaut, postgres.
streamName (Facultatif) Nom ou modèle du flux à interroger pour les informations de schéma. Par défaut, {_metadata_stream}.

Exécuter le modèle

Console

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Cloud Datastream to SQL template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH : chemin d'accès Cloud Storage aux données Datastream. Par exemple : gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME : abonnement Pub/Sub à partir duquel lire les fichiers modifiés. Exemple : projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST : adresse IP de votre hôte SQL.
  • DATABASE_USER : votre utilisateur SQL.
  • DATABASE_PASSWORD : votre mot de passe SQL

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH : chemin d'accès Cloud Storage aux données Datastream. Par exemple : gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME : abonnement Pub/Sub à partir duquel lire les fichiers modifiés. Exemple : projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST : adresse IP de votre hôte SQL.
  • DATABASE_USER : votre utilisateur SQL.
  • DATABASE_PASSWORD : votre mot de passe SQL

Étapes suivantes