管理数据流

概览

在本部分中,您将学习如何使用 Datastream API 执行以下操作:

  • 创建数据流
  • 获取有关数据流和数据流对象的信息
  • 通过启动、暂停、恢复和修改数据流,以及启动和停止数据流对象的回填来更新数据流
  • 恢复永久失败的视频流
  • 为 Oracle 数据流启用大型对象流式传输
  • 删除数据流

您可以通过两种方式使用 Datastream API。您可以进行 REST API 调用,也可以使用 Google Cloud CLI (CLI)。

如需简要了解如何使用 gcloud 管理 Datastream 数据流,请参阅 gcloud Datastream 数据流

创建数据流

在本部分中,您将学习如何创建用于将数据从来源转移到目标位置的数据流。以下示例并不全面,只是重点介绍了 Datastream 的特定功能。如需解决您的特定用例,请将这些示例与 Datastream API 参考文档结合使用。

本部分包含以下用例:

示例 1:将特定对象流式传输到 BigQuery

在此示例中,您将学习如何:

  • 从 MySQL 流式传输到 BigQuery
  • 在数据流中添加一组对象
  • 将数据流的写入模式定义为仅附加
  • 回填数据流中包含的所有对象

以下是从 schema1 中拉取所有表以及从 schema2 中拉取两个特定表(tableAtableC)的请求。这些事件将写入 BigQuery 中的数据集。

请求不包含 customerManagedEncryptionKey 参数,因此使用的是 Google Cloud 内部密钥管理系统(而不是 CMEK)来加密您的数据。

与执行历史回填(或快照)关联的 backfillAll 参数设置为一个空字典 ({}),这意味着 Datastream 会回填该数据流中包含的所有表中的历史数据。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream
{
  "displayName": "MySQL CDC to BigQuery",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "schema1" },
          {
            "database": "schema2",
            "mysqlTables": [
              {
                "table": "tableA",
                "table": "tableC"
              }
            ]
          }
        ]
      },
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "dataFreshness": "900s"
    }
  },
  "backfillAll": {}
}

gcloud

如需详细了解如何使用 gcloud 创建数据流,请参阅 Google Cloud SDK 文档

示例 2:从具有 PostgreSQL 源的数据流中排除特定对象

在此示例中,您将学习如何:

  • 将数据从 PostgreSQL 流式传输到 BigQuery
  • 从数据流中排除对象
  • 从回填中排除对象

以下代码显示了一个创建数据流的请求,该数据流用于将数据从源 PostgreSQL 数据库转移到 BigQuery。从源 PostgreSQL 数据库创建数据流时,您需要在请求中指定两个额外的 PostgreSQL 专用字段:

  • replicationSlot:复制槽是配置 PostgreSQL 数据库以进行复制的前提条件。您需要为每个数据流创建一个复制槽。
  • publication:发布内容是您要从中复制更改的一组表。发布内容名称必须存在于数据库中,才能启动数据流。发布内容必须至少包含数据流的 includeObjects 列表中指定的表。

与执行历史回填(或快照)关联的 backfillAll 参数设置为排除一个表。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to BigQueryCloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "schema1"
        }
      },
      "excludeObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
        "postgresqlTables": [
          {
            "table": "tableA",
            "postgresqlColumns": [
              { "column": "column5" }
              ]
              }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
           "location": "us",
           "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {
    "postgresqlExcludedObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
            "postgresqlTables": [
              { "table": "tableA" }
            ]
          }
        ]
      }
    }
  }

gcloud

如需详细了解如何使用 gcloud 创建数据流,请参阅 Google Cloud SDK 文档

示例 3:为数据流指定仅附加写入模式

将数据流式插入 BigQuery 时,您可以定义写入模式:mergeappendOnly。如需了解详情,请参阅配置写入模式

如果您未在创建流的请求中指定写入模式,系统将使用默认的 merge 模式。

