ストリームの管理

をご覧ください。

概要

このセクションでは、Datastream API を使用して以下を行う方法を説明します。

  • ストリームの検証と作成
  • ストリームとストリームのオブジェクトに関する情報の取得
  • ストリームの開始、一時停止、再開、変更と、ストリームのオブジェクトのバックフィルの開始と停止によるストリームの更新
  • ストリームの削除

Datastream API を使用する方法は 2 つあります。REST API 呼び出しを行うか、gcloud コマンドライン ツール(CLI)を使用できます。

gcloud を使用したデータストリームのストリームの管理の概要については、こちらをご覧ください。

ストリームを検証する

ストリームは、作成する前にそれを検証できます。これにより、ストリームが正常に実行され、すべての検証チェックに合格することが確実になります。

ストリームの検証では、次のことを確認します。

  • データストリームがソースからデータをストリーミングできるようにソースが適切に構成されているかどうか。
  • ストリームがソースと宛先の両方に接続できるかどうか。
  • ストリームのエンドツーエンド構成。

次のコードでは、ソースの Oracle データベースから Cloud Storage の宛先バケットまでのデータ転送に使用されるストリームを検証するリクエストを示します。

REST

POST "https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams?streamId=[stream-id]&validate_only=true"
{
  "displayName": "[display name]",
  "sourceConfig": {
    "sourceConnectionProfileName": "[connectionProfileName]",
    "oracleSourceConfig": {
      "allowlist": {},
      "rejectlist": {}
    }
  }
  "destinationConfig": {
    "destinationConnectionProfileName": "[connectionProfileName]",
    "gcsDestinationConfig": {
      "path": "[filePrefix]",
      "avroFileFormat": "{}"
      "fileRotationMb": MBytes
      "fileRotationInterval": seconds
    }
  },
  "backfillAll": {}
}

&validate_only=true の値は、ストリームの検証のみ行い、ストリームを作成しないことを示しています。また、このリクエストでは URL 全体を引用符で囲みます。これにより、データストリームが確実に &validate_only=true の値を受け取り、ストリームを検証します。

リクエストを行うと、ソースと宛先に対してデータストリームが実行する検証チェックと、チェックに合格したかどうかが表示されます。検証に合格しなかった場合は、失敗した理由と問題の修正方法に関する情報が表示されます。

適切な修正を行った後、リクエストを再度実行して、すべての検証チェックに合格することを確認します。

ストリームの作成

次のコードでは、ソースの Oracle データベースから Cloud Storage の宛先バケットまでのデータ転送に使用されるストリームを作成するリクエストを示します。

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams?streamId=[stream-id]
{
  "displayName": "[display name]",
  "sourceConfig": {
    "sourceConnectionProfileName": "[connectionProfileName]",
    "oracleSourceConfig": {
      "allowlist": {},
      "rejectlist": {}
    }
  }
  "destinationConfig": {
    "destinationConnectionProfileName": "[connectionProfileName]",
    "gcsDestinationConfig": {
      "path": "[filePrefix]",
      "avroFileFormat": "{}"
      "fileRotationMb": MBytes
      "fileRotationInterval": seconds
    }
  },
  "backfillAll": {}
}

たとえば、schema1 からすべてのテーブルを取得し、schema3 から 2 つの特定のテーブル(tableA と tableC)を取得するリクエストは下のようになります。

その結果は Avro 形式で Cloud Storage のバケットに書き込まれ、100 MB または 30 秒ごとに新しいファイルが作成されます(デフォルト値の 50 MB と 60 秒をオーバーライドしています)。

