Gérer les flux

Présentation

Dans cette section, vous allez apprendre à utiliser l'API Datastream pour:

  • Valider et créer des flux
  • Obtenir des informations sur les flux
  • Récupérer les erreurs des flux
  • Mettez à jour les flux en les démarrant, en les mettant en pause, en les réactivant et en les modifiant
  • Supprimer les flux

Vous pouvez utiliser l'API Datastream de deux manières. Vous pouvez effectuer des appels d'API REST ou utiliser l'interface de ligne de commande (CLI) gcloud.

Pour obtenir des informations générales sur l'utilisation de gcloud pour gérer les flux de flux de données, cliquez ici.

Valider un flux

Avant de créer un flux, vous pouvez le valider. De cette manière, vous pouvez vous assurer que le flux fonctionne correctement et que toutes les vérifications de validation sont validées.

La validation d'une vérification de flux:

  • Si la source est correctement configurée pour permettre à Datastream de diffuser des données à partir de celle-ci.
  • Indique si le flux peut se connecter à la source et à la destination.
  • Configuration de bout en bout du flux.

Le code suivant montre une requête permettant de valider un flux permettant de transférer des données d'une base de données Oracle source vers un bucket de destination dans Cloud Storage.

REST

POST "https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams?streamId=[stream-id]&validate_only=true"
{
  "displayName": "[display name]",
  "sourceConfig": {
    "sourceConnectionProfileName": "[connectionProfileName]",
    "oracleSourceConfig": {
      "allowlist": {},
      "rejectlist": {}
    }
  }
  "destinationConfig": {
    "destinationConnectionProfileName": "[connectionProfileName]",
    "gcsDestinationConfig": {
      "path": "[filePrefix]",
      "avroFileFormat": "{}"
      "fileRotationMb": MBytes
      "fileRotationInterval": seconds
    }
  },
  "backfillAll": {}
}

La valeur &validate_only=true indique que vous validez uniquement le flux. vous ne le créez pas. De plus, pour cette demande, placez l'URL entière entre guillemets. Ainsi, Datastream acceptera la valeur &validate_only=true pour valider le flux.

Une fois votre requête envoyée, vous pouvez voir les vérifications de validation exécutées par Datastream pour votre source et votre destination, ainsi que si les vérifications réussissent ou échouent. Pour chaque test de validation qui échoue, des informations s'affichent pour expliquer pourquoi l'opération a échoué et comment résoudre le problème.

Après avoir effectué les corrections appropriées, effectuez à nouveau la demande pour vous assurer que toutes les vérifications de validation réussissent.

Créer un flux

Le code suivant montre une requête de création d'un flux permettant de transférer des données d'une base de données Oracle source vers un bucket de destination dans Cloud Storage.

REST

POST https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams?streamId=[stream-id]
{
  "displayName": "[display name]",
  "sourceConfig": {
    "sourceConnectionProfileName": "[connectionProfileName]",
    "oracleSourceConfig": {
      "allowlist": {},
      "rejectlist": {}
    }
  }
  "destinationConfig": {
    "destinationConnectionProfileName": "[connectionProfileName]",
    "gcsDestinationConfig": {
      "path": "[filePrefix]",
      "avroFileFormat": "{}"
      "fileRotationMb": MBytes
      "fileRotationInterval": seconds
    }
  },
  "backfillAll": {}
}

Par exemple, voici une requête pour extraire toutes les tables de schema1 et deux tables spécifiques de schema3: tableA et tableC.

Les événements sont écrits dans un bucket Cloud Storage au format Avro, et un fichier est créé toutes les 100 ou 30 secondes (en remplaçant les valeurs par défaut de 50 Mo et 60 secondes).

Le paramètre backfillAll est associé à des annonces de remplissage historiques. Si vous définissez ce paramètre sur un dictionnaire vide ({}), Datastream remplira les conditions suivantes:

  • Données historiques, en plus des modifications en cours apportées aux données, de la base de données source vers la destination.
  • Schémas et tables, de la source vers la destination.
