Gérer les flux

Présentation

Dans cette section, vous allez apprendre à utiliser l'API Datastream pour :

  • Créer des flux
  • Obtenir des informations sur les flux et les objets de flux
  • Mettez à jour des flux en les démarrant, en les mettant en pause, en les reprenant et en les modifiant, ainsi qu'en lançant et en arrêtant le remplissage des objets de flux.
  • Récupérer des diffusions ayant échoué définitivement
  • Activer le streaming d'objets volumineux pour les flux Oracle
  • Supprimer des flux

Vous pouvez utiliser l'API Datastream de deux manières. Vous pouvez effectuer des appels d'API REST ou utiliser la Google Cloud CLI (CLI).

Pour obtenir des informations générales sur l'utilisation de gcloud pour gérer les flux Datastream, consultez gcloud Datastream diffusions.

Créer un flux

Dans cette section, vous allez apprendre à créer un flux servant à transférer des données de votre source vers une destination. Les exemples ci-dessous ne sont pas exhaustifs, mais mettent en évidence des fonctionnalités spécifiques de Datastream. Pour répondre à votre cas d'utilisation spécifique, utilisez ces exemples en complément de la documentation de référence de l'API Datastream.

Cette section couvre les cas d'utilisation suivants:

Exemple 1: Transférer des objets spécifiques vers BigQuery

Dans cet exemple, vous allez apprendre à:

  • Transférer des données depuis MySQL vers BigQuery
  • Inclure un ensemble d'objets dans le flux
  • Remplir tous les objets inclus dans le flux

Voici une requête permettant d'extraire toutes les tables de schema1 et deux tables spécifiques de schema2: tableA et tableC. Les événements sont écrits dans un ensemble de données dans BigQuery.

Comme la requête n'inclut pas le paramètre customerManagedEncryptionKey, le système de gestion des clés interne de Google Cloud est utilisé pour chiffrer vos données plutôt que des clés CMEK.

Le paramètre backfillAll associé à l'exécution du remplissage (ou instantané) historique est défini sur un dictionnaire vide ({}), ce qui signifie que Datastream remplit les données historiques de toutes les tables incluses dans le flux.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream
{
  "displayName": "MySQL CDC to BigQuery",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "schema1" },
          {
            "database": "schema2",
            "mysqlTables": [
              {
                "table": "tableA",
                "table": "tableC"
              }
            ]
          }
        ]
      },
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {}
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour créer un flux, consultez la documentation du SDK Google Cloud.

Exemple 2: Exclure des objets spécifiques d'un flux avec une source PostgreSQL

Dans cet exemple, vous allez apprendre à:

  • Transférer des données depuis PostgreSQL vers BigQuery
  • Exclure des objets du flux
  • Exclure des objets du remplissage

Le code suivant montre une requête de création d'un flux servant à transférer des données depuis une base de données PostgreSQL source vers BigQuery. Lorsque vous créez un flux à partir d'une base de données PostgreSQL source, vous devez spécifier deux champs supplémentaires spécifiques à PostgreSQL dans la requête:

  • replicationSlot: un emplacement de réplication est un prérequis pour la configuration d'une base de données PostgreSQL pour la réplication. Vous devez créer un emplacement de réplication pour chaque flux.
  • publication: une publication est un groupe de tables à partir duquel vous souhaitez répliquer les modifications. Le nom de la publication doit exister dans la base de données pour que vous puissiez démarrer un flux. La publication doit au minimum inclure les tables spécifiées dans la liste includeObjects du flux.

Le paramètre backfillAll associé au remplissage de l'historique (ou à l'instantané) est configuré pour exclure une table.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to BigQueryCloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "schema1"
        }
      },
      "excludeObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
        "postgresqlTables": [
          {
            "table": "tableA",
            "postgresqlColumns": [
              { "column": "column5" }
              ]
              }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
           "location": "us",
           "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {
    "postgresqlExcludedObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
            "postgresqlTables": [
              { "table": "tableA" }
            ]
          }
        ]
      }
    }
  }

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour créer un flux, consultez la documentation du SDK Google Cloud.

Exemple 3: Diffuser des données en flux continu vers une destination Cloud Storage

Dans cet exemple, vous allez apprendre à:

  • Transférer des données depuis Oracle vers Cloud Storage
  • Définir un ensemble d'objets à inclure dans le flux
  • Définir une clé CMEK pour chiffrer les données au repos

