Gérer les flux

Sur cette page, vous allez apprendre à utiliser l'API Datastream pour :

  • Créer des flux
  • Obtenir des informations sur les flux et les objets de flux
  • Mettre à jour les flux en les démarrant, en les mettant en veille, en les reprenant et en les modifiant, ainsi qu'en lançant et en arrêtant le remplissage des objets de flux
  • Récupérer des diffusions ayant échoué de manière définitive
  • Activer le streaming des grands objets pour les flux Oracle
  • Supprimer des flux

Vous pouvez utiliser l'API Datastream de deux manières. Vous pouvez effectuer des appels d'API REST ou utiliser la Google Cloud CLI.

Pour obtenir des informations générales sur l'utilisation de Google Cloud CLI afin de gérer les flux Datastream, consultez Flux Datastream gcloud CLI.

Créer un flux

Dans cette section, vous allez apprendre à créer un flux permettant de transférer des données de votre source vers une destination. Les exemples suivants ne sont pas exhaustifs, mais mettent en évidence des fonctionnalités spécifiques de Datastream. Pour répondre à votre cas d'utilisation spécifique, utilisez ces exemples avec la documentation de référence de l'API Datastream.

Cette section couvre les cas d'utilisation suivants :

Exemple 1 : Diffuser des objets spécifiques vers BigQuery

Dans cet exemple, vous allez apprendre à :

  • Diffuser des données en streaming depuis MySQL vers BigQuery
  • Inclure un ensemble d'objets dans le flux
  • Définir le mode d'écriture du flux sur "ajout uniquement"
  • Remplir tous les objets inclus dans le flux

Voici une requête pour extraire toutes les tables de schema1 et deux tables spécifiques de schema2 : tableA et tableC. Les événements sont écrits dans un ensemble de données BigQuery.

La requête n'inclut pas le paramètre customerManagedEncryptionKey. Par conséquent, le système interne de gestion des clés Google Cloud est utilisé pour chiffrer vos données au lieu de CMEK.

Le paramètre backfillAll associé à l'exécution du remplissage d'historique (ou de l'instantané) est défini sur un dictionnaire vide ({}), ce qui signifie que Datastream remplit les données historiques de toutes les tables incluses dans le flux.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream
{
  "displayName": "MySQL CDC to BigQuery",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "schema1" },
          {
            "database": "schema2",
            "mysqlTables": [
              {
                "table": "tableA",
                "table": "tableC"
              }
            ]
          }
        ]
      },
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "dataFreshness": "900s"
    }
  },
  "backfillAll": {}
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour créer un flux, consultez la documentation du SDK Google Cloud.

Exemple 2 : Exclure des objets spécifiques d'un flux avec une source PostgreSQL

Dans cet exemple, vous allez apprendre à :

  • Diffuser des données en streaming depuis PostgreSQL vers BigQuery
  • Exclure des objets du flux
  • Exclure des objets du remplissage

Le code suivant montre une requête permettant de créer un flux utilisé pour transférer des données d'une base de données PostgreSQL source vers BigQuery. Lorsque vous créez un flux à partir d'une base de données PostgreSQL source, vous devez spécifier deux champs supplémentaires propres à PostgreSQL dans votre requête :

  • replicationSlot : un emplacement de réplication est nécessaire pour configurer une base de données PostgreSQL pour la réplication. Vous devez créer un emplacement de réplication pour chaque flux.
  • publication : une publication est un groupe de tables dont vous souhaitez répliquer les modifications. Le nom de la publication doit exister dans la base de données avant de démarrer un flux. La publication doit au minimum inclure les tables spécifiées dans la liste includeObjects du flux.

Le paramètre backfillAll associé à l'exécution du remplissage d'historique (ou de l'instantané) est défini pour exclure une table.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to BigQueryCloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "schema1"
        }
      },
      "excludeObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
        "postgresqlTables": [
          {
            "table": "tableA",
            "postgresqlColumns": [
              { "column": "column5" }
              ]
              }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
           "location": "us",
           "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {
    "postgresqlExcludedObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
            "postgresqlTables": [
              { "table": "tableA" }
            ]
          }
        ]
      }
    }
  }

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour créer un flux, consultez la documentation du SDK Google Cloud.

