管理串流

本頁面說明如何使用 Datastream API 執行下列操作:

  • 建立串流
  • 取得串流和串流物件的相關資訊
  • 啟動、暫停、繼續及修改串流,以及啟動和停止串流物件的補充作業,藉此更新串流
  • 復原永久失敗的串流
  • 為 Oracle 串流啟用大型物件串流
  • 刪除串流

使用 Datastream API 的方法有兩種。您可以發出 REST API 呼叫,也可以使用 Google Cloud CLI (CLI)。

如要瞭解如何使用 Google Cloud CLI 管理 Datastream 串流,請參閱 gcloud CLI Datastream 串流

建立串流

本節說明如何建立串流,將資料從來源傳輸至目的地。以下範例並非完整說明,而是著重於 Datastream 的特定功能。如要解決特定用途,請搭配使用這些範例和 Datastream API 參考資料說明文件

本節涵蓋下列用途:

範例 1:將特定物件串流至 BigQuery

在本範例中,您將瞭解如何:

  • 將 MySQL 資料串流至 BigQuery
  • 在串流中加入一組物件
  • 將串流的寫入模式定義為僅限附加
  • 補充串流中包含的所有物件

以下要求會從 schema1 提取所有資料表,並從 schema2 提取兩個特定資料表:tableAtableC。這些事件會寫入 BigQuery 的資料集。

要求不含 customerManagedEncryptionKey 參數,因此系統會使用 Google Cloud 內部金鑰管理系統加密資料,而非 CMEK。

與執行歷史回填 (或快照) 相關聯的 backfillAll 參數會設為空白字典 ({}),這表示 Datastream 會回填串流中所有資料表的歷史資料。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream
{
  "displayName": "MySQL CDC to BigQuery",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "schema1" },
          {
            "database": "schema2",
            "mysqlTables": [
              {
                "table": "tableA",
                "table": "tableC"
              }
            ]
          }
        ]
      },
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "dataFreshness": "900s"
    }
  },
  "backfillAll": {}
}

gcloud

如要進一步瞭解如何使用 gcloud 建立串流,請參閱 Google Cloud SDK 說明文件

範例 2:從具有 PostgreSQL 來源的串流中排除特定物件

在本範例中,您將瞭解如何:

  • 將 PostgreSQL 資料串流至 BigQuery
  • 從串流中排除物件
  • 從補充作業中排除物件

下列程式碼顯示建立串流的要求,該串流用於將資料從來源 PostgreSQL 資料庫移轉至 BigQuery。 從來源 PostgreSQL 資料庫建立串流時,您需要在要求中指定兩個額外的 PostgreSQL 專屬欄位:

  • replicationSlot:設定 PostgreSQL 資料庫進行複製作業時,必須先建立複製運算單元。您必須為每個串流建立複寫時段。
  • publication:發布作業是指您要複製變更的資料表群組。開始串流前,資料庫中必須有發布作業名稱。發布作業至少須包含串流 includeObjects 清單中指定的資料表。

與執行歷史回填 (或快照) 相關聯的 backfillAll 參數已設為排除一個資料表。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to BigQueryCloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "schema1"
        }
      },
      "excludeObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
        "postgresqlTables": [
          {
            "table": "tableA",
            "postgresqlColumns": [
              { "column": "column5" }
              ]
              }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
           "location": "us",
           "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {
    "postgresqlExcludedObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
            "postgresqlTables": [
              { "table": "tableA" }
            ]
          }
        ]
      }
    }
  }

gcloud

如要進一步瞭解如何使用 gcloud 建立串流,請參閱 Google Cloud SDK 說明文件

範例 3:為串流指定僅限附加的寫入模式

串流至 BigQuery 時,您可以定義寫入模式:mergeappendOnly。詳情請參閱「設定寫入模式」。

如果您在建立串流的要求中未指定寫入模式,系統會使用預設的 merge 模式。