La requête suivante montre comment créer un flux qui écrit les événements dans un bucket Cloud Storage.

Dans cet exemple de requête, les événements sont écrits au format de sortie JSON et un fichier est créé toutes les 100 Mo ou toutes les 30 secondes (en remplaçant les valeurs par défaut de 50 Mo et 60 secondes).

Pour le format JSON, vous pouvez :

  • Incluez un fichier de schéma de types unifié dans le chemin d'accès. Par conséquent, Datastream écrit deux fichiers dans Cloud Storage : un fichier de données JSON et un fichier de schéma Avro. Le fichier de schéma porte le même nom que le fichier de données, avec une extension .schema.

  • Activez la compression gzip pour que Datastream compresse les fichiers écrits dans Cloud Storage.

En utilisant le paramètre backfillNone, la requête spécifie que seules les modifications en cours sont transmises à la destination, sans remplissage.

La requête spécifie le paramètre de clé de chiffrement gérée par le client, qui vous permet de contrôler les clés utilisées pour chiffrer les données au repos dans un projet Google Cloud. Ce paramètre fait référence à la clé CMEK utilisée par Datastream pour chiffrer les données diffusées de la source vers la destination. Elle spécifie également le trousseau de clés de votre CMEK.

Pour en savoir plus sur les trousseaux de clés, consultez la page Ressources Cloud KMS. Pour en savoir plus sur la protection de vos données à l'aide de clés de chiffrement, consultez la page Cloud Key Management Service (KMS).

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
    connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "GcsBucketCp",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "jsonFileFormat": {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      },
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillNone": {}
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour créer un flux, consultez la documentation du SDK Google Cloud.

Valider la définition d'un flux

Avant de créer un flux, vous pouvez valider sa définition. De cette façon, vous pouvez vous assurer que tous les contrôles de validation sont concluants et que le flux s'exécute correctement une fois créé.

Valider un flux consiste à vérifier :

  • Si la source est correctement configurée pour autoriser Datastream à diffuser des données à partir de celle-ci.
  • Si le flux peut se connecter à la source et à la destination.
  • La configuration de bout en bout du flux.

Pour valider un flux, ajoutez &validate_only=true à l'URL qui précède le corps de votre requête:

POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"

Une fois cette requête envoyée, vous voyez les vérifications de validation exécutées par Datastream pour votre source et votre destination, ainsi que les réussites ou les échecs des contrôles. Si un contrôle échoue, des informations indiquent la raison de l'échec et la procédure à suivre pour corriger le problème.

Par exemple, supposons que vous souhaitiez que Datastream utilise une clé de chiffrement gérée par le client (CMEK) pour chiffrer les données diffusées de la source vers la destination. Lors de la validation du flux, Datastream vérifie que la clé existe et que Datastream est autorisé à l'utiliser. Si l'une de ces conditions n'est pas remplie, le message d'erreur suivant s'affiche lorsque vous validez le flux:

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

Pour résoudre ce problème, vérifiez que la clé que vous avez fournie existe et que le compte de service Datastream dispose de l'autorisation cloudkms.cryptoKeys.get pour la clé.

Une fois les corrections appropriées effectuées, relancez la requête pour vous assurer que tous les contrôles de validation réussissent. Pour l'exemple ci-dessus, la vérification CMEK_VALIDATE_PERMISSIONS ne renverra plus de message d'erreur, mais son état sera PASSED.

Obtenir des informations sur un flux

Le code suivant montre une requête de récupération d'informations sur un flux. Ces informations sont les suivantes :

  • Nom du flux (identifiant unique)
  • Un nom convivial pour le flux (nom à afficher)
  • Codes temporels du moment où le flux a été créé et mis à jour pour sa dernière fois
  • Informations sur les profils de connexion source et de destination associés au flux
  • État du flux

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID

La réponse s'affiche comme suit :

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          },
          {
            "schema": "schema3",
            "oracleTables": [
              { "table": "tableA" },
              { "table": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": {},
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillAll": {}
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour récupérer des informations sur votre diffusion, cliquez ici.

Répertorier les flux

Le code suivant montre une requête permettant de récupérer la liste de tous les flux dans le projet et l'emplacement spécifiés.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams

gcloud

Pour en savoir plus sur l'utilisation de gcloud afin d'obtenir des informations sur tous vos flux, cliquez ici.

Lister les objets d'un flux

Le code suivant montre une requête de récupération d'informations sur tous les objets d'un flux.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects

gcloud

Pour en savoir plus sur l'utilisation de gcloud afin de récupérer des informations sur tous les objets de votre flux, cliquez ici.

La liste des objets renvoyés peut ressembler à ceci :

REST

{
  "streamObjects": [
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object1",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object2",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object3",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour répertorier les objets d'un flux, cliquez ici.

Lancer un flux

Le code suivant montre une requête pour démarrer un flux.

Si vous utilisez le paramètre updateMask dans la requête, seuls les champs que vous spécifiez doivent être inclus dans le corps de la requête. Pour démarrer un flux, remplacez la valeur CREATED par RUNNING dans le champ state.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Pour plus d'informations sur l'utilisation de gcloud pour démarrer votre diffusion, cliquez ici.

Démarrer un flux à partir d'une position spécifique

Vous pouvez démarrer un flux à partir d'une position spécifique pour les sources MySQL et Oracle, par exemple si vous souhaitez effectuer un remplissage à l'aide d'un outil externe, ou démarrer la CDC à partir d'un fichier journal que vous indiquez. Pour une source MySQL, vous devez indiquer une position binlog, pour une source Oracle, un numéro de modification système (SCN, System Change Number) dans le fichier journal de rétablissement.

Le code suivant montre une requête pour démarrer un flux déjà créé à partir d'une position spécifique.

Démarrez un flux à partir d'une position binlog spécifique (MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Remplacez les éléments suivants :

  • NAME_OF_THE_LOG_FILE: nom du fichier journal à partir duquel vous souhaitez démarrer votre flux.
  • POSITION: position dans le fichier journal à partir de laquelle vous souhaitez démarrer votre flux. Si vous ne fournissez pas cette valeur, Datastream commence la lecture à partir de l'en-tête du fichier.

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 2
      }
    }
  }
}

gcloud

Il n'est pas possible de démarrer un flux à partir d'une position spécifique à l'aide de gcloud. Pour en savoir plus sur l'utilisation de gcloud pour démarrer un flux, consultez la documentation du SDK Cloud.

Démarrez un flux à partir d'un numéro de modification système spécifique dans le fichier journal de rétablissement (Oracle):

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Remplacez scn par le numéro de modification système (SCN) dans le fichier journal de rétablissement à partir duquel vous souhaitez démarrer votre flux. Ce champ est obligatoire.

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 123123
      }
    }
  }
}

gcloud

Il n'est pas possible de démarrer un flux à partir d'une position spécifique à l'aide de gcloud. Pour en savoir plus sur l'utilisation de gcloud pour démarrer un flux, consultez la documentation du SDK Cloud.

Suspendre un flux

Le code suivant montre une requête permettant de suspendre un flux en cours d'exécution.

Dans cet exemple, le champ spécifié pour le paramètre updateMask est le champ state. En suspendant la diffusion, vous faites passer son état de RUNNING à PAUSED.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "PAUSED"
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour suspendre votre diffusion, cliquez ici.

Réactiver un flux

Le code suivant montre une demande de reprise d'un flux mis en veille.

Dans cet exemple, le champ spécifié pour le paramètre updateMask est le champ state. En reprenant la diffusion, vous faites passer son état de PAUSED à RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Pour plus d'informations sur l'utilisation de gcloud pour reprendre votre diffusion, cliquez ici.

Récupérer un flux

Vous pouvez récupérer un flux ayant échoué définitivement pour une source MySQL, Oracle ou PostgreSQL à l'aide de la méthode RunStream. Chaque type de base de données source a sa propre définition des opérations de récupération de flux possibles. Pour en savoir plus, consultez Récupérer un flux.

Récupérer un flux pour une source MySQL ou Oracle

Les exemples de code suivants montrent des requêtes permettant de récupérer un flux pour une source MySQL ou Oracle à partir de différentes positions de fichier journal:

REST

Récupérez un flux à sa position actuelle. Il s'agit de l'option par défaut:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Récupérez un flux à partir du prochain emplacement disponible:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "nextAvailableStartPosition": {}
  }
}

Récupérer un flux à sa position la plus récente:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "mostRecentStartPosition": {}
  }
}

