ストリームの管理

概要

このセクションでは、Datastream API を使用して以下を行う方法を説明します。

  • ストリームを作成する
  • ストリームとストリーム オブジェクトに関する情報を取得する
  • ストリームの開始、一時停止、再開、変更、ストリーム オブジェクトのバックフィルの開始と停止を行うことで、ストリームを更新する
  • 恒久的な障害が発生したストリームの復元
  • Oracle ストリームのラージ オブジェクトのストリーミングを有効にする
  • ストリームの削除

Datastream API を使用する方法は 2 つあります。REST API 呼び出しを行うか、Google Cloud CLI(CLI)を使用できます。

gcloud を使用して Datastream ストリームを管理する大まかな情報については、gcloud Datastream ストリームをご覧ください。

ストリームの作成

このセクションでは、ソースから宛先にデータを転送するために使用するストリームを作成する方法を学習します。以下の例は包括的なものではなく、Datastream の特定の機能を強調するものです。特定のユースケースに対応するには、これらの例と Datastream API リファレンス ドキュメントを併せて使用してください。

このセクションでは、次のユースケースについて説明します。

例 1: 特定のオブジェクトを BigQuery にストリーミングする

この例では、以下の方法について学習します。

  • MySQL から BigQuery へのストリーミング
  • ストリームにオブジェクトのセットを含める
  • ストリームの書き込みモードを追記専用として定義する
  • ストリームに含まれるすべてのオブジェクトをバックフィルする

次のリクエストは、schema1 からすべてのテーブルを、schema2 から 2 つの特定のテーブル(tableAtableC)をそれぞれ pull するリクエストです。イベントは BigQuery のデータセットに書き込まれます。

リクエストに customerManagedEncryptionKey パラメータが含まれていないため、Google Cloud 内部の鍵管理システムを使用してデータを暗号化し、CMEK を使用します

履歴バックフィル(またはスナップショット)の実行に関連する backfillAll パラメータが空の辞書({})に設定されます。つまり、Datastream は、ストリームに含まれるすべてのテーブルの履歴データをバックフィルします。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream
{
  "displayName": "MySQL CDC to BigQuery",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "schema1" },
          {
            "database": "schema2",
            "mysqlTables": [
              {
                "table": "tableA",
                "table": "tableC"
              }
            ]
          }
        ]
      },
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "dataFreshness": "900s"
    }
  },
  "backfillAll": {}
}

gcloud

gcloud を使用してストリームを作成する方法については、Google Cloud SDK のドキュメントをご覧ください。

例 2: PostgreSQL ソースを含むストリームから特定のオブジェクトを除外する

この例では、以下の方法について学習します。

  • PostgreSQL から BigQuery へのストリーミング
  • ストリームからオブジェクトを除外する
  • バックフィルからオブジェクトを除外する

次のコードは、ソース PostgreSQL データベースから BigQuery にデータを転送するために使用するストリームの作成リクエストを示しています。 ソース PostgreSQL データベースからストリームを作成するときは、リクエストに PostgreSQL 固有の項目を 2 つ指定する必要があります。

  • replicationSlot: レプリケーション用に PostgreSQL データベースを構成するには、レプリケーション スロットが前提条件となります。ストリームごとにレプリケーション スロットを作成する必要があります。
  • publication: パブリケーションは、変更の複製元となるテーブルのグループです。ストリームを開始する前に、パブリケーション名がデータベースに存在している必要があります。少なくとも、ストリームの includeObjects リストで指定されたテーブルがパブリケーションに含まれている必要があります。

過去のバックフィル(またはスナップショット)の実行に関連付けられた backfillAll パラメータは、1 つのテーブルを除外するように設定されています。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to BigQueryCloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "schema1"
        }
      },
      "excludeObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
        "postgresqlTables": [
          {
            "table": "tableA",
            "postgresqlColumns": [
              { "column": "column5" }
              ]
              }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
           "location": "us",
           "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {
    "postgresqlExcludedObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
            "postgresqlTables": [
              { "table": "tableA" }
            ]
          }
        ]
      }
    }
  }

gcloud

gcloud を使用してストリームを作成する方法については、Google Cloud SDK のドキュメントをご覧ください。

例 3: ストリームの追記専用の書き込みモードを指定する

BigQuery にストリーミングするときに、書き込みモード(merge または appendOnly)を定義できます。詳細については、書き込みモードを構成するをご覧ください。