backfillAll のパラメータは、過去のバックフィルに関連付けられます。このパラメータに空の辞書({})を設定すると、データストリームは次の情報のバックフィルを行います。

  • ソース データベースから宛先への履歴データ(進行中のデータの変更を含む)。
  • ソースから宛先へのスキーマとテーブル。
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
        streams/myOracleDb",
    "oracleSourceConfig": {
      "allowlist": {
        "oracleSchemas": [
          {
            "schemaName": "schema1",
            "oracleTables": []
          },
          {
            "schemaName": "schema3",
            "oracleTables": [
              { "tableName": "tableA" },
              { "tableName": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": "{}",
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "backfillAll": {}
}

ストリームに関する情報を取得する

次のコードでは、ストリームに関する情報を取得するリクエストを示します。これには以下の情報が含まれます。

  • データストリームが認識するストリームの名前
  • ストリームのわかりやすい名前(表示名)
  • ストリームの作成および更新の日時スタンプ
  • ストリームに関連付けられたソースと宛先の接続プロファイルに関する情報
  • ストリームの状態

REST

GET https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]

次に例を示します。

GET https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream

レスポンスは次のように表示されます。

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "allowlist": {
        "oracleSchemas": [
          {
            "schemaName": "schema1",
            "oracleTables": []
          },
          {
            "schemaName": "schema3",
            "oracleTables": [
              { "tableName": "tableA" },
              { "tableName": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": "{}",
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "backfillAll": {}
}

gcloud

gcloud を使用してストリームに関する情報を取得する方法については、こちらをご覧ください。

ストリームを一覧表示する

次のコードでは、すべてのストリームに関する情報を取得するリクエストを示します。

REST

GET https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams

gcloud

gcloud を使用してすべてのストリームに関する情報を取得する方法については、こちらをご覧ください。

ストリームのオブジェクトを一覧表示する

次のコードでは、ストリームのすべてのオブジェクトに関する情報を取得するリクエストを示します。

REST

GET https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]/objects

次に例を示します。

GET https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myMySQLCdcStream/objects

返されるオブジェクトのリストは、次のようになります。

REST

{
  "streamObjects": [
    {
      "name": "projects/[project-id]/locations/[location]/streams/[stream-id]/
      objects/[object-id]",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/[project-id]/locations/[location]/streams/[stream-id]/
      objects/[object-id]",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/[project-id]/locations/[location]/streams/[stream-id]/
      objects/[object-id]",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

ストリームを更新する

ストリームを開始する

次のコードでは、ストリームを開始するリクエストを示します。

リクエストで updateMask パラメータを使用することによって、指定したフィールドのみをリクエストの本文に含めば済むようになります。

この例では、指定されているフィールドは state フィールドであり、ストリームのステータス(状態)を表します。ストリームを開始すると、状態が CREATED から RUNNING まで変わっていきます。

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "RUNNING"
}

次に例を示します。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "RUNNING"
}

ストリームを一時停止する

次のコードでは、実行中のストリームを一時停止するリクエストを示します。

この例では、updateMask パラメータに state フィールドを指定します。ストリームを一時停止することで、その状態は RUNNING から PAUSED に変わります。

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "PAUSED"
}

次に例を示します。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "PAUSED"
}

ストリームを再開する

次のコードでは、一時停止されたストリームを再開するリクエストを示します。

この例では、updateMask パラメータに state フィールドを指定します。ストリームを再開すると、状態は PAUSED から RUNNING に変わります。

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "RUNNING"
}

次に例を示します。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "RUNNING"
}

ストリームを変更する

次のコードでは、ファイルを 75 MB または 45 秒ごとにローテーションするようにストリームのファイル ローテーション構成を更新するリクエストを示します。

この例では、updateMask パラメータで指定されたフィールドに fileRotationMb フィールドと fileRotationInterval フィールドが含まれ、それぞれ destinationConfig.gcsDestinationConfig.fileRotationMb フラグと destinationConfig.gcsDestinationConfig.fileRotationInterval フラグで表されます。

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

次に例を示します。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

次のコードでは、データストリームが Cloud Storage に書き込むファイルのパスに統合型スキーマ ファイルを含めるリクエストを示します。この結果、データストリームによって JSON データファイルと Avro スキーマ ファイルの 2 つのファイルが書き込まれます。

この例では、destinationConfig.gcsDestinationConfig.jsonFileFormat フラグにより表された jsonFileFormat フィールドが指定されます。

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }
    }
  }
}

