多くの分離されたデータソースを持つ企業では、組織全体の企業データへのアクセスは、特にリアルタイムで困難になる可能性があります。その結果、データへのアクセスが制限されて遅くなり、組織の内省力を妨げることになります。
Datastream は、オンプレミスやクラウドベースのさまざまなデータソースから変更データにほぼリアルタイムでアクセスできるようにし、組織データへのアクセスを提供します。Datastream は、組織全体で最新のエンタープライズ データに誰でもアクセスできるようにする統合消費 API を提供し、統合されたほぼリアルタイムのシナリオを実現します。
そのようなシナリオの一つは、ソース データベースからクラウドベースのストレージ サービスまたはメッセージング キューにデータを転送し、ストレージ サービスまたはメッセージング キューと通信するその他のアプリケーションやサービスによって読み取り可能な形式にこのデータを変換することです。
このチュートリアルでは、Datastream を使用して、ソースの Oracle データベースから Cloud Storage バケット内のフォルダにスキーマ、テーブル、データを転送する方法を学習します。Cloud Storage は、 Google Cloud上にデータを保存してアクセスできるウェブサービスです。このサービスは、Google のクラウドのパフォーマンスとスケーラビリティに、高度なセキュリティ機能と共有機能を組み合わせたものです。
転送先の Cloud Storage バケット内のフォルダにこの情報を転送する操作の一環として、Datastream はこの情報を Avro に変換します。Avro は、JavaScript Object Notation(JSON)で記述されたスキーマによって定義されます。これにより、異なるデータソース間でのデータの読み取りを統一された方法で行うことができます。
目標
このチュートリアルでは、以下の方法について学習します。- 環境変数を設定します。これらの変数は、Datastream に対してリクエストを行い、接続プロファイルとストリームの両方を作成して管理するときに使用します。
- Cloud Storage のソースのデータベースと送信先バケットの接続プロファイルを作成および管理します。これらの接続プロファイルを作成することで、ソースのデータベースと転送先の Cloud Storage バケットに関する情報を含むレコードを作成します。Datastream のストリームは、接続プロファイルの情報を使用して、ソースのデータベースから転送先バケットのフォルダにデータを転送します。
- ストリームを作成、管理する。Datastream は、このストリームを使用して、データ、スキーマ、テーブルをソース データベースから移行先バケットのフォルダに転送します。
- Datastream が、ソース Oracle データベースのスキーマに関連付けられているデータとテーブルを、転送先バケットのフォルダに転送し、このデータを Avro ファイル形式に変換していることを確認します。
- Datastream で作成したリソースをクリーンアップして、今後に割り当ての消費や課金が発生しないようにします。
料金
このドキュメントでは、課金対象である次の Google Cloudコンポーネントを使用します。
- Cloud Storage
 
  
 
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
始める前に
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- 
    
    
      In the Google Cloud console, on the project selector page, select or create a Google Cloud project. Roles required to select or create a project - Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
