Gérer les flux

Présentation

Dans cette section, vous allez apprendre à utiliser l'API Datastream pour :

  • Valider et créer des flux
  • Obtenir des informations sur les flux et les objets de flux
  • Mettre à jour les flux en démarrant, en suspendant, en reprenant et en modifiant ces flux, ainsi qu'en lançant et en arrêtant le remplissage des objets des flux
  • Supprimer des flux

Vous pouvez utiliser l'API Datastream de deux manières. Vous pouvez effectuer des appels d'API REST ou utiliser l'outil de ligne de commande (CLI) gcloud.

Pour obtenir des informations générales sur l'utilisation de gcloud afin de gérer les flux Datastream, cliquez ici.

Valider un flux

Avant de créer un flux, vous pouvez le valider. Ainsi, vous avez la garantie que le flux s'exécutera correctement et que tous les contrôles de validation réussiront.

Valider une vérification de flux :

  • Indique si la source est correctement configurée pour autoriser Datastream à diffuser des données à partir de celle-ci.
  • Indique si le flux peut se connecter à la source et à la destination.
  • Configuration de bout en bout du flux.

Le code suivant montre une requête permettant de valider un flux utilisé pour transférer des données d'une base de données Oracle source vers un bucket de destination dans Cloud Storage.

REST

POST "https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams?streamId=[stream-id]&validate_only=true"
{
  "displayName": "[display name]",
  "sourceConfig": {
    "sourceConnectionProfileName": "[connectionProfileName]",
    "oracleSourceConfig": {
      "allowlist": {},
      "rejectlist": {}
    }
  }
  "destinationConfig": {
    "destinationConnectionProfileName": "[connectionProfileName]",
    "gcsDestinationConfig": {
      "path": "[filePrefix]",
      "avroFileFormat": "{}"
      "fileRotationMb": MBytes
      "fileRotationInterval": seconds
    }
  },
  "backfillAll": {}
}

La valeur &validate_only=true indique que vous validez uniquement le flux. Vous ne le créez pas. Dans cette requête, placez l'URL entière entre guillemets. Ainsi, Datastream récupérera la valeur &validate_only=true pour valider le flux.

Une fois cette requête envoyée, vous voyez les vérifications de validation exécutées par Datastream pour votre source et votre destination, ainsi que les réussites ou les échecs des contrôles. Si un contrôle échoue, des informations indiquent la raison de l'échec et la procédure à suivre pour corriger le problème.

Une fois les corrections appropriées effectuées, effectuez à nouveau la requête pour vous assurer que tous les contrôles de validation sont concluants.

Créer un flux

Le code suivant montre une requête permettant de créer un flux utilisé pour transférer des données d'une base de données Oracle source vers un bucket de destination dans Cloud Storage.

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams?streamId=[stream-id]
{
  "displayName": "[display name]",
  "sourceConfig": {
    "sourceConnectionProfileName": "[connectionProfileName]",
    "oracleSourceConfig": {
      "allowlist": {},
      "rejectlist": {}
    }
  }
  "destinationConfig": {
    "destinationConnectionProfileName": "[connectionProfileName]",
    "gcsDestinationConfig": {
      "path": "[filePrefix]",
      "avroFileFormat": "{}"
      "fileRotationMb": MBytes
      "fileRotationInterval": seconds
    }
  },
  "backfillAll": {}
}

Par exemple, voici une requête pour extraire toutes les tables de schema1 et deux tables spécifiques de schema3 : tableA et tableC.

Les événements sont écrits dans un bucket Cloud Storage au format Avro, et un fichier est créé toutes les 100 Mo ou 30 secondes (en remplaçant les valeurs par défaut de 50 Mo et 60 secondes).

