概要
多くの分離されたデータソースを持つ企業では、組織全体の企業データへのアクセスは、特にリアルタイムで困難になる可能性があります。その結果、データへのアクセスが制限されて遅くなり、組織の内省力を妨げることになります。
Datastream は、オンプレミスやクラウドベースのさまざまなデータソースから変更データにほぼリアルタイムでアクセスできるようにし、組織データへのアクセスを提供します。Datastream は、シンプルな設定エクスペリエンスと、組織全体で最新のエンタープライズ データに誰でもアクセスできるようにする統合消費 API を提供し、統合されたほぼリアルタイムのシナリオを実現します。
そのようなシナリオの一つは、ソース データベースからクラウドベースのストレージ サービスまたはメッセージング キューにデータを転送し、ストレージ サービスまたはメッセージング キューと通信するその他のアプリケーションやサービスによって読み取り可能な形式にこのデータを変換することです。
このチュートリアルでは、Datastream を使用して、ソースの Oracle データベースから Cloud Storage バケット内のフォルダにスキーマ、テーブル、データを転送する方法を学習します。Cloud Storage は、Google Cloud 上にデータを保存して、アクセスできるウェブサービスです。このサービスは、Google のクラウドのパフォーマンスと拡張性に、高度なセキュリティと共有機能を兼ね備えたものです。
転送先の Cloud Storage バケット内のフォルダにこの情報を転送する操作の一環として、Datastream はこの情報を Avro に変換します。Avro は、JavaScript Object Notation(JSON)で記述されたスキーマで定義されます。これにより、異なるデータソース間でのデータの読み取りを統一された方法で行うことができます。
目標
このチュートリアルでは、以下の方法について学習します。- 環境変数を設定します。これらの変数は、Datastream に対してリクエストを行い、接続プロファイルとストリームの両方を作成して管理するときに使用します。
- Cloud Storage のソースのデータベースと送信先バケットの接続プロファイルを作成および管理します。これらの接続プロファイルを作成することで、ソースのデータベースと転送先の Cloud Storage バケットに関する情報を含むレコードを作成します。Datastream のストリームは、接続プロファイルの情報を使用して、ソースのデータベースから転送先バケットのフォルダにデータを転送します。
- ストリームを作成、管理する。Datastream は、このストリームを使用して、データ、スキーマ、テーブルをソース データベースから移行先バケットのフォルダに転送します。
- Datastream が、ソース Oracle データベースのスキーマに関連付けられているデータとテーブルを、転送先バケットのフォルダに転送し、このデータを Avro ファイル形式に変換していることを確認します。
- Datastream で作成したリソースをクリーンアップして、今後に割り当ての消費や課金が発生しないようにします。
費用
このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。
- Cloud Storage
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
始める前に
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
- Datastream API を有効にします。
- ユーザー アカウントに Datastream 管理者のロールが割り当てられていることを確認します。
- Datastream がアクセスできるソース データベースがあることを確認します。このチュートリアルでは、Oracle データベースをソースとして使用します。
- Datastream パブリック IP アドレスからの受信接続を許可するようにソース データベースを構成します。すべての Datastream リージョンのロケーションと、それらに関連付けられたパブリック IP アドレスを確認するには、IP 許可リストとリージョンをご覧ください。
- IP 許可リスト、転送 SSH トンネル、または VPC ピアリングの ネットワーク接続方法を使用して、Datastream がアクセスできる転送先の Cloud Storage バケットが構成されていることを確認します。
- Datastream が転送先の Cloud Storage バケット内のフォルダに転送できるデータ、テーブル、スキーマが、ソースのデータベース内にあることを確認してください。
- Cloud Shell をダウンロードしてインストールします。このクライアント アプリケーションを使用すると、コマンドラインを使用してクラウド リソース(Datastream を含む)にアクセスできます。
jq
ユーティリティをインストールして構成します。このユーティリティは、軽量で柔軟なコマンドライン JSON プロセッサです。このプロセッサを使用して、複雑なcURL
コマンドを読みやすいテキストで表示します。
環境変数の設定
この手順では、次の変数を設定します。
$PROJECT
: この変数は、Google Cloud プロジェクトに関連付けられています。Google Cloud のリソースを割り当てて使用するには、リソースがプロジェクトに含まれている必要があります。$TOKEN
: この変数はアクセス トークンに関連付けられています。アクセス トークンは、Cloud Shell が REST API を使用して Datastream でタスクを実行するために使用するセッションを提供します。
Cloud Shell アプリケーションを起動します。
Google アカウントを使用してアプリケーションを認証したら、
gcloud auth login
と入力します。Do you want to continue (Y/n)?
プロンプトで、Y
を入力します。ウェブブラウザを開き、URL をブラウザにコピーします。
Google アカウントを使用して Google Cloud SDK に対して認証します。[ログイン] ページにコードが表示されます。このコードがアクセス トークンです。
アクセス トークンをコピーして、Cloud Shell アプリケーションの
Enter verification code:
パラメータに貼り付け、Enter
を押します。プロンプトで「
PROJECT="YOUR_PROJECT_NAME"
」と入力して、$PROJECT
環境変数を Google Cloud プロジェクトに設定します。プロンプトで「
gcloud config set project YOUR_PROJECT_NAME
」を入力して、作業するプロジェクトを Google Cloud プロジェクトに設定します。コマンド プロンプトは、アクティブなプロジェクトを反映するように更新され、次の形式が適用されます:
USERNAME@cloudshell:~ (YOUR_PROJECT_NAME)$
プロンプトで「
TOKEN=$(gcloud auth print-access-token)
」と入力してアクセス トークンを取得し、変数として保存します。プロンプトで次のコマンドを入力して、
$PROJECT
変数と$TOKEN
変数が正しく設定されていることを確認します。echo $PROJECT
echo $TOKEN
変数を設定したので、Datastream 対してリクエストを行い、接続プロファイルとストリームの両方を作成して管理できます。
接続プロファイルの作成と管理
このセクションでは、Cloud Storage のソースの Oracle データベースと転送先バケットの接続プロファイルを作成して、管理します。
これらの接続プロファイルを作成すると、ソース データベースと転送先の Cloud Storage バケットに関する情報を含むレコードが作成されます。Datastream は、接続プロファイルの情報を使用して、ソースのデータベースから転送先バケットのフォルダにデータを転送します。
接続プロファイルの作成と管理には、次の作業が含まれます。
- ソースの Oracle データベースと Cloud Storage の転送先バケットの接続プロファイルの作成
- 接続プロファイルに関する情報の取得
- 接続プロファイルの変更
- ソース Oracle 接続プロファイルで Discover API 呼び出しを実行する。この呼び出しにより、データベース内部を覗いてそのオブジェクトに関連付けられているオブジェクトを確認できます。これらのオブジェクトには、データベースのデータを含むスキーマとテーブルが含まれます。Datastream を使用してストリームを構成する際には、すべてのオブジェクトをデータベースから pull するのではなく、オブジェクトのサブセット(たとえば、データベースの特定のテーブルとスキーマ)のみを pull することをおすすめします。Discover API を使用すると、 pull するデータベース オブジェクトのサブセットを検索(検出)できます。
接続プロファイルの作成
この手順では 2 つの接続プロファイル(移行元 Oracle データベースへの接続と、Cloud Storage 内の移行先バケットへの接続)を作成します。
- 移行元 Oracle データベースへの接続プロファイルを作成するプロンプトが表示されたら、次のコマンドを入力します。
ORACLE="{\"displayName\":\"DISPLAY_NAME\",\"oracle_profile\":{\"hostname\":\"HOSTNAME\",\" username\":\"USERNAME\",\"password\":\"PASSWORD\",\"database_service\":\"DATABASE_SERVICE\",\" port\":"PORT_NUMBER\"},\"no_connectivity\":{}}"
次の表は、ソースの Oracle データベースのパラメータ値を理解する際に役立ちます。
パラメータ値 | 次のものに変更します |
---|---|
DISPLAY_NAME | ソースのデータベースへの接続プロファイルの表示名。 |
HOSTNAME | ソースのデータベース サーバーのホスト名。 |
USERNAME | ソースのデータベースのアカウントのユーザー名(例: ROOT)。 |
PASSWORD | ソースのデータベース用のアカウントのパスワード。 |
DATABASE_SERVICE | ソースのデータベースが確実に保護され、モニタリングされるサービス。Oracle データベースの場合、データベース サービスは通常 ORCL です。 |
PORT_NUMBER | ソース データベース用に予約されているポート番号。Oracle データベースの場合、ポート番号は通常 1521 です。 |
プロンプトで
echo $ORACLE | jq
コマンドを入力すると、作成したソースの接続プロファイルが読みやすいテキストで表示されます。{ "displayName": "DISPLAY_NAME", "oracle_profile": { "hostname": "HOSTNAME", "username": "USERNAME", "password": "PASSWORD", "database_service": "DATABASE_SERVICE", "port": PORT_NUMBER }, "no_connectivity": {} }
Oracle 接続プロファイルを送信して、この接続プロファイルを作成できるようにします。プロンプトが表示されたら、次のコマンドを入力します。
curl -X POST -d $ORACLE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles? connection_profile_id=SOURCE_CONNECTION_PROFILE_ID 次の表は、このコマンドのパラメータ値を理解する際に役立ちます。
パラメータ値 次のものに変更します DATASTREAM_API_VERSION Datastream API の現在のバージョン(例: v1
)。PROJECT_PATH Google Cloud プロジェクトのフルパス(例: projects/$PROJECT/locations/YOUR_PROJECT_LOCATION
)。SOURCE_CONNECTION_PROFILE_ID この接続プロファイル用に予約された一意の ID(cp-1 など)。 次のコードの行が表示されていることを確認します。
{ "name": "PROJECT_PATH/operations/operation-SOURCE_CONNECTION_PROFILE_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "datastream.googleapis.com/DATASREAM_VERSION/PROJECT_PATH/connectionProfiles/
SOURCE_CONNECTION_PROFILE_ID" , "verb": "create", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false }Cloud Storage の宛先バケットへの接続プロファイルを作成します。プロンプトが表示されたら、次のコマンドを入力します。
GOOGLECLOUDSTORAGE="{\"displayName\":\"DISPLAY_NAME\",\"gcs_profile\":{\"bucket_name\":\"BUCKET_NAME\",
\"root_path\":\"/FOLDER_PATH\"},\"no_connectivity\":{}}" 次の表は、転送先バケットのパラメータ値を理解する際に役立ちます。
パラメータ値 次のものに変更します DISPLAY_NAME 転送先バケットの接続プロファイルの表示名。 BUCKET_NAME 転送先バケットの名前。 FOLDER_PATH Datastream がソースのデータベースからデータを転送する転送先バケット内のフォルダ(例: /root/path)。 プロンプトで
echo $GOOGLECLOUDSTORAGE | jq
コマンドを入力すると、作成した転送先の接続プロファイルが読みやすいテキストで表示されます。{ "displayName": "DISPLAY_NAME", "gcs_profile": { "bucket_name": "BUCKET_NAME", "root_path": "/FOLDER_PATH" }, "no_connectivity": {} }
Cloud Storage 接続プロファイルを送信して、この接続プロファイルを作成できるようにします。プロンプトが表示されたら、次のコマンドを入力します。
curl -X POST -d $GOOGLECLOUDSTORAGE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles? connection_profile_id=DESTINATION_CONNECTION_PROFILE_ID 次のコードの行が表示されていることを確認します。
{ "name": "PROJECT_PATH/operations/operation-DESTINATION_CONNECTION_PROFILE_OPERATION_ID", "metadata": { "@type": "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.
OperationMetadata" , "createTime": "DATE_AND_TIME_STAMP", "target": "datastream.googleapis.com/DATASTREAM_VERSION/PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID" , "verb": "create", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false }両方の接続プロファイルが作成されていることを確認します。プロンプトが表示されたら、次のコマンドを入力します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles ソースと送信先の両方の接続プロファイルで、2 つの結果が返されることを確認します。
{ "connectionProfiles": [ { "name": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "gcsProfile": { "bucketName": "BUCKET_NAME", "rootPath": "FOLDER_PATH" }, "noConnectivity": {} }, { "name": "PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "oracleProfile": { "hostname": "HOSTNAME", "port": PORT_NUMBER, "username": "USERNAME", "databaseService": "DATABASE_SERVICE" }, "noConnectivity": {} } ] }
接続プロファイルの管理
この手順では、ソースの Oracle データベース用と Cloud Storage 内の転送先のバケット用に作成した接続プロファイルを管理します。以下に例を示します。
- 転送先の Cloud Storage 接続プロファイルに関する情報を取得する
- この接続プロファイルを変更する。このチュートリアルでは、転送先の Cloud Storage バケットのフォルダを /root/tutorial に変更します。Datastream は、ソースのデータベースからこのフォルダにデータを転送します。
- ソースの Oracle 接続プロファイルで Discover API 呼び出しを実行する。
転送先の Cloud Storage 接続プロファイルに関する情報を取得します。プロンプトが表示されたら、次のコマンドを入力します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/ DESTINATION_CONNECTION_PROFILE_ID この接続プロファイルに関する情報が表示されていることを確認します。
{ "name": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "gcsProfile": { "bucketName": "BUCKET_NAME", "rootPath": "FOLDER_PATH" }, "noConnectivity": {} }
この接続プロファイルを変更する。そのためには、まず UPDATE 変数を設定します。この変数には、変更する接続プロファイルの値が含まれます。このチュートリアルでは、転送先バケットのフォルダを /root/tutorial に変更します。
変数を設定するには、プロンプトで次のコマンドを入力します。
UPDATE="{\"gcsProfile\":{\"rootPath\":\"/root/tutorial\"}}"
プロンプトが表示されたら、次のコマンドを入力します。
curl -X PATCH -d $UPDATE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/ DESTINATION_CONNECTION_PROFILE_ID?update_mask=gcsProfile.rootPath 次のコードの行が表示されていることを確認します。
{ "name": "PROJECT_PATH/operations/operation-DESTINATION_CONNECTION_PROFILE_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID", "verb": "update", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false }
接続プロファイルが変更されたことを確認します。プロンプトが表示されたら、次のコマンドを入力します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/ DESTINATION_CONNECTION_PROFILE_ID Cloud Storage 接続プロファイルの転送先バケットのフォルダが /root/tutorial になったことを確認します。
{ "name": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "gcsProfile": { "bucketName": "BUCKET_NAME", "rootPath": "/root/tutorial" }, "noConnectivity": {} }
Datastream の Discover API を使用して、ソースの Oracle データベースのスキーマとテーブルを検出します。Datastream は、ソースの接続プロファイルを介してこのデータベースにアクセスします。
Oracle データベースのスキーマを検出します。プロンプトが表示されたら、次のコマンドを入力します。
curl -X POST -d "{\"connection_profile_name\":\"projects/YOUR_PROJECT_NUMBER/locations/
YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID\", \"oracle_rdbms\":{\"oracleSchemas\":[{\"schema\":\"ROOT\"}]}}" -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles:discover データベースのすべてのスキーマが Datastream によって取得されていることを確認します。
データベース内のスキーマのテーブルを取得します。このチュートリアルでは、Discover API を使用して ROOT スキーマのテーブルを取得します。ただし、データベース内の任意のスキーマのテーブルを検出できます。
プロンプトが表示されたら、次のコマンドを入力します。
curl -X POST -d "{\"connection_profile_name\":\"projects/YOUR_PROJECT_NUMBER/locations/
YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID\", \"oracle_rdbms\":{\"oracleSchemas\":[{\"schema\":\"ROOT\"}]}}" -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles:discover - 指定したスキーマ(このチュートリアルでは ROOT スキーマ)のすべてのテーブルが Datastream によって取得されていることを確認します。
ソースの Oracle データベース用と Cloud Storage 内の転送先バケット用の接続プロファイルを作成して管理できたので、Datastream でストリームの作成と管理を行う準備ができました。
ストリームを作成、管理する
このセクションでは、ストリームを作成して管理します。Datastream は、このストリームを使用して、データ、スキーマ、テーブルをソース データベースから移行先の Cloud Storage バケットのフォルダに転送します。
ストリームの作成と管理には、以下が含まれます。
- ストリームを検証して、ストリームが正常に実行され、すべての検証チェックに合格することを確認する。次のチェック項目があります:
- データストリームがソースからデータをストリーミングできるようにソースが適切に構成されているかどうか
- ストリームがソースと宛先の両方に接続できるかどうか
- ストリームのエンドツーエンド構成
- 次のリストを使用してストリームを作成します。
- 許可リスト。このリストでは、Datastream が Cloud Storage の転送先バケットのフォルダに転送できるソース データベース内のテーブルとスキーマを指定します。このチュートリアルでは、/root/tutorial フォルダを指定します。
- 拒否リスト。このリストでは、Datastream が Cloud Storage の転送先バケットのフォルダへの転送を制限するソース データベース内のテーブルとスキーマを指定します。
- ストリームに関する情報の取得
- ストリームの変更
- Datastream が、ソースのデータベースから転送先 Cloud Storage バケット内のフォルダにデータ、スキーマ、テーブルを転送することができるようにストリームを開始する
- Fetch Errors API を使用してストリームに関連するエラーを検出する
- ストリームを一時停止する。ストリームが一時停止されると、Datastream はソースのデータベースから転送先バケットに新しいデータを pull しなくなります。
- 一時停止したストリームを再開すると、Datastream は引き続き転送先バケットへのデータ転送を継続します。
ストリームの作成
この手順では、ソース Oracle データベースから転送先の Cloud Storage バケットのフォルダにストリームを作成します。作成するストリームには、許可リストと拒否リストの両方が含まれます。
SCHEMAS 変数を設定します。この変数は、Datastream がソース データベースから取得して Cloud Storage の転送先バケットの /root/tutorial フォルダに転送するデータとテーブルを含むスキーマを定義します。このチュートリアルでは、SCHEMAS 変数を ROOT スキーマに関連付けるように設定します。
プロンプトが表示されたら、次のコマンドを入力します。
SCHEMAS="{\"oracleSchemas\":[{\"schema\":\"ROOT\"}]}"
プロンプトで echo $SCHEMAS | jq コマンドを入力すると、この変数に定義した ROOT スキーマが読みやすいテキストで表示されます。
ストリームの作成プロンプトが表示されたら、次のコマンドを入力します。
STREAM="{\"display_name\":\"DISPLAY_NAME\",\"source_config\":{\"source_connection_profile_name\":\"
PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID" ,\"oracle_source_config\":{\"allowlist\":$SCHEMAS,\"rejectlist\":{}}},\"destination_config\":{\"destination_connection_profile_name\" :\"PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID\",\"gcs_destination_config\": {\"file_rotation_mb\":5,\"file_rotation_interval\":{\"seconds\":15},\"avro_file_format\":{}}, \"backfill_all\":{}}}" プロンプトで
echo $STREAM | jq
コマンドを入力して、作成したストリームを読みやすいテキストで表示します。{ "display_name": "DISPLAY_NAME", "source_config": { "source_connection_profile_name": "PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID", "oracle_source_config": { "allowlist": { "oracleSchemas": [ { "schema": "ROOT" } ] }, "rejectlist": {} } }, "destination_config": { "destination_connection_profile_name": "PROJECT_PATH/connectionProfiles/
DESTINATION_CONNECTION_PROFILE_ID" , "gcs_destination_config": { "file_rotation_mb": 5, "file_rotation_interval": { "seconds": 15 }, "avro_file_format": {} } }, "backfill_all": {} }次の表は、ストリームの次のパラメータを理解するのに役立ちます。
パラメータ 説明 allowlist ソースのデータベースから Cloud Storage の転送先バケットのフォルダに転送されるスキーマ(テーブルとデータを含む)。このチュートリアルでは、ROOT スキーマのすべてのテーブルとデータ(このスキーマのみ)が、転送先バケットの /root/tutorial フォルダに転送されます。 rejectlist Cloud Storage の転送先バケットのフォルダに転送されないスキーマ(テーブルとデータを含む)。このチュートリアルでは、{} 値は、ソースのデータベースから転送先バケットに転送されないテーブルとデータが 1 つもないことを意味しています。 file_rotation_mb ソース データベースから Cloud Storage の転送先バケットのフォルダに転送されるデータを含むファイルのサイズ(MB 単位)。このチュートリアルでは、データはソース データベースから取得されているため、5 MB のファイルに書き込まれます。このサイズを超えるデータは、複数の 5 MB ファイルに分割されます。 file_rotation_interval Datastream が Cloud Storage の転送先バケットのフォルダ内の既存のファイルを閉じて、ソース データベースから転送されるデータを格納する別のファイルを開くまでの秒数。このチュートリアルでは、ファイル ローテーション間隔を 15 秒に設定します。 avro_file_format Datastream がソースのデータベースから Cloud Storage の転送先バケットのフォルダに転送するファイルの形式。このチュートリアルでは、このファイル形式は Avro です。
backfill_all このパラメータは、過去のバックフィルに関連付けられます。このパラメータに空の辞書({})を設定すると、データストリームは次の情報のバックフィルを行います。
- ソース データベースから宛先への履歴データ(進行中のデータの変更を含む)
- ソースから宛先へのスキーマとテーブル
ストリームを検証して、ストリームが正常に実行され、すべての検証チェックに合格することを確認します。プロンプトが表示されたら、次のコマンドを入力します。
curl -X POST -d $STREAM -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
"https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams?stream_id= STREAM_ID&validate_only=true" {}
のコード行が表示されていることを確認します。これは、ストリームがすべての検証チェックに合格し、ストリームに関連するエラーがないことを示します。ストリームを送信して、ストリームを作成できるようにします。プロンプトで、次のコマンドを入力します。
curl -X POST -d $STREAM -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams?stream_id=STREAM_ID 次のコードの行が表示されていることを確認します。
{ "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/streams/STREAM_ID", "verb": "create", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false }
ストリームが作成されたことを確認します。プロンプトが表示されたら、次のコマンドを入力します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams 作成したストリームの結果が返されることを確認します。
{ "streams": [ { "name": "PROJECT_PATH/streams/STREAM_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "sourceConfig": { "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION
/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID" , "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schema": "ROOT" } ] }, "rejectlist": {} } }, "destinationConfig": { "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID" , "gcsDestinationConfig": { "fileRotationMb": 5, "fileRotationInterval": "15s" "avroFileFormat": {} } }, "state": "CREATED", "backfillAll": {} } ] }
ストリームを管理する
この手順では、ソースの Oracle データベースから Cloud Storage 転送先バケットのフォルダにデータを転送するために作成したストリームを使用します。以下に例を示します。
- ストリームに関する情報の取得
- ストリームの変更
- ストリームを開始する
- Fetch Errors API を使用してストリームに関連するエラーを検出する
- ストリームの一時停止と再開
ストリームに関する情報を取得します。プロンプトが表示されたら、次のコマンドを入力します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID このストリームに関する情報が表示されていることを確認します。
{ "name": "PROJECT_PATH/streams/STREAM_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "sourceConfig": { "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION
/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID" , "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schema": "ROOT" } ] }, "rejectlist": {} } }, "destinationConfig": { "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID" , "gcsDestinationConfig": { "fileRotationMb": 5, "fileRotationInterval": "15s" "avroFileFormat": {} } }, "state": "CREATED", "backfillAll": {} }このストリームを変更します。そのためには、まず UPDATE 変数を設定します。この変数には、変更するストリームの値が含まれます。このチュートリアルでは、ソース データベースから Cloud Storage 転送先バケットのフォルダに転送されるデータを含むファイルのサイズ(MB 単位)を 5 MB から 100 バイトに変更します。データはソース データベースから取得されているため、100 MB のファイルに書き込まれます。このサイズを超えるデータは、複数の 100 MB ファイルに分割されます。
変数を設定するには、プロンプトで次のコマンドを入力します。
UPDATE="{\"destination_config\":{\"gcs_destination_config\":{\"file_rotation_mb\":100}}}"
プロンプトが表示されたら、次のコマンドを入力します。
curl -X PATCH -d $UPDATE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID/ ?update_mask=destination_config.gcs_destination_config.file_rotation_mb 次のコードの行が表示されていることを確認します。
{ "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/streams/STREAM_ID", "verb": "update", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false }
ストリームが変更されたことを確認します。プロンプトが表示されたら、次のコマンドを入力します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID Cloud Storage 接続プロファイルの fileRotationMb パラメータの値が、
100
になっていることを確認します。{ "name": "PROJECT_PATH/streams/STREAM_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "sourceConfig": { "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION
/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID" , "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schema": "ROOT" } ] }, "rejectlist": {} } }, "destinationConfig": { "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID" , "gcsDestinationConfig": { "fileRotationMb": 100, "fileRotationInterval": "15s" "avroFileFormat": {} } }, "state": "CREATED", "backfillAll": {} }ストリームを開始する手順は次のとおりです。
UPDATE
変数を変更します。プロンプトが表示されたら、次のコマンドを入力します。UPDATE="{\"state\":\"RUNNING\"}"
次に、次のコマンドを入力します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID ?updateMask=state
次のコードの行が表示されていることを確認します。
{ "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/streams/STREAM_ID", "verb": "start", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false }
数分後、ストリームに関する情報を取得して、ストリームが開始されたことを確認します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID ストリームの状態が
CREATED
からRUNNING
に戻ったことを確認します。{ "name": "PROJECT_PATH/streams/STREAM_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "sourceConfig": { "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION
/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID" , "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schema": "ROOT" } ] }, "rejectlist": {} } }, "destinationConfig": { "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID" , "gcsDestinationConfig": { "fileRotationMb": 100, "fileRotationInterval": "15s" "avroFileFormat": {} } }, "state": "RUNNING", "backfillAll": {} }Fetch Errors API を使用して、ストリームに関連するエラーを取得します。
プロンプトが表示されたら、次のコマンドを入力します。
curl -X POST -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/ STREAM_ID:fetchErrors 次のコードの行が表示されていることを確認します。
{ "name": "PROJECT_PATH/operations/operation-FETCH_ERRORS_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/streams/STREAM_ID", "verb": "fetchErrors", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false }
プロンプトが表示されたら、次のコマンドを入力します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/operations/ operation-FETCH_ERRORS_OPERATION_ID 次のコードの行が表示されていることを確認します。
{ "name": "PROJECT_PATH/operations/operation-FETCH_ERRORS_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION
.OperationMetadata" , "createTime": "DATE_AND_TIME_STAMP", "endTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/streams/STREAM_ID", "verb": "fetchErrors", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": true, "response": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.FetchErrorsResponse" } }
配信を一時停止する手順は次のとおりです。
UPDATE
変数を変更します。プロンプトが表示されたら、次のコマンドを入力します。UPDATE="{\"state\":\"PAUSED\"}"
次に、次のコマンドを入力します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID ?updateMask=state
次のコードの行が表示されていることを確認します。
{ "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/streams/STREAM_ID", "verb": "start", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false }
ストリームに関する情報を取得して、ストリームが一時停止されていることを確認します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID ストリームの状態が
RUNNING
からPAUSED
に戻ったことを確認します。{ "name": "PROJECT_PATH/streams/STREAM_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "sourceConfig": { "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION
/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID" , "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schema": "ROOT" } ] }, "rejectlist": {} } }, "destinationConfig": { "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID" , "gcsDestinationConfig": { "fileRotationMb": 100, "fileRotationInterval": "15s" "avroFileFormat": {} } }, "state": "PAUSED", "backfillAll": {} }一時停止したストリームを再開します。手順は次のとおりです。
UPDATE
変数を変更します。プロンプトが表示されたら、次のコマンドを入力します。UPDATE="{\"state\":\"RUNNING\"}"
次に、次のコマンドを入力します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID ?updateMask=state
次のコードの行が表示されていることを確認します。
{ "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/streams/STREAM_ID", "verb": "start", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false }
数秒後にストリームに関する情報を取得して、ストリームが再度動作していることを確認します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID ストリームの状態が
PAUSED
からRUNNING
に戻ったことを確認します。{ "name": "PROJECT_PATH/streams/STREAM_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "sourceConfig": { "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION
/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID" , "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schema": "ROOT" } ] }, "rejectlist": {} } }, "destinationConfig": { "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID" , "gcsDestinationConfig": { "fileRotationMb": 100, "fileRotationInterval": "15s" "avroFileFormat": {} } }, "state": "RUNNING", "backfillAll": {} }
ストリームを作成して管理し、ストリームに関連するエラーがなく、ストリームの状態が RUNNING
であることを確認したので、ソースのデータベースから Cloud Storage の転送先バケット内のフォルダにデータを転送できることを確認する準備ができました。
ストリームを確認する
この手順では、Datastream で以下のことができることを確認します。
- ソースの Oracle データベースの
ROOT
スキーマに関連付けられているすべてのテーブルからのデータを、Cloud Storage の転送先バケットの/root/tutorial
フォルダに転送します。 - データを Avro ファイル形式に変換します。
Cloud Storage の [ストレージ ブラウザ] ページに移動します。
バケットを含むリンクをクリックします。
[オブジェクト] タブが有効になっていない場合は、クリックします。
root フォルダをクリックしてから、tutorial フォルダをクリックします。
ソースの Oracle データベースの
ROOT
スキーマのテーブルを表すフォルダが表示されていることを確認します。いずれかのテーブル フォルダをクリックして、テーブルに関連付けられているデータを表示します。
データを示すファイルをクリックしてから、[ダウンロード] をクリックします。
このファイルを Avro ツール(Avro Viewer など)で開き、コンテンツが読めることを確認します。これにより、Datastream がデータを Avro ファイル形式に変換したことを確認できます。
クリーンアップ
このチュートリアルが終了したら、Datastream で作成したリソースをクリーンアップして、今後料金が発生しないようにします。次のセクションで、このようなリソースを削除または無効にする方法を説明します。
プロジェクトの削除
課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。
プロジェクトを削除するには:
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Cloud Storage の宛先バケットの削除
Cloud Storage の左側のナビゲーション ドロワーで、[ブラウザ] アイテムをクリックします。
バケットの左側にあるチェックボックスをオンにして、[削除] をクリックします。
[バケットを削除しますか?] ウィンドウで、テキスト フィールドにバケットの名前を入力して、[確認] をクリックします。
ストリームの削除
Cloud Shell アプリケーションがアクティブであることを確認します。
プロンプトが表示されたら、次のコマンドを入力します。
curl -X DELETE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID 次のコードの行が表示されていることを確認します。
{ "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/streams/STREAM_ID", "verb": "delete", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false }
ストリームが削除されたことを確認します。プロンプトが表示されたら、次のコマンドを入力します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams null
{}
値が返されることを確認します。これは、Datastream にストリームがなくなり、作成したストリームが削除されたことを示します。
接続プロファイルを削除する
ソース Oracle データベースへの接続プロファイルを削除します。プロンプトが表示されたら、次のコマンドを入力します。
curl -X DELETE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/ SOURCE_CONNECTION_PROFILE_ID 次のコードの行が表示されていることを確認します。
{ "name": "PROJECT_PATH/operations/operation-SOURCE_CONNECTION_PROFILE_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID", "verb": "delete", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false }
Cloud Storage の送信先バケットへの接続プロファイルを削除します。プロンプトが表示されたら、次のコマンドを入力します。
curl -X DELETE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/ DESTINATION_CONNECTION_PROFILE_ID 次のコードの行が表示されていることを確認します。
{ "name": "PROJECT_PATH/operations/operation-DESTINATION_CONNECTION_PROFILE_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID", "verb": "delete", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false }
両方の接続プロファイルが削除されていることを確認します。プロンプトが表示されたら、次のコマンドを入力します。
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles null
{}
値が返されることを確認します。これは、Datastream に接続プロファイルがなくなり、作成したプロファイルが削除されたことを示します。
次のステップ
- Datastream の詳細を確認する。
- Google Cloud のその他の機能を試す。チュートリアルをご覧ください。