Gérer les flux

Présentation

Dans cette section, vous allez apprendre à utiliser l'API Datastream pour :

  • Créer des flux
  • Obtenir des informations sur les flux et les objets de flux
  • Mettre à jour des flux en les démarrant, en les mettant en pause, en les réactivant et en les modifiant, ainsi qu'en lançant et en arrêtant le remplissage pour les objets de flux
  • Récupérer les diffusions ayant échoué définitivement
  • Activer le streaming d'objets volumineux pour les flux Oracle
  • Supprimer des flux

Vous pouvez utiliser l'API Datastream de deux manières. Vous pouvez effectuer des appels d'API REST ou utiliser la Google Cloud CLI (CLI).

Pour obtenir des informations générales sur la gestion des flux Datastream à l'aide de gcloud, consultez la page gcloud Datastream Streams.

Créer un flux

Dans cette section, vous allez apprendre à créer un flux permettant de transférer des données de votre source vers une destination. Les exemples suivants ne sont pas exhaustifs, mais mettent en avant des fonctionnalités spécifiques de Datastream. Pour répondre à votre cas d'utilisation spécifique, utilisez ces exemples avec la documentation de référence de l'API Datastream.

Cette section couvre les cas d'utilisation suivants :

Exemple 1: Diffuser des objets spécifiques vers BigQuery

Dans cet exemple, vous allez apprendre à :

  • Diffuser des données en streaming depuis MySQL vers BigQuery
  • Inclure un ensemble d'objets dans le flux
  • Définir le mode d'écriture du flux en tant que mode "append-only"
  • Remplir tous les objets inclus dans le flux

La requête suivante permet d'extraire toutes les tables de schema1 et de deux tables de schema2: tableA et tableC. Les événements sont écrits dans un ensemble de données. dans BigQuery.

La requête n'inclut pas le paramètre customerManagedEncryptionKey. Par conséquent, le système de gestion des clés interne de Google Cloud est utilisé pour chiffrer vos données au lieu de CMEK.

Le paramètre backfillAll associé à l'exécution du remplissage d'historique (ou instantané) est défini sur un dictionnaire vide ({}), ce qui signifie que Datastream remplit les données historiques de toutes les tables incluses dans le flux.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream
{
  "displayName": "MySQL CDC to BigQuery",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "schema1" },
          {
            "database": "schema2",
            "mysqlTables": [
              {
                "table": "tableA",
                "table": "tableC"
              }
            ]
          }
        ]
      },
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "dataFreshness": "900s"
    }
  },
  "backfillAll": {}
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour créer un flux, consultez la documentation du SDK Google Cloud.

Exemple 2 : Exclure des objets spécifiques d'un flux avec une source PostgreSQL

Dans cet exemple, vous allez apprendre à :

  • Transférer des données depuis PostgreSQL vers BigQuery
  • Exclure des objets du flux
  • Exclure des objets du remplissage

Le code suivant montre une requête de création d'un flux permettant de transférer des données d'une base de données PostgreSQL source vers BigQuery. Lorsque vous créez un flux à partir d'une base de données PostgreSQL source, vous devez spécifier deux champs supplémentaires spécifiques à PostgreSQL dans votre requête :

  • replicationSlot: un emplacement de réplication est une condition préalable à la configuration d'une base de données PostgreSQL pour la réplication. Vous devez créer un emplacement de réplication pour chaque flux.
  • publication: une publication est un groupe de tables à partir desquelles vous souhaitez répliquer des modifications. Le nom de la publication doit exister dans la base de données avant le démarrage d'un flux. La publication doit au minimum inclure les tables spécifiées dans la liste includeObjects du flux.

Le paramètre backfillAll associé à l'exécution du remplissage d'historique (ou instantané) est défini pour exclure une table.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to BigQueryCloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "schema1"
        }
      },
      "excludeObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
        "postgresqlTables": [
          {
            "table": "tableA",
            "postgresqlColumns": [
              { "column": "column5" }
              ]
              }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
           "location": "us",
           "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {
    "postgresqlExcludedObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
            "postgresqlTables": [
              { "table": "tableA" }
            ]
          }
        ]
      }
    }
  }

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour créer un flux, consultez la documentation du SDK Google Cloud.

Exemple 3 : Spécifier le mode d'écriture en mode ajout uniquement pour un flux

Lorsque vous effectuez un flux vers BigQuery, vous pouvez définir le mode d'écriture : merge ou appendOnly. Pour en savoir plus, consultez la section Configurer le mode d'écriture.

Si vous ne spécifiez pas le mode d'écriture dans votre requête de création de flux, le mode par défaut Le mode merge est utilisé.

