ストリームの管理

概要

このセクションでは、Datastream API を使用して以下を行う方法を説明します。

  • ストリームの作成
  • ストリームとストリーム オブジェクトに関する情報の取得
  • ストリームの開始、一時停止、再開、変更や、ストリーム オブジェクトのバックフィルの開始と停止によって、ストリームを更新する
  • 恒久的な障害が発生したストリームの復元
  • Oracle ストリームで大きなオブジェクトのストリーミングを有効にする
  • ストリームの削除

Datastream API を使用する方法は 2 つあります。REST API 呼び出し、または Google Cloud CLI(CLI)を使用できます。

gcloud を使用した Datastream ストリームの管理の概要については、gcloud Datastream ストリームをご覧ください。

ストリームの作成

このセクションでは、ソースから転送先へのデータ転送に使用されるストリームを作成する方法について説明します。以下の例は包括的なものではなく、Datastream の特定の機能をハイライトしたものです。特定のユースケースに対処するには、Datastream API リファレンス ドキュメントと一緒にこれらの例を使用してください。

このセクションでは、次のユースケースについて説明します。

例 1: 特定のオブジェクトを BigQuery にストリーミングする

この例では、以下の方法について学習します。

  • MySQL から BigQuery へのストリーミング
  • ストリームに一連のオブジェクトを含める
  • ストリームに含まれるすべてのオブジェクトをバックフィルする

次のリクエストは、schema1 からすべてのテーブルを、schema2 から 2 つの特定のテーブル(tableAtableC)をそれぞれ pull するリクエストです。イベントが BigQuery のデータセットに書き込まれます。

リクエストには customerManagedEncryptionKey パラメータが含まれていないため、CMEK の代わりに Google Cloud の内部鍵管理システムがデータを暗号化するために使用されます。

履歴バックフィル(またはスナップショット)の実行に関連する backfillAll パラメータが空の辞書({})に設定されます。つまり、Datastream は、ストリームに含まれるすべてのテーブルの履歴データをバックフィルします。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream
{
  "displayName": "MySQL CDC to BigQuery",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "schema1" },
          {
            "database": "schema2",
            "mysqlTables": [
              {
                "table": "tableA",
                "table": "tableC"
              }
            ]
          }
        ]
      },
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {}
}

gcloud

gcloud を使用してストリームを作成する方法の詳細については、Google Cloud SDK のドキュメントをご覧ください。

例 2: PostgreSQL ソースを使用してストリームから特定のオブジェクトを除外する

この例では、以下の方法について学習します。

  • PostgreSQL から BigQuery へのストリーミング
  • ストリームからオブジェクトを除外する
  • バックフィルからオブジェクトを除外する

次のコードでは、移行元の PostgreSQL データベースから BigQuery にデータを転送するために使用されるストリームを作成するリクエストを示します。ソース PostgreSQL データベースからストリームを作成するときは、リクエストに PostgreSQL 固有の項目を 2 つ指定する必要があります。

  • replicationSlot: レプリケーション スロットは、レプリケーション用の PostgreSQL データベースを構成するための前提条件です。ストリームごとにレプリケーション スロットを作成する必要があります。
  • publication: パブリケーションは、変更を複製するテーブルのグループです。ストリームを開始する前に、パブリケーション名がデータベースに存在する必要があります。パブリケーションには、少なくともストリームの includeObjects リストで指定されたテーブルが含まれている必要があります。

過去のバックフィル(またはスナップショット)の実行に関連する backfillAll パラメータは、1 つのテーブルを除外するように設定されています。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to BigQueryCloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "schema1"
        }
      },
      "excludeObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
        "postgresqlTables": [
          {
            "table": "tableA",
            "postgresqlColumns": [
              { "column": "column5" }
              ]
              }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
           "location": "us",
           "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {
    "postgresqlExcludedObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
            "postgresqlTables": [
              { "table": "tableA" }
            ]
          }
        ]
      }
    }
  }

gcloud

gcloud を使用してストリームを作成する方法の詳細については、Google Cloud SDK のドキュメントをご覧ください。

例 3: Cloud Storage の宛先にストリーミングする

