ストリームの管理

をご覧ください。

概要

このセクションでは、Datastream API を使用して以下を行う方法を説明します。

  • ストリームの検証と作成
  • ストリームとストリームのオブジェクトに関する情報の取得
  • ストリームの開始、一時停止、再開、変更と、ストリームのオブジェクトのバックフィルの開始と停止によるストリームの更新
  • ストリームの削除

Datastream API を使用する方法は 2 つあります。REST API 呼び出しを行うか、gcloud コマンドライン ツール(CLI)を使用できます。

gcloud を使用したデータストリームのストリームの管理の概要については、こちらをご覧ください。

ストリームを検証する

ストリームは、作成する前にそれを検証できます。これにより、ストリームが正常に実行され、すべての検証チェックに合格することが確実になります。

ストリームの検証では、次のことを確認します。

  • データストリームがソースからデータをストリーミングできるようにソースが適切に構成されているかどうか。
  • ストリームがソースと宛先の両方に接続できるかどうか。
  • ストリームのエンドツーエンド構成。

次のコードでは、ソースの Oracle データベースから Cloud Storage の宛先バケットまでのデータ転送に使用されるストリームを検証するリクエストを示します。

REST

POST "https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams?streamId=[stream-id]&validate_only=true"
{
  "displayName": "[display name]",
  "sourceConfig": {  "customerManagedEncryptionKey": "projects/datastream-con-ronteller/locations/us-central1/keyRings/ringy/cryptoKeys/nicekey",
    "sourceConnectionProfileName": "[connectionProfileName]",
    "oracleSourceConfig": {
      "allowlist": {},
      "rejectlist": {}
    }
  }
  "destinationConfig": {
    "destinationConnectionProfileName": "[connectionProfileName]",
    "gcsDestinationConfig": {
      "path": "[filePrefix]",
      "avroFileFormat": "{}"
      "fileRotationMb": MBytes
      "fileRotationInterval": seconds
    }
  },
  "customerManagedEncryptionKey": "projects/[project-id]/locations/[location]/
  keyRings/[ring]/cryptoKeys/[customer-managed-encryption-key]",
  "backfillAll": {}
}

&validate_only=true 値は、ストリームの検証のみで、作成はしていないことを示します。また、このリクエストでは URL 全体を引用符で囲みます。これにより、Datastream は &validate_only=true 値を取得してストリームを検証します。

リクエストを行うと、送信元と宛先に対して Datastream が実行する検証チェックと、チェックに合格したかどうかが表示されます。検証に合格しなかった場合は、失敗した理由と問題の修正方法に関する情報が表示されます。

たとえば、送信元から宛先にストリーミングされるデータを Datastream で暗号化するために使用する顧客管理の暗号鍵(CMEK)があるとします。

ストリームの検証の一環として、Datastream はキーが存在すること、および DataStream がキーを使用する権限を持っていることを確認します。

この条件が満たされない場合、ストリームを検証すると、次のエラー メッセージが返されます。

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

この問題を解決するには、指定したキーが存在し、Datastream サービス アカウントにそのキーに対する cloudkms.cryptoKeys.get 権限があることを確認します。

適切な修正を行った後、リクエストを再度実行して、すべての検証チェックに合格することを確認します。 上記の例では、CMEK_VALIDATE_PERMISSIONS のチェックはエラー メッセージを返さなくなりますが、ステータスは PASSED になります。

ストリームの作成

次のコードでは、ソースの Oracle データベースから Cloud Storage の宛先バケットまでのデータ転送に使用されるストリームを作成するリクエストを示します。

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams?streamId=[stream-id]
{
  "displayName": "[display name]",
  "sourceConfig": {
    "sourceConnectionProfileName": "[connectionProfileName]",
    "oracleSourceConfig": {
      "allowlist": {},
      "rejectlist": {}
    }
  }
  "destinationConfig": {
    "destinationConnectionProfileName": "[connectionProfileName]",
    "gcsDestinationConfig": {
      "path": "[filePrefix]",
      "avroFileFormat": "{}"
      "fileRotationMb": MBytes
      "fileRotationInterval": seconds
    }
  },
  "customerManagedEncryptionKey": "projects/[project-id]/locations/[location]/
  keyRings/[ring]/cryptoKeys/[customer-managed-encryption-key]",
  "backfillAll": {}
}

たとえば、schema1 からすべてのテーブルを取得し、schema3 から 2 つの特定のテーブル(tableA と tableC)を取得するリクエストは下のようになります。

その結果は Avro 形式で Cloud Storage のバケットに書き込まれ、100 MB または 30 秒ごとに新しいファイルが作成されます(デフォルト値の 50 MB と 60 秒をオーバーライドしています)。

Google Cloud に保存されているすべてのデータは、Google 独自の暗号化データに使用されているのと同じ強化鍵管理システムを使用して保存時に暗号化されます。 こうした鍵管理システムでは厳密な鍵のアクセス制御と監査が行われ、AES-256 暗号化標準を使用してデータが保存時に暗号化されます。設定、構成、管理は必要ありません。Google Cloud のデフォルトの保存時の暗号化は、暗号化マテリアルのコンプライアンスや地域区分に関連する特定の要件を持たないユーザーに最適です。