Le paramètre backfillAll est associé au remplissage d'historique. Si vous définissez ce paramètre sur un dictionnaire vide ({}), DataStream remplira les champs suivants :

  • Données d'historique, en plus des modifications en cours sur les données, de la base de données source vers la destination
  • Schémas et tables, de la source vers la destination
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
        streams/myOracleDb",
    "oracleSourceConfig": {
      "allowlist": {
        "oracleSchemas": [
          {
            "schemaName": "schema1",
            "oracleTables": []
          },
          {
            "schemaName": "schema3",
            "oracleTables": [
              { "tableName": "tableA" },
              { "tableName": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": "{}",
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "backfillAll": {}
}

Obtenir des informations sur un flux

Le code suivant montre une requête de récupération d'informations sur un flux. Parmi ces informations, on peut citer :

  • Nom du flux reconnu par Datastream
  • Nom convivial du flux (nom à afficher)
  • Date et d'heure de création et de mise à jour du flux
  • Informations sur les profils de connexion source et de destination associés au flux
  • État du flux

REST

GET https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]

Exemple :

GET https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream

La réponse s'affiche comme suit :

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "allowlist": {
        "oracleSchemas": [
          {
            "schemaName": "schema1",
            "oracleTables": []
          },
          {
            "schemaName": "schema3",
            "oracleTables": [
              { "tableName": "tableA" },
              { "tableName": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": "{}",
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "backfillAll": {}
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud afin de récupérer des informations sur votre flux, cliquez ici.

Répertorier les flux

Le code ci-dessous montre une requête de récupération d'informations sur tous vos flux.

REST

GET https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams

gcloud

Pour en savoir plus sur l'utilisation de gcloud afin de récupérer des informations sur tous vos flux, cliquez ici.

Répertorier les objets d'un flux

Le code suivant montre une requête de récupération d'informations sur tous les objets d'un flux.

REST

GET https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]/objects

Exemple :

GET https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myMySQLCdcStream/objects

La liste des objets renvoyés peut ressembler à ceci :

REST

{
  "streamObjects": [
    {
      "name": "projects/[project-id]/locations/[location]/streams/[stream-id]/
      objects/[object-id]",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/[project-id]/locations/[location]/streams/[stream-id]/
      objects/[object-id]",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/[project-id]/locations/[location]/streams/[stream-id]/
      objects/[object-id]",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

Mettre à jour un flux

Lancer un flux

Le code suivant montre une requête de démarrage d'un flux.

Avec le paramètre updateMask dans la requête, seuls les champs que vous spécifiez doivent être inclus dans le corps de la requête.

Pour cet exemple, le champ spécifié est le champ state, qui représente l'état du flux. En démarrant le flux, vous redéfinissez son état CREATED sur l'état RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "RUNNING"
}

Exemple :

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "RUNNING"
}

Suspendre un flux

Le code suivant montre une requête de suspension d'un flux en cours d'exécution.

Pour cet exemple, le champ spécifié pour le paramètre updateMask est state. En mettant en pause le flux, vous redéfinissez son état RUNNING sur l'état PAUSED.

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "PAUSED"
}

Exemple :

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "PAUSED"
}

Réactiver un flux

Le code suivant montre une requête de reprise d'un flux mis en pause.

Pour cet exemple, le champ spécifié pour le paramètre updateMask est state. En réactivant le flux, vous redéfinissez son état PAUSED sur l'état RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "RUNNING"
}

Exemple :

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "RUNNING"
}

Modifier un flux

Le code suivant montre une requête de mise à jour de la configuration de rotation d'un flux pour alterner le fichier toutes les 75 Mo ou 45 secondes.

Pour cet exemple, les champs spécifiés pour le paramètre updateMask incluent les champs fileRotationMb et fileRotationInterval, représentés respectivement par les options destinationConfig.gcsDestinationConfig.fileRotationMb et destinationConfig.gcsDestinationConfig.fileRotationInterval.

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

Exemple :

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

Le code suivant montre une requête pour inclure un fichier de schéma de types unifiés dans le chemin des fichiers que Datastream écrit dans Cloud Storage. Par conséquent, Datastream écrit deux fichiers : un fichier de données JSON et un fichier de schéma Avro.

Pour cet exemple, le champ spécifié est le champ jsonFileFormat, représenté par l'option destinationConfig.gcsDestinationConfig.jsonFileFormat.

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }
    }
  }
}

Exemple :

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }
    }
  }
}