以下请求展示了如何在创建 MySQL 到 BigQuery 的数据流时定义 appendOnly 模式。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream
{
  "displayName": "My append-only stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

如需详细了解如何使用 gcloud 创建数据流,请参阅 Google Cloud SDK 文档

示例 4:流式传输到 Cloud Storage 目标位置

在此示例中,您将学习如何:

  • 从 Oracle 流式传输到 Cloud Storage
  • 定义要包含在数据流中的一组对象
  • 定义用于加密静态数据的 CMEK

以下请求展示了如何创建一个将事件写入 Cloud Storage 存储桶的数据流。

在此示例请求中,事件以 JSON 输出格式写入,并且每 100 MB 或 30 秒创建一个新文件(替换 50 MB 和 60 秒的默认值)。

对于 JSON 格式,您可以执行以下操作:

  • 在路径中添加统一类型架构文件。因此,Datastream 会将两个文件写入 Cloud Storage:JSON 数据文件和 Avro 架构文件。架构文件与数据文件同名,扩展名为 .schema

  • 启用 gzip 压缩可使 Datastream 压缩写入 Cloud Storage 的文件。

使用 backfillNone 参数,该请求指定仅将正在进行的更改流式传输到目标位置,而不回填。

该请求会指定客户管理的加密密钥参数,该参数可让您控制用于加密 Google Cloud 项目中的静态数据的密钥。该参数是指 Datastream 用于对从来源流式传输到目标的数据进行加密的 CMEK。它还指定了 CMEK 的密钥环。

如需详细了解密钥环,请参阅 Cloud KMS 资源。如需详细了解如何使用加密密钥保护数据,请参阅 Cloud Key Management Service (KMS)

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
    connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "GcsBucketCp",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "jsonFileFormat": {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      },
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillNone": {}
}

gcloud

如需详细了解如何使用 gcloud 创建数据流,请参阅 Google Cloud SDK 文档

验证数据流的定义

在创建数据流之前,您可以验证其定义。这样,您就可以确保所有验证检查都通过,并确保数据流在创建后能够成功运行。

验证数据流检查:

  • 来源是否已正确配置,允许 Datastream 从其中流式传输数据。
  • 数据流是否可以连接到来源和目标位置。
  • 数据流的端到端配置。

如需验证数据流,请将 &validate_only=true 添加到请求正文前面的网址:

POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"

发出此请求后,您将看到 Datastream 针对来源和目标位置运行的验证检查,以及检查是否通过。对于未通过的验证检查,系统会显示失败原因以及纠正方法。

例如,假设您希望 DataStream 使用客户管理的加密密钥 (CMEK) 对从来源流式传输到目标的数据进行加密。在验证数据流的过程中,Datastream 将验证该密钥是否存在,以及它是否有权使用该密钥。如果未满足其中任何一个条件,则当您验证数据流时,将返回以下错误消息:

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

如需解决此问题,请验证您提供的密钥是否存在,以及 Datastream 服务账号是否拥有该密钥的 cloudkms.cryptoKeys.get 权限。

进行适当的更正后,请再次发出请求,以确保通过所有验证检查。 对于上面的示例,CMEK_VALIDATE_PERMISSIONS 检查将不再返回错误消息,但状态为 PASSED

获取数据流相关信息

以下代码显示了一个检索数据流相关信息的请求。此类信息包括:

  • 数据流的名称(唯一标识符)
  • 简单易懂的数据流名称(显示名称)
  • 指明直播创建和上次更新的时间戳
  • 与数据流关联的来源和目标连接配置文件的相关信息
  • 数据流的状态

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID

系统会显示如下响应:

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          },
          {
            "schema": "schema3",
            "oracleTables": [
              { "table": "tableA" },
              { "table": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": {},
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillAll": {}
}

gcloud

有关使用 gcloud 检索直播信息的详情,请点击此处

列出数据流

以下代码显示了检索指定项目和位置中所有数据流的列表的请求。

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams

gcloud

有关使用 gcloud 检索所有视频流的信息的详细信息,请点击此处

列出数据流的对象

以下代码显示了一个检索数据流的所有对象相关信息的请求。

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects

gcloud

如需详细了解如何使用 gcloud 检索有关数据流所有对象的信息,请点击此处

返回的对象列表可能如下所示:

REST

{
  "streamObjects": [
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object1",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object2",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object3",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

gcloud

如需详细了解如何使用 gcloud 列出流的对象,请点击此处

启动数据流

以下代码显示了一个启动数据流的请求。

在请求中使用 updateMask 参数后,请求的正文中必须包含您指定的字段。如需启动数据流,请将 state 字段中的值从 CREATED 更改为 RUNNING

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

有关使用 gcloud 开始直播的详细信息,请点击此处

暂停数据流

以下代码显示了一个暂停正在运行的数据流的请求。

在本例中,为 updateMask 参数指定的字段是 state 字段。暂停数据流后,其状态会从 RUNNING 更改为 PAUSED

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "PAUSED"
}

gcloud

有关使用 gcloud 暂停直播的详细信息,请点击此处

恢复数据流

以下代码显示了一个恢复已暂停的数据流的请求。

在本例中,为 updateMask 参数指定的字段是 state 字段。恢复数据流后,其状态会从 PAUSED 更改为 RUNNING

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

有关使用 gcloud 恢复直播的详细信息,请点击此处

恢复数据流

您可以使用 RunStream 方法恢复 MySQL、Oracle 或 PostgreSQL 源的永久失败的数据流。每种源数据库类型对可以执行哪些数据流恢复操作都有自己的定义。如需了解详情,请参阅恢复数据流

为 MySQL 或 Oracle 来源恢复数据流

以下代码示例展示了从各个日志文件位置恢复 MySQL 或 Oracle 源数据流的请求:

REST

从当前位置恢复数据流。这是默认选项:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

从下一个可用位置恢复数据流:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "nextAvailableStartPosition": {}
  }
}

从最近的位置恢复数据流:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "mostRecentStartPosition": {}
  }
}