Récupérez un flux à partir d'une position spécifique (MySQL):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Remplacez les éléments suivants :

  • NAME_OF_THE_LOG_FILE: nom du fichier journal à partir duquel vous souhaitez récupérer le flux
  • POSITION: position dans le fichier journal à partir de laquelle vous souhaitez récupérer le flux. Si vous ne fournissez pas cette valeur, Datastream récupère le flux à partir de l'en-tête du fichier.

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 4
      }
    }
  }
}

Récupérez un flux à partir d'une position spécifique (Oracle):

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Remplacez scn par le numéro de modification du système (SCN) dans le fichier journal de rétablissement à partir duquel vous souhaitez récupérer votre flux. Ce champ est obligatoire.

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 234234
      }
    }
  }
}

Pour en savoir plus sur les options de récupération disponibles, consultez la section Récupérer un flux.

gcloud

Il n'est pas possible de récupérer un flux avec gcloud.

Récupérer un flux pour une source PostgreSQL

L'exemple de code suivant montre une requête de récupération d'un flux pour une source PostgreSQL. Lors de la récupération, le flux commence à lire à partir du premier numéro de séquence de journal (LSN) dans l'emplacement de réplication configuré pour le flux.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run

Si vous souhaitez modifier l'emplacement de réplication, commencez par mettre à jour le flux avec le nouveau nom de l'emplacement de réplication:

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot
{
  "sourceConfig": {
    "postgresqlSourceConfig": {
      "replicationSlot": "NEW_REPLICATION_SLOT_NAME"
    }
  }
}

gcloud

Il n'est pas possible de récupérer un flux avec gcloud.

Modifier un flux

Le code suivant montre une requête de mise à jour de la configuration de rotation d'un flux pour alterner le fichier toutes les 75 Mo ou 45 secondes.

Dans cet exemple, les champs spécifiés pour le paramètre updateMask incluent les champs fileRotationMb et fileRotationInterval, représentés respectivement par les indicateurs destinationConfig.gcsDestinationConfig.fileRotationMb et destinationConfig.gcsDestinationConfig.fileRotationInterval.

REST

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

Le code suivant montre une requête pour inclure un fichier de schéma de types unifiés dans le chemin des fichiers que Datastream écrit dans Cloud Storage. Par conséquent, Datastream écrit deux fichiers: un fichier de données JSON et un fichier de schéma Avro.

Dans cet exemple, le champ spécifié est jsonFileFormat, représenté par l'option destinationConfig.gcsDestinationConfig.jsonFileFormat.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }
    }
  }
}

Le code suivant montre une requête pour que Datastream réplique des données existantes, en plus des modifications en cours apportées aux données, de la base de données source vers la destination.

La section oracleExcludedObjects du code affiche les tables et les schémas qui ne peuvent pas être remplis dans la destination.

Pour cet exemple, toutes les tables et tous les schémas seront remplis, à l'exception de la table tableA dans le schéma schema3.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schema": "schema3",
          "oracleTables": [
            { "table": "tableA" }
          ]
        }
      ]
    }
  }
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour modifier votre diffusion, cliquez ici.

Déclencher un remplissage pour un objet d'un flux

Un flux dans Datastream peut effectuer un remplissage avec des données d'historique tout en diffusant les modifications en cours par flux vers une destination. Les modifications en cours sont toujours diffusées depuis une source vers une destination. Toutefois, vous pouvez spécifier si vous souhaitez que les données historiques soient diffusées en continu.

Si vous souhaitez que les données de l'historique soient diffusées de la source vers la destination, utilisez le paramètre backfillAll.

Datastream vous permet également de diffuser des données historiques en flux continu uniquement pour des tables de base de données spécifiques. Pour ce faire, utilisez le paramètre backfillAll et excluez les tables pour lesquelles vous ne souhaitez pas obtenir de données historiques.

Si vous souhaitez que seules les modifications en cours soient diffusées dans la destination, utilisez le paramètre backfillNone. Si vous souhaitez ensuite que Datastream diffuse un instantané de toutes les données existantes de la source vers la destination, vous devez lancer le remplissage manuellement pour les objets contenant ces données.

Il peut également être utile de lancer le remplissage d'un objet lorsque les données ne sont pas synchronisées entre la source et la destination. Par exemple, un utilisateur peut supprimer par inadvertance des données dans la destination, auquel cas les données sont perdues. Dans ce cas, le remplissage de l'objet sert de "mécanisme de réinitialisation", car toutes les données sont insérées par flux dans la destination en une seule opération. Par conséquent, les données sont synchronisées entre la source et la destination.

