Présentation
Dans cette section, vous allez apprendre à utiliser l'API Datastream pour :
- Valider et créer des flux
- Obtenir des informations sur les flux et les objets de flux
- Mettre à jour les flux en démarrant, en suspendant, en reprenant et en modifiant ces flux, ainsi qu'en lançant et en arrêtant le remplissage des objets des flux
- Supprimer des flux
Vous pouvez utiliser l'API Datastream de deux manières. Vous pouvez effectuer des appels d'API REST ou utiliser l'outil de ligne de commande (CLI) gcloud
.
Pour obtenir des informations générales sur l'utilisation de gcloud
afin de gérer les flux Datastream, cliquez ici.
Valider un flux
Avant de créer un flux, vous pouvez le valider. Ainsi, vous avez la garantie que le flux s'exécutera correctement et que tous les contrôles de validation réussiront.
Valider une vérification de flux :
- Indique si la source est correctement configurée pour autoriser Datastream à diffuser des données à partir de celle-ci.
- Indique si le flux peut se connecter à la source et à la destination.
- Configuration de bout en bout du flux.
Le code suivant montre une requête permettant de valider un flux utilisé pour transférer des données d'une base de données Oracle source vers un bucket de destination dans Cloud Storage.
REST
POST "https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams?streamId=[stream-id]&validate_only=true" { "displayName": "[display name]", "sourceConfig": { "sourceConnectionProfileName": "[connectionProfileName]", "oracleSourceConfig": { "allowlist": {}, "rejectlist": {} } } "destinationConfig": { "destinationConnectionProfileName": "[connectionProfileName]", "gcsDestinationConfig": { "path": "[filePrefix]", "avroFileFormat": "{}" "fileRotationMb": MBytes "fileRotationInterval": seconds } }, "backfillAll": {} }
La valeur &validate_only=true
indique que vous validez uniquement le flux. Vous ne le créez pas. Dans cette requête, placez l'URL entière entre guillemets. Ainsi, Datastream récupérera la valeur &validate_only=true
pour valider le flux.
Une fois cette requête envoyée, vous voyez les vérifications de validation exécutées par Datastream pour votre source et votre destination, ainsi que les réussites ou les échecs des contrôles. Si un contrôle échoue, des informations indiquent la raison de l'échec et la procédure à suivre pour corriger le problème.
Une fois les corrections appropriées effectuées, effectuez à nouveau la requête pour vous assurer que tous les contrôles de validation sont concluants.
Créer un flux
Le code suivant montre une requête permettant de créer un flux utilisé pour transférer des données d'une base de données Oracle source vers un bucket de destination dans Cloud Storage.
REST
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams?streamId=[stream-id] { "displayName": "[display name]", "sourceConfig": { "sourceConnectionProfileName": "[connectionProfileName]", "oracleSourceConfig": { "allowlist": {}, "rejectlist": {} } } "destinationConfig": { "destinationConnectionProfileName": "[connectionProfileName]", "gcsDestinationConfig": { "path": "[filePrefix]", "avroFileFormat": "{}" "fileRotationMb": MBytes "fileRotationInterval": seconds } }, "backfillAll": {} }
Par exemple, voici une requête pour extraire toutes les tables de schema1 et deux tables spécifiques de schema3 : tableA et tableC.
Les événements sont écrits dans un bucket Cloud Storage au format Avro, et un fichier est créé toutes les 100 Mo ou 30 secondes (en remplaçant les valeurs par défaut de 50 Mo et 60 secondes).
Le paramètre backfillAll
est associé au remplissage d'historique. Si vous définissez ce paramètre sur un dictionnaire vide ({}), DataStream remplira les champs suivants :
- Données d'historique, en plus des modifications en cours sur les données, de la base de données source vers la destination
- Schémas et tables, de la source vers la destination
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleCdcStream { "displayName": "Oracle CDC to Cloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/ streams/myOracleDb", "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schemaName": "schema1", "oracleTables": [] }, { "schemaName": "schema3", "oracleTables": [ { "tableName": "tableA" }, { "tableName": "tableC" } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "myGcsBucket", "gcsDestinationConfig": { "path": "/folder1", "avroFileFormat": "{}", "fileRotationMb": 100, "fileRotationInterval": 30 } }, "backfillAll": {} }
Obtenir des informations sur un flux
Le code suivant montre une requête de récupération d'informations sur un flux. Parmi ces informations, on peut citer :
- Nom du flux reconnu par Datastream
- Nom convivial du flux (nom à afficher)
- Date et d'heure de création et de mise à jour du flux
- Informations sur les profils de connexion source et de destination associés au flux
- État du flux
REST
GET https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]
Exemple :
GET https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myOracleCdcStream
La réponse s'affiche comme suit :
{ "name": "myOracleCdcStream", "displayName": "Oracle CDC to Cloud Storage", "createTime": "2019-12-15T15:01:23.045123456Z", "updateTime": "2019-12-15T15:01:23.045123456Z", "sourceConfig": { "sourceConnectionProfileName": "myOracleDb", "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schemaName": "schema1", "oracleTables": [] }, { "schemaName": "schema3", "oracleTables": [ { "tableName": "tableA" }, { "tableName": "tableC" } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "myGcsBucket", "gcsDestinationConfig": { "path": "/folder1", "avroFileFormat": "{}", "fileRotationMb": 100, "fileRotationInterval": 60 } }, "state": "RUNNING" "backfillAll": {} }
gcloud
Pour en savoir plus sur l'utilisation de gcloud
afin de récupérer des informations sur votre flux, cliquez ici.
Répertorier les flux
Le code ci-dessous montre une requête de récupération d'informations sur tous vos flux.
REST
GET https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams
gcloud
Pour en savoir plus sur l'utilisation de gcloud
afin de récupérer des informations sur tous vos flux, cliquez ici.
Répertorier les objets d'un flux
Le code suivant montre une requête de récupération d'informations sur tous les objets d'un flux.
REST
GET https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]/objects
Exemple :
GET https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myMySQLCdcStream/objects
La liste des objets renvoyés peut ressembler à ceci :
REST
{ "streamObjects": [ { "name": "projects/[project-id]/locations/[location]/streams/[stream-id]/ objects/[object-id]", "displayName": "employees.salaries", "backfillJob": { "state": "ACTIVE", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T12:12:26.344878Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "salaries" } } }, { "name": "projects/[project-id]/locations/[location]/streams/[stream-id]/ objects/[object-id]", "displayName": "contractors.hours", "sourceObject": { "mysqlIdentifier": { "database": "contractors", "table": "hours" } } }, { "name": "projects/[project-id]/locations/[location]/streams/[stream-id]/ objects/[object-id]", "displayName": "employees.departments", "backfillJob": { "state": "COMPLETED", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T11:26:12.869880Z", "lastEndTime": "2021-10-18T11:26:28.405653Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "departments" } } } ] }
Mettre à jour un flux
Lancer un flux
Le code suivant montre une requête de démarrage d'un flux.
Avec le paramètre updateMask
dans la requête, seuls les champs que vous spécifiez doivent être inclus dans le corps de la requête.
Pour cet exemple, le champ spécifié est le champ state
, qui représente l'état du flux. En démarrant le flux, vous redéfinissez son état CREATED
sur l'état RUNNING
.
REST
PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]?updateMask=state { "state": "RUNNING" }
Exemple :
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myOracleCdcStream?updateMask=state { "state": "RUNNING" }
Suspendre un flux
Le code suivant montre une requête de suspension d'un flux en cours d'exécution.
Pour cet exemple, le champ spécifié pour le paramètre updateMask
est state
. En mettant en pause le flux, vous redéfinissez son état RUNNING
sur l'état PAUSED
.
REST
PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]?updateMask=state { "state": "PAUSED" }
Exemple :
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myOracleCdcStream?updateMask=state { "state": "PAUSED" }
Réactiver un flux
Le code suivant montre une requête de reprise d'un flux mis en pause.
Pour cet exemple, le champ spécifié pour le paramètre updateMask
est state
. En réactivant le flux, vous redéfinissez son état PAUSED
sur l'état RUNNING
.
REST
PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]?updateMask=state { "state": "RUNNING" }
Exemple :
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myOracleCdcStream?updateMask=state { "state": "RUNNING" }
Modifier un flux
Le code suivant montre une requête de mise à jour de la configuration de rotation d'un flux pour alterner le fichier toutes les 75 Mo ou 45 secondes.
Pour cet exemple, les champs spécifiés pour le paramètre updateMask
incluent les champs fileRotationMb
et fileRotationInterval
, représentés respectivement par les options destinationConfig.gcsDestinationConfig.fileRotationMb
et destinationConfig.gcsDestinationConfig.fileRotationInterval
.
REST
PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]?updateMask=destinationConfig.gcsDestinationConfig. fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval { "destinationConfig": { "gcsDestinationConfig": { "fileRotationMb": 75, "fileRotationInterval": 45 } } }
Exemple :
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myOracleCdcStream?updateMask=destinationConfig.gcsDestinationConfig. fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval { "destinationConfig": { "gcsDestinationConfig": { "fileRotationMb": 75, "fileRotationInterval": 45 } } }
Le code suivant montre une requête pour inclure un fichier de schéma de types unifiés dans le chemin des fichiers que Datastream écrit dans Cloud Storage. Par conséquent, Datastream écrit deux fichiers : un fichier de données JSON et un fichier de schéma Avro.
Pour cet exemple, le champ spécifié est le champ jsonFileFormat
, représenté par l'option destinationConfig.gcsDestinationConfig.jsonFileFormat
.
PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]?updateMask=destinationConfig.gcsDestinationConfig. jsonFileFormat { "destinationConfig": { "gcsDestinationConfig": { "jsonFileFormat" { "schemaFileFormat": "AVRO_SCHEMA_FILE" } } } }
Exemple :
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myOracleCdcStream?updateMask=destinationConfig.gcsDestinationConfig. jsonFileFormat { "destinationConfig": { "gcsDestinationConfig": { "jsonFileFormat" { "schemaFileFormat": "AVRO_SCHEMA_FILE" } } } }
Le code suivant montre une requête pour que Datastream réplique des données existantes, en plus des modifications en cours apportées aux données, de la base de données source vers la destination.
La section oracleExcludedObjects
du code montre les tables et schémas qui ne peuvent pas être remplis dans la destination.
Pour cet exemple, toutes les tables et tous les schémas seront remplis, à l'exception de la table tableA dans le schéma schema3.
PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]?updateMask=backfillAll { "backfillAll": { "oracleExcludedObjects": { "oracleSchemas": [ { "schemaName": "schema3", "oracleTables": [ { "tableName": "tableA" } ] } ] } } }
Exemple :
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myOracleCdcStream?updateMask=backfillAll { "backfillAll": { "oracleExcludedObjects": { "oracleSchemas": [ { "schemaName": "schema3", "oracleTables": [ { "tableName": "tableA" } ] } ] } } }
Lancer le remplissage pour un objet d'un flux
Un flux dans Datastream peut remplir des données de l'historique, ainsi que des modifications en cours vers une destination. Les modifications en cours seront toujours diffusées d'une source vers une destination. Toutefois, vous pouvez spécifier si vous souhaitez que les données de l'historique soient diffusées.
Si vous souhaitez que les données de l'historique soient diffusées de la source vers la destination, utilisez le paramètre backfillAll
.
Datastream vous permet également de diffuser des données d'historique uniquement pour des tables de base de données spécifiques. Pour ce faire, utilisez le paramètre backfillAll
et excluez les tables pour lesquelles vous ne souhaitez pas de données d'historique.
Si vous souhaitez que seules les modifications en cours soient diffusées dans la destination, utilisez le paramètre backfillNone
. Si vous souhaitez ensuite que Datastream diffuse un instantané de toutes les données existantes de la source vers la destination, vous devez lancer manuellement un remplissage pour les objets contenant ces données.
Une autre raison de lancer le remplissage d'un objet est l'existence de données non synchronisées entre la source et la destination. Par exemple, un utilisateur peut supprimer par inadvertance des données dans la destination, et les données sont alors perdues. Dans ce cas, le remplissage de l'objet sert de "mécanisme de réinitialisation", car toutes les données sont diffusées dans la destination en une seule fois. Par conséquent, les données sont synchronisées entre la source et la destination.
Avant de pouvoir lancer un remplissage pour un objet d'un flux, vous devez récupérer des informations sur l'objet.
Chaque objet possède un [object-id], qui l'identifie de manière unique. Vous utilisez [object-id] pour lancer le remplissage du flux.
REST
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]/objects/[object-id]:startBackfillJob
Arrêter le remplissage d'un objet de flux
Après avoir lancé le remplissage pour un objet d'un flux, vous pouvez arrêter le remplissage de l'objet. Par exemple, si un utilisateur modifie un schéma de base de données, le schéma ou les données peuvent être corrompus. Vous ne voulez pas que ce schéma ou ces données soient diffusés vers la destination. Vous arrêtez donc le remplissage de l'objet.
Vous pouvez également arrêter le remplissage d'un objet à des fins d'équilibrage de charge. Datastream peut exécuter plusieurs remplissages en parallèle. Cela peut ajouter une charge supplémentaire à la source. Si la charge est importante, arrêtez le remplissage pour chaque objet, puis lancez le remplissage un par un pour les objets.
Pour pouvoir arrêter le remplissage d'un objet d'un flux, vous devez effectuer une requête pour récupérer des informations sur tous les objets d'un flux. Chaque objet renvoyé possède un [object-id] qui l'identifie de manière unique. Utilisez le [object-id] pour arrêter le remplissage du flux.
REST
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]/objects/[object-id]:stopBackfillJob
Supprimer un flux
Le code suivant montre une requête de suppression d'un flux.
REST
DELETE https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]
Exemple :
DELETE https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myOracleCdcStream
gcloud
Pour en savoir plus sur la suppression de votre flux à l'aide de gcloud
, cliquez ici.