この例では、以下の方法について学習します。

  • Oracle から Cloud Storage へのストリーミング
  • ストリームに含める一連のオブジェクトを定義する
  • 保存データを暗号化するための CMEK を定義する

次のリクエストは、Cloud Storage のバケットにイベントを書き込むストリームを作成する方法を示しています。

このリクエスト例では、イベントが JSON 出力形式で書き込まれ、100 MB または 30 秒ごとに新しいファイルが作成されます(デフォルト値の 50 MB と 60 秒をオーバーライドしています)。

JSON 形式では、次のことが可能です。

  • パスに統合型スキーマ ファイルを含めるその結果、データストリームによって JSON データファイルと Avro スキーマ ファイルの 2 つのファイルが Cloud Storage に書き込まれます。スキーマ ファイルは、データファイルと同じ名前で、拡張子は .schema です。

  • gzip 圧縮を有効にする。これによって、Cloud Storage に書き込まれたファイルをデータストリームが圧縮するようにします。

backfillNone パラメータを使用すると、このリクエストでは、バックフィルなしに、進行中の変更のみが宛先にストリーミングされることが指定されます。

このリクエストでは、Google Cloud プロジェクト内で保存データを暗号化するために使用される鍵を制御できる、顧客管理の暗号鍵パラメータを指定します。このパラメータは、ソースから宛先にストリーミングされるデータを暗号化するために Datastream が使用する CMEK を参照します。また、CMEK のキーリングも指定します。

キーリングの詳細については、Cloud KMS リソースをご覧ください。暗号鍵を使用したデータの保護の詳細については、Cloud Key Management Service(KMS)をご覧ください。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
    connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "GcsBucketCp",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "jsonFileFormat": {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      },
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillNone": {}
}

gcloud

gcloud を使用してストリームを作成する方法の詳細については、Google Cloud SDK のドキュメントをご覧ください。

ストリームの定義を検証する

ストリームを作成する前に、その定義を検証できます。これにより、すべての検証チェックに合格し、作成時にストリームが正常に実行されることを確認できます。

ストリームの検証では、次のことを確認します。

  • データストリームがソースからデータをストリーミングできるようにソースが適切に構成されているかどうか
  • ストリームがソースと宛先の両方に接続できるかどうか
  • ストリームのエンドツーエンド構成

ストリームを検証するには、リクエスト本文の前の URL に &validate_only=true を追加します。

POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"

このリクエストを送信すると、ソースと宛先でデータストリームが実行する検証チェックと、チェックに合格するか失敗したかが表示されます。検証で不合格だった場合は、失敗の理由と問題を解決する方法に関する情報が表示されます。

たとえば、ソースから宛先にストリーミングされるデータを暗号化するために Datastream で使用する顧客管理の暗号鍵(CMEK)があるとします。ストリームの検証の一環として、Datastream は鍵が存在し、Datastream に鍵を使用する権限があることを確認します。いずれかの条件が満たされない場合、ストリームを検証すると、次のエラー メッセージが返されます。

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

この問題を解決するには、指定した鍵が存在し、Datastream サービス アカウントにその鍵に対する cloudkms.cryptoKeys.get 権限があることを確認してください。

適切な修正を行った後、リクエストを再度実行して、すべての検証チェックに合格することを確認します。 上記の例では、CMEK_VALIDATE_PERMISSIONS チェックはエラー メッセージを返さなくなりますが、ステータスは PASSED になります。

ストリームに関する情報を取得する

次のコマンドでは、ストリームに関する情報を取得するリクエストを示します。これには以下の情報が含まれます。

  • ストリームの名前(一意の識別子)
  • ストリームのわかりやすい名前(表示名)
  • ストリームが最後に作成され、最後に更新されたときのタイムスタンプ
  • ストリームに関連付けられたソースと宛先の接続プロファイルに関する情報
  • ストリームの状態

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID

レスポンスは次のようになります。

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          },
          {
            "schema": "schema3",
            "oracleTables": [
              { "table": "tableA" },
              { "table": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": {},
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillAll": {}
}

gcloud

gcloud を使用してストリームに関する情報を取得する方法については、こちらをクリックしてください。

ストリームを一覧表示する

次のコードでは、指定したプロジェクトとロケーション内のすべてのストリームのリストを取得するリクエストを示します。

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams

gcloud

gcloud を使用してすべてのストリームに関する情報を取得する方法については、こちらをクリックしてください。

ストリームのオブジェクトを一覧表示する

次のコマンドでは、ストリームのすべてのオブジェクトに関する情報を取得するリクエストを示します。

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects

gcloud

gcloud を使用してストリームのすべてのオブジェクトに関する情報を取得する方法については、こちらをクリックしてください。

返されるオブジェクトのリストは次のようになります。

REST

{
  "streamObjects": [
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object1",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object2",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object3",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

gcloud

gcloud を使用してストリームのオブジェクトを一覧表示する方法については、こちらをクリックしてください。

ストリームを開始する

次のコマンドでは、ストリームを開始するリクエストを示します。

リクエストで updateMask パラメータを使用することによって、指定したフィールドのみをリクエストの本文に含めば済むようになります。 ストリームを開始するには、state フィールドの値を CREATED から RUNNING に変更します。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

gcloud を使用してストリームを開始する方法の詳細については、こちらをクリックしてください。

ストリームを特定の位置から開始する

たとえば、外部ツールを使用してバックフィルを実行する場合や、指定したログファイルから CDC を開始する場合、MySQL と Oracle のソースの特定の位置からストリームを開始できます。MySQL ソースの場合は、バイナリログの位置、Oracle ソースの場合は REDO ログファイルのシステム変更番号(SCN)を指定する必要があります。

次のコードは、すでに作成されたストリームを特定の位置から開始するリクエストを示しています。

特定のバイナリログの位置からストリームを開始します(MySQL)。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

次のように置き換えます。

  • NAME_OF_THE_LOG_FILE: ストリームを開始するログファイルの名前。
  • POSITION: ストリームを開始するログファイル内の位置。値を指定しない場合、Datastream はファイルの先頭からの読み取りを開始します。

例:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 2
      }
    }
  }
}

gcloud

gcloud を使用して特定の位置からストリームを開始することはサポートされていません。gcloud を使用してストリームを開始する方法については、Cloud SDK のドキュメントをご覧ください。

REDO ログファイル(Oracle)の特定のシステム変更番号からストリームを開始します。

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
scn は、ストリームを開始する REDO ログファイルのシステム変更番号(SCN)に置き換えます。この項目は必須です。

次に例を示します。

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 123123
      }
    }
  }
}

gcloud

gcloud を使用して特定の位置からストリームを開始することはサポートされていません。gcloud を使用してストリームを開始する方法については、Cloud SDK のドキュメントをご覧ください。

ストリームを一時停止する

次のコマンドでは、実行中のストリームを一時停止するリクエストを示します。

この例では、updateMask パラメータに state フィールドを指定します。ストリームを一時停止することで、その状態は RUNNING から PAUSED に変わります。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "PAUSED"
}

gcloud

gcloud を使用してストリームを一時停止する方法について詳しくは、こちらをご覧ください。

ストリームを再開する

次のコマンドでは、一時停止されたストリームを再開するリクエストを示します。

この例では、updateMask パラメータに state フィールドを指定します。ストリームを再開すると、状態は PAUSED から RUNNING に変わります。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

gcloud を使用してストリームを再開する方法の詳細については、こちらをクリックしてください。

ストリームを復元する

RunStream メソッドを使用すると、恒久的に失敗する MySQL、Oracle または PostgreSQL ソースのストリームを復元できます。ソース データベース タイプごとに、可能なストリーム復元オペレーションの独自の定義があります。詳細については、ストリームを復元するをご覧ください。

MySQL または Oracle ソースのストリームを復元する

次のコードサンプルは、ログファイルのさまざまな位置から MySQL または Oracle ソースのストリームを復元するリクエストを示しています。

REST

ストリームを現在の位置から復元します。これはデフォルトのオプションです。

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

ストリームを次の使用可能な位置から復元します。

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "nextAvailableStartPosition": {}
  }
}

ストリームを最新の位置から復元します。

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "mostRecentStartPosition": {}
  }
}

ストリームを特定の位置から復元します(MySQL)。

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