POST https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
        streams/myOracleDb",
    "oracleSourceConfig": {
      "allowlist": {
        "oracleSchemas": [
          {
            "schemaName": "schema1",
            "oracleTables": []
          },
          {
            "schemaName": "schema3",
            "oracleTables": [
              { "tableName": "tableA" },
              { "tableName": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": "{}",
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "backfillAll": {}
}

Obtenir des informations sur un flux

Le code suivant montre une requête permettant de récupérer des informations sur un flux. Parmi ces informations, on peut citer :

  • Nom du flux reconnu par Datastream
  • Nom convivial du flux (nom à afficher)
  • Dates et heures de création et de mise à jour du flux
  • Informations sur les profils de connexion source et de destination associés au flux
  • État du flux

REST

GET https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]

Exemple :

GET https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream

La réponse s'affiche comme suit:

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "allowlist": {
        "oracleSchemas": [
          {
            "schemaName": "schema1",
            "oracleTables": []
          },
          {
            "schemaName": "schema3",
            "oracleTables": [
              { "tableName": "tableA" },
              { "tableName": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": "{}",
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "backfillAll": {}
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour récupérer des informations sur votre flux, cliquez ici.

Répertorier les flux

Le code suivant montre une requête pour récupérer des informations sur tous vos flux.

REST

GET https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour récupérer des informations sur tous vos flux, cliquez ici.

Récupérer les erreurs d'un flux

Le code suivant montre une requête permettant de récupérer la liste des erreurs associées à un flux qui ne fonctionne pas correctement.

REST

GET https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]:fetchStreamErrors

Exemple :

GET https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream:fetchStreamErrors

Mettre à jour un flux

Lancer un flux

Le code suivant montre une requête pour démarrer un flux.

En utilisant le paramètre updateMask dans la requête, seuls les champs que vous spécifiez doivent être inclus dans le corps de la requête.

Pour cet exemple, le champ spécifié est le champ state, qui représente l'état (ou l'état) du flux. En démarrant le flux, vous remplacez son état CREATED par RUNNING.

REST

PATCH https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "RUNNING"
}

Exemple :

PATCH https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "RUNNING"
}

Suspendre un flux

Le code suivant montre une requête permettant de mettre en pause un flux en cours d'exécution.

Pour cet exemple, le champ spécifié pour le paramètre updateMask est state. En suspendant le flux, vous passez de son état RUNNING à PAUSED.

REST

PATCH https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "PAUSED"
}

Exemple :

PATCH https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "PAUSED"
}

Réactiver un flux

Le code suivant montre une demande de reprise d'un flux mis en pause.

Pour cet exemple, le champ spécifié pour le paramètre updateMask est state. En rétablissant le flux, vous remplacez son état PAUSED par RUNNING.

REST

PATCH https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "RUNNING"
}

Exemple :

PATCH https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "RUNNING"
}

Modifier un flux

Le code suivant montre une demande de mise à jour de la configuration de rotation d'un flux pour une rotation de 75 Mo ou 45 secondes.

Pour cet exemple, les champs spécifiés pour le paramètre updateMask incluent les champs fileRotationMb et fileRotationInterval, représentés respectivement par les options destinationConfig.gcsDestinationConfig.fileRotationMb et destinationConfig.gcsDestinationConfig.fileRotationInterval.

REST

PATCH https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

Exemple :

PATCH https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

Le code suivant montre une demande d'inclusion d'un fichier de schéma de types unifiés dans le chemin des fichiers que Datastream écrit dans Cloud Storage. Par conséquent, Datastream écrit deux fichiers: un fichier de données JSON et un fichier de schéma Avro.

Pour cet exemple, le champ spécifié est jsonFileFormat, représenté par l'option destinationConfig.gcsDestinationConfig.jsonFileFormat.

PATCH https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }
    }
  }
}

Exemple :

PATCH https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }
    }
  }
}

Le code suivant montre une requête permettant à Datastream de répliquer les données existantes, en plus des modifications en cours sur les données, de la base de données source vers la destination.

La section oracleExcludedObjects du code présente les tables et les schémas qui ne peuvent pas être remplis dans la destination.

Pour cet exemple, toutes les tables et tous les schémas seront remplis, à l'exception de la table A dans schema3.

PATCH https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schemaName": "schema3",
          "oracleTables": [
            {
              "tableName": "tableA"
            }
          ]
        }
      ]
    }
  }
}

Exemple :

PATCH https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schemaName": "schema3",
          "oracleTables": [
            {
              "tableName": "tableA"
            }
          ]
        }
      ]
    }
  }
}

Supprimer un flux

Le code suivant montre une demande de suppression d'un flux.

REST

DELETE https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]

Exemple :

DELETE https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream

gcloud

Pour en savoir plus sur la suppression de votre flux à l'aide de gcloud, cliquez ici.