分析のために Datastream と Dataflow を実装する

Datastream

Datastream では、Oracle、MySQL、PostgreSQL データベースから BigQuery データセットへの直接のデータのストリーミングをサポートしています。ただし、データ変換や論理主キーの手動設定など、ストリーム処理ロジックをより細かく制御する必要がある場合は、Datastream を Dataflow ジョブ テンプレートに統合できます。

このチュートリアルでは、Dataflow ジョブ テンプレートを使用して、分析用に BigQuery で最新のマテリアライズド ビューをストリーミングし、Datastream を Dataflow と統合する方法について説明します。

分離されたデータソースを多く使用している組織では、組織全体の企業データへのアクセスは、特にリアルタイムでは制限され、遅くなる可能性があります。これにより、組織の内省力が制限されます。

Datastream は、オンプレミスやクラウドベースのさまざまなデータソースから変更データにほぼリアルタイムでアクセスできるようにします。Datastream では、ストリーミング データの構成をほとんど行わずに、Datastream が自動的に構成を行う設定環境が用意されています。Datastream には、組織全体で最新のエンタープライズ データに誰でもアクセスできるようにする統合消費 API もあり、統合されたシナリオを構築できます。

そのようなシナリオの一つは、ソース データベースからクラウドベースのストレージ サービスまたはメッセージング キューにデータを転送することです。Datastream がデータをストリーミングすると、データは他のアプリケーションやサービスが読み取れる形式に変換されます。このチュートリアルでは、Dataflow はストレージ サービスまたはメッセージング キューと通信して Google Cloud でデータをキャプチャして処理するウェブサービスです。

Datastream を使用して、変更(挿入、更新、削除されたデータ)をソースの MySQL データベースから Cloud Storage バケット内のフォルダにストリーミングする方法を学習します。次に、Dataflow が Datastream がソース データベースからストリーミングするデータ変更を含む新しいファイルについて学習するために使用する通知を送信するように Cloud Storage バケットを構成します。Dataflow ジョブがファイルを処理し、変更を BigQuery に転送します。

統合のユーザーフロー図

目標

このチュートリアルの内容は次のとおりです。

  • Cloud Storage にバケットを作成します。これは、Datastream がソース MySQL データベースからスキーマ、テーブル、データをストリーミングする宛先バケットです。
  • Cloud Storage バケットの Pub/Sub 通知を有効にします。これにより、Dataflow が処理の準備が整った新しいファイルについて学習するために使用する通知を送信するようにバケットを構成します。これらのファイルには、Datastream がソース データベースからバケットにストリーミングするデータへの変更が含まれています。
  • BigQuery でデータセットを作成します。 BigQuery は、Dataflow から受信したデータをデータセットに格納します。このデータは、Datastream が Cloud Storage バケットにストリーミングするソース データベースの変更を表します。
  • Cloud Storage のソースのデータベースと送信先バケットの接続プロファイルを作成および管理します。Datastream のストリームは、接続プロファイルの情報を使用して、ソース データベースからバケットにデータを転送します。
  • ストリームを作成して開始します。このストリームは、データ、スキーマ、テーブルをソース データベースからバケットに転送します。
  • Datastream が、ソース データベースのスキーマに関連付けられているデータとテーブルをバケットに転送していることを確認します。
  • Dataflow でジョブを作成します。Datastream がソース データベースから Cloud Storage バケットにデータ変更をストリーミングした後、変更を含む新しいファイルに関する通知が Dataflow に送信されます。Dataflow ジョブはファイルを処理し、変更を BigQuery に転送します。
  • Dataflow がこのデータに関連付けられた変更を含むファイルを処理し、変更を BigQuery に転送していることを確認します。この結果、Datastream と BigQuery がエンドツーエンドで統合されます。
  • Datastream、Cloud Storage、Pub/Sub、Dataflow、BigQuery で作成したリソースをクリーンアップして、今後割り当ての消費や課金が発生しないようにします。

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

  • Datastream
  • Cloud Storage
  • Pub/Sub
  • Dataflow
  • BigQuery

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Datastream API を有効にします。

    API を有効にする

  7. ユーザー アカウントに Datastream 管理者のロールが割り当てられていることを確認します。

    IAM ページに移動

  8. Datastream がアクセスできるソース MySQL データベースがあることを確認します。また、データベースにデータ、テーブル、スキーマがあることを確認します。
  9. Datastream パブリック IP アドレスからの受信接続を許可するように MySQL データベースを構成します。すべての Datastream リージョンとそれらに関連付けられたパブリック IP アドレスの一覧については、IP 許可リストとリージョンをご覧ください。
  10. 移行元データベースの変更データ キャプチャ(CDC)を設定します。詳細については、ソースの MySQL データベースの構成をご覧ください。
  11. Cloud Storage の Pub/Sub 通知を有効にするためのすべての前提条件を満たしていることを確認します。

    このチュートリアルでは、Cloud Storage に宛先バケットを作成し、そのバケットの Pub/Sub 通知を有効にします。これにより、Dataflow は、Datastream がバケットに書き込む新しいファイルに関する通知を受信できます。これらのファイルには、Datastream がソース データベースからバケットにストリーミングするデータへの変更が含まれています。

