Gérer les flux

Présentation

Dans cette section, vous allez apprendre à utiliser l'API Datastream pour :

  • Créer des flux
  • Obtenir des informations sur les flux et les objets de flux
  • Mettre à jour des flux en les démarrant, en les mettant en pause, en les réactivant et en les modifiant, ainsi qu'en lançant et en arrêtant le remplissage pour les objets de flux
  • Récupérer des flux ayant définitivement échoué
  • Activer le streaming d'objets volumineux pour les flux Oracle
  • Supprimer des flux

Vous pouvez utiliser l'API Datastream de deux manières. Vous pouvez effectuer des appels d'API REST ou utiliser la Google Cloud CLI (CLI).

Pour obtenir des informations générales sur la gestion des flux Datastream à l'aide de gcloud, consultez la page gcloud Datastream Streams.

Créer un flux

Dans cette section, vous allez apprendre à créer un flux permettant de transférer des données de votre source vers une destination. Les exemples suivants ne sont pas exhaustifs, mais mettent en avant des fonctionnalités spécifiques de Datastream. Pour répondre à votre cas d'utilisation spécifique, utilisez ces exemples ainsi que la documentation de référence de l'API Datastream.

Cette section couvre les cas d'utilisation suivants:

Exemple 1: Diffuser des objets spécifiques vers BigQuery

Dans cet exemple, vous allez apprendre à:

  • Insérer des données en flux continu de MySQL vers BigQuery
  • Inclure un ensemble d'objets dans le flux
  • Définir le mode d'écriture du flux en tant que mode "append-only"
  • Remplir tous les objets inclus dans le flux

La requête suivante permet d'extraire toutes les tables de schema1 et de deux tables spécifiques de schema2: tableA et tableC. Les événements sont écrits dans un ensemble de données BigQuery.

La requête n'inclut pas le paramètre customerManagedEncryptionKey. Par conséquent, vos données sont chiffrées à la place de la CMEK à l'aide du système de gestion de clés interne de Google Cloud.

Le paramètre backfillAll associé à l'exécution du remplissage (ou instantané) historique est défini sur un dictionnaire vide ({}), ce qui signifie que Datastream remplit les données historiques de toutes les tables incluses dans le flux.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream
{
  "displayName": "MySQL CDC to BigQuery",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "schema1" },
          {
            "database": "schema2",
            "mysqlTables": [
              {
                "table": "tableA",
                "table": "tableC"
              }
            ]
          }
        ]
      },
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "dataFreshness": "900s"
    }
  },
  "backfillAll": {}
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour créer un flux, consultez la documentation du SDK Google Cloud.

Exemple 2: Exclure des objets spécifiques d'un flux avec une source PostgreSQL

Dans cet exemple, vous allez apprendre à:

  • Transférer des données depuis PostgreSQL vers BigQuery
  • Exclure des objets du flux
  • Exclure des objets du remplissage

Le code suivant montre une requête de création d'un flux permettant de transférer des données d'une base de données PostgreSQL source vers BigQuery. Lorsque vous créez un flux à partir d'une base de données PostgreSQL source, vous devez spécifier deux champs supplémentaires spécifiques à PostgreSQL dans votre requête:

  • replicationSlot: un emplacement de réplication est une condition préalable à la configuration d'une base de données PostgreSQL pour la réplication. Vous devez créer un emplacement de réplication pour chaque flux.
  • publication: une publication est un groupe de tables à partir desquelles vous souhaitez répliquer des modifications. Le nom de la publication doit exister dans la base de données avant le démarrage d'un flux. La publication doit inclure au minimum les tables spécifiées dans la liste includeObjects du flux.

Le paramètre backfillAll associé à l'exécution du remplissage (ou instantané) de l'historique est défini pour exclure une table.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to BigQueryCloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "schema1"
        }
      },
      "excludeObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
        "postgresqlTables": [
          {
            "table": "tableA",
            "postgresqlColumns": [
              { "column": "column5" }
              ]
              }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
           "location": "us",
           "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {
    "postgresqlExcludedObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
            "postgresqlTables": [
              { "table": "tableA" }
            ]
          }
        ]
      }
    }
  }

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour créer un flux, consultez la documentation du SDK Google Cloud.

Exemple 3: Spécifier le mode d'écriture "append-only" pour un flux

Lors de l'insertion en flux continu dans BigQuery, vous pouvez définir le mode d'écriture: merge ou appendOnly. Pour en savoir plus, consultez la section Configurer le mode d'écriture.