Google Cloud プロジェクト内で保存データの暗号化に使用する鍵をより詳細に制御する必要がある場合は、Cloud Key Management Service(KMS)内で管理されている暗号鍵を使用してデータを保護する機能が Datastream により提供されます。こうした暗号鍵は、「顧客管理の暗号鍵(CMEK)」と呼ばれます。CMEK を使用して Datastream のデータを保護すると、CMEK が制御されます。

customerManagedEncryptionKey パラメータは、送信元から宛先にストリーミングされるデータを暗号化するために Datastream が使用できる CMEK に関連付けられています。CMEK は、[customer-managed-encryption-key] プレースホルダで表されます。

[ring] プレースホルダは、CMEK のキーリングを表します。キーリングを使用すると、特定の Google Cloud ロケーション内の鍵を整理して、鍵のグループのアクセス制御を管理できます。キーリングの名前は Google Cloud プロジェクト全体にわたって一意である必要はありませんが、指定されたロケーション内では一意となる必要があります。キーリングの詳細については、Cloud KMS リソースをご覧ください。

ストリームの作成の一環として、Datastream は CMEK が存在すること、および DataStream が鍵を使用する権限を持っていることを確認します。これらのチェックの詳細については、ストリームの検証をご覧ください。

CMEK の代わりに Google Cloud の内部鍵管理システムを使用してデータを暗号化する場合は、API リクエストに customerManagedEncryptionKey パラメータと値を含めないでください。

backfillAll のパラメータは、過去のバックフィルに関連付けられます。このパラメータに空の辞書({})を設定すると、データストリームは次の情報のバックフィルを行います。

  • ソース データベースから宛先への履歴データ(進行中のデータの変更を含む)。
  • ソースから宛先へのスキーマとテーブル。
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
        streams/myOracleDb",
    "oracleSourceConfig": {
      "allowlist": {
        "oracleSchemas": [
          {
            "schemaName": "schema1",
            "oracleTables": []
          },
          {
            "schemaName": "schema3",
            "oracleTables": [
              { "tableName": "tableA" },
              { "tableName": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": "{}",
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/[project-id]/locations/[location]/
  keyRings/[ring]/cryptoKeys/[customer-managed-encryption-key]",
  "backfillAll": {}
}

ストリームに関する情報を取得する

次のコードでは、ストリームに関する情報を取得するリクエストを示します。これには以下の情報が含まれます。

  • データストリームが認識するストリームの名前
  • ストリームのわかりやすい名前(表示名)
  • ストリームの作成および更新の日時スタンプ
  • ストリームに関連付けられたソースと宛先の接続プロファイルに関する情報
  • ストリームの状態

REST

GET https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]

次に例を示します。

GET https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream

レスポンスは次のように表示されます。

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "allowlist": {
        "oracleSchemas": [
          {
            "schemaName": "schema1",
            "oracleTables": []
          },
          {
            "schemaName": "schema3",
            "oracleTables": [
              { "tableName": "tableA" },
              { "tableName": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": "{}",
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/[project-id]/locations/[location]/
  keyRings/[ring]/cryptoKeys/[customer-managed-encryption-key]",
  "backfillAll": {}
}

gcloud

gcloud を使用してストリームに関する情報を取得する方法については、こちらをクリックしてください。

ストリームを一覧表示する

次のコードでは、すべてのストリームに関する情報を取得するリクエストを示します。

REST

GET https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams

gcloud

gcloud を使用してすべてのストリームに関する情報を取得する方法については、こちらをクリックしてください。

ストリームのオブジェクトを一覧表示する

次のコードでは、ストリームのすべてのオブジェクトに関する情報を取得するリクエストを示します。

REST

GET https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]/objects

次に例を示します。

GET https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myMySQLCdcStream/objects

返されるオブジェクトのリストは、次のようになります。

REST

{
  "streamObjects": [
    {
      "name": "projects/[project-id]/locations/[location]/streams/[stream-id]/
      objects/[object-id]",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/[project-id]/locations/[location]/streams/[stream-id]/
      objects/[object-id]",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/[project-id]/locations/[location]/streams/[stream-id]/
      objects/[object-id]",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

ストリームを更新する

ストリームを開始する

次のコードでは、ストリームを開始するリクエストを示します。

リクエストで updateMask パラメータを使用することによって、指定したフィールドのみをリクエストの本文に含めば済むようになります。

この例では、指定されているフィールドは state フィールドであり、ストリームのステータス(状態)を表します。ストリームを開始すると、状態が CREATED から RUNNING まで変わっていきます。

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "RUNNING"
}

次に例を示します。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "RUNNING"
}

ストリームを一時停止する

次のコードでは、実行中のストリームを一時停止するリクエストを示します。

この例では、updateMask パラメータに state フィールドを指定します。ストリームを一時停止することで、その状態は RUNNING から PAUSED に変わります。

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "PAUSED"
}

次に例を示します。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "PAUSED"
}

ストリームを再開する

次のコードでは、一時停止されたストリームを再開するリクエストを示します。

この例では、updateMask パラメータに state フィールドを指定します。ストリームを再開すると、状態は PAUSED から RUNNING に変わります。

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "RUNNING"
}

次に例を示します。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "RUNNING"
}

ストリームを変更する

次のコードでは、ファイルを 75 MB または 45 秒ごとにローテーションするようにストリームのファイル ローテーション構成を更新するリクエストを示します。

この例では、updateMask パラメータで指定されたフィールドに fileRotationMb フィールドと fileRotationInterval フィールドが含まれ、それぞれ destinationConfig.gcsDestinationConfig.fileRotationMb フラグと destinationConfig.gcsDestinationConfig.fileRotationInterval フラグで表されます。

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

次に例を示します。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

次のコードでは、データストリームが Cloud Storage に書き込むファイルのパスに統合型スキーマ ファイルを含めるリクエストを示します。この結果、データストリームによって JSON データファイルと Avro スキーマ ファイルの 2 つのファイルが書き込まれます。

この例では、destinationConfig.gcsDestinationConfig.jsonFileFormat フラグにより表された jsonFileFormat フィールドが指定されます。

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }
    }
  }
}