要件

Datastream には、さまざまな移行元オプション移行先オプションネットワーク接続方法が用意されています。

このチュートリアルでは、スタンドアロンの MySQL データベースと移行先の Cloud Storage サービスを使用していることを前提としています。移行元データベースでは、受信ファイアウォール ルールを追加できるようにネットワークを構成する必要があります。移行元データベースは、オンプレミスまたはクラウド プロバイダにできます。Cloud Storage の宛先には、接続構成は必要ありません。

ユーザーの具体的な環境を把握できないため、ネットワーク構成に関する詳細なステップは提供できません。

このチュートリアルでは、ネットワーク接続方法として [IP 許可リスト] を選択します。IP 許可リストは、移行元データベースのデータへのアクセスを信頼できるユーザーのみに制限、制御するためにしばしば使用されるセキュリティ機能です。IP 許可リストを使用すると、ユーザーや他の Google Cloud サービス(Datastream など)がこのデータにアクセスできる信頼できる IP アドレスまたは IP 範囲のリストを作成できます。IP 許可リストを使用するには、Datastream からの受信接続に対してソース データベースまたはファイアウォールを開く必要があります。

Cloud Storage にバケットを作成します。

Datastream がソース MySQL データベースからスキーマ、テーブル、データをストリーミングする移行先バケットを Cloud Storage に作成します。

  1. Google Cloud コンソールで、Cloud Storage の [ブラウザ] ページに移動します。

    ブラウザ ページに移動

  2. [バケットを作成] をクリックします。[バケットの作成] ページが表示されます。

  3. [バケットに名前を付ける] 領域のテキスト フィールドにバケットの一意の名前を入力し、[続行] をクリックします。

  4. ページの残りの各領域については、デフォルト設定を受け入れます。各リージョンの最後で [続行] をクリックします。

  5. [作成] をクリックします。

Cloud Storage バケットの Pub/Sub 通知を有効にする

このセクションでは、作成した Cloud Storage バケットの Pub/Sub 通知を有効にします。これにより、DataStream がバケットに書き込む新しいファイルを Dataflow に通知するようにバケットを構成します。これらのファイルには、Datastream がソース MySQL データベースからバケットにストリーミングするデータへの変更が含まれています。

  1. 作成した Cloud Storage バケットにアクセスします。[バケットの詳細] ページが表示されます。

  2. 「Cloud Shell をアクティブにする」をクリックします。

  3. プロンプトが表示されたら、次のコマンドを入力します。

    gcloud storage buckets notifications create gs://bucket-name --topic=my_integration_notifs --payload-format=json --object-prefix=integration/tutorial/

  4. [Cloud Shell の承認] ウィンドウが表示されたら、[承認] をクリックします。

  5. 次のコードの行が表示されていることを確認します。

    Created Cloud Pub/Sub topic projects/project-name/topics/my_integration_notifs
    Created notification config projects/_/buckets/bucket-name/notificationConfigs/1
    
  6. Google Cloud コンソールで、Pub/Sub の [トピック] ページに移動します。

    [トピック] ページに移動

  7. 作成した my_integration_notifs トピックをクリックします。

  8. [my_integration_notifs] ページで、ページの一番下までスクロールします。[サブスクリプション] タブが有効で、[表示するサブスクリプションはありません] というメッセージが表示されていることを確認します。

  9. [サブスクリプションを作成] をクリックします。

  10. 表示されたメニューで [サブスクリプションを作成] を選択します。

  11. [サブスクリプションをトピックに追加] ページで次の情報を入力します。

    1. [サブスクリプション ID] フィールドに「my_integration_notifs_sub」と入力します。
    2. [確認応答期限] の値を 120 秒に設定します。これにより、Dataflow は処理したファイルを確認するのに十分な時間を確保でき、Dataflow ジョブの全体的なパフォーマンスが向上します。Pub/Sub サブスクリプション プロパティの詳細については、サブスクリプション プロパティをご覧ください。
    3. 他のデフォルト値はすべてそのままにします。
    4. [作成] をクリックします。