Si vous ne spécifiez pas le mode d'écriture dans votre requête de création de flux, le mode merge par défaut est utilisé.

La requête suivante montre comment définir le mode appendOnly lorsque vous créez un flux MySQL vers BigQuery.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream
{
  "displayName": "My append-only stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour créer un flux, consultez la documentation du SDK Google Cloud.

Exemple 4: Diffuser un flux vers une destination Cloud Storage

Dans cet exemple, vous allez apprendre à:

  • Insérer des données en flux continu d'Oracle vers Cloud Storage
  • Définir un ensemble d'objets à inclure dans le flux
  • Définir une clé CMEK pour chiffrer les données au repos

La requête suivante montre comment créer un flux qui écrit les événements dans un bucket Cloud Storage.

Dans cet exemple de requête, les événements sont écrits au format de sortie JSON, et un fichier est créé tous les 100 Mo ou toutes les 30 secondes (en remplaçant les valeurs par défaut de 50 Mo et 60 secondes).

Pour le format JSON, vous pouvez :

  • Incluez un fichier de schéma de types unifiés dans le chemin d'accès. Par conséquent, Datastream écrit deux fichiers dans Cloud Storage : un fichier de données JSON et un fichier de schéma Avro. Le fichier de schéma porte le même nom que le fichier de données, avec une extension .schema.

  • Activez la compression gzip pour que Datastream compresse les fichiers écrits dans Cloud Storage.

En utilisant le paramètre backfillNone, la requête spécifie que seules les modifications en cours sont diffusées dans la destination, sans remplissage.

La requête spécifie le paramètre de clé de chiffrement gérée par le client, qui vous permet de contrôler les clés utilisées pour chiffrer les données au repos dans un projet Google Cloud. Ce paramètre fait référence à la clé CMEK utilisée par Datastream pour chiffrer les données transférées de la source vers la destination. Il spécifie également le trousseau de clés de votre CMEK.

Pour en savoir plus sur les trousseaux, consultez la page Ressources Cloud KMS. Pour en savoir plus sur la protection de vos données à l'aide de clés de chiffrement, consultez la page Cloud Key Management Service (KMS).

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
    connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "GcsBucketCp",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "jsonFileFormat": {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      },
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillNone": {}
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour créer un flux, consultez la documentation du SDK Google Cloud.

Valider la définition d'un flux

Avant de créer un flux, vous pouvez valider sa définition. De cette façon, vous pouvez vous assurer que tous les contrôles de validation sont concluants et que le flux s'exécutera correctement une fois créé.

Valider un flux consiste à vérifier :

  • Si la source est correctement configurée pour autoriser Datastream à diffuser des données à partir de celle-ci.
  • Si le flux peut se connecter à la source et à la destination.
  • La configuration de bout en bout du flux.

Pour valider un flux, ajoutez &validate_only=true à l'URL précédant le corps de votre requête:

POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"

Une fois cette requête envoyée, vous voyez les vérifications de validation exécutées par Datastream pour votre source et votre destination, ainsi que les réussites ou les échecs des contrôles. Si un contrôle échoue, des informations indiquent la raison de l'échec et la procédure à suivre pour corriger le problème.

Par exemple, supposons que vous disposiez d'une clé de chiffrement gérée par le client (CMEK) que vous souhaitez que Datastream utilise pour chiffrer les données transférées de la source vers la destination. Lors de la validation du flux, Datastream vérifie que la clé existe et qu'il est autorisé à l'utiliser. Si l'une de ces conditions n'est pas remplie, le message d'erreur suivant s'affichera lorsque vous validerez le flux:

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

Pour résoudre ce problème, vérifiez que la clé que vous avez fournie existe et que le compte de service Datastream dispose de l'autorisation cloudkms.cryptoKeys.get pour la clé.

Une fois les corrections appropriées effectuées, relancez la requête pour vous assurer que tous les contrôles de validation réussissent. Pour l'exemple ci-dessus, la vérification CMEK_VALIDATE_PERMISSIONS ne renverra plus de message d'erreur, mais l'état sera PASSED.

Obtenir des informations sur un flux

Le code suivant illustre une requête pour récupérer des informations sur un flux. Ces informations sont les suivantes :

  • Nom du flux (identifiant unique)
  • Nom convivial du flux (nom à afficher)
  • les codes temporels de la création et de la dernière mise à jour de la diffusion.
  • Informations sur les profils de connexion source et de destination associés au flux
  • État du flux

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID

La réponse s'affiche comme suit :

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          },
          {
            "schema": "schema3",
            "oracleTables": [
              { "table": "tableA" },
              { "table": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": {},
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillAll": {}
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud afin de récupérer des informations sur votre flux, cliquez ici.

Répertorier les flux

Le code suivant montre une requête permettant de récupérer la liste de tous les flux dans le projet et l'emplacement spécifiés.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams

gcloud

Pour en savoir plus sur l'utilisation de gcloud afin de récupérer des informations sur l'ensemble de vos diffusions, cliquez ici.

Lister les objets d'un flux

Le code suivant illustre une requête permettant de récupérer des informations sur tous les objets d'un flux.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects

gcloud

Pour savoir comment utiliser gcloud afin de récupérer des informations sur tous les objets de votre flux, cliquez ici.

La liste des objets renvoyés peut ressembler à ceci :

REST

{
  "streamObjects": [
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object1",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object2",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object3",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud afin de répertorier les objets d'un flux, cliquez ici.

Lancer un flux

Le code suivant illustre une requête pour démarrer un flux.

Si vous utilisez le paramètre updateMask dans la requête, seuls les champs que vous spécifiez doivent être inclus dans le corps de la requête. Pour démarrer un flux, remplacez la valeur CREATED du champ state par RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour démarrer votre diffusion, cliquez ici.

Suspendre un flux

Le code suivant illustre une requête permettant de mettre en veille un flux en cours d'exécution.

Pour cet exemple, le champ spécifié pour le paramètre updateMask est state. En mettant le flux en veille, vous allez faire passer son état de RUNNING à PAUSED.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "PAUSED"
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud afin de suspendre votre diffusion, cliquez ici.

Réactiver un flux

Le code suivant illustre une requête pour reprendre un flux mis en veille.

Pour cet exemple, le champ spécifié pour le paramètre updateMask est state. En reprenant le flux, vous passez de l'état PAUSED à l'état RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Pour plus d'informations sur l'utilisation de gcloud pour reprendre votre diffusion, cliquez ici.

Récupérer un flux

Vous pouvez récupérer un flux en échec permanent pour une source MySQL, Oracle ou PostgreSQL à l'aide de la méthode RunStream. Chaque type de base de données source a sa propre définition des opérations de récupération de flux possibles. Pour en savoir plus, consultez la section Récupérer un flux.

Récupérer un flux pour une source MySQL ou Oracle

Les exemples de code suivants montrent des requêtes de récupération d'un flux pour une source MySQL ou Oracle à partir de différentes positions de fichiers journaux:

REST

Permet de récupérer un flux à sa position actuelle. Il s'agit de l'option par défaut:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Récupérez un flux à partir de la position disponible suivante:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "nextAvailableStartPosition": {}
  }
}

Récupérez un flux à sa position la plus récente:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "mostRecentStartPosition": {}
  }
}

Récupérer un flux à partir d'une position spécifique (MySQL):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Remplacez les éléments suivants :

  • NAME_OF_THE_LOG_FILE: nom du fichier journal à partir duquel vous souhaitez récupérer votre flux
  • POSITION: position dans le fichier journal à partir de laquelle vous souhaitez récupérer votre flux. Si vous ne fournissez pas cette valeur, Datastream récupère le flux à partir de l'en-tête du fichier.

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 4
      }
    }
  }
}