从特定位置恢复数据流 (MySQL):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

请替换以下内容:

  • NAME_OF_THE_LOG_FILE:您要从中恢复数据流的日志文件的名称
  • POSITION:日志文件中您要从中恢复数据流的位置。如果您未提供该值,Datastream 会从文件头恢复该数据流。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 4
      }
    }
  }
}

从特定位置恢复数据流 (Oracle):

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
scn 替换为重做日志文件中您要从中恢复数据流的系统变更编号 (SCN)。此字段是必填字段。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 234234
      }
    }
  }
}

如需详细了解可用的恢复选项,请参阅恢复数据流

gcloud

不支持使用 gcloud 恢复数据流。

恢复 PostgreSQL 源的数据流

以下代码示例展示了为 PostgreSQL 来源恢复数据流的请求。在恢复期间,数据流会开始从为其配置的复制槽中的第一个日志序列号 (LSN) 读取数据。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run

如果要更改复制槽,请先使用新的复制槽名称更新数据流:

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot
{
  "sourceConfig": {
    "postgresqlSourceConfig": {
      "replicationSlot": "NEW_REPLICATION_SLOT_NAME"
    }
  }
}

gcloud

不支持使用 gcloud 恢复数据流。

从特定位置开始或继续流式传输

对于 MySQL 和 Oracle 源,您可以从特定位置启动或恢复暂停的数据流。如果您想使用外部工具执行回填,或者从您指定的位置启动 CDC,上述做法可能会很有用。对于 MySQL 源,您需要指示 binlog 位置;对于 Oracle 源,则需要在重做日志文件中指示系统变更编号 (SCN)。

以下代码展示了从特定位置启动或恢复已创建的数据流的请求。

从特定 binlog 位置启动或恢复数据流 (MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

请替换以下内容:

  • NAME_OF_THE_LOG_FILE:您要从中开始流式传输的日志文件的名称。
  • POSITION:日志文件中您要从中开始数据流的位置。如果您未提供该值,Datastream 将开始从文件头读取。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 2
      }
    }
  }
}

gcloud

不支持使用 gcloud 从特定位置开始或恢复数据流。如需了解如何使用 gcloud 启动或恢复数据流,请参阅 Cloud SDK 文档

从重做日志文件 (Oracle) 中的特定系统变更编号启动或恢复数据流:

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
scn 替换为重做日志文件中您要从中开始数据流的系统变更编号 (SCN)。此字段是必填字段。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 123123
      }
    }
  }
}

gcloud

不支持使用 gcloud 从特定位置开始或恢复数据流。如需了解如何使用 gcloud 启动数据流,请参阅 Cloud SDK 文档

修改数据流

以下代码显示了一个请求,它更新数据流的文件轮替配置,以便每 75 MB 或每 45 秒轮替一次文件。

在本例中,为 updateMask 参数指定的字段包括 fileRotationMbfileRotationInterval 字段,分别由 destinationConfig.gcsDestinationConfig.fileRotationMbdestinationConfig.gcsDestinationConfig.fileRotationInterval 标志表示。

REST

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

以下代码显示了一个请求,它在 Datastream 写入 Cloud Storage 的文件的路径中添加 Unified Types 架构文件。因此,Datastream 会写入两个文件:JSON 数据文件和 Avro 架构文件。

在本例中,指定的字段是 jsonFileFormat 字段,由 destinationConfig.gcsDestinationConfig.jsonFileFormat 标志表示。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }  
    }
  }
}

以下代码显示了一个请求,它请求 Datastream 将现有数据连同正在对数据进行的更改从源数据库中复制到目标位置。

代码的 oracleExcludedObjects 部分显示了无法回填到目标位置的表和架构。

在本例中,系统将回填所有表和架构,schema3 中的 tableA 除外。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schema": "schema3",
          "oracleTables": [
            { "table": "tableA" }
          ]
        }
      ]
    }
  }
}  