- 
      Create a project: To create a project, you need the Project Creator
      (roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
 
- 
  
    Verify that billing is enabled for your Google Cloud project. 
- 
    
    
      In the Google Cloud console, on the project selector page, select or create a Google Cloud project. Roles required to select or create a project - Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
- 
      Create a project: To create a project, you need the Project Creator
      (roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
 
- 
  
    Verify that billing is enabled for your Google Cloud project. 
- Datastream API を有効にします。
- ユーザー アカウントに Datastream 管理者のロールが割り当てられていることを確認します。
- Datastream がアクセスできるソース データベースがあることを確認します。このチュートリアルでは、移行元として Oracle データベースを使用します。
- Datastream パブリック IP アドレスからの受信接続を許可するようにソース データベースを構成します。すべての Datastream リージョンのロケーションと、それらに関連付けられたパブリック IP アドレスにアクセスするには、IP 許可リストとリージョンをご覧ください。
- IP 許可リスト、転送 SSH トンネル、または VPC ピアリングの ネットワーク接続方法を使用して、Datastream がアクセスできる転送先の Cloud Storage バケットが構成されていることを確認します。
- Datastream が転送先の Cloud Storage バケット内のフォルダに転送できるデータ、テーブル、スキーマが、ソースのデータベース内にあることを確認してください。
- Cloud Shell をダウンロードしてインストールします。このクライアント アプリケーションを使用すると、コマンドラインを使用してクラウド リソース(Datastream を含む)にアクセスできます。
- jqユーティリティをインストールして構成します。このユーティリティは、軽量で柔軟なコマンドライン JSON プロセッサです。このプロセッサを使用して、複雑な- cURLコマンドを読みやすいテキストで表示します。
環境変数の設定
この手順では、次の変数を設定します。
- $PROJECT: この変数は Google Cloud プロジェクトに関連付けられています。割り当てて使用するGoogle Cloud リソースはすべて、プロジェクトに含まれている必要があります。
- $TOKEN: この変数はアクセス トークンに関連付けられています。アクセス トークンは、Cloud Shell が REST API を使用して Datastream でタスクを実行するために使用するセッションを提供します。
- Cloud Shell アプリケーションを起動します。 
- Google アカウントを使用してアプリケーションを認証したら、次のコマンドを入力します。 - gcloud auth login
- Do you want to continue (Y/n)?プロンプトで、- Yを入力します。
- ウェブブラウザを開き、URL をブラウザにコピーします。 
- Google アカウントを使用して Google Cloud SDK に対して認証します。[ログイン] ページにコードが表示されます。このコードがアクセス トークンです。 
- アクセス トークンをコピーして、Cloud Shell アプリケーションの - Enter verification code:パラメータに貼り付け、- Enterを押します。
- プロンプトで「 - PROJECT=\"YOUR_PROJECT_NAME\"」と入力して、- $PROJECT環境変数を Google Cloudプロジェクトに設定します。
- プロンプトで「 - gcloud config set project YOUR_PROJECT_NAME」を入力して、作業するプロジェクトを Google Cloudプロジェクトに設定します。- コマンド プロンプトは、アクティブなプロジェクトを反映するように更新され、次の形式が適用されます: - USERNAME@cloudshell:~ (YOUR_PROJECT_NAME)$
- プロンプトで「 - TOKEN=$(gcloud auth print-access-token)」と入力してアクセス トークンを取得し、変数として保存します。
- プロンプトで次のコマンドを入力して、 - $PROJECT変数と- $TOKEN変数が正しく設定されていることを確認します。- echo $PROJECT
- echo $TOKEN
 
変数を設定したので、Datastream に対してリクエストを行い、接続プロファイルとストリームの両方を作成して管理できます。
接続プロファイルを作成して管理する
このセクションでは、Cloud Storage のソースの Oracle データベースと転送先バケットの接続プロファイルを作成して、管理します。
これらの接続プロファイルを作成すると、ソースのデータベースと転送先の Cloud Storage バケットに関する情報を含むレコードが作成されます。Datastream は、接続プロファイルの情報を使用して、ソースのデータベースから転送先バケットのフォルダにデータを転送します。
接続プロファイルの作成と管理には、次のものが含まれます。
- ソースの Oracle データベースと Cloud Storage の転送先バケットの接続プロファイルの作成
- 接続プロファイルに関する情報を取得する
- 接続プロファイルの変更
- ソース Oracle 接続プロファイルで Discover API 呼び出しを実行する。この呼び出しにより、データベース内部を覗いてそのオブジェクトに関連付けられているオブジェクトを確認できます。これらのオブジェクトには、データベースのデータを含むスキーマとテーブルが含まれます。Datastream を使用してストリームを構成する際には、すべてのオブジェクトをデータベースから pull するのではなく、オブジェクトのサブセット(たとえば、データベースの特定のテーブルとスキーマ)のみを pull することをおすすめします。Discover API を使用して、取得するデータベース オブジェクトのサブセットを見つけます。
接続プロファイルの作成
この手順では 2 つの接続プロファイル(移行元 Oracle データベースへの接続と、Cloud Storage 内の移行先バケットへの接続)を作成します。
- 移行元 Oracle データベースへの接続プロファイルを作成するプロンプトが表示されたら、次のコマンドを入力します。
ORACLE="{\"displayName\":\"DISPLAY_NAME\",\"oracle_profile\":{\"hostname\":\"HOSTNAME\",\" username\":\"USERNAME\",\"password\":\"PASSWORD\",\"database_service\":\"DATABASE_SERVICE\",\" port\":"PORT_NUMBER\"},\"no_connectivity\":{}}"
次の表は、ソースの Oracle データベースのパラメータ値を理解する際に役立ちます。
| パラメータ値 | 次のものに変更します | 
|---|---|
| DISPLAY_NAME | ソースのデータベースへの接続プロファイルの表示名。 | 
| HOSTNAME | ソースのデータベース サーバーのホスト名。 | 
| USERNAME | ソースのデータベースのアカウントのユーザー名(例: ROOT)。 | 
| PASSWORD | ソースのデータベース用のアカウントのパスワード。 | 
| DATABASE_SERVICE | ソースのデータベースが確実に保護され、モニタリングされるサービス。Oracle データベースの場合、データベース サービスは通常 ORCL です。 | 
| PORT_NUMBER | ソース データベース用に予約されているポート番号。Oracle データベースの場合、通常、ポート番号は 1521 です。 | 
- プロンプトで - echo $ORACLE | jqコマンドを入力すると、作成したソースの接続プロファイルが読みやすいテキストで表示されます。- { "displayName": "DISPLAY_NAME", "oracle_profile": { "hostname": "HOSTNAME", "username": "USERNAME", "password": "PASSWORD", "database_service": "DATABASE_SERVICE", "port": PORT_NUMBER }, "no_connectivity": {} } 
- Oracle 接続プロファイルを送信して、この接続プロファイルを作成できるようにします。プロンプトが表示されたら、次のコマンドを入力します。 - curl -X POST -d $ORACLE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles? - connection_profile_id=SOURCE_CONNECTION_PROFILE_ID - 次の表は、このコマンドのパラメータ値を理解する際に役立ちます。 - パラメータ値 - 次のものに変更します - DATASTREAM_API_VERSION - Datastream API の現在のバージョン(例: - v1)。- PROJECT_PATH - Google Cloud プロジェクトのフルパス(例: - projects/$PROJECT/locations/YOUR_PROJECT_LOCATION)。- SOURCE_CONNECTION_PROFILE_ID - この接続プロファイル用に予約された一意の ID(cp-1 など)。 
- 次のコードの行が表示されていることを確認します。 - { "name": "PROJECT_PATH/operations/operation-SOURCE_CONNECTION_PROFILE_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "datastream.googleapis.com/DATASREAM_VERSION/PROJECT_PATH/connectionProfiles/ - SOURCE_CONNECTION_PROFILE_ID" , "verb": "create", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false }
- Cloud Storage の宛先バケットへの接続プロファイルを作成します。プロンプトが表示されたら、次のコマンドを入力します。 - GOOGLECLOUDSTORAGE="{\"displayName\":\"DISPLAY_NAME\",\"gcs_profile\":{\"bucket_name\":\"BUCKET_NAME\", - \"root_path\":\"/FOLDER_PATH\"},\"no_connectivity\":{}}" - 次の表は、転送先バケットのパラメータ値を理解する際に役立ちます。 - パラメータ値 - 次のものに変更します - DISPLAY_NAME - 転送先バケットの接続プロファイルの表示名。 - BUCKET_NAME - 転送先バケットの名前。 - FOLDER_PATH - Datastream がソースのデータベースからデータを転送する転送先バケット内のフォルダ(例: /root/path)。 
- プロンプトで - echo $GOOGLECLOUDSTORAGE | jqコマンドを入力すると、作成した転送先の接続プロファイルが読みやすいテキストで表示されます。- { "displayName": "DISPLAY_NAME", "gcs_profile": { "bucket_name": "BUCKET_NAME", "root_path": "/FOLDER_PATH" }, "no_connectivity": {} } 
- Cloud Storage 接続プロファイルを送信して、この接続プロファイルを作成できるようにします。プロンプトが表示されたら、次のコマンドを入力します。 - curl -X POST -d $GOOGLECLOUDSTORAGE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles? - connection_profile_id=DESTINATION_CONNECTION_PROFILE_ID 
- 次のコードの行が表示されていることを確認します。 - { "name": "PROJECT_PATH/operations/operation-DESTINATION_CONNECTION_PROFILE_OPERATION_ID", "metadata": { "@type": "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION. - OperationMetadata" , "createTime": "DATE_AND_TIME_STAMP", "target": "datastream.googleapis.com/DATASTREAM_VERSION/PROJECT_PATH/connectionProfiles/- DESTINATION_CONNECTION_PROFILE_ID" , "verb": "create", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false }
- 両方の接続プロファイルが作成されていることを確認します。プロンプトが表示されたら、次のコマンドを入力します。 - curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles 
- ソースと送信先の両方の接続プロファイルで、2 つの結果が返されることを確認します。 - { "connectionProfiles": [ { "name": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "gcsProfile": { "bucketName": "BUCKET_NAME", "rootPath": "FOLDER_PATH" }, "noConnectivity": {} }, { "name": "PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "oracleProfile": { "hostname": "HOSTNAME", "port": PORT_NUMBER, "username": "USERNAME", "databaseService": "DATABASE_SERVICE" }, "noConnectivity": {} } ] } 
接続プロファイルの管理
この手順では、ソースの Oracle データベース用と Cloud Storage 内の転送先のバケット用に作成した接続プロファイルを管理します。以下に例を示します。
- 移行先の Cloud Storage 接続プロファイルに関する情報を取得する
- この接続プロファイルを変更する。このチュートリアルでは、転送先の Cloud Storage バケットのフォルダを /root/tutorial に変更します。Datastream は、ソースのデータベースからこのフォルダにデータを転送します。
- ソースの Oracle 接続プロファイルで Discover API 呼び出しを実行する。
- 転送先の Cloud Storage 接続プロファイルに関する情報を取得します。プロンプトが表示されたら、次のコマンドを入力します。 - curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/ - DESTINATION_CONNECTION_PROFILE_ID 
- この接続プロファイルに関する情報が表示されることを確認します。 - { "name": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "gcsProfile": { "bucketName": "BUCKET_NAME", "rootPath": "FOLDER_PATH" }, "noConnectivity": {} } 
- この接続プロファイルを変更する。これを行うには、まず UPDATE 変数を設定します。この変数には、変更する接続プロファイルの値が含まれます。このチュートリアルでは、転送先のバケットのフォルダを /root/tutorial に変更します。 - 変数を設定するには、プロンプトで次のコマンドを入力します。 - UPDATE="{\"gcsProfile\":{\"rootPath\":\"/root/tutorial\"}}" 
- プロンプトが表示されたら、次のコマンドを入力します。 - curl -X PATCH -d $UPDATE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/ - DESTINATION_CONNECTION_PROFILE_ID?update_mask=gcsProfile.rootPath 
- 次のコードの行が表示されていることを確認します。 - { "name": "PROJECT_PATH/operations/operation-DESTINATION_CONNECTION_PROFILE_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID", "verb": "update", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false } 
- 接続プロファイルが変更されたことを確認します。プロンプトが表示されたら、次のコマンドを入力します。 - curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/ - DESTINATION_CONNECTION_PROFILE_ID 
- Cloud Storage 接続プロファイルの転送先バケットのフォルダが /root/tutorial になったことを確認します。 - { "name": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "gcsProfile": { "bucketName": "BUCKET_NAME", "rootPath": "/root/tutorial" }, "noConnectivity": {} } 
- Datastream の Discover API を使用して、ソースの Oracle データベースのスキーマとテーブルを検出します。Datastream は、ソースの接続プロファイルを介してこのデータベースへのアクセスを提供します。 - Oracle データベースのスキーマを検出します。プロンプトが表示されたら、次のコマンドを入力します。 - curl -X POST -d "{\"connection_profile_name\":\"projects/YOUR_PROJECT_NUMBER/locations/- YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID\", - \"oracle_rdbms\":{\"oracleSchemas\":[{\"schema\":\"ROOT\"}]}}" - -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles:discover 
- データベースのすべてのスキーマが Datastream によって取得されていることを確認します。 
- データベース内のスキーマのテーブルを取得します。このチュートリアルでは、Discover API を使用して ROOT スキーマのテーブルを取得します。ただし、データベース内の任意のスキーマのテーブルを検出できます。 - プロンプトが表示されたら、次のコマンドを入力します。 
 - curl -X POST -d "{\"connection_profile_name\":\"projects/YOUR_PROJECT_NUMBER/locations/- YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID\", - \"oracle_rdbms\":{\"oracleSchemas\":[{\"schema\":\"ROOT\"}]}}" - -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles:discover - 指定したスキーマ(このチュートリアルでは ROOT スキーマ)のすべてのテーブルが Datastream によって取得されていることを確認します。
 
ソースの Oracle データベース用と Cloud Storage 内の転送先バケット用の接続プロファイルを作成して管理できたので、Datastream でストリームの作成と管理を行う準備ができました。
ストリームを作成、管理する
このセクションでは、ストリームを作成して管理します。Datastream は、このストリームを使用して、データ、スキーマ、テーブルをソース データベースから移行先の Cloud Storage バケットのフォルダに転送します。
ストリームの作成と管理には以下が含まれます。
- ストリームを検証して、ストリームが正常に実行され、すべての検証チェックに合格することを確認する。次のチェック項目があります:- データストリームがソースからデータをストリーミングできるようにソースが適切に構成されているかどうか
- ストリームがソースと宛先の両方に接続できるかどうか
- ストリームのエンドツーエンド構成
 
- 次のリストを使用してストリームを作成します。- 許可リスト。このリストでは、Datastream が Cloud Storage の転送先バケットのフォルダに転送できるソース データベース内のテーブルとスキーマを指定します。このチュートリアルでは、/root/tutorial フォルダを指定します。
- 拒否リスト。このリストでは、Datastream が Cloud Storage の転送先バケットのフォルダへの転送を制限するソース データベース内のテーブルとスキーマを指定します。
 
- ストリームに関する情報の取得
- ストリームの変更
- Datastream が、ソースのデータベースから転送先 Cloud Storage バケット内のフォルダにデータ、スキーマ、テーブルを転送することができるようにストリームを開始する
- Fetch Errors API を使用して、ストリームに関連付けられているエラーを検出する
- ストリームを一時停止する。ストリームが一時停止されると、Datastream はソースのデータベースから転送先バケットに新しいデータを pull しなくなります。
- 一時停止したストリームを再開すると、Datastream は引き続き転送先バケットへのデータ転送を継続します。
ストリームの作成
この手順では、ソース Oracle データベースから転送先の Cloud Storage バケットのフォルダにストリームを作成します。作成するストリームには、許可リストと拒否リストの両方が含まれます。
- SCHEMAS 変数を設定します。この変数は、Datastream がソース データベースから取得して Cloud Storage の転送先バケットの /root/tutorial フォルダに転送するデータとテーブルを含むスキーマを定義します。このチュートリアルでは、SCHEMAS 変数を ROOT スキーマに関連付けるように設定します。 - プロンプトが表示されたら、次のコマンドを入力します。 - SCHEMAS="{\"oracleSchemas\":[{\"schema\":\"ROOT\"}]}" 
- プロンプトで echo $SCHEMAS | jq コマンドを入力すると、この変数に定義した ROOT スキーマが読みやすいテキストで表示されます。 
- ストリームの作成プロンプトが表示されたら、次のコマンドを入力します。 - STREAM="{\"display_name\":\"DISPLAY_NAME\",\"source_config\":{\"source_connection_profile_name\":\" - PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID" ,\"oracle_source_config\":- {\"allowlist\":$SCHEMAS,\"rejectlist\":{}}},\"destination_config\":{\"destination_connection_profile_name\" - :\"PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID\",\"gcs_destination_config\": - {\"file_rotation_mb\":5,\"file_rotation_interval\":{\"seconds\":15},\"avro_file_format\":{}}, - \"backfill_all\":{}}}" 
- プロンプトで - echo $STREAM | jqコマンドを入力して、作成したストリームを読みやすいテキストで表示します。- { "display_name": "DISPLAY_NAME", "source_config": { "source_connection_profile_name": "PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID", "oracle_source_config": { "allowlist": { "oracleSchemas": [ { "schema": "ROOT" } ] }, "rejectlist": {} } }, "destination_config": { "destination_connection_profile_name": "PROJECT_PATH/connectionProfiles/ - DESTINATION_CONNECTION_PROFILE_ID" , "gcs_destination_config": { "file_rotation_mb": 5, "file_rotation_interval": { "seconds": 15 }, "avro_file_format": {} } }, "backfill_all": {} }- 次の表は、ストリームの次のパラメータを理解するのに役立ちます。 - パラメータ - 説明 - allowlist - ソースのデータベースから Cloud Storage の転送先バケットのフォルダに転送されるスキーマ(テーブルとデータを含む)。このチュートリアルでは、ROOT スキーマのすべてのテーブルとデータ(このスキーマのみ)が、転送先バケットの /root/tutorial フォルダに転送されます。 - rejectlist - Cloud Storage の転送先バケットのフォルダに転送されないスキーマ(テーブルとデータを含む)。このチュートリアルでは、{} 値は、ソースのデータベースから転送先バケットに転送されないテーブルとデータが 1 つもないことを意味しています。 - file_rotation_mb - ソース データベースから Cloud Storage の転送先バケットのフォルダに転送されるデータを含むファイルのサイズ(MB 単位)。このチュートリアルでは、データはソース データベースから取得されているため、5 MB のファイルに書き込まれます。このサイズを超えるデータは、複数の 5 MB ファイルに分割されます。 - file_rotation_interval - Datastream が Cloud Storage の転送先バケットのフォルダ内の既存のファイルを閉じて、ソース データベースから転送されるデータを含む別のファイルを開くまでの秒数。このチュートリアルでは、ファイル ローテーション間隔を 15 秒に設定します。 - avro_file_format - Datastream がソースのデータベースから Cloud Storage の転送先バケットのフォルダに転送するファイルの形式。このチュートリアルでは、このファイル形式は Avro です。 - backfill_all - このパラメータは、過去のバックフィルに関連付けられます。このパラメータに空の辞書({})を設定すると、データストリームは次の情報のバックフィルを行います。 - ソース データベースから宛先への履歴データ(進行中のデータの変更を含む)
- ソースから宛先へのスキーマとテーブル
 
- ストリームを検証して、ストリームが正常に実行され、すべての検証チェックに合格することを確認します。プロンプトが表示されたら、次のコマンドを入力します。 - curl -X POST -d $STREAM -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - "https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams?stream_id= - STREAM_ID&validate_only=true" 
- {}コード行が表示されていることを確認します。これは、ストリームがすべての検証チェックに合格し、ストリームに関連するエラーがないことを示します。
- ストリームを送信して、ストリームを作成できるようにします。プロンプトで、次のコマンドを入力します。 - curl -X POST -d $STREAM -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams?stream_id=STREAM_ID 
- 次のコードの行が表示されていることを確認します。 - { "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/streams/STREAM_ID", "verb": "create", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false } 
- ストリームが作成されたことを確認します。プロンプトが表示されたら、次のコマンドを入力します。 - curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams 
- 作成したストリームの結果が返されることを確認します。 - { "streams": [ { "name": "PROJECT_PATH/streams/STREAM_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "sourceConfig": { "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION - /connectionProfiles/SOURCE_CONNECTION_PROFILE_ID" , "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schema": "ROOT" } ] }, "rejectlist": {} } }, "destinationConfig": { "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION- /connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID" , "gcsDestinationConfig": { "fileRotationMb": 5, "fileRotationInterval": "15s" "avroFileFormat": {} } }, "state": "CREATED", "backfillAll": {} } ] }
ストリームを管理する
この手順では、ソースの Oracle データベースから Cloud Storage 転送先バケットのフォルダにデータを転送するために作成したストリームを使用します。以下に例を示します。
- ストリームに関する情報の取得
- ストリームの変更
- ストリームを開始する
- Fetch Errors API を使用して、ストリームに関連付けられているエラーを検出する
- ストリームの一時停止と再開
- ストリームに関する情報を取得します。プロンプトが表示されたら、次のコマンドを入力します。 - curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID 
- このストリームに関する情報が表示されていることを確認します。 - { "name": "PROJECT_PATH/streams/STREAM_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "sourceConfig": { "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION - /connectionProfiles/SOURCE_CONNECTION_PROFILE_ID" , "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schema": "ROOT" } ] }, "rejectlist": {} } }, "destinationConfig": { "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION- /connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID" , "gcsDestinationConfig": { "fileRotationMb": 5, "fileRotationInterval": "15s" "avroFileFormat": {} } }, "state": "CREATED", "backfillAll": {} }
- このストリームを変更します。これを行うには、まず UPDATE 変数を設定します。この変数には、変更するストリームの値が含まれます。このチュートリアルでは、ソース データベースから Cloud Storage 転送先バケットのフォルダに転送されるデータを含むファイルのサイズ(MB 単位)を 5 MB から 100 バイトに変更します。データはソース データベースから取得されているため、100 MB のファイルに書き込まれます。このサイズを超えるデータは、複数の 100 MB ファイルに分割されます。 - 変数を設定するには、プロンプトで次のコマンドを入力します。 - UPDATE="{\"destination_config\":{\"gcs_destination_config\":{\"file_rotation_mb\":100}}}"
- プロンプトが表示されたら、次のコマンドを入力します。 - curl -X PATCH -d $UPDATE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID/ - ?update_mask=destination_config.gcs_destination_config.file_rotation_mb 
- 次のコードの行が表示されていることを確認します。 - { "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/streams/STREAM_ID", "verb": "update", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false } 
- ストリームが変更されたことを確認します。プロンプトが表示されたら、次のコマンドを入力します。 - curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID 
- Cloud Storage 接続プロファイルの fileRotationMb パラメータの値が、 - 100になっていることを確認します。- { "name": "PROJECT_PATH/streams/STREAM_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "sourceConfig": { "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION - /connectionProfiles/SOURCE_CONNECTION_PROFILE_ID" , "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schema": "ROOT" } ] }, "rejectlist": {} } }, "destinationConfig": { "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION- /connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID" , "gcsDestinationConfig": { "fileRotationMb": 100, "fileRotationInterval": "15s" "avroFileFormat": {} } }, "state": "CREATED", "backfillAll": {} }
- ストリームを開始する手順は次のとおりです。 - UPDATE変数を変更します。プロンプトが表示されたら、次のコマンドを入力します。- UPDATE="{\"state\":\"RUNNING\"}"
- 次のコマンドを入力します。 - curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID - ?updateMask=state 
 
- 次のコードの行が表示されていることを確認します。 - { "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/streams/STREAM_ID", "verb": "start", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false } 
- 数分後、ストリームに関する情報を取得して、ストリームが開始されたことを確認します。 - curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID 
- ストリームの状態が - CREATEDから- RUNNINGに戻ったことを確認します。- { "name": "PROJECT_PATH/streams/STREAM_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "sourceConfig": { "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION - /connectionProfiles/SOURCE_CONNECTION_PROFILE_ID" , "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schema": "ROOT" } ] }, "rejectlist": {} } }, "destinationConfig": { "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION- /connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID" , "gcsDestinationConfig": { "fileRotationMb": 100, "fileRotationInterval": "15s" "avroFileFormat": {} } }, "state": "RUNNING", "backfillAll": {} }
- Fetch Errors API を使用して、ストリームに関連付けられたエラーを取得します。 - プロンプトが表示されたら、次のコマンドを入力します。 - curl -X POST -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/ - STREAM_ID:fetchErrors 
- 次のコードの行が表示されていることを確認します。 - { "name": "PROJECT_PATH/operations/operation-FETCH_ERRORS_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/streams/STREAM_ID", "verb": "fetchErrors", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false } 
- プロンプトが表示されたら、次のコマンドを入力します。 - curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/operations/ - operation-FETCH_ERRORS_OPERATION_ID 
- 次のコードの行が表示されていることを確認します。 - { "name": "PROJECT_PATH/operations/operation-FETCH_ERRORS_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION - .OperationMetadata" , "createTime": "DATE_AND_TIME_STAMP", "endTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/streams/STREAM_ID", "verb": "fetchErrors", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": true, "response": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION- .FetchErrorsResponse" } }
 
- 配信を一時停止する手順は次のとおりです。 - UPDATE変数を変更します。プロンプトが表示されたら、次のコマンドを入力します。- UPDATE="{\"state\":\"PAUSED\"}"
- 次のコマンドを入力します。 - curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID - ?updateMask=state 
 
- 次のコードの行が表示されていることを確認します。 - { "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/streams/STREAM_ID", "verb": "start", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false } 
- ストリームに関する情報を取得して、ストリームが一時停止していることを確認します。 - curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID 
- ストリームの状態が - RUNNINGから- PAUSEDに戻ったことを確認します。- { "name": "PROJECT_PATH/streams/STREAM_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "sourceConfig": { "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION - /connectionProfiles/SOURCE_CONNECTION_PROFILE_ID" , "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schema": "ROOT" } ] }, "rejectlist": {} } }, "destinationConfig": { "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION- /connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID" , "gcsDestinationConfig": { "fileRotationMb": 100, "fileRotationInterval": "15s" "avroFileFormat": {} } }, "state": "PAUSED", "backfillAll": {} }
- 一時停止したストリームを再開します。手順は次のとおりです。 - UPDATE変数を変更します。プロンプトが表示されたら、次のコマンドを入力します。- UPDATE="{\"state\":\"RUNNING\"}"
- 次のコマンドを入力します。 - curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID - ?updateMask=state 
 
- 次のコードの行が表示されていることを確認します。 - { "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/streams/STREAM_ID", "verb": "start", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false } 
- 数秒後に、ストリームに関する情報を取得して、ストリームが再度動作していることを確認します。 - curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID 
- ストリームの状態が - PAUSEDから- RUNNINGに戻ったことを確認します。- { "name": "PROJECT_PATH/streams/STREAM_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "sourceConfig": { "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION - /connectionProfiles/SOURCE_CONNECTION_PROFILE_ID" , "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schema": "ROOT" } ] }, "rejectlist": {} } }, "destinationConfig": { "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION- /connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID" , "gcsDestinationConfig": { "fileRotationMb": 100, "fileRotationInterval": "15s" "avroFileFormat": {} } }, "state": "RUNNING", "backfillAll": {} }
ストリームを作成して管理し、ストリームに関連するエラーがなく、ストリームの状態が RUNNING であることを確認したので、ソースのデータベースから Cloud Storage の転送先バケット内のフォルダにデータを転送できることを確認する準備ができました。
ストリームを確認する
この手順では、Datastream で以下のことができることを確認します。
- ソースの Oracle データベースの ROOTスキーマに関連付けられているすべてのテーブルからのデータを、Cloud Storage の転送先バケットの/root/tutorialフォルダに転送します。
- データを Avro ファイル形式に変換します。
- Cloud Storage の [ストレージ ブラウザ] ページに移動します。 
- バケットを含むリンクをクリックします。 
- [オブジェクト] タブが有効になっていない場合は、クリックします。 
- root フォルダをクリックしてから、tutorial フォルダをクリックします。 
- ソースの Oracle データベースの - ROOTスキーマのテーブルを表すフォルダが表示されていることを確認します。
- いずれかのテーブル フォルダをクリックして、テーブルに関連付けられているデータを表示します。 
- データを示すファイルをクリックしてから、[ダウンロード] をクリックします。 
- このファイルを Avro ツール(Avro Viewer など)で開き、コンテンツが読めることを確認します。これにより、Datastream がデータを Avro ファイル形式に変換したことも確認できます。 
クリーンアップ
このチュートリアルが終了したら、Datastream で作成したリソースをクリーンアップして、今後料金が発生しないようにします。次のセクションで、このようなリソースを削除または無効にする方法を説明します。
プロジェクトの削除
課金されないようにする最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。
プロジェクトを削除するには:
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Cloud Storage の宛先バケットの削除
- Cloud Storage の左側のナビゲーション ドロワーで、[ブラウザ] 項目をクリックします。 
- バケットの左側にあるチェックボックスをオンにして、[削除] をクリックします。 
- [バケットを削除しますか?] ウィンドウで、テキスト フィールドにバケットの名前を入力して、[確認] をクリックします。 
ストリームの削除
- Cloud Shell アプリケーションがアクティブであることを確認します。 
- プロンプトが表示されたら、次のコマンドを入力します。 - curl -X DELETE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID 
- 次のコードの行が表示されていることを確認します。 - { "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/streams/STREAM_ID", "verb": "delete", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false } 
- ストリームが削除されたことを確認します。プロンプトが表示されたら、次のコマンドを入力します。 - curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams 
- null - {}値が返されることを確認します。これは、Datastream にストリームがなくなり、作成したストリームが削除されたことを示します。
接続プロファイルを削除する
- ソース Oracle データベースへの接続プロファイルを削除します。プロンプトが表示されたら、次のコマンドを入力します。 - curl -X DELETE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/ - SOURCE_CONNECTION_PROFILE_ID 
- 次のコードの行が表示されていることを確認します。 - { "name": "PROJECT_PATH/operations/operation-SOURCE_CONNECTION_PROFILE_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID", "verb": "delete", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false } 
- Cloud Storage の送信先バケットへの接続プロファイルを削除します。プロンプトが表示されたら、次のコマンドを入力します。 - curl -X DELETE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/ - DESTINATION_CONNECTION_PROFILE_ID 
- 次のコードの行が表示されていることを確認します。 - { "name": "PROJECT_PATH/operations/operation-DESTINATION_CONNECTION_PROFILE_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID", "verb": "delete", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false } 
- 両方の接続プロファイルが削除されていることを確認します。プロンプトが表示されたら、次のコマンドを入力します。 - curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" - https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles 
- null - {}値が返されることを確認します。これは、Datastream に接続プロファイルがなくなり、作成したプロファイルが削除されたことを示します。
次のステップ
- Datastream の詳細を確認する。
- その他の Google Cloud 機能を試す。チュートリアルをご覧ください。