次のように置き換えます。

  • NAME_OF_THE_LOG_FILE: ストリームを復元するログファイルの名前
  • POSITION: ストリームを復元するログファイル内の位置。値を指定しない場合、Datastream ではファイルの先頭からストリームが復元されます。

次に例を示します。

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 4
      }
    }
  }
}

ストリームを特定の位置から復元します(Oracle)。

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
scn は、ストリームを復元する REDO ログファイルのシステム変更番号(SCN)に置き換えます。この項目は必須です。

次に例を示します。

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 234234
      }
    }
  }
}

使用可能な復元オプションの詳細については、ストリームの復元をご覧ください。

gcloud

gcloud を使用したストリームの復元はサポートされていません。

PostgreSQL ソースのストリームを復元する

次のコードサンプルは、PostgreSQL ソースのストリームを復元するリクエストを示しています。復元の際、ストリームは、ストリーム用に構成されたレプリケーション スロットの最初のログシーケンス番号(LSN)から読み込みを開始します。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

例:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run

レプリケーション スロットを変更する場合は、まず新しいレプリケーション スロット名でストリームを更新します。

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot
{
  "sourceConfig": {
    "postgresqlSourceConfig": {
      "replicationSlot": "NEW_REPLICATION_SLOT_NAME"
    }
  }
}

gcloud

gcloud を使用したストリームの復元はサポートされていません。

ストリームを変更する

次のコマンドでは、ファイルを 75 MB または 45 秒ごとにローテーションするようにストリームのファイル ローテーション構成を更新するリクエストを示します。

この例では、updateMask パラメータで指定されたフィールドに fileRotationMb フィールドと fileRotationInterval フィールドが含まれ、それぞれ destinationConfig.gcsDestinationConfig.fileRotationMb フラグと destinationConfig.gcsDestinationConfig.fileRotationInterval フラグで表されます。

REST

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

次のコマンドでは、データストリームが Cloud Storage に書き込むファイルのパスに統合型スキーマ ファイルを含めるリクエストを示します。この結果、データストリームによって JSON データファイルと Avro スキーマ ファイルの 2 つのファイルが書き込まれます。

この例では、destinationConfig.gcsDestinationConfig.jsonFileFormat フラグにより表された jsonFileFormat フィールドが指定されます。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }
    }
  }
}

次のコマンドでは、進行中のデータに加え、既存のデータをソース データベースから宛先にレプリケートするデータストリームのリクエストを示します。

コードの oracleExcludedObjects セクションには、宛先へのバックフィルが制限されているテーブルとスキーマが示されます。

この例では、schema3 の tableA を除くすべてのテーブルとスキーマがバックフィルされます。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schema": "schema3",
          "oracleTables": [
            { "table": "tableA" }
          ]
        }
      ]
    }
  }
}

gcloud

gcloud を使用してストリームを変更する方法の詳細については、こちらをクリックしてください。

ストリームのオブジェクトのバックフィルを開始する

データストリームのストリームでは、過去のデータをバックフィルすることや、進行中の変更を宛先にストリーミングすることが可能です。進行中の変更は常に、ソースから宛先にストリーミングされます。ただし、履歴データをストリーミングするかどうかは指定できます。

過去のデータをソースから宛先にストリーミングする場合は、backfillAll パラメータを使用します。

Datastream では、特定のデータベース テーブルに対してのみ履歴データをストリーミングすることもできます。これを行うには、backfillAll パラメータを使用して、履歴データが不要なテーブルを除外します。

進行中の変更のみを宛先にストリーミングする場合は、backfillNone パラメータを使用します。その後、データストリームですべてのソースのスナップショットをソースから宛先にストリーミングする場合は、そのデータを含むオブジェクトのバックフィルを手動で開始する必要があります。

オブジェクトのバックフィルを開始するもう一つの理由は、データがソースと宛先の間で同期していない場合です。たとえば、ユーザーが誤って宛先のデータを削除してしまうと、そのデータが失われています。この場合、オブジェクトのバックフィルを開始することが「リセットの仕組み」として機能します。これは、すべてのデータが一度に宛先へストリーミングされるためです。その結果、ソースと宛先の間でデータが同期します。