La requête suivante montre comment définir le mode appendOnly lorsque vous créez un flux MySQL vers BigQuery.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream
{
  "displayName": "My append-only stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour créer un flux, consultez la documentation du SDK Google Cloud.

Exemple 4: Diffuser un flux vers une destination Cloud Storage

Dans cet exemple, vous allez apprendre à:

  • Transférer des données depuis Oracle vers Cloud Storage
  • Définir un ensemble d'objets à inclure dans le flux
  • Définir des CMEK pour chiffrer les données au repos

La requête suivante montre comment créer un flux qui écrit les événements dans un bucket dans Cloud Storage.

Dans cet exemple de requête, les événements sont écrits au format de sortie JSON, et un fichier est créé tous les 100 Mo ou toutes les 30 secondes (en remplaçant les valeurs par défaut de 50 Mo et 60 secondes).

Pour le format JSON, vous pouvez :

  • Incluez un fichier de schéma de types unifiés dans le chemin d'accès. Par conséquent, Datastream écrit deux fichiers dans Cloud Storage : un fichier de données JSON et un fichier de schéma Avro. Le fichier de schéma porte le même nom que le fichier de données, avec une extension .schema.

  • Activez la compression gzip pour que Datastream compresse les fichiers écrits dans Cloud Storage.

En utilisant le paramètre backfillNone, la requête spécifie que seules les modifications en cours sont diffusées dans la destination, sans remplissage.

La requête spécifie le paramètre de clé de chiffrement gérée par le client, qui vous permet de contrôler les clés utilisées pour chiffrer les données au repos dans un projet Google Cloud. Le paramètre fait référence à la clé CMEK utilisée par Datastream pour chiffrer les données diffusées de la source vers la destination. Il spécifie également le trousseau de clés de votre CMEK.

Pour en savoir plus sur les trousseaux, consultez la page Ressources Cloud KMS. Pour en savoir plus sur la protection de vos données à l'aide de clés de chiffrement, consultez Cloud Key Management Service (KMS).

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
    connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "GcsBucketCp",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "jsonFileFormat": {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      },
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillNone": {}
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour créer un flux, consultez la documentation du SDK Google Cloud.

Valider la définition d'un flux

Avant de créer un flux, vous pouvez valider sa définition. Vous pouvez ainsi vous assurer que tous les contrôles de validation sont réussis et que le flux s'exécute correctement lorsqu'il est créé.

Valider un flux consiste à vérifier :

  • Si la source est correctement configurée pour autoriser Datastream à diffuser des données à partir de celle-ci.
  • Si le flux peut se connecter à la source et à la destination.
  • La configuration de bout en bout du flux.

Pour valider un flux, ajoutez &validate_only=true à l'URL précédant le corps de votre requête :

POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"

Une fois cette requête envoyée, vous voyez les vérifications de validation exécutées par Datastream pour votre source et votre destination, ainsi que les réussites ou les échecs des contrôles. Si un contrôle échoue, des informations indiquent la raison de l'échec et la procédure à suivre pour corriger le problème.

Par exemple, supposons que vous disposiez d'une clé de chiffrement gérée par le client (CMEK) que Datastream doit utiliser pour chiffrer les données transférées de la source vers la destination. Lors de la validation du flux, Datastream vérifie que la clé existe et qu'il est autorisé à l'utiliser. Si l'une de ces conditions n'est pas remplie, le message d'erreur suivant s'affiche lorsque vous validez le flux :

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

Pour résoudre ce problème, vérifiez que la clé que vous avez fournie existe et que le compte de service Datastream dispose de l'autorisation cloudkms.cryptoKeys.get pour la clé.

Une fois les corrections appropriées effectuées, relancez la requête pour vous assurer que tous les contrôles de validation réussissent. Dans l'exemple ci-dessus, la vérification CMEK_VALIDATE_PERMISSIONS ne renverra plus de message d'erreur, mais aura un état PASSED.

Obtenir des informations sur un flux

Le code suivant illustre une requête permettant de récupérer des informations sur un flux. Parmi ces informations, on peut citer :

  • Nom du flux (identifiant unique)
  • Nom convivial pour le flux (nom à afficher)
  • les codes temporels de la création et de la dernière mise à jour de la diffusion.
  • Informations sur les profils de connexion source et de destination associés au flux
  • État du flux

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID

La réponse s'affiche comme suit :

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          },
          {
            "schema": "schema3",
            "oracleTables": [
              { "table": "tableA" },
              { "table": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": {},
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillAll": {}
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud afin de récupérer des informations sur votre flux, cliquez ici.

Répertorier les flux

Le code suivant montre une requête visant à récupérer la liste de tous les flux du projet et de l'emplacement spécifiés.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams

gcloud

Pour en savoir plus sur l'utilisation de gcloud afin de récupérer des informations sur tous vos flux, cliquez ici.

Lister les objets d'un flux

Le code suivant montre une requête de récupération d'informations sur tous les objets d'un flux.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects

gcloud

Pour savoir comment utiliser gcloud afin de récupérer des informations sur tous les objets de votre flux, cliquez ici.

La liste des objets renvoyés peut ressembler à ceci :

REST

{
  "streamObjects": [
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object1",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object2",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object3",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud afin de répertorier les objets d'un flux, cliquez ici.

Lancer un flux

Le code suivant montre une requête pour démarrer un flux.

Si vous utilisez le paramètre updateMask dans la requête, seuls les champs que vous spécifiez doivent être inclus dans le corps de la requête. Pour démarrer un flux, remplacez la valeur CREATED du champ state par RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour démarrer votre diffusion, cliquez ici.

Suspendre un flux

Le code suivant montre une requête visant à mettre en pause un flux en cours d'exécution.

Pour cet exemple, le champ spécifié pour le paramètre updateMask est state. En mettant le flux en veille, vous allez faire passer son état de RUNNING à PAUSED.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "PAUSED"
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour mettre en pause votre flux, cliquez ici.

Réactiver un flux

Le code suivant montre une requête visant à reprendre un flux suspendu.

Dans cet exemple, le champ spécifié pour le paramètre updateMask est le champ state. En rétablissant le flux, vous passez de l'état PAUSED à RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour reprendre votre flux, cliquez ici.

Récupérer un flux

Vous pouvez récupérer un flux en échec permanent à l'aide de la méthode RunStream. Chaque type de base de données source a sa propre définition des opérations de récupération de flux possibles. Pour en savoir plus, consultez Récupérer un flux.

Récupérer un flux pour une source MySQL ou Oracle

Les exemples de code suivants montrent des requêtes de récupération d'un flux pour une instance MySQL ou une Source Oracle à partir de différentes positions de fichiers journaux:

REST

Permet de récupérer un flux à sa position actuelle. Il s'agit de l'option par défaut:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Récupérez un flux à partir de la prochaine position disponible :

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "nextAvailableStartPosition": {}
  }
}

Récupérez un flux à sa position la plus récente:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "mostRecentStartPosition": {}
  }
}

Récupérer un flux à partir d'une position spécifique (MySQL):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Remplacez les éléments suivants :

  • NAME_OF_THE_LOG_FILE: nom du fichier journal à partir duquel vous souhaitez pour récupérer votre diffusion
  • POSITION: position dans le fichier journal à partir de laquelle vous souhaitez récupérer votre diffusion. Si vous ne fournissez pas de valeur, Datastream récupère le flux à partir du début du fichier.

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 4
      }
    }
  }
}