下列要求說明如何建立 MySQL 到 BigQuery 的資料串流時,定義 appendOnly 模式。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream
{
  "displayName": "My append-only stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

如要進一步瞭解如何使用 gcloud 建立串流,請參閱 Google Cloud SDK 說明文件

範例 4:將串流傳輸至 Cloud Storage 目的地

在本範例中,您將瞭解如何:

  • 從 Oracle 串流至 Cloud Storage
  • 定義要納入串流的一組物件
  • 定義用於加密靜態資料的 CMEK

下列要求說明如何建立串流,將事件寫入 Cloud Storage 中的值區。

在這個範例要求中,事件會以 JSON 輸出格式寫入,且每 100 MB 或 30 秒就會建立新檔案 (覆寫 50 MB 和 60 秒的預設值)。

如果是 JSON 格式,您可以:

  • 在路徑中新增一致類型的結構定義檔案。因此,Datastream 會將兩個檔案寫入 Cloud Storage:JSON 資料檔案和 Avro 結構定義檔案。結構定義檔案的名稱與資料檔案相同,但副檔名為「.schema」。

  • 啟用 gzip 壓縮功能,讓 Datastream 壓縮寫入 Cloud Storage 的檔案。

要求使用 backfillNone 參數,指定只將持續變更串流至目的地,不回填資料。

要求會指定客戶代管的加密金鑰參數,讓您控管用於加密 Google Cloud 專案中靜態資料的金鑰。這個參數是指 Datastream 用來加密從來源串流至目標位置資料的 CMEK。並指定 CMEK 的金鑰環。

如要進一步瞭解金鑰環,請參閱 Cloud KMS 資源。如要進一步瞭解如何使用加密金鑰保護資料,請參閱 Cloud Key Management Service (KMS)

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
    connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "GcsBucketCp",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "jsonFileFormat": {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      },
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillNone": {}
}

gcloud

如要進一步瞭解如何使用 gcloud 建立串流,請參閱 Google Cloud SDK 說明文件

範例 5:將資料串流至 BigLake 代管資料表

在本範例中,您將瞭解如何設定串流,以 append-only 模式將資料從 MySQL 資料庫複製到 BigLake Iceberg 資料表。建立要求前,請先完成下列步驟:

  • 擁有要用來儲存資料的 Cloud Storage bucket
  • 建立 Cloud 資源連線
  • 向 Cloud Storage bucket 授予 Cloud 資源連線存取權

