分析のために Datastream と Dataflow を実装する

Datastream

Datastream では、Oracle、MySQL、PostgreSQL データベースから BigQuery データセットへの直接のデータのストリーミングをサポートしています。ただし、データ変換や論理主キーの手動設定など、ストリーム処理ロジックをより細かく制御する必要がある場合は、Datastream を Dataflow ジョブ テンプレートと統合できます。

このチュートリアルでは、Dataflow ジョブ テンプレートを使用して最新のマテリアライズド ビューを BigQuery にストリーミングして分析することで、Datastream を Dataflow と統合する方法について説明します。

多くの分離されたデータソースを持つ組織では、組織全体の企業データへのアクセスは、特にリアルタイムで制限され、速度が低下する可能性があります。これにより、組織の内省力が制限されます。

Datastream は、オンプレミスやクラウドベースのさまざまなデータソースからの変更データに、ほぼリアルタイムでアクセスできます。Datastream により、ストリーミング データの多くの構成を行う必要のないセットアップ エクスペリエンスが提供されます。これは、Datastream によって自動的に行われます。Datastream には、統合された最新の API を提供するため、組織全体で最新のエンタープライズ データにアクセスできるようになっています。

そのようなシナリオの 1 つは、ソース データベースからクラウドベースのストレージ サービスまたはメッセージング キューにデータを転送する方法です。Datastream がデータをストリーミングすると、データは他のアプリケーションやサービスが読み取ることができる形式に変換されます。このチュートリアルでは、Dataflow はストレージ サービスまたはメッセージング キューと通信して Google Cloud でデータをキャプチャし、処理するウェブサービスです。

Datastream を使用して、変更(挿入、更新、削除されたデータ)をソースの MySQL データベースから Cloud Storage バケット内のフォルダにストリーミングする方法を学習します。次に、Datastream がソース データベースからストリーミングするデータ変更を含む新しいファイルについて学習するために使用される通知を送信するように、Cloud Storage バケットを構成します。Dataflow ジョブがファイルを処理し、変更を BigQuery に転送します。

統合ユーザーのフロー図

目標

このチュートリアルの内容は次のとおりです。

  • Cloud Storage にバケットを作成します。これは、Datastream がソース MySQL データベースからスキーマ、テーブル、データをストリーミングする宛先バケットです。
  • Cloud Storage バケットの Pub/Sub 通知を有効にします。これを行うことで、処理の準備ができている新しいファイルについて Dataflow が使用する通知を送信するようにバケットを構成します。これらのファイルには、Datastream がソース データベースからバケットにストリーミングするデータへの変更が含まれています。
  • BigQuery でデータセットを作成します。 BigQuery では、Dataflow から受信したデータを含むデータセットを使用します。このデータは、Datastream が Cloud Storage バケットにストリーミングするソース データベースの変更を表します。
  • Cloud Storage のソースのデータベースと送信先バケットの接続プロファイルを作成および管理します。Datastream のストリームは、接続プロファイルの情報を使用して、ソースのデータベースからバケットにデータを転送します。
  • ストリームを作成して開始する。このストリームでは、ソースのデータベースからバケット、データ、スキーマ、テーブルを転送します。
  • Datastream が、ソース データベースのスキーマに関連付けられているデータとテーブルをバケットに転送していることを確認します。
  • Dataflow でジョブを作成します。Datastream がソース データベースから Cloud Storage バケットにデータ変更をストリーミングすると、変更を含む新しいファイルに関する通知が Dataflow に送信されます。Dataflow ジョブがファイルを処理し、変更を BigQuery に転送します。
  • Dataflow がこのデータに関連付けられた変更を含むファイルを処理し、変更を BigQuery に転送していることを確認します。この結果、Datastream と BigQuery がエンドツーエンドで統合されます。
  • Datastream、Cloud Storage、Pub/Sub、Dataflow、BigQuery で作成したリソースをクリーンアップして、今後に割り当ての消費や課金が発生しないようにします。

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

  • Datastream
  • Cloud Storage
  • Pub/Sub
  • Dataflow
  • BigQuery

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Google Cloud プロジェクトで課金が有効になっていることを確認します

  4. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  5. Google Cloud プロジェクトで課金が有効になっていることを確認します

  6. Datastream API を有効にします。

    API を有効にする

  7. ユーザー アカウントに Datastream 管理者のロールが割り当てられていることを確認します。

    IAM ページに移動

  8. Datastream がアクセスできるソース MySQL データベースがあることを確認します。また、データベースにデータ、テーブル、スキーマがあることを確認します。
  9. Datastream パブリック IP アドレスからの受信接続を許可するように MySQL データベースを構成します。Datastream リージョンと、それらに関連付けられたパブリック IP アドレスの一覧については、IP 許可リストとリージョンをご覧ください。
  10. 移行元データベースの変更データ キャプチャ(CDC)を設定します。詳細については、ソース MySQL データベースの構成をご覧ください。
  11. Cloud Storage の Pub/Sub 通知を有効にするためのすべての前提条件を満たしていることを確認します。

    このチュートリアルでは、Cloud Storage に宛先バケットを作成し、そのバケットの Pub/Sub 通知を有効にします。これにより、Dataflow は、Datastream がバケットに書き込む新しいファイルに関する通知を受信できます。これらのファイルには、Datastream がソース データベースからバケットにストリーミングするデータへの変更が含まれています。