Le code suivant montre une requête pour que Datastream réplique des données existantes, en plus des modifications en cours apportées aux données, de la base de données source vers la destination.

La section oracleExcludedObjects du code montre les tables et schémas qui ne peuvent pas être remplis dans la destination.

Pour cet exemple, toutes les tables et tous les schémas seront remplis, à l'exception de la table tableA dans le schéma schema3.

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schemaName": "schema3",
          "oracleTables": [
            {
              "tableName": "tableA"
            }
          ]
        }
      ]
    }
  }
}

Exemple :

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schemaName": "schema3",
          "oracleTables": [
            {
              "tableName": "tableA"
            }
          ]
        }
      ]
    }
  }
}

Lancer le remplissage pour un objet d'un flux

Un flux dans Datastream peut remplir des données de l'historique, ainsi que des modifications en cours vers une destination. Les modifications en cours seront toujours diffusées d'une source vers une destination. Toutefois, vous pouvez spécifier si vous souhaitez que les données de l'historique soient diffusées.

Si vous souhaitez que les données de l'historique soient diffusées de la source vers la destination, utilisez le paramètre backfillAll.

Datastream vous permet également de diffuser des données d'historique uniquement pour des tables de base de données spécifiques. Pour ce faire, utilisez le paramètre backfillAll et excluez les tables pour lesquelles vous ne souhaitez pas de données d'historique.

Si vous souhaitez que seules les modifications en cours soient diffusées dans la destination, utilisez le paramètre backfillNone. Si vous souhaitez ensuite que Datastream diffuse un instantané de toutes les données existantes de la source vers la destination, vous devez lancer manuellement un remplissage pour les objets contenant ces données.

Une autre raison de lancer le remplissage d'un objet est l'existence de données non synchronisées entre la source et la destination. Par exemple, un utilisateur peut supprimer par inadvertance des données dans la destination, et les données sont alors perdues. Dans ce cas, le remplissage de l'objet sert de "mécanisme de réinitialisation", car toutes les données sont diffusées dans la destination en une seule fois. Par conséquent, les données sont synchronisées entre la source et la destination.

Avant de pouvoir lancer un remplissage pour un objet d'un flux, vous devez récupérer des informations sur l'objet.

Chaque objet possède un [object-id], qui l'identifie de manière unique. Vous utilisez [object-id] pour lancer le remplissage du flux.

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]/objects/[object-id]:startBackfillJob

Arrêter le remplissage d'un objet de flux

Après avoir lancé le remplissage pour un objet d'un flux, vous pouvez arrêter le remplissage de l'objet. Par exemple, si un utilisateur modifie un schéma de base de données, le schéma ou les données peuvent être corrompus. Vous ne voulez pas que ce schéma ou ces données soient diffusés vers la destination. Vous arrêtez donc le remplissage de l'objet.

Vous pouvez également arrêter le remplissage d'un objet à des fins d'équilibrage de charge. Datastream peut exécuter plusieurs remplissages en parallèle. Cela peut ajouter une charge supplémentaire à la source. Si la charge est importante, arrêtez le remplissage pour chaque objet, puis lancez le remplissage un par un pour les objets.

Pour pouvoir arrêter le remplissage d'un objet d'un flux, vous devez effectuer une requête pour récupérer des informations sur tous les objets d'un flux. Chaque objet renvoyé possède un [object-id] qui l'identifie de manière unique. Utilisez le [object-id] pour arrêter le remplissage du flux.

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]/objects/[object-id]:stopBackfillJob

Supprimer un flux

Le code suivant montre une requête de suppression d'un flux.

REST

DELETE https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]

Exemple :

DELETE https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream

gcloud

Pour en savoir plus sur la suppression de votre flux à l'aide de gcloud, cliquez ici.