このチュートリアルの後半では、Dataflow ジョブを作成します。このジョブを作成する際に、Dataflow を my_integration_notifs_sub サブスクリプションのサブスクライバーとして割り当てます。これにより、Dataflow は、Datastream が Cloud Storage に書き込む新しいファイルに関する通知を受信し、ファイルを処理して、データ変更を BigQuery に転送できます。

BigQuery でデータセットを作成します

このセクションでは、BigQuery でデータセットを作成します。BigQuery は、Dataflow から受信したデータをデータセットに格納します。このデータは、Datastream が Cloud Storage バケットにストリーミングするソース MySQL データベースの変更を表します。

  1. Google Cloud コンソールで、BigQuery の [SQL ワークスペース] ページに移動します。

    [SQL ワークスペース] ページに移動

  2. [エクスプローラ] ペインで、Google Cloud プロジェクト名の横にある [アクションを表示] をクリックします。

  3. 表示されたメニューで、[データセットを作成] を選択します。

  4. [データセットを作成する] ウィンドウで、次の操作を行います。

    1. [データセット ID] フィールドに、データセットの ID を入力します。このチュートリアルでは、フィールドに My_integration_dataset_log と入力します。
    2. 他のデフォルト値はそべてそのままにします。
    3. [データセットを作成] をクリックします。
  5. [エクスプローラ] ペインで、Google Cloud プロジェクト名の横にある [ノードを展開] をクリックし、作成したデータセットが表示されていることを確認します。

  6. この手順に沿って、2 番目のデータセット My_integration_dataset_final を作成します。

  7. 各データセットの横にある [ ノードを展開] を展開します。

  8. 各データセットが空であることを確認します。

Datastream がソース データベースから Cloud Storage バケットにデータ変更をストリーミングした後、Dataflow ジョブは変更を含むファイルを処理し、変更を BigQuery データセットに転送します。

Datastream で接続プロファイルを作成する

このセクションでは、ソース データベースと宛先の接続プロファイルを Datastream で作成します。接続プロファイルの作成の一環として、移行元の接続プロファイルのプロファイル タイプとして MySQL を選択し、移行先の接続プロファイルのプロファイル タイプとして MySQL を選択します。

Datastream は、接続プロファイルで定義された情報を使用してソースと宛先の両方に接続し、ソース データベースから Cloud Storage の宛先バケットにデータをストリーミングします。

MySQL データベースのソース接続プロファイルを作成する

  1. Google Cloud コンソールで、Datastream の [接続プロファイル] ページに移動します。

    [接続プロファイル] ページに移動

  2. [プロファイルの作成] をクリックします。

  3. MySQL データベースのソース接続プロファイルを作成するには、[接続プロファイルを作成] ページで [MySQL] プロファイル タイプをクリックします。

  4. [Create MySQL profile] ページの [接続設定の定義] セクションに、次の情報を入力します。

    • [接続プロファイル名] フィールドに My Source Connection Profile と入力します。
    • 自動生成された接続プロファイル ID を保持します。
    • 接続プロファイルを保存するリージョンを選択します。

    • 接続の詳細を入力します。

      • [ホスト名または IP] フィールドに、Datastream がソース データベースへの接続に使用できるホスト名またはパブリック IP アドレスを入力します。このチュートリアルでは、IP 許可リストをネットワーク接続に使用するため、パブリック IP アドレスを指定します。
      • [ポート] フィールドに、ソース データベース用に予約されているポート番号を入力します。MySQL データベースの場合、デフォルト ポートは通常 3306 です。
      • ソース データベースへの認証用に、ユーザー名パスワードを入力します。
  5. [接続設定の定義] セクションで、[続行] をクリックします。[MySQL プロファイルの作成] ページの [ソースへの接続を保護する] セクションが有効になっています。

  6. [暗号化のタイプ] メニューから [なし] を選択します。このメニューの詳細については、MySQL データベースの接続プロファイルの作成をご覧ください。

  7. [ソースへの接続を保護する] セクションで、[続行] をクリックします。[MySQL プロファイルの作成] ページの [接続方法の定義] セクションはアクティブです。

  8. [接続方法] プルダウンで、Datastream とソース データベース間の接続を確立するために使用するネットワーク方式を選択します。このチュートリアルでは、接続方法として [IP 許可リスト] を選択します。

  9. 表示される Datastream パブリック IP アドレスからの受信接続を許可するように移行元データベースを構成します。

  10. [接続方法の定義] セクションで [続行] をクリックします。[MySQL プロファイルの作成] ページの [テスト接続プロファイル] セクションはアクティブです。

  11. [テストを実行] をクリックして、移行元データベースと Datastream が相互に通信できることを確認します。

  12. [テストに合格] ステータスが表示されていることを確認します。

  13. [作成] をクリックします。

