管理数据流

概览

在本部分中,您将学习如何使用 Datastream API 执行以下操作:

  • 创建数据流
  • 获取有关数据流和数据流对象的信息
  • 通过启动、暂停、恢复和修改数据流,以及启动和停止数据流对象的回填来更新数据流
  • 恢复永久失败的视频流
  • 为 Oracle 数据流启用大型对象流式传输
  • 删除数据流

您可以通过两种方式使用 Datastream API。您可以进行 REST API 调用,也可以使用 Google Cloud CLI (CLI)。

如需了解如何使用 gcloud 管理 Datastream 数据流的概要信息,请参阅 gcloud Datastream 数据流

创建数据流

在本部分中,您将学习如何创建用于将数据从源位置传输到目标位置的数据流。以下示例并不全面,只是重点介绍了 Datastream 的特定功能。如需解决您的特定用例,请将这些示例与 Datastream API 参考文档结合使用。

本部分包含以下用例:

示例 1:将特定对象流式传输到 BigQuery

在本例中,您将学习如何:

  • 从 MySQL 流式传输到 BigQuery
  • 在数据流中添加一组对象
  • 将数据流的写入模式定义为只附加
  • 回填数据流中包含的所有对象

以下是从 schema1 和两个特定表中提取所有表的请求 schema2 中的表:tableAtableC。系统会将事件写入 BigQuery 中的数据集。

请求不包含 customerManagedEncryptionKey 参数,因此 使用 Google Cloud 内部密钥管理系统来加密您的数据 而非 CMEK。

与执行历史记录操作相关的 backfillAll 参数 将回填(或快照)设为一个空的字典 ({}),这意味着 Datastream 会回填数据流中包含的所有表中的历史数据。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream
{
  "displayName": "MySQL CDC to BigQuery",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "schema1" },
          {
            "database": "schema2",
            "mysqlTables": [
              {
                "table": "tableA",
                "table": "tableC"
              }
            ]
          }
        ]
      },
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "dataFreshness": "900s"
    }
  },
  "backfillAll": {}
}

gcloud

如需详细了解如何使用 gcloud 创建数据流,请参阅 Google Cloud SDK 文档

示例 2:从使用 PostgreSQL 作为来源的数据流中排除特定对象

在此示例中,您将学习如何:

  • 从 PostgreSQL 流式传输到 BigQuery
  • 从数据流中排除对象
  • 从回填中排除对象

以下代码提供了一个请求,它创建一个数据流用于将数据从源 PostgreSQL 数据库转移到 BigQuery。从源 PostgreSQL 数据库创建数据流时,您需要在请求中额外指定两个 PostgreSQL 专用字段:

  • replicationSlot:若要将 PostgreSQL 数据库配置为进行复制,必须先创建复制槽。您需要为每个数据流创建一个复制槽。
  • publication:发布内容是您要从中复制更改的一组表。发布内容名称必须存在于数据库中,才能启动数据流。发布内容至少必须包含数据流的 includeObjects 列表中指定的表。

与执行历史回填(或快照)相关联的 backfillAll 参数设置为排除一个表。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to BigQueryCloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "schema1"
        }
      },
      "excludeObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
        "postgresqlTables": [
          {
            "table": "tableA",
            "postgresqlColumns": [
              { "column": "column5" }
              ]
              }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
           "location": "us",
           "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {
    "postgresqlExcludedObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
            "postgresqlTables": [
              { "table": "tableA" }
            ]
          }
        ]
      }
    }
  }

gcloud

如需详细了解如何使用 gcloud 创建数据流,请参阅 Google Cloud SDK 文档

示例 3:为数据流指定只写入模式

将数据流式插入 BigQuery 时,您可以定义写入模式:mergeappendOnly。如需了解详情,请参阅配置写入模式

如果您在创建数据流的请求中未指定写入模式,则系统会使用默认的 merge 模式。