Récupérez un flux à partir d'une position spécifique (Oracle) :

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Remplacez scn par le numéro de modification du système (SCN) dans le fichier de journal de relecture à partir duquel vous souhaitez récupérer votre flux. Ce champ est obligatoire.

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 234234
      }
    }
  }
}

Pour en savoir plus sur les options de récupération disponibles, consultez la section Récupérer un flux.

gcloud

Il n'est pas possible de récupérer un flux avec gcloud.

Récupérer un flux pour une source PostgreSQL

L'exemple de code suivant montre une requête visant à récupérer un flux pour une source PostgreSQL. Pendant la récupération, le flux commence à lire à partir de la première séquence de journal (LSN) dans l'emplacement de réplication configuré pour le flux.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run

Si vous souhaitez modifier l'emplacement de réplication, mettez à jour le flux avec le nouveau nom de l'emplacement de réplication:

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot
{
  "sourceConfig": {
    "postgresqlSourceConfig": {
      "replicationSlot": "NEW_REPLICATION_SLOT_NAME"
    }
  }
}

gcloud

Il n'est pas possible de récupérer un flux avec gcloud.

Récupérer un flux pour une source SQL Server

Les exemples de code suivants montrent des exemples de requêtes pour récupérer un flux pour une source SQL Server.

REST

Récupérez un flux à partir de la première position disponible:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run

Récupérez un flux à partir d'un numéro de séquence de journal préféré:

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": lsn
      }
    }
  }
}

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": 0000123C:0000BA78:0004
      }
    }
  }
}

gcloud

Il n'est pas possible de récupérer un flux avec gcloud.