要件

Datastream には、さまざまな移行元オプション移行先オプションネットワーク接続方法が用意されています。

このチュートリアルでは、スタンドアロンの MySQL データベースと宛先の Cloud Storage サービスを使用していることを前提としています。移行元データベースでは、受信ファイアウォール ルールを追加できるようにネットワークを構成する必要があります。移行元データベースは、オンプレミスまたはクラウド プロバイダにできます。Cloud Storage のエクスポート先の場合は、接続構成は必要ありません。

ユーザーの具体的な環境を把握できないため、ネットワーク構成に関する詳細なステップは提供できません。

このチュートリアルでは、ネットワーク接続方法として [IP 許可リスト] を選択します。IP 許可リストは、ソース データベースのデータへのアクセスを信頼できるユーザーのみに制限し、アクセスを制御する際によく使われるセキュリティ機能です。IP 許可リストを使用して、ユーザーと、Datastream などの他の Google Cloud サービスがこのデータにアクセスできる、信頼できる IP アドレスまたは IP 範囲のリストを作成できます。IP 許可リストを使用するには、データストリームからの受信接続に対してソース データベースまたはファイアウォールを開く必要があります。

Cloud Storage にバケットを作成します。

Datastream がソース MySQL データベースからスキーマ、テーブル、データをストリーミングする宛先バケットを Cloud Storage に作成します。

  1. Google Cloud コンソールで、Cloud Storage の [ブラウザ] ページに移動します。

    ブラウザ ページに移動

  2. [バケットを作成] をクリックします。[バケットの作成] ページが表示されます。

  3. [バケットに名前を付ける] 領域のテキスト フィールドに「my-integration-bucket」と入力し、[続行] をクリックします。

  4. ページの残りの各領域については、デフォルト設定を受け入れます。各リージョンの最後にある [続行] をクリックします。

  5. [作成] をクリックします。

Cloud Storage バケットの Pub/Sub 通知を有効にする