次に例を示します。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }
    }
  }
}

次のコードでは、進行中のデータに加え、既存のデータをソース データベースから宛先にレプリケートするデータストリームのリクエストを示します。

コードの oracleExcludedObjects セクションには、宛先へのバックフィルが制限されているテーブルとスキーマが示されます。

この例では、schema3 の tableA を除き、すべてのテーブルとスキーマがバックフィルされます。

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schemaName": "schema3",
          "oracleTables": [
            {
              "tableName": "tableA"
            }
          ]
        }
      ]
    }
  }
}

次に例を示します。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schemaName": "schema3",
          "oracleTables": [
            {
              "tableName": "tableA"
            }
          ]
        }
      ]
    }
  }
}

ストリームのオブジェクトのバックフィルを開始する

データストリームのストリームでは、過去のデータをバックフィルすることや、進行中の変更を宛先にストリーミングすることが可能です。進行中の変更は常に、ソースから宛先にストリーミングされます。ただし、履歴データをストリーミングするかどうかは指定できます。

過去のデータをソースから宛先にストリーミングする場合は、backfillAll パラメータを使用します。

データストリームでは、特定のデータベース テーブルのみの履歴データをストリーミングすることもできます。これを行うには、backfillAll パラメータを使用して、履歴データを必要としないテーブルを除外します。

進行中の変更のみを宛先にストリーミングする場合は、backfillNone パラメータを使用します。その後、データストリームですべてのソースのスナップショットをソースから宛先にストリーミングする場合は、そのデータを含むオブジェクトのバックフィルを手動で開始する必要があります。

オブジェクトのバックフィルを開始するもう一つの理由は、データがソースと宛先の間で同期していない場合です。たとえば、ユーザーが誤って宛先のデータを削除してしまうと、そのデータが失われています。この場合、オブジェクトのバックフィルを開始することが「リセットの仕組み」として機能します。これは、すべてのデータが一度に宛先へストリーミングされるためです。その結果、ソースと宛先の間でデータが同期します。

ストリームのオブジェクトのバックフィルを開始する前には、オブジェクトに関する情報を取得する必要があります。

各オブジェクトには、オブジェクトを一意に識別する [object-id] があります。ストリームのバックフィルは、この [object-id] を使用して開始します。

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]/objects/[object-id]:startBackfillJob

ストリームのオブジェクトのバックフィルを停止する

ストリームのオブジェクトのバックフィルを開始した後は、それを停止できます。たとえば、ユーザーがデータベース スキーマを変更すると、スキーマやデータが壊れる可能性があります。このスキーマやデータが宛先にストリーミングされないようにするには、オブジェクトのバックフィルを停止します。

また、ロード バランシングのためにオブジェクトのバックフィルを停止することもできます。データストリームは、複数のバックフィルを並行して実行できます。これにより、ソースの負荷が増加する可能性があります。負荷が著しく大きい場合は、各オブジェクトのバックフィルを停止してから、オブジェクトのバックフィルを 1 つずつ開始します。

ストリームのオブジェクトのバックフィルを停止する前には、ストリームのすべてのオブジェクトに関する情報を取得するリクエストを行う必要があります。返される各オブジェクトには、オブジェクトを一意に識別する [object-id] が含まれます。ストリームのバックフィルは、[object-id] を使用して停止します。

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]/objects/[object-id]:stopBackfillJob

ストリームの削除

次のコードでは、ストリームを削除するリクエストを示します。

REST

DELETE https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]

次に例を示します。

DELETE https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream

gcloud

gcloud を使用してストリームを削除する方法の詳細については、こちらをご覧ください。