Exemple 3 : Spécifier le mode d'écriture "ajout uniquement" pour un flux

Lorsque vous diffusez des données en flux continu vers BigQuery, vous pouvez définir le mode d'écriture : merge ou appendOnly. Pour en savoir plus, consultez Configurer le mode écriture.

Si vous ne spécifiez pas le mode d'écriture dans votre requête de création d'un flux, le mode merge par défaut est utilisé.

La requête suivante montre comment définir le mode appendOnly lorsque vous créez un flux MySQL vers BigQuery.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream
{
  "displayName": "My append-only stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour créer un flux, consultez la documentation du SDK Google Cloud.

Exemple 4 : Diffuser vers un autre projet dans BigQuery

Si vous avez créé vos ressources Datastream dans un projet, mais que vous souhaitez les diffuser dans un autre projet BigQuery, vous pouvez le faire à l'aide d'une requête semblable à celle qui suit.

Si vous spécifiez sourceHierarchyDatasets pour votre ensemble de données de destination, vous devez renseigner le champ projectId.

Si vous spécifiez singleTargetDataset pour votre ensemble de données de destination, remplissez le champ datasetId au format projectId:datasetId.

REST

Pour sourceHierarchyDatasets :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=crossProjectBqStream1
{
  "displayName": "My cross-project stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        },
        "projectId": "myProjectId2"
      }
    }
  },
  "backfillAll": {}
}

Pour singleTargetDataset :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=crossProjectBqStream2
{
  "displayName": "My cross-project stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "singleTargetDataset": {
        "datasetId": "myProjectId2:myDatasetId"
      },
    }
  },
  "backfillAll": {}
}

gcloud

Pour sourceHierarchyDatasets :

  datastream streams create crossProjectBqStream1 --location=us-central1
  --display-name=my-cross-project-stream --source=source-cp --mysql-source-config=mysql_source_config.json
  --destination=destination-cp --bigquery-destination-config=source_hierarchy_cross_project_config.json
  --backfill-none
  

Contenu du fichier de configuration source_hierarchy_cross_project_config.json :

  {"sourceHierarchyDatasets": {"datasetTemplate": {"location": "us-central1", "datasetIdPrefix": "prefix_"}, "projectId": "myProjectId2"}}
  

Pour singleTargetDataset :

  datastream streams create crossProjectBqStream --location=us-central1
  --display-name=my-cross-project-stream --source=source-cp --mysql-source-config=mysql_source_config.json
  --destination=destination-cp --bigquery-destination-config=single_target_cross_project_config.json
  --backfill-none
  

Contenu du fichier de configuration single_target_cross_project_config.json :

  {"singleTargetDataset": {"datasetId": "myProjectId2:myDatastetId"}}
  

Pour en savoir plus sur l'utilisation de gcloud pour créer un flux, consultez la documentation du SDK Google Cloud.

Exemple 5 : Diffuser des données vers une destination Cloud Storage

Dans cet exemple, vous allez apprendre à :

  • Diffuser des données d'Oracle vers Cloud Storage
  • Définir un ensemble d'objets à inclure dans le flux
  • Définir une CMEK pour chiffrer les données au repos

La requête suivante montre comment créer un flux qui écrit les événements dans un bucket Cloud Storage.

Dans cet exemple de requête, les événements sont écrits au format de sortie JSON, et un fichier est créé toutes les 100 Mo ou 30 secondes (en remplaçant les valeurs par défaut de 50 Mo et 60 secondes).

Pour le format JSON, vous pouvez :

  • Incluez un fichier de schéma de types unifiés dans le chemin d'accès. Par conséquent, Datastream écrit deux fichiers dans Cloud Storage : un fichier de données JSON et un fichier de schéma Avro. Le fichier de schéma porte le même nom que le fichier de données, avec une extension .schema.

  • Activez la compression gzip pour que Datastream compresse les fichiers écrits dans Cloud Storage.

En utilisant le paramètre backfillNone, la requête indique que seuls les changements en cours sont diffusés dans la destination, sans remplissage.

La requête spécifie le paramètre de clé de chiffrement gérée par le client, qui vous permet de contrôler les clés utilisées pour chiffrer les données au repos dans un projet Google Cloud . Le paramètre fait référence à la clé CMEK utilisée par Datastream pour chiffrer les données diffusées de la source vers la destination. Il spécifie également le trousseau de clés pour votre clé CMEK.