このセクションでは、作成した Cloud Storage バケットに対して Pub/Sub 通知を有効にします。これにより、Datastream がバケットに書き込む新しいファイルを Dataflow に通知するようにバケットを構成できます。これらのファイルには、Datastream がソース MySQL データベースからバケットにストリーミングするデータへの変更が含まれています。

  1. 作成した Cloud Storage バケットにアクセスします。[バケットの詳細] ページが表示されます。

  2. 「Cloud Shell をアクティブにする」をクリックします。

  3. プロンプトが表示されたら、次のコマンドを入力します。

    gsutil notification create -t my_integration_notifs -f json -p integration/tutorial/ gs://my-integration-bucket

  4. [Cloud Shell の承認] ウィンドウが表示されたら、[承認] をクリックします。

  5. 次のコード行を確認します。

    Created Cloud Pub/Sub topic projects/project-name/topics/my_integration_notifs
    Created notification config projects/_/buckets/my-integration-bucket/notificationConfigs/1
    
  6. Google Cloud コンソールで、Pub/Sub の [トピック] ページに移動します。

    [トピック] ページに移動

  7. 作成した my_integration_notifs トピックをクリックします。

  8. my_integration_notifs ページで、ページの一番下までスクロールします。[サブスクリプション] タブがアクティブで、[表示するサブスクリプションがありません] というメッセージが表示されます。

  9. [サブスクリプションを作成] をクリックします。

  10. 表示されるメニューで [サブスクリプションを作成] を選択します。

  11. [サブスクリプションをトピックに追加] ページで次の情報を入力します。

    1. [サブスクリプション ID] フィールドに「my_integration_notifs_sub」と入力します。
    2. [確認応答期限] の値を 120 秒に設定します。これにより、Dataflow は処理されたファイルの確認に十分な時間を確保し、Dataflow ジョブの全体的なパフォーマンスを向上できます。Pub/Sub サブスクリプション プロパティの詳細については、サブスクリプション プロパティをご覧ください。
    3. 他のデフォルト値はすべてそのままにします。
    4. [作成] をクリックします。

このチュートリアルの後半で、Dataflow ジョブを作成します。このジョブの一部として、Dataflow を my_integration_notifs_sub サブスクリプションのサブスクライバーに割り当てます。これにより、Dataflow は、Datastream が Cloud Storage に書き込む新しいファイルに関する通知を受信し、ファイルを処理してデータの変更を BigQuery に転送できます。

BigQuery でデータセットを作成します

このセクションでは、BigQuery でデータセットを作成します。BigQuery では、Dataflow から受信したデータを含むデータセットを使用します。このデータは、Datastream が Cloud Storage バケットにストリーミングするソース MySQL データベースの変更を表します。

  1. Google Cloud コンソールで BigQuery の [SQL ワークスペース] ページに移動します。

    [SQL ワークスペース] ページに移動

  2. [エクスプローラ] ペインで、Google Cloud プロジェクト名の横にある [アクションを表示] をクリックします。

  3. 表示されたメニューで、[データセットを作成] を選択します。

  4. [データセットを作成] ウィンドウで、次の操作を行います。

    1. [データセット ID] フィールドに、データセットの ID を入力します。このチュートリアルでは、フィールドに「My_integration_dataset_log」と入力します。
    2. 他のデフォルト値はそべてそのままにします。
    3. [データセットを作成] をクリックします。
  5. [エクスプローラ] ペインで、Google Cloud プロジェクト名の横にある [ノードを開く] をクリックし、作成したデータセットがあることを確認します。

  6. この手順に沿って、2 番目のデータセット My_integration_dataset_final を作成します。

  7. 各データセットの横にある [ ノードを展開] を展開します。

  8. 各データセットが空であることを確認します。

Datastream がソース データベースから Cloud Storage バケットにデータ変更をストリーミングした後、Dataflow ジョブは変更を含むファイルを処理し、変更を BigQuery データセットに転送します。

Datastream で接続プロファイルを作成する

このセクションでは、Datastream で移行元データベースと移行先用の接続プロファイルを作成します。接続プロファイルの作成の一環として、移行元の接続プロファイルのプロファイル タイプとして MySQL を選択し、移行先接続プロファイルのプロファイル タイプとして Cloud Storage を選択します。