Avant de pouvoir lancer un remplissage pour un objet d'un flux, vous devez récupérer des informations sur l'objet.

Chaque objet est associé à un OBJECT_ID, qui l'identifie de manière unique. Vous utilisez OBJECT_ID pour lancer le remplissage du flux.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour le remplissage initial d'un objet de votre flux, cliquez ici.

Arrêter le remplissage pour un objet d'un flux

Après avoir lancé le remplissage pour un objet d'un flux, vous pouvez arrêter le remplissage pour l'objet. Par exemple, si un utilisateur modifie le schéma d'une base de données, le schéma ou les données peuvent être corrompus. Vous ne voulez pas que ce schéma ou ces données soient diffusés vers la destination. Vous arrêtez donc le remplissage de l'objet.

Vous pouvez également arrêter le remplissage d'un objet à des fins d'équilibrage de charge. Datastream peut exécuter plusieurs remplissages en parallèle. Cela peut ajouter une charge supplémentaire à la source. Si la charge est importante, arrêtez le remplissage pour chaque objet, puis lancez le remplissage des objets un par un.

Avant de pouvoir arrêter le remplissage d'un objet d'un flux, vous devez effectuer une requête afin de récupérer des informations sur tous les objets d'un flux. Chaque objet renvoyé possède un OBJECT_ID, qui l'identifie de manière unique. Vous utilisez OBJECT_ID pour arrêter le remplissage du flux.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob

gcloud

Pour en savoir plus sur l'utilisation de gcloud afin d'arrêter le remplissage pour un objet de votre flux, cliquez ici.

Modifier le nombre maximal de tâches CDC simultanées

Le code suivant montre comment définir sur 7 le nombre maximal de tâches simultanées de capture de données modifiées (CDC, Change Data Capture) pour un flux MySQL.

Dans cet exemple, le champ spécifié pour le paramètre updateMask est le champ maxConcurrentCdcTasks. En définissant sa valeur sur 7, vous faites passer le nombre maximal de tâches CDC simultanées de la valeur précédente à 7. Vous pouvez utiliser des valeurs comprises entre 0 et 50 (inclus). Si vous ne définissez pas cette valeur, ou si vous lui attribuez la valeur 0, cinq tâches par défaut sont définies pour le flux.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud, cliquez ici.

Modifier le nombre maximal de tâches de remplissage simultanées

Le code suivant montre comment définir sur 25 le nombre maximal de tâches de remplissage simultanées pour un flux MySQL.

Dans cet exemple, le champ spécifié pour le paramètre updateMask est le champ maxConcurrentBackfillTasks. En définissant sa valeur sur 25, vous faites passer le nombre maximal de tâches de remplissage simultanées de la valeur précédente à 25. Vous pouvez utiliser des valeurs comprises entre 0 et 50 (inclus). Si vous ne définissez pas cette valeur, ou si vous lui attribuez la valeur 0, la valeur par défaut du système (16 tâches) est définie pour le flux.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/
streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentBackfillTasks": "25"
      }
    }
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud, cliquez ici.

Activer le streaming d'objets volumineux pour les sources Oracle

Vous pouvez activer le streaming d'objets volumineux, tels que les grands objets binaires (BLOB), les grands objets de caractères (CLOB) et les grands objets de caractères nationaux (NCLOB) pour les flux avec des sources Oracle. L'option streamLargeObjects vous permet d'inclure des objets volumineux dans les flux nouveaux et existants. L'option est définie au niveau du flux. Vous n'avez donc pas besoin de spécifier les colonnes des types de données d'objets volumineux.

L'exemple suivant montre comment créer un flux permettant de diffuser des objets volumineux.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleLobStream
{
  "displayName": "Oracle LOB stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1",
            "oracleTables": [
              {
                "table": "tableA",
                "oracleColumns": [
                  {
                    "column": "column1,column2"
                  }
                ]
              }
            ]
          }
        ]
      },
      "excludeObjects": {},
      "streamLargeObjects": {}
    }
  }
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour mettre à jour un flux, consultez la documentation du SDK Google Cloud.

Supprimer un flux

Le code suivant montre une requête de suppression d'un flux.

REST

DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID

gcloud

Pour plus d'informations sur la suppression de votre diffusion à l'aide de gcloud, cliquez ici.