Pour en savoir plus sur les trousseaux de clés, consultez la page Ressources Cloud KMS. Pour en savoir plus sur la protection de vos données à l'aide de clés de chiffrement, consultez Cloud Key Management Service (KMS).

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
    connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "GcsBucketCp",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "jsonFileFormat": {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      },
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillNone": {}
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour créer un flux, consultez la documentation du SDK Google Cloud.

Exemple 6 : Diffuser des données dans une table gérée BigLake

Dans cet exemple, vous allez apprendre à configurer un flux pour répliquer les données d'une base de données MySQL vers une table BigLake Iceberg en mode append-only. Avant de créer la demande, assurez-vous d'avoir effectué les étapes suivantes :

  • Disposer d'un bucket Cloud Storage dans lequel stocker vos données
  • Créez une connexion à une ressource Cloud.
  • Accorder à votre connexion de ressource Cloud l'accès au bucket Cloud Storage

Vous pouvez ensuite utiliser la requête suivante pour créer votre flux :

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlBigLakeStream
{
  "displayName": "MySQL to BigLake stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlBigLakeCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          {
            "database": "my-mysql-database"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id",
    "bigqueryDestinationConfig": {
      "blmtConfig": {
        "bucket": "my-gcs-bucket-name",
        "rootPath": "my/folder",
        "connectionName": "my-project-id.us-central1.my-bigquery-connection-name",
        "fileFormat": "PARQUET",
        "tableFormat": "ICEBERG"
        },
      "singleTargetDataset": {
        "datasetId": "my-project-id:my-bigquery-dataset-id"
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

datastream streams create mysqlBigLakeStream --location=us-central1
--display-name=mysql-to-bl-stream --source=source --mysql-source-config=mysql_source_config.json
--destination=destination --bigquery-destination-config=bl_config.json
--backfill-none

Contenu du fichier de configuration source mysql_source_config.json :

{"excludeObjects": {}, "includeObjects": {"mysqlDatabases":[{"database":"my-mysql-database"}]}}

Contenu du fichier de configuration bl_config.json :

{ "blmtConfig": { "bucket": "my-gcs-bucket-name", "rootPath": "my/folder", "connectionName": "my-project-id.us-central1.my-bigquery-connection-name", "fileFormat": "PARQUET", "tableFormat": "ICEBERG" }, "singleTargetDataset": {"datasetId": "my-project-id:my-bigquery-dataset-id"}, "appendOnly": {} }

Terraform

resource "google_datastream_stream" "stream" {
  stream_id    = "mysqlBlStream"
  location     = "us-central1"
  display_name = "MySQL to BigLake stream"

  source_config {
    source_connection_profile = "/projects/myProjectId1/locations/us-central1/streams/mysqlBlCp"
    mysql_source_config {
      include_objects {
        mysql_databases {
          database = "my-mysql-database"
        }
      }
    }
  }

  destination_config {
    destination_connection_profile = "projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id"
    bigquery_destination_config {
      single_target_dataset {
        dataset_id = "my-project-id:my-bigquery-dataset-id"
      }
      blmt_config {
        bucket          = "my-gcs-bucket-name"
        table_format    = "ICEBERG"
        file_format     = "PARQUET"
        connection_name = "my-project-id.us-central1.my-bigquery-connection-name"
        root_path       = "my/folder"
      }
      append_only {}
    }
  }

  backfill_none {}
}
    

Valider la définition d'un flux

Avant de créer un flux, vous pouvez valider sa définition. Vous pouvez ainsi vous assurer que tous les contrôles de validation sont réussis et que le flux s'exécutera correctement une fois créé.

Valider un flux consiste à vérifier :

  • Si la source est correctement configurée pour autoriser Datastream à diffuser des données à partir de celle-ci.
  • Si le flux peut se connecter à la source et à la destination.
  • La configuration de bout en bout du flux.

Pour valider un flux, ajoutez &validate_only=true à l'URL précédant le corps de votre requête :

POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"

Une fois cette requête envoyée, vous voyez les vérifications de validation exécutées par Datastream pour votre source et votre destination, ainsi que les réussites ou les échecs des contrôles. Si un contrôle échoue, des informations indiquent la raison de l'échec et la procédure à suivre pour corriger le problème.

Par exemple, supposons que vous disposiez d'une clé de chiffrement gérée par le client (CMEK) que vous souhaitez que Datastream utilise pour chiffrer les données transférées de la source vers la destination. Lors de la validation du flux, Datastream vérifie que la clé existe et qu'il est autorisé à l'utiliser. Si l'une de ces conditions n'est pas remplie, le message d'erreur suivant s'affiche lorsque vous validez le flux :

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

Pour résoudre ce problème, vérifiez que la clé que vous avez fournie existe et que le compte de service Datastream dispose de l'autorisation cloudkms.cryptoKeys.get pour la clé.

Une fois les corrections appropriées effectuées, relancez la requête pour vous assurer que tous les contrôles de validation réussissent. Pour l'exemple précédent, la vérification CMEK_VALIDATE_PERMISSIONS ne renverra plus de message d'erreur, mais aura l'état PASSED.

Obtenir des informations sur un flux

Le code suivant montre une requête de récupération d'informations sur un flux. Voici quelques exemples :

  • Nom du flux (identifiant unique)
  • Nom convivial du flux (nom à afficher)
  • Codes temporels de création et de dernière mise à jour du flux
  • Informations sur les profils de connexion source et de destination associés au flux
  • État du flux

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID

La réponse s'affiche comme suit :

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          },
          {
            "schema": "schema3",
            "oracleTables": [
              { "table": "tableA" },
              { "table": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": {},
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillAll": {}
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour récupérer des informations sur votre flux, consultez la documentation du SDK Google Cloud.

Répertorier les flux

Le code suivant montre une requête permettant de récupérer la liste de tous les flux du projet et de l'emplacement spécifiés.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams

gcloud

Pour en savoir plus sur l'utilisation de gcloud afin de récupérer des informations sur tous vos flux, consultez la documentation du SDK Google Cloud.

Lister les objets d'un flux

Le code suivant montre une requête de récupération d'informations sur tous les objets d'un flux.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects

gcloud

Pour en savoir plus sur l'utilisation de gcloud afin de récupérer des informations sur tous les objets de votre flux, consultez la documentation du SDK Google Cloud.

La liste des objets renvoyés peut ressembler à ceci :

REST

{
  "streamObjects": [
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object1",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object2",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object3",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour lister les objets d'un flux, consultez la documentation du SDK Google Cloud.

Lancer un flux

Le code suivant montre une requête pour démarrer un flux.

Si vous utilisez le paramètre updateMask dans la requête, seuls les champs que vous spécifiez doivent être inclus dans le corps de la requête. Pour démarrer un flux, remplacez la valeur du champ state de CREATED par RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour démarrer votre flux, consultez la documentation du SDK Google Cloud.

Suspendre un flux

Le code suivant montre une requête permettant de suspendre un flux en cours d'exécution.

Dans cet exemple, le champ spécifié pour le paramètre updateMask est le champ state. En mettant le flux en pause, vous passez de l'état RUNNING à PAUSED.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "PAUSED"
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour suspendre votre flux, consultez la documentation du SDK Google Cloud.

Réactiver un flux

Le code suivant montre une requête permettant de reprendre un flux suspendu.

Dans cet exemple, le champ spécifié pour le paramètre updateMask est le champ state. En rétablissant le flux, vous modifiez son état de PAUSED à RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour reprendre votre flux, consultez la documentation du SDK Google Cloud.

Récupérer un flux

Vous pouvez récupérer un flux en échec permanent à l'aide de la méthode RunStream. Chaque type de base de données source a sa propre définition des opérations de récupération de flux possibles. Pour en savoir plus, consultez Récupérer un flux.

Récupérer un flux pour une source MySQL ou Oracle

Les exemples de code suivants montrent des requêtes permettant de récupérer un flux pour une source MySQL ou Oracle à partir de différentes positions de fichier journal :

REST

Récupérez un flux à partir de la position actuelle. Il s'agit de l'option par défaut :

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Récupérer un flux à partir de la prochaine position disponible :

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "nextAvailableStartPosition": {}
  }
}

Récupérer un flux à partir de la position la plus récente :

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "mostRecentStartPosition": {}
  }
}

Récupérer un flux à partir d'une position spécifique (réplication basée sur le journal binaire MySQL) :

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Remplacez les éléments suivants :

  • NAME_OF_THE_LOG_FILE : nom du fichier journal à partir duquel vous souhaitez récupérer votre flux
  • POSITION : position dans le fichier journal à partir de laquelle vous souhaitez récupérer votre flux. Si vous ne fournissez pas de valeur, Datastream récupère le flux depuis le début du fichier.

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 4
      }
    }
  }
}