Datastream は、接続プロファイルで定義された情報を使用して、移行元と移行先の両方に接続し、移行元データベースから Cloud Storage の移行先バケットにデータをストリーミングできるようにします。

MySQL データベースのソース接続プロファイルの作成

  1. Google Cloud コンソールで、Datastream の [接続プロファイル] ページに移動します。

    [接続プロファイル] ページに移動

  2. [プロファイルの作成] をクリックします。

  3. MySQL データベースのソース接続プロファイルを作成するには、[接続プロファイルの作成] ページで [MySQL] プロファイル タイプをクリックします。

  4. [MySQL プロファイルの作成] ページの [接続設定の定義] セクションで、次の情報を入力します。

    • [接続プロファイルの名前] フィールドに「My Source Connection Profile」と入力します。
    • 自動生成された接続プロファイル ID を保持します。
    • 接続プロファイルを保存する [リージョン] を選択します。

    • 接続の詳細を入力します。

      • [ホスト名または IP] フィールドに、Datastream がソース データベースへの接続に使用できるホスト名またはパブリック IP アドレスを入力します。このチュートリアルでは、ネットワーク接続方法として IP 許可リストを使用しているため、パブリック IP アドレスを指定します。
      • [ポート] フィールドに、ソース データベース用に予約されているポート番号を入力します。MySQL データベースの場合、デフォルト ポートは通常 3306 です。
      • ソース データベースへの認証用に、ユーザー名パスワードを入力します。
  5. [接続設定の定義] セクションで、[続行] をクリックします。[MySQL プロファイルの作成] ページの [ソースへの接続を保護する] セクションはアクティブです。

  6. [暗号化のタイプ] メニューから [なし] を選択します。このメニューの詳細については、MySQL データベースの接続プロファイルの作成をご覧ください。

  7. [ソースへの接続を保護する] セクションで、[続行] をクリックします。[MySQL プロファイルの作成] ページの [接続方法の定義] セクションはアクティブです。

  8. [接続方法] プルダウンで、Datastream とソース データベースの間の接続を確立するために使用するネットワーク方式を選択します。このチュートリアルでは、接続方法として [IP 許可リスト] を選択します。

  9. 表示される Datastream パブリック IP アドレスからの受信接続を許可するように移行元データベースを構成します。

  10. [接続方法の定義] セクションで [続行] をクリックします。[MySQL プロファイルの作成] ページの [テスト接続プロファイル] セクションはアクティブです。

  11. [テストを実行] をクリックして、移行元データベースと Datastream が相互に通信できることを確認します。

  12. [テストに合格] ステータスが表示されていることを確認します。

  13. [作成] をクリックします。

Cloud Storage の宛先接続プロファイルの作成

  1. Google Cloud コンソールで、Datastream の [接続プロファイル] ページに移動します。

    [接続プロファイル] ページに移動

  2. [プロファイルの作成] をクリックします。

  3. Cloud Storage の移行先接続プロファイルを作成するには、[接続プロファイルの作成] ページで、[Cloud Storage] プロファイル タイプをクリックします。

  4. [Create Cloud Storage profile] ページで、次の情報を入力します。

    • [接続プロファイルの名前] フィールドに「My Destination Connection Profile」と入力します。
    • 自動生成された接続プロファイル ID を保持します。
    • 接続プロファイルを保存する [リージョン] を選択します。
    • In the接続の詳細ペインをクリックします。参照選択してください。統合バケット分配このチュートリアルの前半で作成した。これは、Datastream がソース データベースからデータを転送するバケットです。選択したら、[選択] をクリックします。

      バケットが、[接続の詳細] ペインの [バケット名] フィールドに表示されます。

    • [接続プロファイルのパス接頭辞] フィールドに、Datastream が宛先にデータをストリーミングするときにバケット名に追加するパスの接頭辞を指定します。Datastream がバケットのルートフォルダではなく、バケット内のパスにデータを書き込んでいることを確認します。このチュートリアルでは、Pub/Sub 通知の構成時に定義したパスを使用します。欄に「/integration/tutorial」と入力します。

  5. [作成] をクリックします。