以下请求展示了如何在创建 MySQL 到 BigQuery 流时定义 appendOnly 模式。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream
{
  "displayName": "My append-only stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

如需详细了解如何使用 gcloud 创建数据流,请参阅 Google Cloud SDK 文档

示例 4:将数据流式传输到 Cloud Storage 目标位置

在此示例中,您将学习如何:

  • 从 Oracle 流式传输到 Cloud Storage
  • 定义要包含在数据流中的一组对象
  • 定义 CMEK 以加密静态数据

以下请求展示了如何创建将事件写入 Cloud Storage 存储桶的数据流。

在此示例请求中,事件以 JSON 输出格式写入,并且每 100 MB 或 30 秒创建一个新文件(替换 50 MB 和 60 秒的默认值)。

对于 JSON 格式,您可以执行以下操作:

  • 在路径中添加统一类型架构文件。因此,Datastream 会将两个文件写入 Cloud Storage:JSON 数据文件和 Avro 架构文件。架构文件与数据文件同名,扩展名为 .schema

  • 启用 gzip 压缩可使 Datastream 压缩写入 Cloud Storage 的文件。

通过使用 backfillNone 参数,请求会指定仅将正在进行的更改流式传输到目标位置,而不会回填。

该请求会指定客户管理的加密密钥参数,该参数可让您控制用于加密 Google Cloud 项目中的静态数据的密钥。该参数是指 Datastream 用于加密从源流向目标的数据的 CMEK。它还指定了 CMEK 的密钥环。

如需详细了解密钥环,请参阅 Cloud KMS 资源。如需详细了解如何使用加密密钥保护数据,请参阅 Cloud Key Management Service (KMS)

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
    connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "GcsBucketCp",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "jsonFileFormat": {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      },
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillNone": {}
}

gcloud

如需详细了解如何使用 gcloud 创建数据流,请参阅 Google Cloud SDK 文档

验证数据流的定义

在创建数据流之前,您可以对其定义进行验证。这样,您就可以确保所有验证检查都通过,并确保数据流在创建后能够成功运行。

验证数据流检查:

  • 来源是否已正确配置,允许 Datastream 从其中流式传输数据。
  • 数据流是否可以连接到来源和目标位置。
  • 数据流的端到端配置。

如需验证数据流,请将 &validate_only=true 添加到请求正文前面的网址:

POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"

发出此请求后,您将看到 Datastream 针对来源和目标位置运行的验证检查,以及检查是否通过。对于未通过的验证检查,系统会显示失败原因以及纠正方法。

例如,假设您有一个客户管理的加密密钥 (CMEK),您希望 Datastream 使用它来加密从源流式传输到目标的数据。在验证数据流的过程中,Datastream 将验证该密钥是否存在,以及它是否有权使用该密钥。如果未满足其中任何一个条件,则当您验证数据流时,将返回以下错误消息:

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

如需解决此问题,请验证您提供的密钥是否存在,以及 Datastream 服务账号是否拥有该密钥的 cloudkms.cryptoKeys.get 权限。

进行适当的更正后,请再次发出请求,以确保通过所有验证检查。 对于上面的示例,CMEK_VALIDATE_PERMISSIONS 检查将不再返回错误消息,但状态为 PASSED

获取数据流相关信息

以下代码显示了一个检索数据流相关信息的请求。此类信息包括:

  • 数据流的名称(唯一标识符)
  • 数据流的易记名称(显示名称)
  • 指明直播创建和上次更新的时间戳
  • 与数据流关联的来源和目标连接配置文件的相关信息
  • 数据流的状态

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID

系统会显示如下响应:

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          },
          {
            "schema": "schema3",
            "oracleTables": [
              { "table": "tableA" },
              { "table": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": {},
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillAll": {}
}

gcloud

如需详细了解如何使用 gcloud 检索数据流的相关信息,请点击此处

列出数据流

以下代码显示了检索 指定项目和位置。

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams

gcloud

有关使用 gcloud 检索所有视频流的信息的详细信息,请点击此处

列出数据流的对象

以下代码显示了一个检索数据流的所有对象相关信息的请求。

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects

gcloud

如需详细了解如何使用 gcloud 检索数据流的所有对象的相关信息,请点击此处

返回的对象列表可能如下所示:

REST

{
  "streamObjects": [
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object1",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object2",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object3",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

gcloud

如需详细了解如何使用 gcloud 列出数据流的对象,请点击此处

启动数据流

以下代码显示了一个启动数据流的请求。

通过在请求中使用 updateMask 参数,使只有您指定的字段必须包含在请求正文中。如需启动数据流,请将 state 字段中的值从 CREATED 更改为 RUNNING

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

有关使用 gcloud 开始直播的详细信息,请点击此处

暂停数据流

以下代码显示了一个暂停正在运行的数据流的请求。

在本例中,为 updateMask 参数指定的字段是 state 字段。暂停数据流后,其状态会从 RUNNING 更改为 PAUSED

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "PAUSED"
}

gcloud

有关使用 gcloud 暂停直播的详细信息,请点击此处

恢复数据流

以下代码显示了一个恢复已暂停的数据流的请求。

在本例中,为 updateMask 参数指定的字段是 state 字段。恢复数据流后,其状态会从 PAUSED 更改为 RUNNING

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

有关使用 gcloud 恢复直播的详细信息,请点击此处

恢复数据流

您可以使用 RunStream 方法恢复永久失败的流。每个 源数据库类型对哪些数据流恢复操作有自己的定义 都是可行的如需了解详情,请参阅恢复数据流

恢复 MySQL 或 Oracle 源的数据流

以下代码示例展示了从不同日志文件位置恢复 MySQL 或 Oracle 来源数据流的请求:

REST

从当前位置恢复数据流。这是默认选项:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

从下一个可用位置恢复数据流:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "nextAvailableStartPosition": {}
  }
}

从最近的位置恢复数据流:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "mostRecentStartPosition": {}
  }
}