ストリームを作成するリクエストで書き込みモードを指定しない場合は、デフォルトの merge モードが使用されます。

次のリクエストは、MySQL から BigQuery へのストリームを作成するときに appendOnly モードを定義する方法を示しています。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream
{
  "displayName": "My append-only stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

gcloud を使用してストリームを作成する方法については、Google Cloud SDK のドキュメントをご覧ください。

例 4: Cloud Storage の宛先にストリーミングする

この例では、以下の方法について学習します。

  • Oracle から Cloud Storage へのストリーミング
  • ストリームに含めるオブジェクトのセットを定義する
  • 保存データを暗号化するための CMEK を定義する

次のリクエストは、イベントを Cloud Storage のバケットに書き込むストリームを作成する方法を示しています。

このリクエストの例では、イベントは JSON 出力形式で書き込まれ、100 MB または 30 秒ごとに新しいファイルが作成されます(デフォルト値の 50 MB と 60 秒をオーバーライドしています)。

JSON 形式では、次のことが可能です。

  • パスに統合型スキーマ ファイルを含めるその結果、データストリームによって JSON データファイルと Avro スキーマ ファイルの 2 つのファイルが Cloud Storage に書き込まれます。スキーマ ファイルは、データファイルと同じ名前で、拡張子は .schema です。

  • gzip 圧縮を有効にする。これによって、Cloud Storage に書き込まれたファイルをデータストリームが圧縮するようにします。

backfillNone パラメータを使用すると、このリクエストでは、バックフィルなしに、進行中の変更のみが宛先にストリーミングされることが指定されます。

このリクエストでは、顧客管理の暗号鍵パラメータを指定します。このパラメータを使用すると、Google Cloud プロジェクト内の保存データの暗号化に使用する鍵を制御できます。このパラメータは、ソースから宛先にストリーミングされるデータを暗号化するために Datastream が使用する CMEK を指します。また、CMEK のキーリングも指定します。

キーリングの詳細については、Cloud KMS リソースをご覧ください。暗号鍵を使用したデータの保護の詳細については、Cloud Key Management Service(KMS)をご覧ください。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
    connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "GcsBucketCp",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "jsonFileFormat": {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      },
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillNone": {}
}

gcloud

gcloud を使用してストリームを作成する方法については、Google Cloud SDK のドキュメントをご覧ください。

ストリームの定義を検証する

ストリームを作成する前に、その定義を検証できます。これにより、すべての検証チェックに合格し、作成時にストリームが正常に実行されることが確実になります。

ストリームの検証では、次のことを確認します。

  • データストリームがソースからデータをストリーミングできるようにソースが適切に構成されているかどうか
  • ストリームがソースと宛先の両方に接続できるかどうか
  • ストリームのエンドツーエンド構成

ストリームを検証するには、リクエストの本文の前の URL に &validate_only=true を追加します。

POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"

このリクエストを行うと、Datastream が送信元と宛先に対して実行した検証チェックと、そのチェックに合格したかどうかが表示されます。検証で不合格となった場合は、失敗の理由と問題を解決する方法に関する情報が表示されます。

たとえば、ソースから宛先にストリーミングされるデータの暗号化に Datastream が使用する顧客管理の暗号鍵(CMEK)があるとします。ストリームの検証の一環として、Datastream は鍵が存在することと、その鍵を使用する権限が Datastream に付与されていることを確認します。これらの条件のいずれかが満たされない場合、ストリームを検証すると、次のエラー メッセージが返されます。

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

この問題を解決するには、指定した鍵が存在し、その鍵に対する cloudkms.cryptoKeys.get 権限が Datastream サービス アカウントにあることを確認してください。

適切な修正を行った後、リクエストを再度実行して、すべての検証チェックに合格することを確認します。上記の例では、CMEK_VALIDATE_PERMISSIONS チェックはエラー メッセージを返さなくなり、ステータスが PASSED になります。

ストリームに関する情報を取得する

次のコマンドでは、ストリームに関する情報を取得するリクエストを示します。これには以下の情報が含まれます。

  • ストリームの名前(固有識別子)
  • ストリームのユーザー フレンドリーな名前(表示名)
  • ストリームの作成日時と最終更新日時のタイムスタンプ
  • ストリームに関連付けられたソースと宛先の接続プロファイルに関する情報
  • ストリームの状態

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID

次のようなレスポンスが表示されます。

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          },
          {
            "schema": "schema3",
            "oracleTables": [
              { "table": "tableA" },
              { "table": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": {},
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillAll": {}
}

gcloud

gcloud を使用してストリームに関する情報を取得する方法について詳しくは、こちらをご覧ください。

ストリームを一覧表示する

次のコードは、指定したプロジェクトと場所内の全ストリームのリストを取得するリクエストを示しています。

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams

gcloud

gcloud を使用してすべてのストリームに関する情報を取得する方法について詳しくは、こちらをクリックしてください。

ストリームのオブジェクトを一覧表示する

次のコマンドでは、ストリームのすべてのオブジェクトに関する情報を取得するリクエストを示します。

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects

gcloud

gcloud を使用してストリームのすべてのオブジェクトに関する情報を取得する方法について詳しくは、こちらをクリックしてください。

返されるオブジェクトのリストは、次のようになります。

REST

{
  "streamObjects": [
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object1",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object2",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object3",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

gcloud

gcloud を使用してストリームのオブジェクトを一覧表示する方法の詳細については、こちらをクリックしてください。

ストリームを開始する

次のコマンドでは、ストリームを開始するリクエストを示します。

リクエストで updateMask パラメータを使用することによって、指定したフィールドのみをリクエストの本文に含めば済むようになります。ストリームを開始するには、state フィールドの値を CREATED から RUNNING に変更します。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

gcloud を使用して配信を開始する方法について詳しくは、こちらをご覧ください。

ストリームを一時停止する

次のコマンドでは、実行中のストリームを一時停止するリクエストを示します。

この例では、updateMask パラメータに state フィールドを指定します。ストリームを一時停止することで、その状態は RUNNING から PAUSED に変わります。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "PAUSED"
}

gcloud

gcloud を使用してストリームを一時停止する方法について詳しくは、こちらをご覧ください。

ストリームを再開する

次のコマンドでは、一時停止されたストリームを再開するリクエストを示します。

この例では、updateMask パラメータに state フィールドを指定します。ストリームを再開すると、状態は PAUSED から RUNNING に変わります。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

gcloud を使用して配信を再開する方法について詳しくは、こちらをご覧ください。

ストリームを復元する

RunStream メソッドを使用すると、恒久的に失敗したストリームを復元できます。ソース データベースの種類ごとに、可能なストリーム復元オペレーションの定義が異なります。詳細については、ストリームを復元するをご覧ください。

MySQL または Oracle ソースのストリームを復元する

次のコードサンプルは、さまざまなログファイル位置から MySQL または Oracle ソースのストリームを復元するリクエストを示しています。

REST

現在の位置からストリームを復元します。これはデフォルトのオプションです。

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

次の使用可能な位置からストリームを復元します:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "nextAvailableStartPosition": {}
  }
}

ストリームを最新の位置から復元します。

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "mostRecentStartPosition": {}
  }
}

ストリームを特定の位置から復元します(MySQL)。

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

以下を置き換えます。

  • NAME_OF_THE_LOG_FILE: そこからストリームを復元するログファイルの名前
  • POSITION: ログファイル内の、ストリームを復元する位置。値を指定しない場合、Datastream はファイルの先頭からストリームを復元します。

次に例を示します。

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 4
      }
    }
  }
}

ストリームを特定の位置から復元します(Oracle)。

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
scn を、そこからストリームを復元する redo ログファイル内のシステム変更番号(SCN)に置き換えます。この項目は必須です。

次に例を示します。

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 234234
      }
    }
  }
}

利用可能な復元オプションについて詳しくは、ストリームを復元するをご覧ください。

gcloud

gcloud を使用したストリームの復元はサポートされていません。

PostgreSQL ソースのストリームを復元する

次のコードサンプルは、PostgreSQL ソースのストリームを復元するリクエストを示しています。復元中、ストリームはストリーム用に構成されたレプリケーション スロットの最初のログ シーケンス番号(LSN)から読み取りを開始します。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

次に例を示します。

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run

レプリケーション スロットを変更する場合は、最初に新しいレプリケーション スロット名を更新します。

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot
{
  "sourceConfig": {
    "postgresqlSourceConfig": {
      "replicationSlot": "NEW_REPLICATION_SLOT_NAME"
    }
  }
}

gcloud

gcloud を使用したストリームの復元はサポートされていません。

SQL Server ソースのストリームを復元する

次のコードサンプルは、SQL Server ソースのストリームを復元するリクエストの例を示しています。

REST

使用可能な最初の位置からストリームを復元します:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