MySQL データベースのソース接続プロファイルと Cloud Storage の移行先接続プロファイルを作成した後、それらを使用してストリームを作成できます。

Datastream でストリームを作成する

このセクションでは、ストリームを作成します。このストリームは、接続プロファイルの情報を使用して、ソース MySQL データベースから Cloud Storage の宛先バケットにデータを転送します。

ストリームの設定の定義

  1. Google Cloud コンソールで、Datastream の [ストリーム] ページに移動します。

    [ストリーム] ページに移動

  2. [ストリームを作成] をクリックします。

  3. [ストリームの作成] ページの [ストリームの詳細の定義] パネルで、次の情報を指定します。

    • [ストリーム名] フィールドに「My Stream」と入力します。
    • 自動生成されたストリーム ID を保持します。
    • [リージョン] メニューから、移行元と移行先の接続プロファイルを作成したリージョンを選択します。
    • [ソースタイプ] メニューから [MySQL] プロファイル タイプを選択します。
    • [宛先の種類] メニューから、[Cloud Storage] プロファイル タイプを選択します。
  4. ストリームに環境を準備する方法が反映されるように、自動的に生成される必須の前提条件を確認します。これらの前提条件には、移行元データベースの構成方法や Cloud Storage の移行先バケットに Datastream を接続する方法が含まれます。

  5. [続行] をクリックします。[ストリームの作成] ページの [Define MySQL connection profile] パネルが表示されます。

ソース接続プロファイルに関する情報の指定

このセクションでは、移行元データベース用に作成した接続プロファイル(ソース接続プロファイル)を選択します。このチュートリアルでは、My Source Connection Profile です。

  1. [ソース接続プロファイル] メニューから、MySQL データベースのソース接続プロファイルを選択します。

  2. [テストを実行] をクリックして、移行元データベースと Datastream が相互に通信できることを確認します。

    テストに失敗した場合、接続プロファイルに関連する問題が表示されます。トラブルシューティングの手順については、問題の診断ページをご覧ください。必要な変更を行って問題を修正し、再度テストを行います。

  3. [続行] をクリックします。[ストリームの作成] ページの [ストリームのソースの構成] パネルが表示されます。

ストリームのソース データベースに関する情報の構成

このセクションでは、Datastream の移行元データベースでテーブルとスキーマを指定して、ストリームのソース データベースに関する情報を構成します。

  • 宛先への転送ができる。
  • 宛先への転送が制限されている。

また、Datastream が過去のデータをバックフィルするのか、進行中の変更を宛先にストリーミングするのか、データへの変更のみをストリーミングするのかも決定します。

  1. [含めるオブジェクト] メニューを使用して、Datastream が Cloud Storage の宛先バケットのフォルダに転送できるソース データベース内のテーブルとスキーマを指定します。 メニューが読み込まれるのは、データベースに 5,000 個のオブジェクトがある場合のみです。

    このチュートリアルでは、Datastream ですべてのテーブルとスキーマを転送します。そのため、メニューから [すべてのテーブル] を選択します。

  2. この除外するオブジェクトを選択パネルがなし。Datastream によるソース データベースのテーブルやスキーマの Cloud Storage への転送は制限しません。

  3. この履歴データのバックフィル モードを選択する パネルが自動。 Datastream は、データへの変更に加えて、既存のデータをすべて移行先にストリーミングします。

  4. [続行] をクリックします。[ストリームの作成] ページの [Define Cloud Storage connection profile] パネルが表示されます。

宛先接続プロファイルの選択