从特定位置恢复数据流 (MySQL):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

替换以下内容:

  • NAME_OF_THE_LOG_FILE:您要从中导出日志的日志文件的名称 以恢复您的视频流
  • POSITION:您希望从日志文件中的哪个位置恢复数据流。如果您未提供此值,Datastream 会从文件开头恢复数据流。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 4
      }
    }
  }
}

从特定位置恢复数据流 (Oracle):

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
scn 替换为您要从中恢复数据流的 redo 日志文件中的系统更改编号 (SCN)。此字段是必填字段。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 234234
      }
    }
  }
}

如需详细了解可用的恢复选项,请参阅恢复数据流

gcloud

不支持使用 gcloud 恢复数据流。

恢复 PostgreSQL 源的数据流

以下代码示例展示了为 PostgreSQL 恢复数据流的请求 来源。在恢复期间,数据流会从为其配置的复制槽中的第一个日志序列号 (LSN) 开始读取。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run

如果要更改复制槽,请使用新的 首先显示复制槽名称:

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot
{
  "sourceConfig": {
    "postgresqlSourceConfig": {
      "replicationSlot": "NEW_REPLICATION_SLOT_NAME"
    }
  }
}

gcloud

不支持使用 gcloud 恢复数据流。

恢复 SQL Server 源的数据流

以下代码示例展示了用于恢复 SQL Server 数据源数据流的请求示例。

REST

从第一个可用位置恢复数据流:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run

从首选的日志序列号恢复数据流:

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": lsn
      }
    }
  }
}

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": 0000123C:0000BA78:0004
      }
    }
  }
}

gcloud

不支持使用 gcloud 恢复数据流。

从特定位置启动或继续数据流

您可以启动流式传输,也可以从特定位置恢复已暂停的流式传输, MySQL 和 Oracle 源。如果您想对数据执行回填 或从您指定的位置启动 CDC。对于 MySQL 源,您需要指明二进制日志位置;对于 Oracle 源,您需要指明重做日志文件中的系统更改编号 (SCN)。

以下代码显示了从特定位置开始或恢复已创建的数据流的请求。

从特定 binlog 位置启动或恢复数据流 (MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

替换以下内容:

  • NAME_OF_THE_LOG_FILE:您要从中导出日志的日志文件的名称 即可开始直播
  • POSITION:日志文件中您希望从哪里开始读取 你的直播如果您不提供此值,Datastream 会从文件开头开始读取。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 2
      }
    }
  }
}

gcloud

不支持使用 gcloud 从特定位置开始或恢复流式传输。如需了解如何使用 gcloud 启动或恢复流式传输,请参阅 Cloud SDK 文档

从重做日志文件 (Oracle) 中的特定系统变更编号启动或恢复数据流:

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
scn 替换为重做日志文件中的系统变更编号 (SCN) 。此字段是必填字段。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 123123
      }
    }
  }
}

gcloud

不支持使用 gcloud 从特定位置开始或恢复流式传输。如需了解如何使用 gcloud 启动数据流,请参阅 Cloud SDK 文档

修改数据流

以下代码显示了一个请求,它更新数据流的文件轮替配置,以便每 75 MB 或每 45 秒轮替一次文件。

在本例中,为 updateMask 参数指定的字段包括 fileRotationMbfileRotationInterval 字段,分别由 destinationConfig.gcsDestinationConfig.fileRotationMbdestinationConfig.gcsDestinationConfig.fileRotationInterval 标志表示。

REST

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

以下代码显示了一个请求,它在 Datastream 写入 Cloud Storage 的文件的路径中添加 Unified Types 架构文件。因此,Datastream 会写入两个文件:JSON 数据文件和 Avro 架构文件。

在本例中,指定的字段是 jsonFileFormat 字段,由 destinationConfig.gcsDestinationConfig.jsonFileFormat 标志表示。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }  
    }
  }
}

以下代码显示了一个请求,它请求 Datastream 将现有数据连同正在对数据进行的更改从源数据库中复制到目标位置。

代码的 oracleExcludedObjects 部分显示了无法回填到目标位置的表和架构。

在本例中,系统将回填所有表和架构,schema3 中的 tableA 除外。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schema": "schema3",
          "oracleTables": [
            { "table": "tableA" }
          ]
        }
      ]
    }
  }
}  