Cloud Storage の宛先接続プロファイルの作成

  1. Google Cloud コンソールで、Datastream の [接続プロファイル] ページに移動します。

    [接続プロファイル] ページに移動

  2. [プロファイルの作成] をクリックします。

  3. Cloud Storage の宛先接続プロファイルを作成するには、[接続プロファイルを作成] ページで [Cloud Storage] プロファイル タイプをクリックします。

  4. [Create Cloud Storage profile] ページで、次の情報を入力します。

    • [接続プロファイル名] フィールドに My Destination Connection Profile と入力します。
    • 自動生成された接続プロファイル ID を保持します。
    • 接続プロファイルを保存するリージョンを選択します。
    • [接続の詳細] ペインで [参照] をクリックし、このチュートリアルの前半で作成した Cloud Storage バケットを選択します。これは、Datastream がソース データベースからデータを転送するバケットです。選択したら、[選択] をクリックします。

      バケットが、[接続の詳細] ペインの [バケット名] フィールドに表示されます。

    • [接続プロファイルのパス接頭辞] フィールドに、Datastream がデータを移行先にストリーミングする際にバケット名に追加するパスの接頭辞を指定します。Datastream がバケットのルートフォルダではなく、バケット内のパスにデータを書き込むようにしてください。このチュートリアルでは、Pub/Sub 通知の構成時に定義したパスを使用します。欄に「/integration/tutorial」と入力します。

  5. [作成] をクリックします。

MySQL データベースのソース接続プロファイルと Cloud Storage の移行先接続プロファイルを作成した後、それらを使用してストリームを作成できます。

Datastream でストリームを作成する

このセクションでは、ストリームを作成します。このストリームは、接続プロファイルの情報を使用して、ソース MySQL データベースから Cloud Storage のソースバケットにデータを転送します。

ストリームの設定の定義

  1. Google Cloud コンソールで、Datastream の [ストリーム] ページに移動します。

    [ストリーム] ページに移動

  2. [ストリームを作成] をクリックします。

  3. [ストリームの作成] ページの [ストリームの詳細の定義] パネルで、次の情報を指定します。

    • [ストリーム名] フィールドに「My Stream」と入力します。
    • 自動生成されたストリーム ID を保持します。
    • [リージョン] メニューから、ソースと送信先の接続プロファイルを作成したリージョンを選択します。
    • [ソースタイプ] メニューから [MySQL] プロファイル タイプを選択します。
    • [宛先の種類] メニューから、[Cloud Storage] プロファイル タイプを選択します。
  4. ストリームに環境を準備する方法が反映されるように、自動的に生成される必須の前提条件を確認します。これらの前提条件には、移行元データベースの構成方法や Cloud Storage の移行先バケットに Datastream を接続する方法が含まれます。

  5. [続行] をクリックします。[ストリームの作成] ページの [Define MySQL connection profile] パネルが表示されます。

ソース接続プロファイルに関する情報の指定

このセクションでは、ソースデータベース用に作成した接続プロファイル(ソース接続プロファイル)を選択します。このチュートリアルでは、My Source Connection Profile です。

  1. [ソース接続プロファイル] メニューから、MySQL データベースのソース接続プロファイルを選択します。

  2. [テストを実行] をクリックして、移行元データベースと Datastream が相互に通信できることを確認します。

    テストに失敗した場合、接続プロファイルに関連する問題が表示されます。トラブルシューティングの手順については、問題を診断するのページをご覧ください。必要な変更を行って問題を修正し、再度テストを行います。

  3. [続行] をクリックします。[ストリームの作成] ページの [ストリームのソースの構成] パネルが表示されます。