このセクションでは、Cloud Storage 用に作成した接続プロファイル(移行先の接続プロファイル)を選択します。このチュートリアルでは、My Destination Connection Profile です。

  1. [宛先接続プロファイル] メニューから、Cloud Storage の宛先接続プロファイルを選択します。

  2. [続行] をクリックします。[ストリームの作成] ページの [ストリームの移行先の構成] パネルが表示されます。

ストリームの転送先に関する情報の構成

このセクションでは、ストリームの移行先バケットに関する情報を構成します。これには以下の情報が含まれます。

  • Cloud Storage に書き込まれるファイルの出力形式。
  • Datastream がソース データベースからスキーマ、テーブル、データを転送する宛先バケットのフォルダ。
  1. [出力形式] フィールドで、Cloud Storage に書き込まれるファイルの形式を選択します。Datastream は現在、Avro と JSON の 2 つの出力形式をサポートしています。 このチュートリアルでは、このファイル形式は Avro です。

  2. [続行] をクリックします。[ストリームの作成] ページの [ストリームの詳細の確認と作成] パネルが表示されます。

ストリームの作成

  1. ストリームの詳細と、ストリームがソース MySQL データベースから Cloud Storage の宛先バケットへのデータ転送に使用するソース接続と宛先接続のプロファイルを確認します。

  2. ストリームを検証するには、[検証を実行] をクリックします。ストリームを検証すると、Datastream は移行元が適切に構成されていることを確認し、ストリームが移行元と移行先の両方に接続できること、ストリームのエンドツーエンド構成を検証します。

  3. すべての検証チェックに合格したら、[作成] をクリックします。

  4. [ストリームを作成しますか?] ダイアログ ボックスで、[作成] をクリックします。

ストリームの開始

このチュートリアルでは、移行元データベースの負荷が増大した場合にストリームを個別に作成して開始します。負荷を軽減するには、ストリームを開始せずに作成し、データベースが負荷を処理できるときにストリームを開始します。

ストリームを開始すると、Datastream は移行元データベースから移行先にデータ、スキーマ、テーブルを転送できます。

  1. Google Cloud コンソールで、Datastream の [ストリーム] ページに移動します。

    [ストリーム] ページに移動

  2. 開始するストリームの横にあるチェックボックスをオンにします。このチュートリアルでは、My Stream を使用します。

  3. [開始] をクリックします。

  4. ダイアログで [開始] をクリックします。ストリームのステータスが Not started から Starting、そして Running に変わります。

ストリームを開始すると、Datastream が移行元データベースから移行先にデータを転送することを確認できます。

ストリームの検証

このセクションでは、Datastream がソース MySQL データベースのすべてのテーブルから、Cloud Storage の宛先バケットの /integration/tutorial フォルダにデータを転送することを確認します。このチュートリアルでは、バケットの名前は my-integration-bucket です。

  1. Google Cloud コンソールで、Datastream の [ストリーム] ページに移動します。

    [ストリーム] ページに移動

  2. 作成したストリームをクリックします。このチュートリアルでは、My Stream を使用します。

  3. In theストリームの詳細ページで、my-integration-bucket/integration/tutorialリンク。このリンクは [Destination write path] フィールドの後に表示されます。Cloud Storage の [バケットの詳細] ページが別のタブで開きます。

  4. ソース データベースのテーブルを表すフォルダが表示されていることを確認します。

  5. いずれかのテーブル フォルダをクリックしてから、テーブルに関連付けられたデータが表示されるまで各サブフォルダをクリックします。

Dataflow ジョブを作成する