gcloud

有关使用 gcloud 修改直播的详细信息,请点击此处

启动数据流对象的回填

Datastream 中的数据流可以回填历史数据,并将正在进行的更改流式传输到目标位置。正在进行的更改将始终从来源流式传输到目标位置。不过,您可以指定是否要流式传输历史数据。

如果要将历史数据从来源流式传输到目标位置,请使用 backfillAll 参数。

借助 Datastream,您还可以仅流式传输特定数据库表的历史数据。为此,请使用 backfillAll 参数并排除不需要其历史数据的表。

如果您只希望将正在进行的更改流式传输到目标位置,请使用 backfillNone 参数。如果您希望 Dataflow 将所有现有数据的快照从来源流式传输到目标位置,则必须对包含此数据的对象手动启动回填。

为对象启动回填的另一个原因是来源与目标位置之间的数据不同步。例如,用户可以意外删除目标位置中的数据,而这些数据现在丢失。在这种情况下,为对象启动回填可用作“重置机制”,因为所有数据都一次性流式传输到目标位置。因此,数据会在来源和目标位置之间同步。

您必须先检索对象的相关信息,然后才能对数据流的对象启动回填。

每个对象都有一个 OBJECT_ID,用于唯一标识该对象。您可以使用 OBJECT_ID 为数据流启动回填。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob

gcloud

如需详细了解如何使用 gcloud 为数据流对象启动回填,请参阅 Google Cloud SDK 文档

停止数据流对象的回填

启动数据流对象的回填后,您可停止该对象的回填。例如,如果用户修改数据库架构,则架构或数据可能会损坏。您不希望将此架构或数据流式传输到目标位置,因此需停止对象的回填。

您还可以停止对象的回填来实现负载均衡。Datastream 可以并行运行多个回填。这可能会给来源带来额外的负担。如果负载显著,请停止每个对象的回填,然后逐个启动对象的回填。

您必须先发出请求来检索数据流的所有对象的相关信息,然后才能停止数据流对象的回填。返回的每个对象都有一个 OBJECT_ID,用于唯一标识该对象。您可以使用 OBJECT_ID 来停止数据流的回填。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob

gcloud

如需详细了解如何使用 gcloud 为数据流对象停止回填,请点击此处

更改并发 CDC 任务的数量上限

以下代码展示了如何将 MySQL 流的并发变更数据捕获 (CDC) 任务数量上限设置为 7。

在本例中,为 updateMask 参数指定的字段是 maxConcurrentCdcTasks 字段。将其值设置为 7 即表示您会将并发 CDC 任务的数量上限从之前的值更改为 7。您可以使用 0 到 50(包括这两个数值)之间的值。如果您未指定此值,或将其定义为 0,系统会为数据流设置系统默认设置(5 个任务)。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }  
}

gcloud

如需详细了解如何使用 gcloud,请点击此处

更改并发回填任务的数量上限

以下代码展示了如何将 MySQL 数据流的并发回填任务数量上限设置为 25。

在本例中,为 updateMask 参数指定的字段是 maxConcurrentBackfillTasks 字段。将其值设置为 25 后,您将并发回填任务的数量从之前的值更改为 25。可以使用 0 到 50(含)之间的值。如果您未指定此值,或将其定义为 0,系统会默认为数据流设置 16 个任务。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/
streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentBackfillTasks": "25"
      }
    }  
}

gcloud

如需详细了解如何使用 gcloud,请点击此处

为 Oracle 来源启用大型对象流式传输

您可以启用大型对象的流式传输,例如二进制大型对象 (BLOB), 字符大型对象 (CLOB) 和国家字符大型对象 (NCLOB) (针对使用 Oracle 来源的数据流)。借助 streamLargeObjects 标志,您可以在新串流和现有串流中添加大型对象。该标记在视频流级别设置 因此您无需指定大型对象数据类型的列。

以下示例展示了如何创建支持在更大文件中进行流式传输的视频流 对象的操作。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleLobStream
{
  "displayName": "Oracle LOB stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1",
            "oracleTables": [
              {
                "table": "tableA",
                "oracleColumns": [
                  {
                    "column": "column1,column2"
                  }
                ]
              }
            ]
          }
        ]
      },
      "excludeObjects": {},
      "streamLargeObjects": {}
    }
  }
}

gcloud

如需详细了解如何使用 gcloud 更新数据流,请参阅 Google Cloud SDK 文档

删除数据流

以下代码显示了一个删除数据流的请求。

REST

DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID

gcloud

如需详细了解如何使用 gcloud 删除数据流,请点击此处

后续步骤