Récupérer un flux à partir d'une position spécifique (réplication basée sur GTID MySQL) :

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "GTID_SET"
      }
    }
  }
}

Remplacez GTID_SET par un ou plusieurs GTID uniques ou plages de GTID à partir desquels vous souhaitez récupérer votre flux.

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:1-3"
      }
    }
  }
}

Récupérer un flux à partir d'une position spécifique (Oracle) :

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Remplacez scn par le numéro de modification du système (SCN) dans le fichier journal de rétablissement à partir duquel vous souhaitez récupérer votre flux. Ce champ est obligatoire.

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 234234
      }
    }
  }
}

Pour en savoir plus sur les options de récupération disponibles, consultez Récupérer un flux.

gcloud

Il n'est pas possible de récupérer un flux à l'aide de gcloud.

Récupérer un flux pour une source PostgreSQL

L'exemple de code suivant montre une requête permettant de récupérer un flux pour une source PostgreSQL. Lors de la récupération, le flux commence à lire les données à partir du premier numéro de séquence de journal (LSN) dans l'emplacement de réplication configuré pour le flux.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run

Si vous souhaitez modifier l'emplacement de réplication, mettez d'abord à jour le flux avec le nouveau nom d'emplacement de réplication :

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot
{
  "sourceConfig": {
    "postgresqlSourceConfig": {
      "replicationSlot": "NEW_REPLICATION_SLOT_NAME"
    }
  }
}