ストリームのソース データベースに関する情報の構成

このセクションでは、Datastream のソース データベースでテーブルとスキーマを指定して、ストリームのソース データベースに関する情報を構成します。

  • 宛先への転送ができる。
  • 宛先への転送が制限されている。

また、Datastream が過去のデータをバックフィルするのか、進行中の変更を送信先にストリーミングするのか、データへの変更のみをストリーミングするのかを決定します。

  1. [含めるオブジェクト] メニューを使用して、Datastream が Cloud Storage の宛先バケットのフォルダに転送できるソース データベース内のテーブルとスキーマを指定します。 メニューは、データベースに最大 5,000 個のオブジェクトが存在する場合にのみ読み込まれます。

    このチュートリアルでは、Datastream ですべてのテーブルとスキーマを転送します。そのため、メニューから [すべてのテーブル] を選択します。

  2. [除外するオブジェクトを選択] パネルが [なし] に設定されていることを確認します。Datastream によるソース データベースのテーブルやスキーマの Cloud Storage への転送は制限しません。

  3. [過去のデータ用にバックフィル モードを選択] パネルが [自動] に設定されていることを確認します。Datastream は、データへの変更に加えて、既存のすべてのデータを送信先にストリーミングします。

  4. [続行] をクリックします。[ストリームの作成] ページの [Define Cloud Storage connection profile] パネルが表示されます。

宛先接続プロファイルの選択

このセクションでは、Cloud Storage 用に作成した接続プロファイル(送信先の接続プロファイル)を選択します。このチュートリアルでは、My Destination Connection Profile です。

  1. [宛先接続プロファイル] メニューから、Cloud Storage の宛先接続プロファイルを選択します。

  2. [続行] をクリックします。[ストリームの作成] ページの [ストリームの移行先の構成] パネルが表示されます。

ストリームの転送先に関する情報の構成

このセクションでは、ストリームの送信先バケットに関する情報を構成します。これには以下の情報が含まれます。

  • Cloud Storage に書き込まれるファイルの出力形式。
  • Datastream がソース データベースからスキーマ、テーブル、データを転送する送信先バケットのフォルダ。
  1. [出力形式] フィールドで、Cloud Storage に書き込まれるファイルの形式を選択します。Datastream は現在、Avro と JSON の 2 つの出力形式をサポートしています。 このチュートリアルでは、このファイル形式はAvroです。

  2. [続行] をクリックします。[ストリームの作成] ページの [ストリームの詳細の確認と作成] パネルが表示されます。

ストリームの作成

  1. ストリームの詳細と、ストリームがソース MySQL データベースから Cloud Storage の宛先バケットへのデータ転送に使用するソース接続と宛先接続のプロファイルを確認します。

  2. ストリームを検証するには、[検証を実行] をクリックします。ストリームを検証すると、Datastream は移行元が適切に構成されていることを確認し、ストリームが移行元と移行先の両方に接続できること、ストリームのエンドツーエンド構成を検証します。

  3. すべての検証チェックに合格したら、[作成] をクリックします。

  4. [ストリームを作成しますか?] ダイアログ ボックスで、[作成] をクリックします。

ストリームの開始

このチュートリアルでは、移行元データベースの負荷が増大した場合にストリームを個別に作成して開始します。負荷を軽減するには、ストリームを開始せずに作成し、データベースが負荷を処理できるときにストリームを開始します。

ストリームを開始すると、Datastream は移行元データベースから移行先にデータ、スキーマ、テーブルを転送できます。

  1. Google Cloud コンソールで、Datastream の [ストリーム] ページに移動します。

    [ストリーム] ページに移動

  2. 開始するストリームの横にあるチェックボックスをオンにします。このチュートリアルでは、My Stream を使用します。

  3. [開始] をクリックします。

  4. ダイアログで [開始] をクリックします。ストリームのステータスが Not started から Starting、そして Running に変わります。

ストリームを開始すると、Datastream が移行元データベースから移行先にデータを転送することを確認できます。

ストリームの検証