このセクションでは、Dataflow でジョブを作成します。Datastream がソースの MySQL データベースから Cloud Storage バケットにデータ変更をストリーミングした後、Pub/Sub は変更を含む新しいファイルに関する通知を Dataflow に送信します。Dataflow ジョブがファイルを処理し、変更を BigQuery に転送します。

  1. Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。

    [ジョブ] ページに移動

  2. [テンプレートからジョブを作成] をクリックします。

  3. [テンプレートからジョブを作成] ページの [ジョブ名] フィールドに、作成する Dataflow ジョブの名前を入力します。このチュートリアルでは、フィールドに「my-dataflow-integration-job」と入力します。

  4. [リージョン エンドポイント] メニューから、ジョブを保存するリージョンを選択します。これは、選択したリージョンと同じリージョンですソース接続プロファイル宛先接続プロファイルおよびストリーム

  5. [Dataflow テンプレート] メニューから、ジョブの作成に使用するテンプレートを選択します。このチュートリアルでは、[Datastream to BigQuery] を選択します。

    こ選択すると、このテンプレートに関連する追加フィールドが表示されます。

  6. [ファイルの場所の Cloud Storage 出力ファイルの場所] フィールドに、Cloud Storage バケットの名前を含むパスを入力します。このチュートリアルでは、gs://my-integration-bucket と入力します。

  7. [Cloud Storage 通知ポリシーで使用されている Pub/Sub サブスクリプション] フィールドに、Pub/Sub サブスクリプションの名前を含むパスを入力します。このチュートリアルでは、projects/project-name/subscriptions/my_integration_notifs_sub と入力します。

  8. [Datastream output file format (avro/json)] フィールドに「avro」と入力します。このチュートリアルでは、Avro が Datastream が Cloud Storage に書き込むファイル形式です。

  9. [Name or template for the dataset to contain staging tables.] フィールドに「My_integration_dataset_log」と入力します。Dataflow は、このデータセットを使用して Datastream から受信するデータの変更をステージングします。

  10. [Template for the dataset to contain replica tables.] フィールドに「My_integration_dataset_final」と入力します。これは、My_integration_dataset_log データセットにステージングされた変更を統合してソース データベース内のテーブルの 1 対 1 のレプリカが作成されるデータセットです。

  11. [デッドレター キュー ディレクトリ] フィールドに、Cloud Storage バケットの名前とデッドレター キューのフォルダを含むパスを入力します。ルートフォルダ内のパスを使用せず、Datastream がデータを書き込むパスとは異なるパスであることを確認してください。Dataflow が BigQuery に転送できなかったデータの変更は、キューに保存されます。Dataflow が再処理できるように、キュー内のコンテンツを修正できます。

    このチュートリアルでは、[Dead letter queue directory.] フィールドに「gs://my-integration-bucket/dlq」と入力します(dlq はデッドレター キューのフォルダです)。

  12. [ジョブを実行] をクリックします。

統合を確認する

このチュートリアルのストリームを確認するセクションで、Datastream がソース MySQL データベースのすべてのテーブルから、Cloud Storage の宛先バケットの /integration/tutorial フォルダにデータを転送することを確認しました。

このセクションでは、Dataflow がこのデータに関連付けられた変更を含むファイルを処理し、変更を BigQuery に転送することを確認します。この結果、Datastream と BigQuery がエンドツーエンドで統合されます。

  1. Google Cloud で、BigQuery の [SQL ワークスペース] ページに移動します。

    [SQL ワークスペース] ページに移動

  2. [エクスプローラ] ペインで、Google Cloud プロジェクトの名前の横にあるノードを展開します。

  3. My_integration_dataset_log データセットと My_integration_dataset_final データセットの横にあるノードを展開します。

  4. 各データセットにデータが含まれていることを確認します。これにより、Dataflow が Datastream が Cloud Storage にストリーミングしたデータを含む変更を含むファイルを処理し、これらの変更を BigQuery に転送したことが確認されます。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、Google Cloud コンソールを使用して次の操作を行います。

  • プロジェクト、Datastream ストリーム、Datastream 接続プロファイルを削除します。
  • Dataflow ジョブを停止する。
  • BigQuery データセット、Pub/Sub トピックとサブスクリプション、Cloud Storage バケットを削除します。

Datastream、Dataflow、BigQuery、Pub/Sub、Cloud Storage で作成したリソースをクリーンアップして、今後割り当ての消費を回避し、課金されないようにします。