gcloud

Il n'est pas possible de récupérer un flux à l'aide de gcloud.

Récupérer un flux pour une source SQL Server

Les exemples de code suivants montrent des exemples de requêtes permettant de récupérer un flux pour une source SQL Server.

REST

Reprendre une diffusion à partir de la première position disponible :

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run

Récupérer un flux à partir d'un numéro de séquence de journal de votre choix :

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": lsn
      }
    }
  }
}

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": 0000123C:0000BA78:0004
      }
    }
  }
}

gcloud

Il n'est pas possible de récupérer un flux à l'aide de gcloud.

Démarrer ou reprendre un flux à une position spécifique

Vous pouvez démarrer un flux ou reprendre un flux mis en veille à partir d'une position spécifique pour les sources MySQL et Oracle. Cela peut être utile lorsque vous souhaitez effectuer un remplissage d'historique à l'aide d'un outil externe ou démarrer la CDC à partir d'une position que vous indiquez. Pour une source MySQL, vous devez indiquer une position binlog ou un ensemble GTID. Pour une source Oracle, vous devez indiquer un numéro de modification du système (SCN) dans le fichier journal de rétablissement.

Le code suivant montre une requête permettant de démarrer ou de reprendre un flux déjà créé à partir d'une position spécifique.

Démarrer ou reprendre un flux à partir d'une position binlog spécifique (MySQL) :

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Remplacez les éléments suivants :

  • NAME_OF_THE_LOG_FILE : nom du fichier journal à partir duquel vous souhaitez démarrer votre flux.
  • POSITION : position dans le fichier journal à partir de laquelle vous souhaitez démarrer votre flux. Si vous ne fournissez pas de valeur, Datastream commence la lecture à partir du début du fichier.

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 2
      }
    }
  }
}

gcloud

Il n'est pas possible de démarrer ni de reprendre un flux à une position spécifique à l'aide de gcloud. Pour savoir comment utiliser gcloud pour démarrer ou reprendre un flux, consultez la documentation du SDK Cloud.

Démarrer ou reprendre un flux à partir d'un ensemble GTID spécifique (MySQL) :

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "GTID_SET"
      }
    }
  }
}