接著,您可以使用下列要求建立串流:

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlBigLakeStream
{
  "displayName": "MySQL to BigLake stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlBigLakeCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          {
            "database": "my-mysql-database"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id",
    "bigqueryDestinationConfig": {
      "blmtConfig": {
        "bucket": "my-gcs-bucket-name",
        "rootPath": "my/folder",
        "connectionName": "my-project-id.us-central1.my-bigquery-connection-name",
        "fileFormat": "PARQUET",
        "tableFormat": "ICEBERG"
        },
      "singleTargetDataset": {
        "datasetId": "my-project-id:my-bigquery-dataset-id"
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

datastream streams create mysqlBigLakeStream --location=us-central1
--display-name=mysql-to-bl-stream --source=source --mysql-source-config=mysql_source_config.json
--destination=destination --bigquery-destination-config=bl_config.json
--backfill-none

mysql_source_config.json 來源設定檔的內容:

{"excludeObjects": {}, "includeObjects": {"mysqlDatabases":[{"database":"my-mysql-database"}]}}

bl_config.json 設定檔的內容:

{ "blmtConfig": { "bucket": "my-gcs-bucket-name", "rootPath": "my/folder", "connectionName": "my-project-id.us-central1.my-bigquery-connection-name", "fileFormat": "PARQUET", "tableFormat": "ICEBERG" }, "singleTargetDataset": {"datasetId": "my-project-id:my-bigquery-dataset-id"}, "appendOnly": {} }

Terraform

resource "google_datastream_stream" "stream" {
  stream_id    = "mysqlBlStream"
  location     = "us-central1"
  display_name = "MySQL to BigLake stream"

  source_config {
    source_connection_profile = "/projects/myProjectId1/locations/us-central1/streams/mysqlBlCp"
    mysql_source_config {
      include_objects {
        mysql_databases {
          database = "my-mysql-database"
        }
      }
    }
  }

  destination_config {
    destination_connection_profile = "projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id"
    bigquery_destination_config {
      single_target_dataset {
        dataset_id = "my-project-id:my-bigquery-dataset-id"
      }
      blmt_config {
        bucket          = "my-gcs-bucket-name"
        table_format    = "ICEBERG"
        file_format     = "PARQUET"
        connection_name = "my-project-id.us-central1.my-bigquery-connection-name"
        root_path       = "my/folder"
      }
      append_only {}
    }
  }

  backfill_none {}
}
    

驗證串流定義

建立串流前,您可以驗證其定義。這樣一來,您就能確保所有驗證檢查都通過,且串流在建立後能順利執行。

驗證串流時,系統會檢查下列項目:

  • 來源是否已正確設定,可讓 Datastream 從中串流資料。
  • 串流是否能同時連線至來源和目的地。
  • 串流的端對端設定。

如要驗證串流,請在要求主體前的網址中加入 &validate_only=true

POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"

提出要求後,您會看到 Datastream 對來源和目的地執行的驗證檢查,以及檢查通過或失敗的結果。如果驗證檢查未通過,系統會顯示失敗原因,以及修正問題的建議做法。

舉例來說,假設您有一個客戶自行管理的加密金鑰 (CMEK),並希望 Datastream 使用該金鑰,加密從來源串流至目的地的資料。為驗證串流,Datastream 會確認金鑰存在,且 Datastream 具備使用金鑰的權限。如果不符合上述任一條件,驗證串流時就會收到下列錯誤訊息:

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

如要解決這個問題,請確認您提供的金鑰存在,且 Datastream 服務帳戶具備該金鑰的 cloudkms.cryptoKeys.get 權限。

修正錯誤後,請再次提出要求,確保通過所有驗證檢查。以上述範例來說,CMEK_VALIDATE_PERMISSIONS 檢查不會再傳回錯誤訊息,而是會顯示 PASSED 狀態。

取得串流的相關資訊

下列程式碼顯示擷取串流資訊的要求。這類資訊包括:

  • 串流名稱 (專屬 ID)
  • 串流的易記名稱 (顯示名稱)
  • 串流的建立和上次更新時間戳記
  • 與串流相關聯的來源和目的地連線設定檔資訊
  • 串流狀態

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID

系統會顯示以下回應:

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          },
          {
            "schema": "schema3",
            "oracleTables": [
              { "table": "tableA" },
              { "table": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": {},
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillAll": {}
}

gcloud

如要進一步瞭解如何使用 gcloud 擷取串流資訊,請參閱 Google Cloud SDK 說明文件

列出串流

下列程式碼顯示要求,用於擷取指定專案和位置中的所有串流清單。

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams

gcloud

如要進一步瞭解如何使用 gcloud 擷取所有串流的相關資訊,請參閱 Google Cloud SDK 說明文件

列出串流的物件

下列程式碼顯示要求,用於擷取串流中所有物件的相關資訊。

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects

gcloud

如要進一步瞭解如何使用 gcloud 擷取串流中所有物件的相關資訊,請參閱 Google Cloud SDK 說明文件

傳回的物件清單可能如下所示:

REST

{
  "streamObjects": [
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object1",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object2",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object3",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

gcloud

如要進一步瞭解如何使用 gcloud 列出串流物件,請參閱 Google Cloud SDK 說明文件

啟動串流

下列程式碼顯示啟動串流的要求。

在要求中使用 updateMask 參數時,要求主體中只須包含您指定的欄位。如要開始串流,請將「state」欄位中的值從 CREATED 變更為 RUNNING

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

如要進一步瞭解如何使用 gcloud 啟動串流,請參閱 Google Cloud SDK 說明文件

暫停串流

下列程式碼顯示暫停執行中串流的要求。

在本例中,為 updateMask 參數指定的欄位是 state 欄位。暫停串流會將串流狀態從 RUNNING 變更為 PAUSED

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "PAUSED"
}

gcloud

如要進一步瞭解如何使用 gcloud 暫停串流,請參閱 Google Cloud SDK 說明文件

繼續執行串流

下列程式碼顯示要求,用於繼續播放已暫停的串流。

在本例中,為 updateMask 參數指定的欄位是 state 欄位。繼續串流後,串流狀態會從「PAUSED」變更為「RUNNING」。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

如要進一步瞭解如何使用 gcloud 繼續串流,請參閱 Google Cloud SDK 說明文件

復原串流

您可以使用 RunStream 方法復原永久失敗的串流。每種來源資料庫類型都有自己的定義,說明可進行哪些串流復原作業。詳情請參閱「復原串流」。

復原 MySQL 或 Oracle 來源的串流

下列程式碼範例顯示從各種記錄檔位置復原 MySQL 或 Oracle 來源串流的要求:

REST

從目前位置恢復串流。這是預設選項:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

從下一個可用位置恢復串流:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "nextAvailableStartPosition": {}
  }
}

從最新位置恢復串流:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "mostRecentStartPosition": {}
  }
}

從特定位置復原串流 (以 MySQL 二進位記錄檔為基礎的複製作業):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

更改下列內容:

  • NAME_OF_THE_LOG_FILE:要從中還原串流的記錄檔名稱
  • POSITION:您要從記錄檔中恢復串流的位置。如未提供值,Datastream 會從檔案開頭復原串流。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 4
      }
    }
  }
}

從特定位置還原串流 (以 MySQL GTID 為基礎的複製模式):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "GTID_SET"
      }
    }
  }
}

GTID_SET 替換為一或多個單一 GTID 或 GTID 範圍,您要從中還原串流。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:1-3"
      }
    }
  }
}

