BigQuery で Oracle データからリアルタイムの分析情報を抽出する

Google Cloud Japan Team
※この投稿は米国時間 2022 年 5 月 20 日に、Google Cloud blog に投稿されたものの抄訳です。
リレーショナル データベースはトランザクションの処理には優れていますが、分析を大規模に実行するようには設計されていません。データ エンジニアやデータ アナリストであれば、業務データをリアルタイムでデータ ウェアハウスに継続的に複製し、データドリブンのビジネス上の意思決定をタイムリーに行いたいと考えるかもしれません。
このブログでは、Oracle データベースから Google Cloud の BigQuery に業務データを複製して処理することで、複数のシステムを同期させ、一括読み込みによる更新や不便なバッチ ウィンドウの必要性を排除する方法について、ステップごとのチュートリアルを提供します。


上の図に示されているオペレーション フローは次のとおりです。
Oracle のソースから受信したデータは、Datastream を通じて Cloud Storage にキャプチャされ、複製される。
このデータを Dataflow テンプレートによって処理し、拡充し、BigQuery に送り、分析し、可視化する。
Google では、Oracle ワークロードのライセンスを提供していません。Google Cloud 上で実行することを選択した Oracle ワークロードのライセンスを調達と、これらのライセンスの条件の遵守は、お客様の責任で行っていただく必要があります。
費用
このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
このチュートリアルを終了した後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。
はじめに
1. Google Cloud コンソールの [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
注: この手順で作成するリソースをそのまま保持する予定でない場合、既存のプロジェクトを選択するのではなく、新しいプロジェクトを作成してください。チュートリアルの終了後にそのプロジェクトを削除すれば、プロジェクトに関連するすべてのリソースを削除できます。
2. Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
3. Compute Engine、Datastream、Dataflow および Pub/Sub API を有効にします。
4. また、プロジェクト オーナーまたはエディターのロールが付与されている必要があります。
ステップ 1: 環境を準備する
1. Cloud Shell で、以下の環境変数を定義します。
以下を置き換えます。
YOUR_PROJECT_NAME: プロジェクト名
YOUR_PROJECT_ID: プロジェクト ID
YOUR_PROJECT_NUMBER: プロジェクト番号
2. 次のように入力します。
3. このチュートリアルで使用するスクリプトやユーティリティを含む、GitHub のチュートリアル リポジトリのクローンを作成します。
4. Oracle に読み込むサンプル トランザクションを含んだカンマ区切りファイルを抽出します。
5. 以下のようにして、Compute Engine にサンプルの Oracle XE 11g の Docker インスタンスを作成します。
a.Cloud Shell で、ディレクトリを build_docker に変更します。
b.以下の build_orcl.sh script を実行します。
以下を置き換えます。
YOUR_PROJECT_ID: クラウド プロジェクト ID
GCP_ZONE: Compute インスタンスが作成されるゾーン
GCP_NETWORK_NAME= VM とファイアウォールのエントリを作成するネットワーク名
GCP_SUBNET_NAME= VM とファイアウォールのエントリを作成するネットワーク サブネット
Y or N= FastFresh スキーマと ORDERS テーブルを作成するかどうかの選択(Y または N)。このチュートリアルでは、Y を使用します。
Y or N= Oracle データベースを Datastream 用に構成するかどうかの選択(Y または N)。このチュートリアルでは、Y を使用します。
このスクリプトは次のことを行います。
Google Cloud Compute のインスタンスを新規作成する。
Oracle 11g XE の docker コンテナを構成する。
FastFresh スキーマと Datastream の前提条件をプリロードする。
スクリプトの実行後、build_orcl.sh スクリプトは、接続の詳細と認証情報 (DB Host、DB Port、SID) の要約を表示。このチュートリアルの後半で使用するため、これらの詳細のコピーを作成します。
スクリプトの実行後、build_orcl.sh スクリプトは、接続の詳細と認証情報(DB Host、DB Port、SID)の要約を表示します。このチュートリアルの後半で使用するため、これらの詳細のコピーを作成します。
6. 複製したデータを保存するための Cloud Storage バケットを作成します。
バケット名は後のステップで使用するため、コピーを作成します。
7. オブジェクトの変更に関する通知を Pub/Sub トピックに送信するようにバケットを構成します。この構成は、Dataflow テンプレートで必要となります。手順は次のとおりです。
a.oracle_retail という新しいトピックを作成します。
b.oracle_retail トピックに送信されるメッセージを受信する Pub/Sub サブスクリプションを作成します。
8. retail という名前の BigQuery データセットを作成します。
9. Compute Engine のサービス アカウントに BigQuery の Admin ロールを割り当てます。
ステップ 2: Datastream で Oracle のデータを Google Cloud に複製する
Datastream は、MySQL や Oracle などのソースから Google Cloud データベースやストレージ ソリューションへのデータ同期をサポートしています。
このセクションでは、Datastream を使用して Oracle FastFresh スキーマをバックフィルし、Oracle データベースから Cloud Storage にリアルタイムで更新を複製する方法を説明します。
ストリームの作成
1. Cloud コンソールで、Datastream に移動して [ストリームの作成] をクリックします。フォームが表示されます。次のようにフォームに入力し、[続行] をクリックします。
ストリーム名: oracle-cdc
ストリーム ID: oracle-cdc
ソースタイプ: Oracle
宛先の種類: Cloud Storage
他のすべてのフィールド: デフォルト値を保持する
2. [ソースの定義とテスト] セクションで、[新しい接続の作成] プロファイルを選択します。フォームが表示されます。次のようにフォームに入力し、[続行] をクリックします。
接続プロファイル名: orcl-retail-source
接続プロファイル ID: orcl-retail-source
ホスト名: <db_host>
ポート: 1521
ユーザー名: datastream
パスワード: tutorial_datastream
システム ID(SID): XE
接続方法: IP 許可リストを選択
3. [テストを実行] をクリックして、移行元データベースと Datastream が相互に通信できることを確認します。その後、[作成して続行] をクリックします。
複製するオブジェクト、特定のスキーマ、テーブル、列を定義し、包含または除外する「含めるオブジェクトを選択」ページが表示されます。
テストが失敗した場合は、フォームのパラメータに必要な変更を加えてから、再度テストを実行します。
4. 以下のイメージにあるように、FastFresh > Orders を選択します。

5. 既存のレコードを読み込むには、バックフィル モードを自動に設定し、[続行] をクリックします。
6. [宛先の定義] セクションで [新しい接続プロファイルの作成] を選択します。フォームが表示されます。以下のようにフォームに記入し、[作成して続行] をクリックします。
接続プロファイル名: oracle-retail-gcs
接続プロファイル ID: oracle-retail-gcs
バケット名: 「環境の準備」セクションで作成したバケット名。
7. ストリームのパス接頭辞は空白にしておき、出力フォーマットに、JSON を選択します。[続行] をクリックします。
8. 新しい接続プロファイルを作成ページで、[検証を実行] をクリックし、[作成] をクリックします。
出力は次のようになります。


ステップ 3: Datastream to BigQuery テンプレートを使用して Dataflow ジョブを作成する
このセクションでは、Dataflow の Datastream to BigQuery ストリーミング テンプレートをデプロイし、Datastream で取得した変更を BigQuery に複製します。
また、UDF を作成して使用することで、このテンプレートの機能を拡張します。
受信データを処理するための UDF を作成する
バックフィルされたデータと新しい受信データの両方に対して、以下の処理を実行する UDF を作成します。
お客様の支払方法などの機密情報を削除する。
データ系列と検出の目的で、Oracle ソーステーブルを BigQuery に追加する。
このロジックは、Datastream が生成する JSON ファイルを入力パラメータとする JavaScript ファイルにキャプチャされます。
1. Cloud Shell セッションで、以下のコードをコピーし、retail_transform.js という名前のファイルに保存します。
2. retail_transform.js ファイルを格納する Cloud Storage バケットを作成し、新しく作成したバケットに JavaScript ファイルをアップロードします。
Dataflow ジョブを作成する
1. Cloud Shell で、Dataflow が使用するデッドレター キュー(DLQ)バケットを作成します。
2. Dataflow 実行用のサービス アカウントを作成し、そのアカウントに Dataflow ワーカー、Dataflow 管理者、Pub/Sub 管理者、BigQuery データ編集者、BigQuery ジョブユーザー、Datastream 管理者、Storage 管理者を割り当てます。
3. 下りファイアウォール ルールを作成して、自動スケーリングが有効になっている場合に Dataflow VM が TCP ポート 12345 と 12346 でネットワーク トラフィックと通信、送信、受信を行うようにします。
4. Dataflow ジョブを作成して実行します。
Dataflow コンソールを確認し、新しいストリーミング ジョブが開始されたことを確認します。
5. Cloud Shell で、以下のコマンドを実行し、Datastream のストリームを開始します。
6. Datastream ストリームのステータスを確認します。
状態が「実行中」と表示されていることを確認します。新しい状態の値が反映されるまで、数秒かかる場合があります。
Datastream コンソールをチェックして、ORDERS テーブルのバックフィルの進捗を確認します。
出力は次のようになります。


このタスクは初期ロードであるため、Datastream は ORDERS オブジェクトから読み取ります。ストリーム作成時に指定した Cloud Storage バケットにある JSON ファイルに全レコードを書き込みます。バックフィル作業には 10 分程度かかります。
最終ステップ: BigQuery でデータを分析する
数分後、バックフィルされたデータが BigQuery に複製されます。新着データは、ほぼリアルタイムでデータセットにストリーミングされます。各レコードは、Dataflow テンプレートの一部として定義した UDF ロジックで処理されます。
Dataflow のジョブにより、データセット内に以下の 2 つのテーブルが新たに作成されます。
ORDERS: この出力テーブルは Oracle テーブルのレプリカで、Dataflow テンプレートの一部としてデータに適用された変換を含む。
ORDERS_log: このステージング テーブルには、Oracle ソースからのすべての変更が記録される。テーブルはパーティション分割され、更新されたレコードと、その変更が更新、挿入、削除のいずれなのかというメタデータの変更情報が格納される。
BigQuery を使えば、業務データをリアルタイムに見ることができます。また、特定の商品の売上を店舗間でリアルタイムに比較する、または、売上と顧客データを組み合わせて特定の店舗での顧客の消費傾向を分析するなどのクエリを実行できます。
業務データに対してクエリを実行する
1. BigQuery で、以下の SQL を実行し、売れ筋上位の 3 商品をクエリします。
出力は次のようになります。


2. BigQuery で、以下の SQL ステートメントを実行し、ORDERS テーブルと ORDERS_log テーブルの両方の行数のクエリを実行します。
バックフィルが完了し、最後のステートメントで 520217 という数字が返ってくるはずです。
これで、Oracle データの BigQuery への変更データ取り込みがリアルタイムで完了しました。
クリーンアップ
このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。プロジェクトを削除する場合
Cloud コンソールで、[リソースの管理] ページに移動します。
プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。
今後の予定
この基礎の上にさらに、将来の需要をどのように予測するか、そして予測データをどのように可視化するか、次のチュートリアル「Datastream、Dataflow、BigQuery ML、Looker を使用して需要予測を構築、可視化する」でご確認ください。
- Google カスタマー エンジニア(大のデータ好き) Carlos Augusto
- プロダクト マーケティング マネージャー Frank Guan