プロジェクトの削除

課金をなくす最も簡単な方法は、このチュートリアル用に作成したプロジェクトを削除することです。

  1. Google Cloud コンソールで、[リソースの管理] ページに移動します。

    [リソースの管理] ページに移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。

  3. プロジェクトを削除するには、ダイアログにプロジェクト ID を入力し、[シャットダウン] をクリックします。

ストリームの削除

  1. Google Cloud コンソールで、Datastream の [ストリーム] ページに移動します。

    [ストリーム] ページに移動

  2. 変更するストリームをクリックします。このチュートリアルでは、My Stream を使用します。

  3. [一時停止] をクリックします。

  4. ダイアログで [一時停止] をクリックします。

  5. [ストリームの詳細] ページの [ストリームのステータス] ペインで、ストリームのステータスが Paused であることを確認します。

  6. [削除] をクリックします。

  7. ダイアログのテキストで「Delete」と入力し、[削除] をクリックします。

接続プロファイルの削除

  1. Google Cloud コンソールで、Datastream の [接続プロファイル] ページに移動します。

    [接続プロファイル] ページに移動

  2. 削除する各接続プロファイル(My Source Connection ProfileMy Destination Connection Profile)のチェックボックスをオンにします。

  3. [削除] をクリックします。

  4. ダイアログで [削除] をクリックします。

Dataflow ジョブを停止する

  1. Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。

    [ジョブ] ページに移動

  2. 停止するジョブをクリックします。 このチュートリアルでは、my-dataflow-integration-job です。

  3. [Stop] をクリックします。

  4. [ジョブを停止] ダイアログで [ドレイン] オプションを選択し、[ジョブの停止] をクリックします。

BigQuery データセットを削除する

  1. Google Cloud で、BigQuery の [SQL ワークスペース] ページに移動します。

    [SQL ワークスペース] ページに移動

  2. [エクスプローラ] ペインで、Google Cloud プロジェクト名の横にあるノードを展開します。

  3. BigQuery でデータセットを作成するで作成したデータセットの右側にある [アクションを表示] ボタンをクリックします。このボタンは省略記号のようなアイコンです。

    このチュートリアルでは、My_integration_dataset_log の右側にある [アクションを表示] ボタンをクリックします。

  4. 表示されたプルダウン メニューから [削除] を選択します。

  5. [データセットを削除しますか?] ダイアログで、テキスト フィールドに delete と入力して [削除] をクリックします。

  6. この手順を繰り返して、作成した 2 番目のデータセット My_integration_dataset_final を削除します。

Pub/Sub サブスクリプションとトピックを削除します。

  1. Google Cloud コンソールで、Pub/Sub の [サブスクリプション] ページに移動します。

    [サブスクリプション] ページに移動

  2. 削除するサブスクリプションの横にあるチェックボックスをクリックします。このチュートリアルでは、my_integration_notifs_sub サブスクリプションの横にあるチェックボックスをオンにします。

  3. [削除] をクリックします。

  4. [Delete subscription] ダイアログで、[削除] をクリックします。

  5. Google Cloud コンソールで、Pub/Sub の [トピック] ページに移動します。

    [トピック] ページに移動

  6. my_integration_notifs トピックの横にあるチェックボックスをオンにします。

  7. [削除] をクリックします。

  8. [Delete topic] ダイアログで、テキスト フィールドに delete と入力して [削除] をクリックします。

Cloud Storage バケットの削除

  1. Google Cloud コンソールで、Cloud Storage の [ブラウザ] ページに移動します。

    ブラウザ ページに移動

  2. バケットの横にあるチェックボックスをオンにします。このチュートリアルでは、my-integration-bucket です。

  3. [削除] をクリックします。

  4. ダイアログ ボックスで、テキスト フィールドに Delete と入力して [削除] をクリックします。

次のステップ