從特定位置復原串流 (Oracle):

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
scn 替換為重做記錄檔中的系統變更編號 (SCN),您要從該編號還原串流。這是必填欄位。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 234234
      }
    }
  }
}

如要進一步瞭解可用的復原選項,請參閱「復原串流」。

gcloud

系統不支援使用 gcloud 復原串流。

復原 PostgreSQL 來源的串流

下列程式碼範例顯示從 PostgreSQL 來源復原串流的要求。在復原期間,串流會從為串流設定的複製運算單元中,第一個記錄檔序號 (LSN) 開始讀取內容。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run

如要變更複製運算單元,請先使用新的複製運算單元名稱更新串流:

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot
{
  "sourceConfig": {
    "postgresqlSourceConfig": {
      "replicationSlot": "NEW_REPLICATION_SLOT_NAME"
    }
  }
}

gcloud

系統不支援使用 gcloud 復原串流。

復原 SQL Server 來源的串流

下列程式碼範例顯示如何要求復原 SQL Server 來源的串流。

REST

從第一個可用位置繼續串流:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run

從偏好的記錄序號恢復串流:

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": lsn
      }
    }
  }
}

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": 0000123C:0000BA78:0004
      }
    }
  }
}

gcloud

系統不支援使用 gcloud 復原串流。

從特定位置開始或繼續串流

您可以從特定位置開始串流,或繼續串流 MySQL 和 Oracle 來源的資料。如果您想使用外部工具執行回填作業,或從您指定的位址啟動 CDC,這項功能就非常實用。如果是 MySQL 來源,您需要指出二進位記錄檔位置或 GTID 集;如果是 Oracle 來源,則需要指出重做記錄檔中的系統變更編號 (SCN)。

下列程式碼顯示要求,從特定位置開始或繼續播放已建立的串流。

從特定二進位記錄檔位置開始或繼續串流 (MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

更改下列內容:

  • NAME_OF_THE_LOG_FILE:要開始串流的記錄檔名稱。
  • POSITION:您要開始串流的記錄檔位置。如未提供值,Datastream 會從檔案開頭開始讀取。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 2
      }
    }
  }
}

gcloud

系統不支援使用 gcloud 從特定位置開始或繼續串流。如要瞭解如何使用 gcloud 啟動或繼續串流,請參閱 Cloud SDK 說明文件

從特定 GTID 集開始或繼續串流 (MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "GTID_SET"
      }
    }
  }
}

GTID_SET 替換為一或多個單一 GTID 或 GTID 範圍,您要從這些 GTID 開始或繼續串流。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:3-7"
      }
    }
  }
}

gcloud

系統不支援使用 gcloud 從特定位置開始或繼續串流。如要瞭解如何使用 gcloud 啟動或繼續串流,請參閱 Cloud SDK 說明文件

從重做記錄檔 (Oracle) 中的特定系統變更編號開始或繼續串流:

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
scn 替換為重做記錄檔中的系統變更編號 (SCN),串流會從該編號開始。這是必填欄位。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 123123
      }
    }
  }
}

gcloud

系統不支援使用 gcloud 從特定位置開始或繼續串流。如要瞭解如何使用 gcloud 啟動串流,請參閱 Cloud SDK 說明文件

修改串流

下列程式碼顯示更新串流檔案輪替設定的請求,將檔案輪替間隔設為每 75 MB 或 45 秒。

在本範例中,為 updateMask 參數指定的欄位包括 fileRotationMbfileRotationInterval 欄位,分別以 destinationConfig.gcsDestinationConfig.fileRotationMbdestinationConfig.gcsDestinationConfig.fileRotationInterval 旗標表示。

REST

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

下列程式碼顯示要求,在 Datastream 寫入 Cloud Storage 的檔案路徑中加入一致類型的結構定義檔案。因此,Datastream 會寫入兩個檔案:JSON 資料檔案和 Avro 結構定義檔案。

在本範例中,指定的欄位是 jsonFileFormat 欄位,以 destinationConfig.gcsDestinationConfig.jsonFileFormat 旗標表示。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }  
    }
  }
}

下列程式碼顯示要求,請 Datastream 將來源資料庫中的現有資料和資料變更複製到目的地。

程式碼的 oracleExcludedObjects 區段會顯示無法回填至目的地的表格和結構定義。

在本範例中,除了 schema3 中的 tableA 以外,所有資料表和結構定義都會回填。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schema": "schema3",
          "oracleTables": [
            { "table": "tableA" }
          ]
        }
      ]
    }
  }
}  

gcloud

如要進一步瞭解如何使用 gcloud 修改串流,請參閱 Google Cloud SDK 說明文件

為串流物件啟動補充作業