次に例を示します。

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run

指定のログシーケンス番号からストリームを復元します:

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": lsn
      }
    }
  }
}

次に例を示します。

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": 0000123C:0000BA78:0004
      }
    }
  }
}

gcloud

gcloud を使用したストリームの復元はサポートされていません。

特定の位置からストリームを開始または再開する

MySQL ソースと Oracle ソースでは、特定の位置からストリームを開始したり、一時停止したストリームを再開したりできます。これは、外部ツールを使用してバックフィルを実行する場合や、指定した位置から CDC を開始する場合に便利です。MySQL ソースの場合は binlog 位置を、Oracle ソースの場合は redo ログファイル内のシステム変更番号(SCN)を指定する必要があります。

次のコードは、特定の位置から作成済みのストリームを開始または再開するリクエストを示しています。

特定のバイナリログ位置からストリームを開始または再開する(MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

以下を置き換えます。

  • NAME_OF_THE_LOG_FILE: そこからストリームを開始するログファイルの名前。
  • POSITION: ログファイル内のストリームの開始位置。値を指定しない場合、Datastream はファイルの先頭から読み取りを開始します。

次に例を示します。

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 2
      }
    }
  }
}

gcloud

gcloud を使用して特定の位置からストリームを開始または再開することはできません。gcloud を使用してストリームを開始または再開する方法については、Cloud SDK のドキュメントをご覧ください。

redo ログファイル内の特定のシステム変更番号からストリームを開始または再開する(Oracle):

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
scn を、そこからストリームを開始する redo ログファイル内のシステム変更番号(SCN)に置き換えます。この項目は必須です。

次に例を示します。

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 123123
      }
    }
  }
}

gcloud

gcloud を使用して特定の位置からストリームを開始または再開することはできません。gcloud を使用してストリームを開始する方法については、Cloud SDK のドキュメントをご覧ください。

ストリームを変更する

次のコマンドでは、ファイルを 75 MB または 45 秒ごとにローテーションするようにストリームのファイル ローテーション構成を更新するリクエストを示します。

この例では、updateMask パラメータで指定されたフィールドに fileRotationMb フィールドと fileRotationInterval フィールドが含まれ、それぞれ destinationConfig.gcsDestinationConfig.fileRotationMb フラグと destinationConfig.gcsDestinationConfig.fileRotationInterval フラグで表されます。

REST

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

次のコマンドでは、データストリームが Cloud Storage に書き込むファイルのパスに統合型スキーマ ファイルを含めるリクエストを示します。この結果、データストリームによって JSON データファイルと Avro スキーマ ファイルの 2 つのファイルが書き込まれます。

この例では、destinationConfig.gcsDestinationConfig.jsonFileFormat フラグにより表された jsonFileFormat フィールドが指定されます。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }  
    }
  }
}

次のコマンドでは、進行中のデータに加え、既存のデータをソース データベースから宛先にレプリケートするデータストリームのリクエストを示します。

コードの oracleExcludedObjects セクションには、宛先へのバックフィルが制限されているテーブルとスキーマが示されます。

この例では、schema3 の tableA を除くすべてのテーブルとスキーマがバックフィルされます。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schema": "schema3",
          "oracleTables": [
            { "table": "tableA" }
          ]
        }
      ]
    }
  }
}  

gcloud

gcloud を使用してストリームを変更する方法の詳細については、こちらをクリックしてください。

ストリームのオブジェクトのバックフィルを開始する

データストリームのストリームでは、過去のデータをバックフィルすることや、進行中の変更を宛先にストリーミングすることが可能です。進行中の変更は常に、ソースから宛先にストリーミングされます。ただし、履歴データをストリーミングするかどうかは指定できます。

過去のデータをソースから宛先にストリーミングする場合は、backfillAll パラメータを使用します。

Datastream では、特定のデータベース テーブルの履歴データのみをストリーミングすることもできます。これを行うには、backfillAll パラメータを使用して、履歴データを不要なテーブルを除外します。

進行中の変更のみを宛先にストリーミングする場合は、backfillNone パラメータを使用します。その後、データストリームですべてのソースのスナップショットをソースから宛先にストリーミングする場合は、そのデータを含むオブジェクトのバックフィルを手動で開始する必要があります。