このセクションでは、Datastream がソース MySQL データベースのすべてのテーブルから、Cloud Storage の移行先バケットの /integration/tutorial フォルダにデータを転送することを確認します。

  1. Google Cloud コンソールで、Datastream の [ストリーム] ページに移動します。

    [ストリーム] ページに移動

  2. 作成したストリームをクリックします。このチュートリアルでは、My Stream を使用します。

  3. [ストリームの詳細] ページで、bucket-name/integration/tutorial リンクをクリックします。ここで、bucket-name は Cloud Storage バケットに付けた名前です。このリンクは、[書き込み先パス] フィールドの後に表示されます。Cloud Storage の [バケットの詳細] ページが別のタブで開きます。

  4. ソース データベースのテーブルを表すフォルダが表示されていることを確認します。

  5. いずれかのテーブル フォルダをクリックし、テーブルに関連付けられているデータが表示されるまで各サブフォルダをクリックします。

Dataflow ジョブを作成する

このセクションでは、Dataflow でジョブを作成します。Datastream がソース MySQL データベースから Cloud Storage バケットにデータ変更をストリーミングした後、Pub/Sub は変更を含む新しいファイルに関する通知を Dataflow に送信します。Dataflow ジョブはファイルを処理し、変更を BigQuery に転送します。

  1. Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。

    [ジョブ] ページに移動

  2. [テンプレートからジョブを作成] をクリックします。

  3. [テンプレートからジョブを作成] ページの [ジョブ名] フィールドに、作成する Dataflow ジョブの名前を入力します。このチュートリアルでは、このフィールドに my-dataflow-integration-job と入力します。

  4. [リージョン エンドポイント] メニューで、ジョブを保存するリージョンを選択します。これは、作成したソース接続プロファイル宛先接続プロファイルストリームで選択したリージョンと同じです。

  5. [Dataflow テンプレート] メニューから、ジョブの作成に使用するテンプレートを選択します。このチュートリアルでは、[Datastream to BigQuery] を選択します。

    こ選択すると、このテンプレートに関連する追加フィールドが表示されます。

  6. [Cloud Storage 内の Datastream ファイル出力のファイルの場所] フィールドに、gs://bucket-name という形式で Cloud Storage バケットの名前を入力します。

  7. [Cloud Storage 通知ポリシーで使用されている Pub/Sub サブスクリプション] フィールドに、Pub/Sub サブスクリプションの名前を含むパスを入力します。このチュートリアルでは、projects/project-name/subscriptions/my_integration_notifs_sub と入力します。

  8. [Datastream output file format (avro/json)] フィールドに「avro」と入力します。このチュートリアルでは、Avro が Datastream が Cloud Storage に書き込むファイル形式です。

  9. [Name or template for the dataset to contain staging tables.] フィールドに「My_integration_dataset_log」と入力します。Dataflow は、このデータセットを使用して Datastream から受信するデータの変更をステージングします。

  10. [Template for the dataset to contain replica tables.] フィールドに「My_integration_dataset_final」と入力します。これは、My_integration_dataset_log データセットにステージングされた変更を統合してソース データベース内のテーブルの 1 対 1 のレプリカが作成されるデータセットです。

  11. [デッドレター キューのディレクトリ] フィールドに、Cloud Storage バケットの名前とデッドレター キューのフォルダを含むパスを入力します。ルートフォルダのパスを使用しないこと、およびパスが Datastream がデータを書き込むパスとは異なることを確認してください。Dataflow が BigQuery に転送できなかったデータ変更は、キューに保存されます。キュー内のコンテンツを修正して、Dataflow で再処理できるようにすることができます。

    このチュートリアルでは、[Dead letter queue directory.] フィールドに「gs://bucket-name/dlq」と入力します(bucket-name はバケットの名前、dlq はデッドレター キューのフォルダです)。

  12. [ジョブを実行] をクリックします。

統合を確認する

このチュートリアルのストリームを確認するセクションで、Datastream がソース MySQL データベースのすべてのテーブルから、Cloud Storage の宛先バケットの /integration/tutorial フォルダにデータを転送することを確認しました。

このセクションでは、Dataflow がこのデータに関連付けられた変更を含むファイルを処理し、変更を BigQuery に転送することを確認します。この結果、Datastream と BigQuery がエンドツーエンドで統合されます。

  1. Google Cloud コンソールで、BigQuery の [SQL ワークスペース] ページに移動します。

    [SQL ワークスペース] ページに移動

  2. [エクスプローラ] ペインで、Google Cloud プロジェクト名の横にあるノードを展開します。

  3. My_integration_dataset_log データセットと My_integration_dataset_log データセットの横にあるノードを展開します。

  4. 各データセットにデータが含まれていることを確認します。これにより、Datastream が Cloud Storage にストリーミングしたデータに関連付けられた変更を含むファイルを Dataflow が処理し、これらの変更を BigQuery に転送したことが確認されます。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、Google Cloud コンソールを使用して次の操作を行います。

  • プロジェクト、Datastream ストリーム、Datastream 接続プロファイルを削除します。
  • Dataflow ジョブを停止する。
  • BigQuery データセット、Pub/Sub トピックとサブスクリプション、Cloud Storage バケットを削除します。