Démarrer ou reprendre une diffusion à partir d'une position spécifique

Vous pouvez démarrer une diffusion ou reprendre une diffusion mise en pause à partir d'un emplacement spécifique pour Sources MySQL et Oracle. Cela peut être utile lorsque vous souhaitez effectuer un remplissage à l'aide d'un outil externe, ou démarrer la CDC à partir d'une position que vous indiquez. Pour une source MySQL, vous devez indiquer une position de journal binaire. Pour une source Oracle, vous devez indiquer un numéro de modification système (SCN) dans le fichier journal de rétablissement.

Le code suivant illustre une requête pour démarrer ou reprendre un flux déjà créé à partir d'une position spécifique.

Démarrer ou reprendre un flux à partir d'une position binlog spécifique (MySQL) :

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Remplacez les éléments suivants :

  • NAME_OF_THE_LOG_FILE: nom du fichier journal à partir duquel vous souhaitez pour lancer votre diffusion.
  • POSITION: position dans le fichier journal à partir de laquelle vous souhaitez commencer votre flux. Si vous ne fournissez pas de valeur, Datastream commence à lire à partir du début du fichier.

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 2
      }
    }
  }
}

gcloud

Il n'est pas possible de démarrer ou de reprendre un flux à partir d'une position spécifique à l'aide de gcloud. Pour en savoir plus sur l'utilisation de gcloud pour démarrer ou reprendre un flux, consultez la documentation du SDK Cloud.

Démarrez ou reprenez un flux à partir d'un numéro de modification du système spécifique dans le fichier journal de rétablissement (Oracle):

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Remplacez scn par le numéro de modification du système (SCN) dans le fichier journal de rétablissement. à partir duquel vous souhaitez démarrer votre flux. Ce champ est obligatoire.

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 123123
      }
    }
  }
}

gcloud

Le démarrage ou la reprise d'un flux à partir d'une position spécifique à l'aide de gcloud n'est pas pris en charge. Pour savoir comment utiliser gcloud pour démarrer un flux, consultez la documentation du SDK Cloud.

Modifier un flux

Le code suivant montre une requête de mise à jour de la configuration de rotation d'un flux pour alterner le fichier toutes les 75 Mo ou 45 secondes.

Dans cet exemple, les champs spécifiés pour le paramètre updateMask incluent les champs fileRotationMb et fileRotationInterval, représentés respectivement par les options destinationConfig.gcsDestinationConfig.fileRotationMb et destinationConfig.gcsDestinationConfig.fileRotationInterval.

REST

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

Le code suivant montre une requête visant à inclure un fichier de schéma de types unifiés dans le chemin d'accès des fichiers que Datastream écrit dans Cloud Storage. Par conséquent, Datastream écrit deux fichiers : un fichier de données JSON et un fichier de schéma Avro.

Pour cet exemple, le champ spécifié est le champ jsonFileFormat, représenté par l'option destinationConfig.gcsDestinationConfig.jsonFileFormat.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }  
    }
  }
}

Le code suivant montre une requête pour que Datastream réplique des données existantes, en plus des modifications en cours apportées aux données, de la base de données source vers la destination.

La section oracleExcludedObjects du code indique les tables et les schémas qui ne peuvent pas être renseignés dans la destination.

Pour cet exemple, toutes les tables et tous les schémas seront remplis, à l'exception de la table tableA dans le schéma schema3.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schema": "schema3",
          "oracleTables": [
            { "table": "tableA" }
          ]
        }
      ]
    }
  }
}  

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour modifier votre flux, cliquez ici.

Déclencher le remplissage pour un objet d'un flux

Un flux dans Datastream peut effectuer un remplissage avec des données d'historique tout en diffusant les modifications en cours par flux vers une destination. Les modifications en cours sont toujours diffusées depuis une source vers une destination. Toutefois, vous pouvez spécifier si vous souhaitez que les données historiques soient diffusées en continu.

Si vous souhaitez que les données de l'historique soient diffusées de la source vers la destination, utilisez le paramètre backfillAll.

Datastream vous permet également de diffuser des données d'historique uniquement pour des tables de base de données spécifiques. Pour ce faire, utilisez le paramètre backfillAll et excluez les tables pour lesquelles vous ne souhaitez pas de données historiques.

Si vous souhaitez que seules les modifications en cours soient diffusées dans la destination, utilisez le paramètre backfillNone. Si vous souhaitez ensuite que Datastream diffuse un instantané de toutes les données existantes de la source vers la destination, vous devez lancer manuellement le remplissage pour les objets contenant ces données.