次に例を示します。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }
    }
  }
}

次のコードでは、進行中のデータに加え、既存のデータをソース データベースから宛先にレプリケートするデータストリームのリクエストを示します。

コードの oracleExcludedObjects セクションには、宛先へのバックフィルが制限されているテーブルとスキーマが示されます。

この例では、schema3 の tableA を除き、すべてのテーブルとスキーマがバックフィルされます。

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schemaName": "schema3",
          "oracleTables": [
            {
              "tableName": "tableA"
            }
          ]
        }
      ]
    }
  }
}

次に例を示します。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schemaName": "schema3",
          "oracleTables": [
            {
              "tableName": "tableA"
            }
          ]
        }
      ]
    }
  }
}

ストリームのオブジェクトのバックフィルを開始する

データストリームのストリームでは、過去のデータをバックフィルすることや、進行中の変更を宛先にストリーミングすることが可能です。進行中の変更は常に、ソースから宛先にストリーミングされます。ただし、履歴データをストリーミングするかどうかは指定できます。

過去のデータをソースから宛先にストリーミングする場合は、backfillAll パラメータを使用します。

Datastream では、特定のデータベース テーブルのみの履歴データをストリーミングすることもできます。これを行うには、backfillAll パラメータを使用して、履歴データを必要としないテーブルを除外します。

進行中の変更のみを宛先にストリーミングする場合は、backfillNone パラメータを使用します。その後、データストリームですべてのソースのスナップショットをソースから宛先にストリーミングする場合は、そのデータを含むオブジェクトのバックフィルを手動で開始する必要があります。

オブジェクトのバックフィルを開始するもう一つの理由は、データがソースと宛先の間で同期していない場合です。たとえば、ユーザーが誤って宛先のデータを削除してしまうと、そのデータが失われています。この場合、オブジェクトのバックフィルを開始することが「リセットの仕組み」として機能します。これは、すべてのデータが一度に宛先へストリーミングされるためです。その結果、ソースと宛先の間でデータが同期します。

ストリームのオブジェクトのバックフィルを開始する前には、オブジェクトに関する情報を取得する必要があります。

各オブジェクトには、オブジェクトを一意に識別する [object-id] があります。[object-id] を使用してストリームのバックフィルを開始します。

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]/objects/[object-id]:startBackfillJob

ストリームのオブジェクトのバックフィルを停止する

ストリームのオブジェクトのバックフィルを開始した後は、それを停止できます。たとえば、ユーザーがデータベース スキーマを変更すると、スキーマやデータが壊れる可能性があります。このスキーマやデータが宛先にストリーミングされないようにするには、オブジェクトのバックフィルを停止します。

また、ロード バランシングのためにオブジェクトのバックフィルを停止することもできます。データストリームは、複数のバックフィルを並行して実行できます。これにより、ソースの負荷が増加する可能性があります。負荷が著しく大きい場合は、各オブジェクトのバックフィルを停止してから、オブジェクトのバックフィルを 1 つずつ開始します。

ストリームのオブジェクトのバックフィルを停止する前に、ストリームのすべてのオブジェクトに関する情報を取得するリクエストを行う必要があります。返される各オブジェクトには、オブジェクトを一意に識別する [object-id] が含まれます。ストリームのバックフィルを停止するには、[object-id] を使用します。

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]/objects/[object-id]:stopBackfillJob

ストリームの削除

次のコードでは、ストリームを削除するリクエストを示します。

REST

DELETE https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]

次に例を示します。

DELETE https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream

gcloud

gcloud を使用してストリームを削除する方法の詳細については、こちらをご覧ください。