ストリームのオブジェクトのバックフィルを開始する前には、オブジェクトに関する情報を取得する必要があります。

各オブジェクトには、オブジェクトを一意に識別する OBJECT_ID があります。OBJECT_ID を使用して、ストリームのバックフィルを開始します。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob

gcloud

gcloud を使用してストリームのオブジェクトの最初のバックフィルの詳細については、こちらをクリックしてください。

ストリームのオブジェクトのバックフィルを停止する

ストリームのオブジェクトのバックフィルを開始した後は、それを停止できます。たとえば、ユーザーがデータベース スキーマを変更すると、スキーマやデータが壊れる可能性があります。このスキーマやデータが宛先にストリーミングされないようにするには、オブジェクトのバックフィルを停止します。

また、ロード バランシングのためにオブジェクトのバックフィルを停止することもできます。データストリームは、複数のバックフィルを並行して実行できます。これにより、ソースの負荷が増加する可能性があります。負荷が著しく大きい場合は、各オブジェクトのバックフィルを停止してから、オブジェクトのバックフィルを 1 つずつ開始します。

ストリームのオブジェクトのバックフィルを停止するには、ストリームのすべてのオブジェクトに関する情報を取得するリクエストを行う必要があります。返される各オブジェクトには、オブジェクトを一意に識別する OBJECT_ID があります。OBJECT_ID を使用して、ストリームのバックフィルを停止します。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob

gcloud

gcloud を使用してストリームのオブジェクトのバックフィルを停止する方法については、こちらをクリックしてください。

同時 CDC タスクの最大数を変更する

次のコードでは、MySQL ストリームの同時変更データ キャプチャ(CDC)タスクの最大数を 7 に設定する方法を示しています。

この例では、updateMask パラメータに maxConcurrentCdcTasks フィールドを指定します。値を 7 に設定すると、CDC タスクの同時実行の最大数が前の値から 7 に変更されます。0 ~ 50 の値を使用できます。値を定義しない場合、または 0 として定義すると、ストリームに対して 5 つのシステムデフォルトが設定されます。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }
}

gcloud

gcloud の使用について詳しくは、こちらをクリックしてください。

同時バックフィル タスクの最大数を変更する

次のコードは、MySQL ストリームの同時バックフィル タスクの最大数を 25 に設定する方法を示しています。

この例では、updateMask パラメータに maxConcurrentBackfillTasks フィールドを指定します。値を 25 に設定すると、バックフィルの同時タスクの最大数が前の値から 25 に変更されます。0 ~ 50 の値を使用できます。値を定義しない場合、または 0 として定義すると、ストリームに対して 16 個のタスクがデフォルトに設定されます。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/
streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentBackfillTasks": "25"
      }
    }
}

gcloud

gcloud の使用について詳しくは、こちらをクリックしてください。

Oracle ソースの大規模オブジェクトのストリーミングを有効にする

Oracle ソースでのストリーミングでは、バイナリラージ オブジェクト(BLOB)、文字ラージ オブジェクト(CLOB)、国別文字ラージ オブジェクト(NCLOB)などの大規模オブジェクトのストリーミングを有効にできます。streamLargeObjects フラグを使用すると、新しいストリームと既存のストリームの両方に大きなオブジェクトを含めることができます。このフラグはストリーム レベルで設定されるため、大規模オブジェクト データ型の列を指定する必要はありません。

次の例は、大きなオブジェクトをストリーミングできるストリームを作成する方法を示しています。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleLobStream
{
  "displayName": "Oracle LOB stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1",
            "oracleTables": [
              {
                "table": "tableA",
                "oracleColumns": [
                  {
                    "column": "column1,column2"
                  }
                ]
              }
            ]
          }
        ]
      },
      "excludeObjects": {},
      "streamLargeObjects": {}
    }
  }
}

gcloud

gcloud を使用してストリームを更新する方法の詳細については、Google Cloud SDK のドキュメントをご覧ください。

ストリームを削除する

次のコマンドでは、ストリームを削除するリクエストを示します。

REST

DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID

gcloud

gcloud を使用してストリームを削除する方法の詳細については、こちらをご覧ください。