Il peut également être utile de lancer le remplissage d'un objet lorsque les données ne sont pas synchronisées entre la source et la destination. Par exemple, un utilisateur peut supprimer par inadvertance des données dans la destination, auquel cas les données sont perdues. Dans ce cas, le remplissage de l'objet sert de "mécanisme de réinitialisation", car toutes les données sont insérées par flux dans la destination en une seule opération. Par conséquent, les données sont synchronisées entre la source et la destination.

Avant de pouvoir lancer le remplissage d'un objet d'un flux, vous devez récupérer des informations sur l'objet.

Chaque objet est associé à un OBJECT_ID, qui l'identifie de manière unique. Vous utilisez OBJECT_ID pour lancer le remplissage du flux.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour lancer le remplissage d'un objet de votre flux, consultez la documentation du SDK Google Cloud.

Arrêter le remplissage pour un objet d'un flux

Après avoir lancé le remplissage d'un objet d'un flux, vous pouvez l'arrêter. Par exemple, si un utilisateur modifie le schéma d'une base de données, le schéma ou les données risquent d'être corrompus. Vous ne voulez pas que ce schéma ou ces données soient diffusés vers la destination. Vous arrêtez donc le remplissage de l'objet.

Vous pouvez également arrêter le remplissage d'un objet à des fins d'équilibrage de charge. Datastream peut exécuter plusieurs remplissages en parallèle. Cela peut ajouter une charge supplémentaire à la source. Si la charge est importante, arrêtez le remplissage pour chaque objet, puis lancez le remplissage des objets un par un.

Avant de pouvoir arrêter le remplissage d'un objet de flux, vous devez demander la récupération des informations sur tous les objets du flux. Chaque objet renvoyé possède un OBJECT_ID, qui l'identifie de manière unique. Utilisez OBJECT_ID pour arrêter le remplissage du flux.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour arrêter le remplissage d'un objet de votre flux, cliquez ici.

Modifier le nombre maximal de tâches CDC simultanées

Le code suivant montre comment définir le nombre maximal de tâches de capture de données modifiées (CDC) simultanées pour un flux MySQL sur 7.

Dans cet exemple, le champ spécifié pour le paramètre updateMask est le champ maxConcurrentCdcTasks. En définissant sa valeur sur 7, vous modifiez le nombre maximal de tâches CDC simultanées de la valeur précédente à 7. Vous pouvez utiliser des valeurs comprises entre 0 et 50 (inclus). Si vous ne définissez pas de valeur ou si vous définissez la valeur sur 0, la valeur par défaut du système (cinq tâches) est définie pour le flux.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }  
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud, cliquez ici.

Modifier le nombre maximal de tâches de remplissage simultanées

Le code suivant montre comment définir le nombre maximal de tâches de remplissage simultanées pour un flux MySQL sur 25.

Pour cet exemple, le champ spécifié pour le paramètre updateMask est maxConcurrentBackfillTasks. En définissant sa valeur sur 25, vous faites passer le nombre maximal de tâches de remplissage simultanées de la valeur précédente à 25. Vous pouvez utiliser des valeurs comprises entre 0 et 50 (inclus). Si vous ne définissez pas cette valeur ou si vous la définissez sur 0, la valeur système par défaut de 16 tâches est définie pour le flux.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/
streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentBackfillTasks": "25"
      }
    }  
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud, cliquez ici.

Activer le streaming d'objets volumineux pour les sources Oracle

Vous pouvez activer le streaming d'objets volumineux, tels que les objets binaires de grande taille (BLOB), les objets de grande taille à caractères (CLOB) et les objets de grande taille à caractères nationaux (NCLOB) pour les flux avec des sources Oracle. L'indicateur streamLargeObjects vous permet d'inclure de grands objets dans les nouveaux flux et les flux existants. L'indicateur est défini au niveau du flux, vous n'avez pas besoin de spécifier les colonnes des types de données d'objets volumineux.

L'exemple suivant montre comment créer un flux permettant de diffuser de grands volumes d'objets.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleLobStream
{
  "displayName": "Oracle LOB stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1",
            "oracleTables": [
              {
                "table": "tableA",
                "oracleColumns": [
                  {
                    "column": "column1,column2"
                  }
                ]
              }
            ]
          }
        ]
      },
      "excludeObjects": {},
      "streamLargeObjects": {}
    }
  }
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour mettre à jour un flux, consultez la documentation du SDK Google Cloud.

Supprimer un flux

Le code suivant illustre une requête de suppression d'un flux.

REST

DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID

gcloud

Pour en savoir plus sur la suppression de votre flux à l'aide de gcloud, cliquez ici.

Étape suivante