オブジェクトのバックフィルを開始するもう一つの理由は、データがソースと宛先の間で同期していない場合です。たとえば、ユーザーが誤って宛先のデータを削除してしまうと、そのデータが失われています。この場合、オブジェクトのバックフィルを開始することが「リセットの仕組み」として機能します。これは、すべてのデータが一度に宛先へストリーミングされるためです。その結果、ソースと宛先の間でデータが同期します。

ストリームのオブジェクトのバックフィルを開始する前には、オブジェクトに関する情報を取得する必要があります。

各オブジェクトには、オブジェクトを一意に識別する OBJECT_ID があります。OBJECT_ID を使用して、ストリームのバックフィルを開始します。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob

gcloud

gcloud を使用してストリームのオブジェクトのバックフィルを開始する方法については、Google Cloud SDK のドキュメントをご覧ください。

ストリームのオブジェクトのバックフィルを停止する

ストリームのオブジェクトのバックフィルを開始した後は、それを停止できます。たとえば、ユーザーがデータベース スキーマを変更すると、スキーマやデータが壊れる可能性があります。このスキーマやデータが宛先にストリーミングされないようにするには、オブジェクトのバックフィルを停止します。

また、ロード バランシングのためにオブジェクトのバックフィルを停止することもできます。データストリームは、複数のバックフィルを並行して実行できます。これにより、ソースの負荷が増加する可能性があります。負荷が著しく大きい場合は、各オブジェクトのバックフィルを停止してから、オブジェクトのバックフィルを 1 つずつ開始します。

ストリームのオブジェクトのバックフィルを停止する前に、ストリームのすべてのオブジェクトに関する情報を取得するリクエストを行う必要があります。返される各オブジェクトには、オブジェクトを一意に識別する OBJECT_ID が含まれます。OBJECT_ID を使用して、ストリームのバックフィルを停止します。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob

gcloud

gcloud を使用してストリームのオブジェクトのバックフィルを停止する方法について詳しくは、こちらをクリックしてください。

同時実行 CDC タスクの最大数を変更する

次のコードは、MySQL ストリームの同時実行変更データ キャプチャ(CDC)タスクの最大数を 7 に設定する方法を示しています。

この例では、updateMask パラメータに maxConcurrentCdcTasks フィールドを指定します。この値を 7 に設定すると、同時実行 CDC タスク数が以前の値から 7 に変更されます。0~50(50を含む) の値を使用できます。値を定義しないか 0 と定義した場合、そのストリームにはシステムのデフォルトである 5 つのタスクが設定されます。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }  
}

gcloud

gcloud の使用方法について詳しくは、こちらをクリックしてください。

同時実行バックフィル タスクの最大数を変更する

次のコードは、MySQL ストリームの同時実行バックフィル タスクの最大数を 25 に設定する方法を示しています。

この例では、updateMask パラメータに maxConcurrentBackfillTasks フィールドを指定します。この値を 25 に設定すると、最大同時バックフィル タスク数が以前の値から 25 に変更されます。0~50(50を含む) の値を使用できます。値を定義しないか 0 と定義した場合、システムのデフォルトである 16 個のタスクがストリームに設定されます。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/
streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentBackfillTasks": "25"
      }
    }  
}

gcloud

gcloud の使用方法について詳しくは、こちらをクリックしてください。

Oracle ソースのラージ オブジェクトのストリーミングを有効にする

Oracle ソースでのストリーミングでは、バイナリラージ オブジェクト(BLOB)、文字ラージ オブジェクト(CLOB)、国別文字ラージ オブジェクト(NCLOB)などの大規模オブジェクトのストリーミングを有効にできます。streamLargeObjects フラグを使用すると、新しいストリームでも既存のストリームでもラージ オブジェクトを作成できますこのフラグはストリームレベルで設定されラージ オブジェクト データ型の列を指定する必要はありません。

次の例は、大きなオブジェクトをストリーミングできるストリームを作成する方法を示しています。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleLobStream
{
  "displayName": "Oracle LOB stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1",
            "oracleTables": [
              {
                "table": "tableA",
                "oracleColumns": [
                  {
                    "column": "column1,column2"
                  }
                ]
              }
            ]
          }
        ]
      },
      "excludeObjects": {},
      "streamLargeObjects": {}
    }
  }
}

gcloud

gcloud を使用してストリームを更新する方法の詳細については、Google Cloud SDK のドキュメントをご覧ください。

ストリームを削除する

次のコマンドでは、ストリームを削除するリクエストを示します。

REST

DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID

gcloud

gcloud を使用してストリームを削除する方法の詳細については、こちらをクリックしてください。

次のステップ