Récupérez un flux à partir d'une position spécifique (Oracle):

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Remplacez scn par le numéro de modification du système (SCN) dans le fichier journal de rétablissement à partir duquel vous souhaitez récupérer votre flux. Ce champ est obligatoire.

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 234234
      }
    }
  }
}

Pour en savoir plus sur les options de récupération disponibles, consultez la section Récupérer un flux.

gcloud

Il n'est pas possible de récupérer un flux avec gcloud.

Récupérer un flux pour une source PostgreSQL

L'exemple de code suivant montre une requête de récupération d'un flux pour une source PostgreSQL. Lors de la récupération, le flux commence à lire à partir du premier numéro de séquence de journal (LSN) dans l'emplacement de réplication configuré pour le flux.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run

Si vous souhaitez modifier l'emplacement de réplication, mettez d'abord à jour le flux avec le nouveau nom de l'emplacement de réplication:

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot
{
  "sourceConfig": {
    "postgresqlSourceConfig": {
      "replicationSlot": "NEW_REPLICATION_SLOT_NAME"
    }
  }
}

gcloud

Il n'est pas possible de récupérer un flux avec gcloud.

Démarrer ou reprendre une diffusion à partir d'une position spécifique

Vous pouvez démarrer un flux ou reprendre un flux suspendu à partir d'une position spécifique pour les sources MySQL et Oracle. Cela peut être utile lorsque vous souhaitez effectuer un remplissage à l'aide d'un outil externe ou démarrer la CDC à partir d'une position que vous indiquez. Pour une source MySQL, vous devez indiquer une position binlog. Pour une source Oracle, vous devez indiquer un numéro de modification système (SCN) dans le fichier journal de rétablissement.