gcloud

有关使用 gcloud 修改直播的详细信息,请点击此处

启动数据流对象的回填

Datastream 中的数据流可以回填历史数据,并将正在进行的更改流式传输到目标位置。正在进行的更改将始终从来源流式传输到目标位置。不过,您可以指定是否要流式传输历史数据。

如果要将历史数据从来源流式传输到目标位置,请使用 backfillAll 参数。

借助 Datastream,您还可以仅流式传输特定数据库表的历史数据。为此,请使用 backfillAll 参数并排除不需要其历史数据的表。

如果您只希望将正在进行的更改流式传输到目标位置,请使用 backfillNone 参数。如果您希望 Dataflow 将所有现有数据的快照从来源流式传输到目标位置,则必须对包含此数据的对象手动启动回填。

为对象启动回填的另一个原因是来源与目标位置之间的数据不同步。例如,用户可以意外删除目标位置中的数据,而这些数据现在丢失。在这种情况下,为对象启动回填可用作“重置机制”,因为所有数据都一次性流式传输到目标位置。因此,数据会在来源和目标位置之间同步。

您必须先检索对象的相关信息,然后才能对数据流的对象启动回填。

每个对象都有一个 OBJECT_ID,用于唯一标识该对象。您可以使用 OBJECT_ID 为数据流启动回填。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob

gcloud

如需详细了解如何使用 gcloud 为数据流对象启动回填,请参阅 Google Cloud SDK 文档

停止数据流对象的回填

启动数据流对象的回填后,您可停止该对象的回填。例如,如果用户修改数据库架构,则架构或数据可能会损坏。您不希望将此架构或数据流式传输到目标位置,因此需停止对象的回填。

您还可以停止对象的回填来实现负载均衡。Datastream 可以并行运行多个回填。这可能会给来源带来额外的负担。如果负载显著,请停止每个对象的回填,然后逐个启动对象的回填。

您必须先发出请求来检索数据流的所有对象的相关信息,然后才能停止数据流对象的回填。返回的每个对象都有一个 OBJECT_ID,用于唯一标识该对象。您可以使用 OBJECT_ID 来停止数据流的回填。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob

gcloud

如需详细了解如何使用 gcloud 停止回填数据流对象,请点击此处

更改并发 CDC 任务的数量上限

以下代码展示了如何将 MySQL 流的并发变更数据捕获 (CDC) 任务数量上限设置为 7。

在本例中,为 updateMask 参数指定的字段是 maxConcurrentCdcTasks 字段。将其值设置为 7 即表示您会将并发 CDC 任务的数量上限从之前的值更改为 7。可以使用 0 到 50(含)之间的值。如果您未指定此值,或将其定义为 0,系统会默认为数据流设置 5 个任务。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }  
}

gcloud

如需详细了解如何使用 gcloud,请点击此处

更改并发回填任务的数量上限

以下代码展示了如何将 MySQL 数据流的并发回填任务数上限设置为 25。

在本例中,为 updateMask 参数指定的字段是 maxConcurrentBackfillTasks 字段。将其值设置为 25 即表示您会将并发回填任务的数量上限从之前的值更改为 25。可以使用 0 到 50(含)之间的值。如果您未指定此值,或将其定义为 0,系统会默认为数据流设置 16 个任务。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/
streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentBackfillTasks": "25"
      }
    }  
}

gcloud

如需详细了解如何使用 gcloud,请点击此处

为 Oracle 来源启用大型对象流式传输

您可以启用大型对象的流式传输,例如对于使用 Oracle 源的数据流,二进制大型对象 (BLOB)、字符大型对象 (CLOB) 和国家字符大型对象 (NCLOB)。streamLargeObjects 标志可让您在新流和现有流中包含大型对象。该标志在数据流级别设置,您无需指定大型对象数据类型的列。

以下示例展示了如何创建支持流式传输大型对象的数据流。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleLobStream
{
  "displayName": "Oracle LOB stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1",
            "oracleTables": [
              {
                "table": "tableA",
                "oracleColumns": [
                  {
                    "column": "column1,column2"
                  }
                ]
              }
            ]
          }
        ]
      },
      "excludeObjects": {},
      "streamLargeObjects": {}
    }
  }
}

gcloud

如需详细了解如何使用 gcloud 更新数据流,请参阅 Google Cloud SDK 文档

删除数据流

以下代码显示了一个删除数据流的请求。

REST

DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID

gcloud

有关使用 gcloud 删除直播的详细信息,请点击此处