Datastream、Dataflow、BigQuery、Pub/Sub、Cloud Storage で作成したリソースをクリーンアップすることで、リソースが割り当てを使い果たしたり、今後料金が発生しないようにします。

プロジェクトの削除

課金をなくす最も簡単な方法は、このチュートリアル用に作成したプロジェクトを削除することです。

  1. Google Cloud コンソールで、[リソースの管理] ページに移動します。

    [リソースの管理] ページに移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。

  3. プロジェクトを削除するには、ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックします。

ストリームの削除

  1. Google Cloud コンソールで、Datastream の [ストリーム] ページに移動します。

    [ストリーム] ページに移動

  2. 変更するストリームをクリックします。このチュートリアルでは、My Stream を使用します。

  3. [一時停止] をクリックします。

  4. ダイアログで [一時停止] をクリックします。

  5. [ストリームの詳細] ページの [ストリームのステータス] ペインで、ストリームのステータスが Paused であることを確認します。

  6. [削除] をクリックします。

  7. ダイアログのテキスト フィールドに Delete と入力して、[削除] をクリックします。

接続プロファイルの削除

  1. Google Cloud コンソールで、Datastream の [接続プロファイル] ページに移動します。

    [接続プロファイル] ページに移動

  2. 削除する各接続プロファイル([My Source Connection Profile] と [My Destination Connection Profile])のチェックボックスをオンにします。

  3. [削除] をクリックします。

  4. ダイアログで [削除] をクリックします。

Dataflow ジョブを停止する

  1. Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。

    [ジョブ] ページに移動

  2. 停止するジョブをクリックします。 このチュートリアルでは、my-dataflow-integration-job です。

  3. [停止] をクリックします。

  4. [ジョブを停止] ダイアログで [ドレイン] オプションを選択し、[ジョブの停止] をクリックします。

BigQuery データセットを削除する

  1. Google Cloud コンソールで、BigQuery の [SQL ワークスペース] ページに移動します。

    [SQL ワークスペース] ページに移動

  2. [エクスプローラ] ペインで、Google Cloud プロジェクト名の横にあるノードを展開します。

  3. BigQuery でデータセットを作成するで作成したデータセットのいずれかの右側にある [アクションを表示] ボタンをクリックします。このボタンは省略記号のようなアイコンです。

    このチュートリアルでは、My_integration_dataset_log の右側にある [アクションを表示] ボタンをクリックします。

  4. 表示されたプルダウン メニューから [削除] を選択します。

  5. [データセットを削除しますか?] ダイアログで、テキスト フィールドに delete と入力して [削除] をクリックします。

  6. この手順を繰り返して、作成した 2 番目のデータセット My_integration_dataset_final を削除します。

Pub/Sub サブスクリプションとトピックを削除します。

  1. Google Cloud コンソールで、Pub/Sub の [サブスクリプション] ページに移動します。

    [サブスクリプション] ページに移動

  2. 削除する定期購入の横にあるチェックボックスをクリックします。このチュートリアルでは、my_integration_notifs_sub サブスクリプションの横にあるチェックボックスをオンにします。

  3. [削除] をクリックします。

  4. [Delete subscription] ダイアログで、[削除] をクリックします。

  5. Google Cloud コンソールで、Pub/Sub の [トピック] ページに移動します。

    [トピック] ページに移動

  6. my_integration_notifs トピックの横にあるチェックボックスをオンにします。

  7. [削除] をクリックします。

  8. [Delete topic] ダイアログで、テキスト フィールドに delete と入力して [削除] をクリックします。

Cloud Storage バケットの削除

  1. Google Cloud コンソールで、Cloud Storage の [ブラウザ] ページに移動します。

    ブラウザ ページに移動

  2. バケットの横にあるチェックボックスをオンにします。

  3. [削除] をクリックします。

  4. ダイアログ ボックスで、テキスト フィールドに Delete と入力して [削除] をクリックします。

次のステップ