Le code suivant montre une requête pour démarrer ou reprendre un flux déjà créé à partir d'une position spécifique.

Démarrez ou reprenez un flux à partir d'une position binlog spécifique (MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Remplacez les éléments suivants :

  • NAME_OF_THE_LOG_FILE: nom du fichier journal à partir duquel vous souhaitez démarrer votre flux.
  • POSITION: position dans le fichier journal à partir de laquelle vous souhaitez démarrer votre flux. Si vous n'indiquez pas de valeur, Datastream commence à lire les données à partir de l'en-tête du fichier.

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 2
      }
    }
  }
}

gcloud

Il n'est pas possible de démarrer ou de reprendre un flux à partir d'une position spécifique à l'aide de gcloud. Pour en savoir plus sur l'utilisation de gcloud pour démarrer ou reprendre un flux, consultez la documentation du SDK Cloud.

Démarrez ou reprenez un flux à partir d'un numéro de modification du système spécifique dans le fichier journal de rétablissement (Oracle):

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Remplacez scn par le numéro de modification du système (SCN) dans le fichier journal de rétablissement à partir duquel vous souhaitez démarrer votre flux. Ce champ est obligatoire.

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 123123
      }
    }
  }
}

gcloud

Il n'est pas possible de démarrer ou de reprendre un flux à partir d'une position spécifique à l'aide de gcloud. Pour en savoir plus sur l'utilisation de gcloud pour démarrer un flux, consultez la documentation du SDK Cloud.

Modifier un flux

Le code suivant montre une requête de mise à jour de la configuration de rotation d'un flux pour alterner le fichier toutes les 75 Mo ou 45 secondes.

Pour cet exemple, les champs spécifiés pour le paramètre updateMask incluent les champs fileRotationMb et fileRotationInterval, représentés respectivement par les options destinationConfig.gcsDestinationConfig.fileRotationMb et destinationConfig.gcsDestinationConfig.fileRotationInterval.

REST

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

Le code suivant montre une requête permettant d'inclure un fichier de schéma de types unifiés dans le chemin des fichiers que Datastream écrit dans Cloud Storage. Par conséquent, Datastream écrit deux fichiers: un fichier de données JSON et un fichier de schéma Avro.

Pour cet exemple, le champ spécifié est le champ jsonFileFormat, représenté par l'option destinationConfig.gcsDestinationConfig.jsonFileFormat.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }  
    }
  }
}

Le code suivant montre une requête pour que Datastream réplique des données existantes, en plus des modifications en cours apportées aux données, de la base de données source vers la destination.

La section oracleExcludedObjects du code affiche les tables et les schémas pour lesquels le remplissage dans la destination est interdit.

Pour cet exemple, toutes les tables et tous les schémas seront remplis, à l'exception de la table tableA dans le schéma schema3.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schema": "schema3",
          "oracleTables": [
            { "table": "tableA" }
          ]
        }
      ]
    }
  }
}  

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour modifier votre flux, cliquez ici.

Déclencher le remplissage pour un objet de flux

Un flux dans Datastream peut effectuer un remplissage avec des données d'historique tout en diffusant les modifications en cours par flux vers une destination. Les modifications en cours sont toujours diffusées depuis une source vers une destination. Toutefois, vous pouvez indiquer si vous souhaitez que les données historiques soient diffusées.

Si vous souhaitez que les données de l'historique soient diffusées de la source vers la destination, utilisez le paramètre backfillAll.

Datastream vous permet également de diffuser des données historiques uniquement pour des tables de base de données spécifiques. Pour ce faire, utilisez le paramètre backfillAll et excluez les tables pour lesquelles vous ne souhaitez pas obtenir de données historiques.

Si vous souhaitez que seules les modifications en cours soient diffusées dans la destination, utilisez le paramètre backfillNone. Si vous souhaitez ensuite que Datastream diffuse un instantané de toutes les données existantes de la source vers la destination, vous devez lancer manuellement le remplissage pour les objets contenant ces données.