Datastream 中的串流可以補充歷來資料,並將持續變更串流至目的地。系統一律會將進行中的變更從來源串流至目的地。不過,您可以指定是否要串流歷來資料。

如要將來源的歷來資料串流至目的地,請使用 backfillAll 參數。

Datastream 也可讓您只串流特定資料庫資料表的歷來資料。如要這麼做,請使用 backfillAll 參數,並排除您不想要歷來資料的表格。

如果只想將持續變更串流至目的地,請使用 backfillNone 參數。如果想讓 Datastream 將來源的所有現有資料快照串流至目的地,您必須手動啟動含有這類資料的物件補充作業。

如果來源和目的地之間的資料不同步,也可以啟動物件補充作業。舉例來說,使用者可能會不小心刪除目的地中的資料,導致資料遺失。在這種情況下,啟動物件補充作業可做為「重設機制」,因為所有資料都會一次串流至目的地。因此,來源和目的地之間會同步處理資料。

如要對串流的物件啟動補充作業,必須先擷取物件的相關資訊

每個物件都有 OBJECT_ID,可做為物件的專屬 ID。您可以使用 OBJECT_ID 啟動串流的回填作業。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob

gcloud

如要進一步瞭解如何使用 gcloud 為串流物件啟動回填作業,請參閱 Google Cloud SDK 說明文件

停止串流物件的補充作業

為串流物件啟動補充作業後,即可停止該物件的補充作業。舉例來說,如果使用者修改資料庫結構定義,結構定義或資料可能會損毀。您不希望將這個結構定義或資料串流至目的地,因此停止回填物件。

您也可以基於負載平衡目的停止物件回填作業。Datastream 可以並行執行多個回填作業。這可能增加來源的負載。如果負載量很大,請停止每個物件的補充作業,然後逐一啟動物件的補充作業。

如要停止串流物件的補充作業,必須先提出要求,擷取串流中所有物件的相關資訊。傳回的每個物件都有 OBJECT_ID,可做為物件的專屬 ID。您可以使用 OBJECT_ID 停止串流的回填作業。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob

gcloud

如要進一步瞭解如何使用 gcloud 停止串流物件的回填作業,請參閱 Google Cloud SDK 說明文件

變更 CDC 並行工作數上限

下列程式碼顯示如何將 MySQL 串流的並行變更資料擷取 (CDC) 工作數上限設為 7。

在本例中,為 updateMask 參數指定的欄位是 maxConcurrentCdcTasks 欄位。將值設為 7,即可將並行 CDC 工作數上限從先前的值變更為 7。可使用的值介於 0 至 50 之間 (包括 0 與 50)。如未定義值或定義為 0,系統會為串流設定預設值 5 個工作。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }  
}

gcloud

如要進一步瞭解如何使用 gcloud,請參閱 Google Cloud SDK 說明文件

變更並行補充作業工作數量上限

下列程式碼顯示如何將 MySQL 串流的並行回填工作數上限設為 25。

在本例中,為 updateMask 參數指定的欄位是 maxConcurrentBackfillTasks 欄位。將值設為 25,即可將並行回填工作數上限從先前的值變更為 25。可使用的值介於 0 至 50 之間 (包括 0 與 50)。如未定義值或將值定義為 0,系統會為串流設定 16 項工作的預設值。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/
streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentBackfillTasks": "25"
      }
    }  
}

gcloud

如要進一步瞭解如何使用 gcloud,請參閱 Google Cloud SDK 說明文件

為 Oracle 來源啟用大型物件串流

您可以為具有 Oracle 來源的串流啟用大型物件串流,例如二進位大型物件 (BLOB)、字元大型物件 (CLOB) 和國家字元大型物件 (NCLOB)。streamLargeObjects 旗標可讓您在新的和現有的串流中加入大型物件。這項旗標是在串流層級設定,因此您不需要指定大型物件資料類型的資料欄。

以下範例說明如何建立串流,以便串流大型物件。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleLobStream
{
  "displayName": "Oracle LOB stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1",
            "oracleTables": [
              {
                "table": "tableA",
                "oracleColumns": [
                  {
                    "column": "column1,column2"
                  }
                ]
              }
            ]
          }
        ]
      },
      "excludeObjects": {},
      "streamLargeObjects": {}
    }
  }
}

gcloud

如要進一步瞭解如何使用 gcloud 更新串流,請參閱 Google Cloud SDK 說明文件

刪除串流

下列程式碼顯示刪除串流的要求。

REST

DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID

gcloud

如要進一步瞭解如何使用 gcloud 刪除串流,請參閱 Google Cloud SDK 說明文件

後續步驟