Remplacez GTID_SET par un ou plusieurs GTID uniques ou plages de GTID à partir desquels vous souhaitez démarrer ou reprendre votre flux.

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:3-7"
      }
    }
  }
}

gcloud

Il n'est pas possible de démarrer ni de reprendre un flux à une position spécifique à l'aide de gcloud. Pour savoir comment utiliser gcloud pour démarrer ou reprendre un flux, consultez la documentation du SDK Cloud.

Démarrer ou reprendre un flux à partir d'un numéro de modification du système spécifique dans le fichier journal de reprise (Oracle) :

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Remplacez scn par le numéro de modification du système (SCN) dans le fichier journal de rétablissement à partir duquel vous souhaitez démarrer votre flux. Ce champ est obligatoire.

Exemple :

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 123123
      }
    }
  }
}

gcloud

Il n'est pas possible de démarrer ni de reprendre un flux à une position spécifique à l'aide de gcloud. Pour savoir comment utiliser gcloud afin de démarrer un flux, consultez la documentation du SDK Cloud.

Modifier un flux

Le code suivant montre une requête de mise à jour de la configuration de rotation d'un flux pour alterner le fichier toutes les 75 Mo ou 45 secondes.

Dans cet exemple, les champs spécifiés pour le paramètre updateMask incluent les champs fileRotationMb et fileRotationInterval, représentés par les options destinationConfig.gcsDestinationConfig.fileRotationMb et destinationConfig.gcsDestinationConfig.fileRotationInterval, respectivement.

REST

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

Le code suivant montre une requête permettant d'inclure un fichier de schéma de types unifiés dans le chemin d'accès des fichiers que Datastream écrit dans Cloud Storage. Par conséquent, Datastream écrit deux fichiers : un fichier de données JSON et un fichier de schéma Avro.

Pour cet exemple, le champ spécifié est le champ jsonFileFormat, représenté par l'indicateur destinationConfig.gcsDestinationConfig.jsonFileFormat.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }  
    }
  }
}

Le code suivant montre une requête pour que Datastream réplique des données existantes, en plus des modifications en cours apportées aux données, de la base de données source vers la destination.

La section oracleExcludedObjects du code indique les tables et les schémas pour lesquels le remplissage des données historiques dans la destination est limité.

Pour cet exemple, toutes les tables et tous les schémas seront remplis, à l'exception de la table tableA dans le schéma schema3.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schema": "schema3",
          "oracleTables": [
            { "table": "tableA" }
          ]
        }
      ]
    }
  }
}  

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour modifier votre flux, consultez la documentation du SDK Google Cloud.

Lancer le remplissage pour un objet d'un flux

Un flux dans Datastream peut effectuer un remplissage avec des données d'historique tout en diffusant les modifications en cours par flux vers une destination. Les modifications en cours sont toujours diffusées en flux continu d'une source vers une destination. Toutefois, vous pouvez spécifier si vous souhaitez que les données historiques soient diffusées.

Si vous souhaitez que les données de l'historique soient diffusées de la source vers la destination, utilisez le paramètre backfillAll.

Datastream vous permet également de diffuser des données historiques uniquement pour des tables de base de données spécifiques. Pour ce faire, utilisez le paramètre backfillAll et excluez les tables pour lesquelles vous ne souhaitez pas de données historiques.

Si vous souhaitez que seuls les changements en cours soient diffusés vers la destination, utilisez le paramètre backfillNone. Si vous souhaitez ensuite que Datastream diffuse un instantané de toutes les données existantes de la source vers la destination, vous devez lancer un remplissage manuel pour les objets contenant ces données.

Il peut également être utile de lancer le remplissage d'un objet lorsque les données ne sont pas synchronisées entre la source et la destination. Par exemple, un utilisateur peut supprimer par inadvertance des données dans la destination, auquel cas les données sont perdues. Dans ce cas, le remplissage de l'objet sert de "mécanisme de réinitialisation", car toutes les données sont insérées par flux dans la destination en une seule opération. Par conséquent, les données sont synchronisées entre la source et la destination.

Avant de pouvoir lancer le remplissage d'un objet d'un flux, vous devez récupérer des informations sur l'objet.

Chaque objet possède un OBJECT_ID qui l'identifie de manière unique. Vous utilisez OBJECT_ID pour lancer le remplissage de l'historique du flux.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob

gcloud

Pour en savoir plus sur l'utilisation de gcloud afin de lancer un remplissage pour un objet de votre flux, consultez la documentation du SDK Google Cloud.

Arrêter le remplissage pour un objet d'un flux

Après avoir lancé le remplissage d'un objet d'un flux, vous pouvez l'arrêter. Par exemple, si un utilisateur modifie un schéma de base de données, le schéma ou les données peuvent être corrompus. Vous ne voulez pas que ce schéma ou ces données soient diffusés vers la destination. Vous arrêtez donc le remplissage de l'objet.

Vous pouvez également arrêter le remplissage d'un objet à des fins d'équilibrage de charge. Datastream peut exécuter plusieurs remplissages en parallèle. Cela peut ajouter une charge supplémentaire à la source. Si la charge est importante, arrêtez le remplissage de chaque objet, puis relancez-le objet par objet.

Avant de pouvoir arrêter le remplissage pour un objet d'un flux, vous devez envoyer une requête pour récupérer des informations sur tous les objets d'un flux. Chaque objet renvoyé possède un OBJECT_ID qui l'identifie de manière unique. Vous utilisez OBJECT_ID pour arrêter le remplissage des espaces publicitaires pour le flux.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob

gcloud

Pour en savoir plus sur l'utilisation de gcloud afin d'arrêter le remplissage pour un objet de votre flux, consultez la documentation du SDK Google Cloud.

Modifier le nombre maximal de tâches CDC simultanées

Le code suivant montre comment définir le nombre maximal de tâches de capture des données modifiées (CDC) simultanées pour un flux MySQL sur 7.

Dans cet exemple, le champ spécifié pour le paramètre updateMask est le champ maxConcurrentCdcTasks. En définissant sa valeur sur 7, vous modifiez le nombre maximal de tâches CDC simultanées, qui passe de la valeur précédente à 7. Vous pouvez utiliser des valeurs comprises entre 0 et 50 (inclus). Si vous ne définissez pas la valeur ou si vous la définissez sur 0, la valeur par défaut de cinq tâches est définie pour le flux.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }  
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud, consultez la documentation du SDK Google Cloud.

Modifier le nombre maximal de tâches de remplissage simultanées

Le code suivant montre comment définir sur 25 le nombre maximal de tâches de remplissage simultanées pour un flux MySQL.

Dans cet exemple, le champ spécifié pour le paramètre updateMask est le champ maxConcurrentBackfillTasks. En définissant sa valeur sur 25, vous modifiez le nombre maximal de tâches de remplissage simultanées, qui passe de la valeur précédente à 25. Vous pouvez utiliser des valeurs comprises entre 0 et 50 (inclus). Si vous ne définissez pas la valeur ou si vous la définissez sur 0, la valeur par défaut du système (16 tâches) est définie pour le flux.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/
streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentBackfillTasks": "25"
      }
    }  
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud, consultez la documentation du SDK Google Cloud.

Activer le streaming des grands objets pour les sources Oracle

Vous pouvez activer le streaming d'objets volumineux, tels que les objets binaires volumineux (BLOB), les objets volumineux de type caractère (CLOB) et les objets volumineux de type caractère national (NCLOB) pour les flux avec des sources Oracle. L'indicateur streamLargeObjects vous permet d'inclure des objets volumineux dans les flux nouveaux et existants. L'indicateur est défini au niveau du flux. Vous n'avez pas besoin de spécifier les colonnes des types de données d'objets volumineux.

L'exemple suivant montre comment créer un flux qui vous permet de diffuser de grands objets.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleLobStream
{
  "displayName": "Oracle LOB stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1",
            "oracleTables": [
              {
                "table": "tableA",
                "oracleColumns": [
                  {
                    "column": "column1,column2"
                  }
                ]
              }
            ]
          }
        ]
      },
      "excludeObjects": {},
      "streamLargeObjects": {}
    }
  }
}

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour mettre à jour un flux, consultez la documentation du SDK Google Cloud.

Supprimer un flux

Le code suivant montre une requête de suppression d'un flux.

REST

DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID

gcloud

Pour en savoir plus sur l'utilisation de gcloud pour supprimer votre flux, consultez la documentation du SDK Google Cloud.

Étapes suivantes