多くのデータソースが分離されている場合、組織全体の企業データへのアクセス、特にリアルタイムでの管理は困難な場合があります。その結果、データへのアクセスが限定的かつ低速になり、組織がイントロスペクションを行う能力が妨げられます。
Datastream は、オンプレミスやクラウドベースのさまざまなデータソースからデータを変更するためにほぼリアルタイムでアクセスし、組織データへのアクセスを実現します。Datastream は、シンプルなセットアップ エクスペリエンスと、統合された統合された API を提供し、組織全体で最新のエンタープライズ データに誰でもアクセスできるようにすることで、ほぼリアルタイムの統合シナリオを実現します。
このようなシナリオの 1 つは、ソース データベースからクラウドベースのストレージ サービスまたはメッセージング キューにデータを転送し、このストレージ サービスまたはメッセージング キューと通信する他のアプリケーションやサービスから読み取り可能な形式にこのデータを変換することです。
このチュートリアルでは、Datastream を使用して、ソース Oracle データベースから Cloud Storage バケット内のフォルダにスキーマ、テーブル、データを転送する方法を学習します。Cloud Storage は、Google Cloud にデータを保存してアクセスするためのウェブサービスです。このサービスは、Google のクラウドのパフォーマンスと拡張性に、高度なセキュリティと共有機能を兼ね備えたものです。
この情報を転送先の Cloud Storage バケット内のフォルダに転送する一環として、Datastream はこの情報を Avro に変換します。Avro は、JavaScript Object Notation(JSON)で記述されたスキーマによって定義されます。この変換により、異なるデータソース間で均一な方法でデータを読み取ることができます。
目標
このチュートリアルでは、以下の方法について学習します。- 環境変数を設定します。これらの変数は、接続プロファイルとストリームの両方を作成および管理するために Datastream にリクエストを行うときに使用します。
- Cloud Storage のソース データベースと宛先バケットの接続プロファイルを作成、管理します。これらの接続プロファイルを作成すると、ソース データベースと宛先の Cloud Storage バケットに関する情報を含むレコードが作成されます。Datastream のストリームは、接続プロファイルの情報を使用して、ソース データベースから転送先バケットのフォルダにデータを転送します。
- ストリームを作成して管理します。Datastream は、このストリームを使用して、ソース データベースから転送先バケットのフォルダにデータ、スキーマ、テーブルを転送します。
- Datastream が、ソース Oracle データベースのスキーマに関連付けられているデータとテーブルを、移行先バケットのフォルダに転送し、このデータを Avro ファイル形式に変換していることを確認します。
- Datastream で作成したリソースをクリーンアップして、今後割り当ての消費や課金が発生しないようにします。
費用
このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。
- Cloud Storage
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
始める前に
- Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
-
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
-
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
- Datastream API を有効にします。
- ユーザー アカウントに Datastream 管理者のロールが割り当てられていることを確認します。
- Datastream がアクセスできるソース データベースがあることを確認します。このチュートリアルでは、ソースとして Oracle データベースを使用します。
- Datastream パブリック IP アドレスからの受信接続を許可するようにソース データベースを構成します。すべての Datastream リージョンおよび関連するパブリック IP アドレスのロケーションにアクセスするには、IP 許可リストとリージョンをご覧ください。
- IP 許可リスト、転送 SSH トンネル、または VPC ピアリングのネットワーク接続方法を使用して、Datastream がアクセスできる宛先 Cloud Storage バケットが構成されていることを確認します。
- コピー元データベースに、データストリームが転送先の Cloud Storage バケット内のフォルダに転送できるデータ、テーブル、スキーマがあることを確認してください。
- Cloud Shell をダウンロードしてインストールします。このクライアント アプリケーションでは、コマンドラインを使ってクラウド リソース(Datastream を含む)にアクセスできます。
jq
ユーティリティをインストールして構成します。このユーティリティは、軽量で柔軟なコマンドライン JSON プロセッサです。このプロセッサを使用して、複雑なcURL
コマンドを読みやすいテキストで表示します。
環境変数の設定
この手順では、次の変数を設定します。
$PROJECT
: この変数は Google Cloud プロジェクトに関連付けられています。Google Cloud のリソースを割り当てて使用するには、リソースがプロジェクトに含まれている必要があります。$TOKEN
: この変数はアクセス トークンに関連付けられています。アクセス トークンは、Cloud Shell が REST API を介して Datastream でタスクを実行するために使用するセッションです。
Cloud Shell アプリケーションを起動します。
Google アカウントを使用してアプリケーションを認証したら、
gcloud auth login
と入力します。Do you want to continue (Y/n)?
プロンプトで、「Y
」と入力します。ウェブブラウザを開いて、その URL をブラウザにコピーします。
Google アカウントを使用して Google Cloud SDK に対する認証を行います。[ログイン] ページにコードが表示されます。このコードはアクセス トークンです。
アクセス トークンをコピーし、Cloud Shell アプリケーションの
Enter verification code:
パラメータに貼り付けて、Enter
を押します。プロンプトで「
PROJECT="[YOUR_PROJECT_NAME]"
」と入力して、$PROJECT
環境変数を Google Cloud プロジェクトに設定します。プロンプトで
gcloud config set project [YOUR_PROJECT_NAME]
を入力して、作業するプロジェクトを Google Cloud プロジェクトに設定します。コマンド プロンプトは、現在アクティブなプロジェクトを反映するように更新され、次の形式が適用されます。
[USERNAME]@cloudshell:~ ([YOUR_PROJECT_NAME])$
プロンプトが表示されたら、「
TOKEN=$(gcloud auth print-access-token)
」と入力してアクセス トークンを取得し、変数として保存します。プロンプトで次のコマンドを入力して、
$PROJECT
変数と$TOKEN
変数が正しく設定されていることを確認します。echo $PROJECT
echo $TOKEN
変数を設定したので、Datastream にリクエストを発行して、接続プロファイルとストリームの両方を作成して管理できます。
接続プロファイルの作成と管理
このセクションでは、Cloud Storage のソース Oracle データベースと宛先バケットの接続プロファイルを作成し、管理します。
これらの接続プロファイルを作成すると、ソース データベースと宛先の Cloud Storage バケットに関する情報を含むレコードが作成されます。Datastream は、接続プロファイルの情報を使用して、ソース データベースから転送先バケットのフォルダにデータを転送します。
接続プロファイルの作成と管理には以下が含まれます。
- Cloud Storage のソース Oracle データベースと宛先バケットの接続プロファイルの作成
- 接続プロファイルに関する情報を取得する
- 接続プロファイルの変更
- ソース Oracle 接続プロファイルで API 呼び出しを実行するこの呼び出しでは、データベース内に関連付けられているオブジェクトを確認できます。これらのオブジェクトには、データベースのデータが含まれるスキーマとテーブルが含まれます。Datastream を使用してストリームを構成する場合は、データベースからすべてのオブジェクトを pull するのではなく、オブジェクトのサブセット(データベースの特定のテーブルやスキーマのみ)を取得することをおすすめします。detect API を使用すると、pull するデータベース オブジェクトのサブセットを検出(または検出)できます。
接続プロファイルの作成
この手順では、2 つの接続プロファイルを作成します。1 つはソース Oracle データベース用、もう 1 つは Cloud Storage の宛先バケット用です。
ソース Oracle データベースへの接続プロファイルを作成します。プロンプトで、次のコマンドを入力します。
ORACLE="{\"displayName\":\"[DISPLAY_NAME]\",\"oracle_profile\":{\"hostname\":\"[HOSTNAME],\"username\":\"[USERNAME]\",\"password\":\"[PASSWORD]\",\"database_service\":\"[DATABASE_SERVICE]\",\"port\":[PORT_NUMBER]},\"no_connectivity\":{}}"
次の表は、ソース Oracle データベースのパラメータ値を理解する際に役立ちます。
パラメータ値 次に変更 [DISPLAY_NAME] ソース データベースへの接続プロファイルの表示名。 [HOSTNAME] ソース データベース サーバーのホスト名。 [USERNAME] ソース データベースのアカウントのユーザー名(例: ROOT)。 [PASSWORD] ソース データベースのアカウントのパスワード。 [DATABASE_SERVICE] ソース データベースが確実に保護され、モニタリングされるサービス。通常、Oracle データベースの場合、データベース サービスは ORCL です。 [PORT_NUMBER] ソース データベース用に予約されたポート番号。Oracle データベースの場合、ポート番号は通常 1521 です。 プロンプトで
echo $ORACLE | jq
コマンドを入力すると、作成したソース接続プロファイルが読みやすいテキストで表示されます。{ "displayName": "[DISPLAY_NAME]", "oracle_profile": { "hostname": "[HOSTNAME]", "username": "[USERNAME]", "password": "[PASSWORD]", "database_service": "[DATABASE_SERVICE]", "port": [PORT_NUMBER] }, "no_connectivity": {} }
Oracle 接続プロファイルを送信して、作成できるようにします。プロンプトで、次のコマンドを入力します。
curl -X POST -d $ORACLE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/connectionProfiles?connection_profile_id=[SOURCE_CONNECTION_PROFILE_ID]
このコマンドのパラメータ値を理解するには、次の表を使用してください。
パラメータ値 次に変更 [DATASTREAM_API_VERSION] Datastream API の現在のバージョン(例: v1
)。[PROJECT_PATH] Google Cloud プロジェクトのフルパス(例: projects/$PROJECT/locations/[YOUR_PROJECT_LOCATION]))。 [SOURCE_CONNECTION_PROFILE_ID] この接続プロファイル用に予約された一意の識別子(cp-1 など)。 次のコード行が表示されることを確認します。
{ "name": "[PROJECT_PATH]/operations/operation-[SOURCE_CONNECTION_PROFILE_OPERATION_ID]", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.[DATASTREAM_API_VERSION].OperationMetadata", "createTime": "[DATE_AND_TIME_STAMP]", "target": "datastream.googleapis.com/[DATASREAM_VERSION]/[PROJECT_PATH]/connectionProfiles/[SOURCE_CONNECTION_PROFILE_ID]", "verb": "create", "requestedCancellation": false, "apiVersion": "[DATASTREAM_API_VERSION]" }, "done": false }
Cloud Storage の転送先バケットへの接続プロファイルを作成します。プロンプトで、次のコマンドを入力します。
GOOGLECLOUDSTORAGE="{\"displayName\":\"[DISPLAY_NAME]\",\"gcs_profile\":{\"bucket_name\":\"[BUCKET_NAME]\",\"root_path\":\"/[FOLDER_PATH]\"},\"no_connectivity\":{}}"
転送先バケットのパラメータ値を理解するには、次の表をご覧ください。
パラメータ値 次に変更 [DISPLAY_NAME] 転送先バケットの接続プロファイルの表示名。 [BUCKET_NAME] 転送先バケットの名前。 [FOLDER_PATH] Datastream がソース データベースからデータを転送する転送先バケット内のフォルダ(例: /root/path)。 プロンプトで
echo $GOOGLECLOUDSTORAGE | jq
コマンドを入力すると、作成した宛先接続プロファイルが読みやすいテキストで表示されます。{ "displayName": "[DISPLAY_NAME]", "gcs_profile": { "bucket_name": "[BUCKET_NAME]", "root_path": "/[FOLDER_PATH]" }, "no_connectivity": {} }
Cloud Storage 接続プロファイルを作成できるように、接続プロファイルを送信する。プロンプトで、次のコマンドを入力します。
curl -X POST -d $GOOGLECLOUDSTORAGE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/connectionProfiles?connection_profile_id=[DESTINATION_CONNECTION_PROFILE_ID]
次のコード行が表示されることを確認します。
{ "name": "[PROJECT_PATH]/operations/operation-[DESTINATION_CONNECTION_PROFILE_OPERATION_ID]", "metadata": { "@type": "@type": "type.googleapis.com/google.cloud.datastream.[DATASTREAM_API_VERSION].OperationMetadata", "createTime": "[DATE_AND_TIME_STAMP]", "target": "datastream.googleapis.com/[DATASREAM_VERSION]/[PROJECT_PATH]/connectionProfiles/[DESTINATION_CONNECTION_PROFILE_ID]", "verb": "create", "requestedCancellation": false, "apiVersion": "[DATASTREAM_API_VERSION]" }, "done": false }
両方の接続プロファイルが作成されていることを確認します。プロンプトで、次のコマンドを入力します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/connectionProfiles
送信元と宛先の両方の接続プロファイルで、2 つの結果が返されることを確認します。
{ "connectionProfiles": [ { "name": "[PROJECT_PATH]/connectionProfiles/[DESTINATION_CONNECTION_PROFILE_ID]", "createTime": "[DATE_AND_TIME_STAMP]", "updateTime": "[DATE_AND_TIME_STAMP]", "displayName": "[DISPLAY_NAME]", "gcsProfile": { "bucketName": "[BUCKET_NAME]", "rootPath": "[FOLDER_PATH]" }, "noConnectivity": {} }, { "name": "[PROJECT_PATH]/connectionProfiles/[SOURCE_CONNECTION_PROFILE_ID]", "createTime": "[DATE_AND_TIME_STAMP]", "updateTime": "[DATE_AND_TIME_STAMP]", "displayName": "[DISPLAY_NAME]", "oracleProfile": { "hostname": "[HOSTNAME]", "port": [PORT_NUMBER], "username": "[USERNAME]", "databaseService": "[DATABASE_SERVICE]" }, "noConnectivity": {} } ] }
接続プロファイルの管理
この手順では、ソース Oracle データベースと宛先バケットに対して作成した接続プロファイルを Cloud Storage で管理します。次の操作が含まれます。
- 転送先の Cloud Storage 接続プロファイルに関する情報を取得する
- この接続プロファイルを変更します。このチュートリアルでは、転送先の Cloud Storage バケットのフォルダを /root/tutorial に変更します。Datastream は、ソース データベースからこのフォルダにデータを転送します。
- ソース Oracle 接続プロファイルで API 呼び出しを実行する
転送先の Cloud Storage 接続プロファイルに関する情報を取得します。プロンプトで、次のコマンドを入力します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/connectionProfiles/[DESTINATION_CONNECTION_PROFILE_ID]
この接続プロファイルに関する情報が表示されていることを確認します。
{ "name": "[PROJECT_PATH]/connectionProfiles/[DESTINATION_CONNECTION_PROFILE_ID]", "createTime": "[DATE_AND_TIME_STAMP]", "updateTime": "[DATE_AND_TIME_STAMP]", "displayName": "[DISPLAY_NAME]", "gcsProfile": { "bucketName": "[BUCKET_NAME]", "rootPath": "[FOLDER_PATH]" }, "noConnectivity": {} }
この接続プロファイルを変更します。まず、UPDATE 変数を設定します。この変数には、変更する接続プロファイルの値が含まれます。このチュートリアルでは、転送先バケットのフォルダを /root/tutorial に変更します。
変数を設定するには、プロンプトで次のコマンドを入力します。
UPDATE="{\"gcsProfile\":{\"rootPath\":\"/root/tutorial\"}}"
プロンプトで、次のコマンドを入力します。
curl -X PATCH -d $UPDATE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/connectionProfiles/[DESTINATION_CONNECTION_PROFILE_ID]?update_mask=gcsProfile.rootPath
次のコード行が表示されることを確認します。
{ "name": "[PROJECT_PATH]/operations/operation-[DESTINATION_CONNECTION_PROFILE_OPERATION_ID]", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.[DATASTREAM_API_VERSION].OperationMetadata", "createTime": "[DATE_AND_TIME_STAMP]", "target": "[PROJECT_PATH]/connectionProfiles/[DESTINATION_CONNECTION_PROFILE_ID]", "verb": "update", "requestedCancellation": false, "apiVersion": "[DATASTREAM_API_VERSION]" }, "done": false }
接続プロファイルが変更されていることを確認します。プロンプトで、次のコマンドを入力します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/connectionProfiles/[DESTINATION_CONNECTION_PROFILE_ID]
Cloud Storage 接続プロファイルの宛先バケットのフォルダが /root/tutorial になったことを確認します。
{ "name": "[PROJECT_PATH]/connectionProfiles/[DESTINATION_CONNECTION_PROFILE_ID]", "createTime": "[DATE_AND_TIME_STAMP]", "updateTime": "[DATE_AND_TIME_STAMP]", "displayName": "[DISPLAY_NAME]", "gcsProfile": { "bucketName": "[BUCKET_NAME]", "rootPath": "/root/tutorial" }, "noConnectivity": {} }
Datastream 検出 API を使用して、ソース Oracle データベースのスキーマとテーブルを検出します。Datastream では、ソース接続プロファイルを介してこのデータベースにアクセスできます。
Oracle データベースのスキーマを確認します。プロンプトで、次のコマンドを入力します。
curl -X POST -d "{"connection_profile_name":"projects/[YOUR_PROJECT_NUMBER]/locations/[YOUR_PROJECT_LOCATION]/connectionProfiles/[SOURCE_CONNECTION_PROFILE_ID]"}" -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/connectionProfiles:discover
Datastream がデータベースのすべてのスキーマを取得することを確認します。
データベース内のスキーマのテーブルを取得します。このチュートリアルでは、Discover API を使用して ROOT スキーマのテーブルを取得します。ただし、データベース内の任意のスキーマのテーブルを検出できます。
プロンプトで、次のコマンドを入力します。
curl -X POST -d "{\"connection_profile_name\":\"projects/[YOUR_PROJECT_NUMBER]/locations/[YOUR_PROJECT_LOCATION]/connectionProfiles/[SOURCE_CONNECTION_PROFILE_ID]\", \"oracle_rdbms\":{\"oracleSchemas\":[{\"schemaName\":\"ROOT\"}]}}" -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/connectionProfiles:discover
Datastream が、指定されたスキーマ(このチュートリアルでは ROOT スキーマ)のすべてのテーブルを取得することを確認します。
ソース Oracle データベースと宛先バケットの接続プロファイルを作成して Cloud Storage に管理できたので、Datastream でストリームの作成と管理を行う準備ができました。
ストリームの作成と管理
このセクションでは、ストリームを作成して管理します。Datastream は、このストリームを使用して、ソース データベースから転送先 Cloud Storage バケットのフォルダにデータ、スキーマ、テーブルを転送します。
ストリームの作成と管理には以下が含まれます。
- ストリームを検証し、ストリームが正常に実行され、すべての検証チェックに合格していることを確認します。次のチェック項目があります:
- Datastream がソースからデータをストリーミングできるようにソースが適切に構成されているかどうか。
- ストリームが送信元と宛先の両方に接続できるかどうか。
- ストリームのエンドツーエンド構成。
- 次のリストを使用してストリームを作成します。
- 許可リスト。このリストは、Datastream が Cloud Storage の転送先バケットのフォルダに転送できるソース データベース内のテーブルとスキーマを指定します。このチュートリアルでは /root/tutorial フォルダです。
- 拒否リスト。このリストは、Datastream が Cloud Storage の転送先バケット内のフォルダへの転送を制限するソース データベース内のテーブルとスキーマを指定します。
- ストリームに関する情報の取得
- ストリームの変更
- ストリームの開始。Datastream が、ソース データベースからコピー先の Cloud Storage バケット内のフォルダにデータ、スキーマ、テーブルを転送することができる。
- Fetch Errors API を使用してストリームに関連するエラーを検出する
- ストリームを一時停止します。ストリームが一時停止されると、Datastream はソース データベースから宛先バケットに新しいデータを pull しません。
- 一時停止したストリームを再開すると、DataStream は引き続き転送先バケットにデータを転送できます。
ストリームの作成
この手順では、ソース Oracle データベースから宛先 Cloud Storage バケットのフォルダにストリームを作成します。作成するストリームには、許可リストと拒否リストの両方が含まれます。
SCHEMAS 変数を設定します。この変数は、Datastream でソース データベースから取得して Cloud Storage の転送先バケットの /root/tutorial フォルダに転送するデータとテーブルを含むスキーマを定義します。このチュートリアルでは、SCHEMAS 変数を ROOT スキーマに関連付けます。
プロンプトで、次のコマンドを入力します。
SCHEMAS="{\"oracleSchemas\":[{\"schemaName\":\"ROOT\"}]}"
プロンプトで echo $SCHEMAS | jq コマンドを入力すると、この変数に定義した ROOT スキーマが読みやすいテキストで表示されます。
ストリームの作成プロンプトで、次のコマンドを入力します。
STREAM="{\"display_name\":\"[DISPLAY_NAME]\",\"source_config\":{\"source_connection_profile_name\":\"[PROJECT_PATH]/connectionProfiles/[SOURCE_CONNECTION_PROFILE_ID]",\"oracle_source_config\":{\"allowlist\":$SCHEMAS,\"rejectlist\":{}}},\"destination_config\":{\"destination_connection_profile_name\":\"[PROJECT_PATH]/connectionProfiles/[DESTINATION_CONNECTION_PROFILE_ID]\",\"gcs_destination_config\":{\"file_rotation_mb\":5,\"file_rotation_interval\":{\"seconds\":15},\"avro_file_format\":{}},\"backfill_all\":{}}"
}プロンプトで echo $STREAM | jq コマンドを入力すると、作成したストリームが読みやすいテキストで表示されます。
{ "display_name": "[DISPLAY_NAME]", "source_config": { "source_connection_profile_name": "[PROJECT_PATH]/connectionProfiles/[SOURCE_CONNECTION_PROFILE_ID]", "oracle_source_config": { "allowlist": { "oracleSchemas": [ { "schemaName": "ROOT" } ] }, "rejectlist": {} } }, "destination_config": { "destination_connection_profile_name": "[PROJECT_PATH]/connectionProfiles/[DESTINATION_CONNECTION_PROFILE_ID]", "gcs_destination_config": { "file_rotation_mb": 5, "file_rotation_interval": { "seconds": 15 }, "avro_file_format": {} } }, "backfill_all": {} }
この表は、ストリームの次のパラメータを理解するのに役立ちます。
パラメータ 説明 allowlist テーブルとデータを含むスキーマ。ソース データベースから Cloud Storage の転送先バケットのフォルダに転送されます。このチュートリアルでは、ROOT スキーマのすべてのテーブルとデータ(このスキーマのみ)が、宛先バケットの /root/tutorial フォルダに転送されます。 rejectlist テーブルとデータを含むスキーマ。これらは Cloud Storage の転送先バケットのフォルダには転送されません。このチュートリアルでは、{} 値は、ソース データベースからのテーブルとデータが宛先バケットに転送されないように指定します。 file_rotation_mb ソース データベースから Cloud Storage 転送先バケットのフォルダに転送されるデータを含むファイルのサイズ(MB 単位)。このチュートリアルでは、データがソース データベースから取得されるため、5 MB のファイルに書き込まれます。このサイズを超えるデータは複数の 5 MB ファイルに分割されます。 file_rotation_interval Datastream が Cloud Storage 転送先バケットのフォルダ内の既存のファイルをクローズし、ソース データベースから転送されるデータを含む別のファイルを開くまでの秒数。このチュートリアルでは、ファイル ローテーション間隔を 15 秒に設定します。 avro_file_format Datastream がソース データベースから Cloud Storage 転送先バケットのフォルダに転送するファイルの形式。このチュートリアルでは、Avro はファイル形式です。
backfill_all このパラメータは過去のバックフィルに関連付けられています。このパラメータに空の辞書({})を設定すると、Datastream はバックフィルを行います。
- ソース データベースから移行先への履歴データ、および継続的なデータ変更。
- 移行元から移行先へのスキーマとテーブル。
ストリームを検証し、ストリームが正常に実行されること、およびすべての検証チェックに合格していることを確認します。プロンプトで、次のコマンドを入力します。
curl -X POST -d $STREAM -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" "https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/streams?stream_id=[STREAM_ID]&validate_only=true"
コードの
{}
行が表示されていることを確認します。これは、ストリームがすべての検証チェックに合格し、ストリームに関連付けられたエラーがないことを示します。ストリームを作成できるように送信します。プロンプトで、次のコマンドを入力します。
curl -X POST -d $STREAM -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/streams?stream_id=[STREAM_ID]
次のコード行が表示されることを確認します。
{ "name": "[PROJECT_PATH]/operations/operation-[STREAM_OPERATION_ID]", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.[DATASTREAM_API_VERSION].OperationMetadata", "createTime": "[DATE_AND_TIME_STAMP]", "target": "[PROJECT_PATH]/streams/[STREAM_ID]", "verb": "create", "requestedCancellation": false, "apiVersion": "[DATASTREAM_API_VERSION]" }, "done": false }
ストリームが作成されたことを確認します。プロンプトで、次のコマンドを入力します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/streams
作成したストリームの結果が返されることを確認します。
{ "streams": [ { "name": "[PROJECT_PATH]/streams/[STREAM_ID]", "createTime": "[DATE_AND_TIME_STAMP]", "updateTime": "[DATE_AND_TIME_STAMP]", "displayName": "[DISPLAY_NAME]", "sourceConfig": { "sourceConnectionProfileName": "projects/[YOUR_PROJECT_NUMBER]/locations/[YOUR_PROJECT_LOCATION]/connectionProfiles/[SOURCE_CONNECTION_PROFILE_ID]", "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schemaName": "ROOT" } ] }, "rejectlist": {} } }, "destinationConfig": { "destinationConnectionProfileName": "projects/[YOUR_PROJECT_NUMBER]/locations/[YOUR_PROJECT_LOCATION]/connectionProfiles/[DESTINATION_CONNECTION_PROFILE_ID]", "gcsDestinationConfig": { "fileRotationMb": 5, "fileRotationInterval": "15s" "avroFileFormat": {} } }, "state": "CREATED", "backfillAll": {} } ] }
ストリームの管理
この手順では、作成したストリームを使用して、ソース Oracle データベースから Cloud Storage の転送先バケットのフォルダにデータを転送します。次の操作が含まれます。
- ストリームに関する情報の取得
- ストリームの変更
- ストリームの開始
- Fetch Errors API を使用してストリームに関連するエラーを検出する
- ストリームの一時停止と再開
ストリームに関する情報を取得します。プロンプトで、次のコマンドを入力します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/streams/[STREAM_ID]
このストリームに関する情報が表示されていることを確認します。
{ "name": "[PROJECT_PATH]/streams/[STREAM_ID]", "createTime": "[DATE_AND_TIME_STAMP]", "updateTime": "[DATE_AND_TIME_STAMP]", "displayName": "[DISPLAY_NAME]", "sourceConfig": { "sourceConnectionProfileName": "projects/[YOUR_PROJECT_NUMBER]/locations/[YOUR_PROJECT_LOCATION]/connectionProfiles/[SOURCE_CONNECTION_PROFILE_ID]", "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schemaName": "ROOT" } ] }, "rejectlist": {} } }, "destinationConfig": { "destinationConnectionProfileName": "projects/[YOUR_PROJECT_NUMBER]/locations/[YOUR_PROJECT_LOCATION]/connectionProfiles/[DESTINATION_CONNECTION_PROFILE_ID]", "gcsDestinationConfig": { "fileRotationMb": 5, "fileRotationInterval": "15s" "avroFileFormat": {} } }, "state": "CREATED", "backfillAll": {} }
このストリームを変更します。まず、UPDATE 変数を設定します。この変数には、変更するストリームの値が含まれます。このチュートリアルでは、ソース データベースから Cloud Storage 転送先バケットのフォルダに転送されるデータを含むファイルのサイズを(5 バイトから 100 バイト)に変更します。データがソース データベースから取得されると、100 MB ファイルに書き込まれます。このサイズを超えるデータは複数の 100 MB ファイルに分割されます。
変数を設定するには、プロンプトで次のコマンドを入力します。
UPDATE="{\"destination_config\":{\"gcs_destination_config\":{\"file_rotation_mb\":100}}}"
プロンプトで、次のコマンドを入力します。
curl -X PATCH -d $UPDATE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/streams/[STREAM_ID]/?update_mask=destination_config.gcs_destination_config.file_rotation_mb
次のコード行が表示されることを確認します。
{ "name": "[PROJECT_PATH]/operations/operation-[STREAM_OPERATION_ID]", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.[DATASTREAM_API_VERSION].OperationMetadata", "createTime": "[DATE_AND_TIME_STAMP]", "target": "[PROJECT_PATH]/streams/[STREAM_ID]", "verb": "update", "requestedCancellation": false, "apiVersion": "[DATASTREAM_API_VERSION]" }, "done": false }
ストリームが変更されたことを確認します。プロンプトで、次のコマンドを入力します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/streams/[STREAM_ID]
Cloud Storage 接続プロファイルの fileRotationMb パラメータの値が
100
になったことを確認します。{ "name": "[PROJECT_PATH]/streams/[STREAM_ID]", "createTime": "[DATE_AND_TIME_STAMP]", "updateTime": "[DATE_AND_TIME_STAMP]", "displayName": "[DISPLAY_NAME]", "sourceConfig": { "sourceConnectionProfileName": "projects/[YOUR_PROJECT_NUMBER]/locations/[YOUR_PROJECT_LOCATION]/connectionProfiles/[SOURCE_CONNECTION_PROFILE_ID]", "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schemaName": "ROOT" } ] }, "rejectlist": {} } }, "destinationConfig": { "destinationConnectionProfileName": "projects/[YOUR_PROJECT_NUMBER]/locations/[YOUR_PROJECT_LOCATION]/connectionProfiles/[DESTINATION_CONNECTION_PROFILE_ID]", "gcsDestinationConfig": { "fileRotationMb": 100, "fileRotationInterval": "15s" "avroFileFormat": {} } }, "state": "CREATED", "backfillAll": {} }
ストリームを開始する手順は次のとおりです。
UPDATE
変数を変更します。プロンプトで、次のコマンドを入力します。UPDATE="{\"state\":\"RUNNING\"}"
次のコマンドを入力します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/streams/[STREAM_ID]?updateMask=state
次のコード行が表示されていることを確認します。
{ "name": "[PROJECT_PATH]/operations/operation-[STREAM_OPERATION_ID]", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.[DATASTREAM_API_VERSION].OperationMetadata", "createTime": "[DATE_AND_TIME_STAMP]", "target": "[PROJECT_PATH]/streams/[STREAM_ID]", "verb": "start", "requestedCancellation": false, "apiVersion": "[DATASTREAM_API_VERSION]" }, "done": false }
数分後、ストリームに関する情報を取得して、ストリームが開始されたことを確認します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/streams/[STREAM_ID]
ストリームの状態が
CREATED
からRUNNING
に変更されたことを確認します。{ "name": "[PROJECT_PATH]/streams/[STREAM_ID]", "createTime": "[DATE_AND_TIME_STAMP]", "updateTime": "[DATE_AND_TIME_STAMP]", "displayName": "[DISPLAY_NAME]", "sourceConfig": { "sourceConnectionProfileName": "projects/[YOUR_PROJECT_NUMBER]/locations/[YOUR_PROJECT_LOCATION]/connectionProfiles/[SOURCE_CONNECTION_PROFILE_ID]", "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schemaName": "ROOT" } ] }, "rejectlist": {} } }, "destinationConfig": { "destinationConnectionProfileName": "projects/[YOUR_PROJECT_NUMBER]/locations/[YOUR_PROJECT_LOCATION]/connectionProfiles/[DESTINATION_CONNECTION_PROFILE_ID]", "gcsDestinationConfig": { "fileRotationMb": 100, "fileRotationInterval": "15s" "avroFileFormat": {} } }, "state": "RUNNING", "backfillAll": {} }
Fetch Errors API を使用して、ストリームに関連付けられているエラーを取得します。
プロンプトで、次のコマンドを入力します。
curl -X POST -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/streams/[STREAM_ID]:fetchErrors
次のコード行が表示されることを確認します。
{ "name": "[PROJECT_PATH]/operations/operation-[FETCH_ERRORS_OPERATION_ID]", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.[DATASTREAM_API_VERSION].OperationMetadata", "createTime": "[DATE_AND_TIME_STAMP]", "target": "[PROJECT_PATH]/streams/[STREAM_ID]", "verb": "fetchErrors", "requestedCancellation": false, "apiVersion": "[DATASTREAM_API_VERSION]" }, "done": false }
プロンプトで、次のコマンドを入力します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/operations/operation-[FETCH_ERRORS_OPERATION_ID]
次のコード行が表示されることを確認します。
{ "name": "[PROJECT_PATH]/operations/operation-[FETCH_ERRORS_OPERATION_ID]", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.[DATASTREAM_API_VERSION].OperationMetadata", "createTime": "[DATE_AND_TIME_STAMP]", "endTime": "[DATE_AND_TIME_STAMP]", "target": "[PROJECT_PATH]/streams/[STREAM_ID]", "verb": "fetchErrors", "requestedCancellation": false, "apiVersion": "[DATASTREAM_API_VERSION]" }, "done": true, "response": { "@type": "type.googleapis.com/google.cloud.datastream.[DATASTREAM_API_VERSION].FetchErrorsResponse" } }
配信を一時停止する手順は次のとおりです。
UPDATE
変数を変更します。プロンプトで、次のコマンドを入力します。UPDATE="{\"state\":\"PAUSED\"}"
次のコマンドを入力します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/streams/[STREAM_ID]?updateMask=state
次のコード行が表示されていることを確認します。
{ "name": "[PROJECT_PATH]/operations/operation-[STREAM_OPERATION_ID]", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.[DATASTREAM_API_VERSION].OperationMetadata", "createTime": "[DATE_AND_TIME_STAMP]", "target": "[PROJECT_PATH]/streams/[STREAM_ID]", "verb": "start", "requestedCancellation": false, "apiVersion": "[DATASTREAM_API_VERSION]" }, "done": false }
ストリームに関する情報を取得して、ストリームが一時停止されていることを確認します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/streams/[STREAM_ID]
ストリームの状態が
RUNNING
からPAUSED
に変更されたことを確認します。{ "name": "[PROJECT_PATH]/streams/[STREAM_ID]", "createTime": "[DATE_AND_TIME_STAMP]", "updateTime": "[DATE_AND_TIME_STAMP]", "displayName": "[DISPLAY_NAME]", "sourceConfig": { "sourceConnectionProfileName": "projects/[YOUR_PROJECT_NUMBER]/locations/[YOUR_PROJECT_LOCATION]/connectionProfiles/[SOURCE_CONNECTION_PROFILE_ID]", "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schemaName": "ROOT" } ] }, "rejectlist": {} } }, "destinationConfig": { "destinationConnectionProfileName": "projects/[YOUR_PROJECT_NUMBER]/locations/[YOUR_PROJECT_LOCATION]/connectionProfiles/[DESTINATION_CONNECTION_PROFILE_ID]", "gcsDestinationConfig": { "fileRotationMb": 100, "fileRotationInterval": "15s" "avroFileFormat": {} } }, "state": "PAUSED", "backfillAll": {} }
一時停止したストリームを再開します。手順は次のとおりです。
UPDATE
変数を変更します。プロンプトで、次のコマンドを入力します。UPDATE="{\"state\":\"RUNNING\"}"
次のコマンドを入力します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/streams/[STREAM_ID]?updateMask=state
次のコード行が表示されていることを確認します。
{ "name": "[PROJECT_PATH]/operations/operation-[STREAM_OPERATION_ID]", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.[DATASTREAM_API_VERSION].OperationMetadata", "createTime": "[DATE_AND_TIME_STAMP]", "target": "[PROJECT_PATH]/streams/[STREAM_ID]", "verb": "start", "requestedCancellation": false, "apiVersion": "[DATASTREAM_API_VERSION]" }, "done": false }
数秒後、ストリームに関する情報を取得して、再度実行されていることを確認します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/streams/[STREAM_ID]
ストリームの状態が
PAUSED
からRUNNING
に戻されたことを確認します。{ "name": "[PROJECT_PATH]/streams/[STREAM_ID]", "createTime": "[DATE_AND_TIME_STAMP]", "updateTime": "[DATE_AND_TIME_STAMP]", "displayName": "[DISPLAY_NAME]", "sourceConfig": { "sourceConnectionProfileName": "projects/[YOUR_PROJECT_NUMBER]/locations/[YOUR_PROJECT_LOCATION]/connectionProfiles/[SOURCE_CONNECTION_PROFILE_ID]", "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schemaName": "ROOT" } ] }, "rejectlist": {} } }, "destinationConfig": { "destinationConnectionProfileName": "projects/[YOUR_PROJECT_NUMBER]/locations/[YOUR_PROJECT_LOCATION]/connectionProfiles/[DESTINATION_CONNECTION_PROFILE_ID]", "gcsDestinationConfig": { "fileRotationMb": 100, "fileRotationInterval": "15s" "avroFileFormat": {} } }, "state": "RUNNING", "backfillAll": {} }
ストリームを作成して管理できたので、ストリームに関連するエラーがなく、ストリームの状態が RUNNING
であることを確認しました。ソース データベースからフォルダに移動します。
ストリームを確認する
この手順では、Datastream を確認します。
- ソース Oracle データベースの
ROOT
スキーマに関連付けられているすべてのテーブルから、Cloud Storage の転送先バケットの/root/tutorial
フォルダにデータを転送します。 - データを Avro ファイル形式に変換します。
Cloud Storage の [ストレージ ブラウザ] ページに移動します。
バケットを含むリンクをクリックします。
[OBJECTS] タブが有効になっていない場合は、クリックします。
root フォルダをクリックしてから、[tutorial] フォルダをクリックします。
ソース Oracle データベースの
ROOT
スキーマのテーブルを表すフォルダが表示されていることを確認します。いずれかのテーブル フォルダをクリックして、テーブルに関連付けられているデータを表示します。
データを表すファイルをクリックし、[ダウンロード] をクリックします。
このファイルを Avro ツール(Avro Viewer など)で開き、コンテンツが読み取り可能であることを確認します。これにより、Datastream がデータを Avro ファイル形式に翻訳したことも確認されました。
クリーンアップ
このチュートリアルが終了したら、Datastream で作成したリソースをクリーンアップして、今後料金が発生しないようにします。次のセクションで、このようなリソースを削除または無効にする方法を説明します。
プロジェクトの削除
課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。
プロジェクトを削除するには:
- Cloud Console で [リソースの管理] ページに移動します。
- プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
- ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。
Cloud Storage 転送先バケットを削除する
Cloud Storage の左側のナビゲーション ドロワーで、[ブラウザ] 項目をクリックします。
バケットの左側にあるチェックボックスをオンにして、[削除] をクリックします。
[バケットを削除しますか?]ウィンドウのテキスト フィールドにバケットの名前を入力し、[確認] をクリックします。
ストリームの削除
Cloud Shell アプリケーションがアクティブであることを確認します。
プロンプトで、次のコマンドを入力します。
curl -X DELETE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/streams/[STREAM_ID]
次のコード行が表示されることを確認します。
{ "name": "[PROJECT_PATH]/operations/operation-[STREAM_OPERATION_ID]", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.[DATASTREAM_API_VERSION].OperationMetadata", "createTime": "[DATE_AND_TIME_STAMP]", "target": "[PROJECT_PATH]/streams/[STREAM_ID]", "verb": "delete", "requestedCancellation": false, "apiVersion": "[DATASTREAM_API_VERSION]" }, "done": false }
ストリームが削除されたことを確認します。プロンプトで、次のコマンドを入力します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/streams
null
{}
値が返されることを確認します。これは、Datastream にストリームがなく、作成したストリームが削除されたことを示します。
接続プロファイルの削除
ソース Oracle データベースへの接続プロファイルを削除します。プロンプトで、次のコマンドを入力します。
curl -X DELETE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/connectionProfiles/[SOURCE_CONNECTION_PROFILE_ID]
次のコード行が表示されることを確認します。
{ "name": "[PROJECT_PATH]/operations/operation-[SOURCE_CONNECTION_PROFILE_OPERATION_ID]", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.[DATASTREAM_API_VERSION].OperationMetadata", "createTime": "[DATE_AND_TIME_STAMP]", "target": "[PROJECT_PATH]/connectionProfiles/[SOURCE_CONNECTION_PROFILE_ID]", "verb": "delete", "requestedCancellation": false, "apiVersion": "[DATASTREAM_API_VERSION]" }, "done": false }
Cloud Storage の転送先バケットへの接続プロファイルを削除します。プロンプトで、次のコマンドを入力します。
curl -X DELETE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/connectionProfiles/[DESTINATION_CONNECTION_PROFILE_ID]
次のコード行が表示されることを確認します。
{ "name": "[PROJECT_PATH]/operations/operation-[DESTINATION_CONNECTION_PROFILE_OPERATION_ID]", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.[DATASTREAM_API_VERSION].OperationMetadata", "createTime": "[DATE_AND_TIME_STAMP]", "target": "[PROJECT_PATH]/connectionProfiles/[DESTINATION_CONNECTION_PROFILE_ID]", "verb": "delete", "requestedCancellation": false, "apiVersion": "[DATASTREAM_API_VERSION]" }, "done": false }
両方の接続プロファイルが削除されていることを確認します。プロンプトで、次のコマンドを入力します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/[DATASTREAM_API_VERSION]/[PROJECT_PATH]/connectionProfiles
null
{}
値が返されることを確認します。これは、Datastream に接続プロファイルがなくなり、作成したプロファイルが削除されることを意味します。
次のステップ
- Datastream の詳細を確認する。
- Google Cloud のその他の機能を試す。チュートリアルをご覧ください。