Il peut également être utile de lancer le remplissage d'un objet lorsque les données ne sont pas synchronisées entre la source et la destination. Par exemple, un utilisateur peut supprimer par inadvertance des données dans la destination, auquel cas les données sont perdues. Dans ce cas, le remplissage de l'objet sert de "mécanisme de réinitialisation", car toutes les données sont insérées par flux dans la destination en une seule opération. Par conséquent, les données sont synchronisées entre la source et la destination.

Avant de pouvoir lancer le remplissage pour un objet de flux, vous devez récupérer des informations sur l'objet.

Chaque objet est associé à un OBJECT_ID, qui l'identifie de manière unique. Utilisez OBJECT_ID pour lancer le remplissage du flux.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob

gcloud

Pour en savoir plus sur l'utilisation de gcloud afin de lancer le remplissage pour un objet de votre flux, consultez la documentation du SDK Google Cloud.

Arrêter le remplissage pour un objet d'un flux

Après avoir lancé le remplissage pour un objet de flux, vous pouvez arrêter le remplissage pour l'objet. Par exemple, si un utilisateur modifie le schéma d'une base de données, le schéma ou les données peuvent être corrompus. Vous ne voulez pas que ce schéma ou ces données soient diffusés vers la destination. Vous arrêtez donc le remplissage de l'objet.

Vous pouvez également arrêter le remplissage d'un objet à des fins d'équilibrage de charge. Datastream peut exécuter plusieurs remplissages en parallèle. Cela peut ajouter une charge supplémentaire à la source. Si la charge est importante, arrêtez le remplissage pour chaque objet, puis lancez le remplissage des objets un par un.

Avant de pouvoir arrêter le remplissage d'un objet de flux, vous devez demander la récupération des informations sur tous les objets du flux. Chaque objet renvoyé possède un OBJECT_ID, qui l'identifie de manière unique. Utilisez OBJECT_ID pour arrêter le remplissage du flux.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour arrêter le remplissage d'un objet de votre flux, cliquez ici.

Modifier le nombre maximal de tâches CDC simultanées

Le code suivant montre comment définir le nombre maximal de tâches de capture de données modifiées (CDC) simultanées pour un flux MySQL sur 7.

Pour cet exemple, le champ spécifié pour le paramètre updateMask est maxConcurrentCdcTasks. En définissant sa valeur sur 7, vous faites passer le nombre maximal de tâches CDC simultanées de la valeur précédente à 7. Vous pouvez utiliser des valeurs comprises entre 0 et 50 (inclus). Si vous ne définissez pas cette valeur, ou si vous la définissez sur 0, la valeur système par défaut de 5 tâches est définie pour le flux.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }  
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud, cliquez ici.

Modifier le nombre maximal de tâches de remplissage simultanées

Le code suivant montre comment définir le nombre maximal de tâches de remplissage simultanées pour un flux MySQL sur 25.

Pour cet exemple, le champ spécifié pour le paramètre updateMask est maxConcurrentBackfillTasks. En définissant sa valeur sur 25, vous faites passer le nombre maximal de tâches de remplissage simultanées de la valeur précédente à 25. Vous pouvez utiliser des valeurs comprises entre 0 et 50 (inclus). Si vous ne définissez pas cette valeur, ou si vous la définissez sur 0, la valeur système par défaut de 16 tâches est définie pour le flux.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/
streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentBackfillTasks": "25"
      }
    }  
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud, cliquez ici.

Activer le streaming d'objets volumineux pour les sources Oracle

Vous pouvez activer la diffusion en continu d'objets volumineux, tels que les objets binaires volumineux (BLOB), les objets de grande taille (CLOB) et les grands objets de caractères nationaux (NCLOB) pour les flux avec des sources Oracle. L'option streamLargeObjects vous permet d'inclure des objets volumineux dans des flux nouveaux et existants. L'indicateur est défini au niveau du flux. Vous n'avez donc pas besoin de spécifier les colonnes des types de données d'objets volumineux.

L'exemple suivant montre comment créer un flux qui vous permet de diffuser des objets volumineux.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleLobStream
{
  "displayName": "Oracle LOB stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1",
            "oracleTables": [
              {
                "table": "tableA",
                "oracleColumns": [
                  {
                    "column": "column1,column2"
                  }
                ]
              }
            ]
          }
        ]
      },
      "excludeObjects": {},
      "streamLargeObjects": {}
    }
  }
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour mettre à jour un flux, consultez la documentation du SDK Google Cloud.

Supprimer un flux

Le code suivant illustre une requête de suppression d'un flux.

REST

DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID

gcloud

Pour plus d'informations sur l'utilisation de gcloud pour supprimer votre flux, cliquez ici.