データ パイプラインの移行

このドキュメントでは、データ ウェアハウスにデータを読み込む、アップストリームのデータ パイプラインを移行する方法について説明します。このドキュメントを使用すると、データ パイプラインの概要、パイプラインが使用できる手順とパターン、データ ウェアハウスの移行に使用できる移行オプションとテクノロジーについて理解を深めることができます。

データ パイプラインとは

コンピューティングにおけるデータ パイプラインは、一連の接続された処理ステップを使用してデータを処理するタイプのアプリケーションです。一般的なコンセプトとして、データ パイプラインを適用できる処理には、情報システム間のデータ転送、抽出、変換、読み込み(ETL)、データ拡充、リアルタイムのデータ分析があります。通常、データ パイプラインはバッチプロセスまたはストリーミング プロセスとして運用されます。前者の場合、必要に応じて実行され、データを処理します。後者の場合は継続的に実行され、データがパイプラインに使用可能になると、その都度データを処理します。

データ ウェアハウスのコンセプトでは、データ パイプラインの一般的な用途は、トランザクション システムからデータを読み取り、そのデータに変換を適用してからデータ ウェアハウスに書き込むことです。それぞれの変換は 1 つの関数で記述されます。所定の関数への入力は、前の関数の出力です。このように関連付けられている一連の関数は、一般に有向非巡回グラフ(DAG)と呼ばれるグラフとして記述されます。有向とは、グラフが一定の方向(ソースから宛先)に従うことを意味します。非巡回とは、所定の関数の入力を、DAG のダウンストリームにある別の関数からは取得できないことを意味します。つまり、ループを使用することはできません。グラフの各ノードは関数です。各エッジは、ある関数から次の関数に渡されるデータを表します。パイプラインの最初にある関数はソース、つまりソースデータ システムへの接続です。最後の関数はシンク、つまり宛先データシステムへの接続です。

データ パイプラインのコンテキストでは、通常、ソースはトランザクション システム(たとえば、RDBMS)であり、シンクはデータ ウェアハウスに接続します。このタイプのグラフは「データフロー DAG」と呼ばれます。また、DAG を使用して、データ パイプラインと他のシステムとの間のデータ移動をオーケストレーションすることもできます。このように使用される DAG は、「オーケストレーション DAG」または「制御フロー DAG」と呼ばれます。

データ パイプラインに移行すべきとき

ユースケースを BigQuery に移行する際は、オフロードするか完全に移行するかを選択できます。

ユースケースをオフロードする場合、アップストリームのデータ パイプラインをあらかじめ移行しておく必要はありません。最初にユースケース スキーマとデータを既存のデータ ウェアハウスから BigQuery に移行します。次に、データの同期を維持するために、古いデータ ウェアハウスから新しいデータ ウェアハウスに増分コピーが行われるように設定します。最後に、ダウンストリーム プロセス(スクリプト、クエリ、ダッシュボード、ビジネス アプリケーションなど)を移行して検証します。

この時点で、アップストリームのデータ パイプラインは未変更のまま、引き続き既存のデータ ウェアハウスにデータを書き込んでいます。オフロードしたユースケースを移行バックログに再び追加すれば、以降のイテレーションで完全に移行できます。

一方、ユースケースを完全に移行する場合は、ユースケースに必要なアップストリームのデータ パイプラインを Google Cloud に移行します。完全な移行では、最初にユースケースをオフロードする必要があります。完全に移行した後は、データが BigQuery に直接取り込まれるため、オンプレミス データ ウェアハウスの対応するレガシー テーブルの使用を中止します。

イテレーションでは、次のいずれかのオプションを選択できます。

  • ユースケースのみをオフロードする。
  • 前にオフロードしたユースケースを完全に移行する。
  • ユースケースをゼロから完全に移行するために、同じイテレーションでユースケースを最初にオフロードする。

ユースケースのすべてを完全に移行すると、古いウェアハウスの電源を切ることができます。これは、オーバーヘッドとコストを削減するための重要なステップです。

データ パイプラインを移行する方法

このドキュメントの残りでは、データ パイプラインの移行方法を取り上げます。これには、どのアプローチとプロシージャを使用するか、どのテクノロジーを採用するかの選択も含まれます。オプションは、既存のデータ パイプラインを BigQuery にリダイレクトして再利用することから、Google Cloud のマネージド サービスを利用するためにデータ パイプラインを再作成することまで多岐にわたります。

データ パイプラインで使用するプロシージャとパターン

データ パイプラインを使用して、多数のプロシージャとパターンを実行できます。データ ウェアハウジングでは、こうしたパイプラインが一般的に使用されています。使用できるデータ パイプラインには、バッチデータ パイプラインとストリーミング データ パイプラインがあります。バッチデータ パイプラインは、一定の期間(たとえば、1 日)にわたって収集されたデータに対して実行されます。ストリーミング データ パプラインは、オペレーティング システムによって生成されるリアルタイムのイベントを処理します。たとえば、オンライン トランザクション処理(OLTP)データベースによって生成された、CDC の行変更イベントなどです。

抽出、変換、読み込み(ETL)

データ ウェアハウジングのコンテキストでは、多くの場合、データ パイプラインで抽出、変換、読み込み(ETL)プロシージャを実行します。ETL テクノロジーはデータ ウェアハウスの外部で実行されます。これは、データ ウェアハウスのリソースをデータの準備と変換に使用するのではなく、主に同時クエリに使用できることを意味します。データ ウェアハウスの外部で変換を実行する場合の 1 つの欠点は、変換を表現するためのツールと言語(SQL 以外)も学習しなければならないことです。

次の図は、典型的な ETL プロシージャを示しています。

ソース(抽出)が 1 つ以上の変換プロセスを経て(変換)、シンクに移動し、最終的にデータ ウェアハウスに送信される(読み込み)流れを表す図

図 1. 典型的な ETL プロシージャ。

典型的な ETL データ パイプラインは 1 つ以上のソースシステムからデータを pull します(利用不可のシステムといった問題による障害を回避するため、できるだけ少ないソースシステムからデータを pull する必要があります)。パイプラインは pull したデータに対し、一連の変換を実行します。変換には、データのクリーニング、ビジネスルールの適用、データ整合性のチェック、集計または非集計の作成が含まれます。詳しくは、実際の ETL サイクルをご覧ください。

通常は複数のデータ パイプラインを使用します。最初のパイプラインで、ソースシステムからデータ ウェアハウスへのデータのコピーを重点に処理します。以降のシステムでデータにビジネス ロジックを適用し、さまざまなデータマートで使用できるように変換します。データマートとは、特定の事業単位またはビジネス フォーカスに絞り込んだ、データ ウェアハウスのサブセットのことです。

複数のデータ パイプラインを使用する場合は、それらのデータ パイプラインのオーケストレーションが必要になります。次の図は、このオーケストレーション プロセスの概略を示しています。

2 つの ETL プロセス(サブ DAG)を管理するオーケストレーター(DAG)

図 2. 複数のデータ パイプラインのオーケストレーション プロセス。

この図に示されているデータ パイプラインのそれぞれが、オーケストレーション DAG のサブ DAG とみなされます。各オーケストレーション DAG には、全体的な目標に沿った複数のデータ パイプラインが含まれています。全体的な目標とは、たとえば特定の事業単位用にデータを準備して、その事業のビジネス アナリストがダッシュボードまたはレポートを実行できるようにするなどの目標です。

抽出、読み込み、変換(ELT)

ELT は ETL の代わりとなる手法です。ELT では、データ パイプラインは 2 つの部分に分割されます。最初の部分で、ETL テクノロジーによってソースシステムからデータを抽出し、データ ウェアハウスに読み込みます。2 番目の部分では、データ ウェアハウスに基づく SQL スクリプトによってデータの変換を行います。この手法の利点は、SQL を使用して変換を表現できることです。一方、欠点としては、同時クエリに必要なデータ ウェアハウスのリソースを消費する可能性が挙げられます。このことから、通常は、データ ウェアハウスのリソースの需要が低くなる夜間(またはオフピーク時)に ELT バッチ処理が実行されます。

次の図は、典型的な ELT プロシージャを示しています。

ソース(抽出)が 1 つ以上の変換プロセスを経て(変換)、シンクに移動し、最終的にデータ ウェアハウスに送信される(読み込み)流れを表す図。

図 3. 典型的な ELT プロシージャ。

ELT 手法を採用する場合、抽出と読み込みを 1 つの DAG にまとめ、変換にはそれとは別の固有の DAG を使用するのが一般的です。1 回データ ウェアハウスに読み込まれたデータを複数回変換して、ダウンストリームでレポート作成などに使用されるさまざまなテーブルを作成します。これらの DAG は、(ETL のセクションに示されている)オーケストレーション DAG のサブ DAG になります。

密集したオンプレミスのデータ ウェアハウスからクラウドにデータ パイプラインを移行する際は、BigQuery のようなクラウドのデータ ウェアハウスは超並列データ処理テクノロジーであることを念頭に置いてください。実際、BigQuery の場合は、ELT と同時クエリのどちらの需要の増加にも対応するために追加でリソースを購入できます。詳細については、クエリ パフォーマンスの最適化の概要をご覧ください。

抽出と読み込み(EL)

抽出と読み込み(EL)プロシージャは単独で使用することも、EL の後に変換が続く形で使用することもできます。後者の場合は ELT になります。EL と ELT が分かれているのは、独自の取り込みデータ パイプラインを作成する必要を軽減するために、このタスクを実行する複数の自動サービスが利用できるためです。詳しくは、BigQuery Data Transfer Service をご覧ください。

変更データ キャプチャ(CDC)

変更データ キャプチャ(CDC)は、データの変更追跡に適用されるさまざまなソフトウェア設計パターンのうちの 1 つです。データ ウェアハウジングでは、通常は CDC が使用されています。データ ウェアハウスは、各種のソースシステムから一定期間にわたるデータとその変更を照合して追跡するために使用されるためです。

次の図は、CDC が ELT と連動する場合の例を示しています。

個々のレコードに抽出時にバージョン情報が割り当てられ、読み込み時にタイムスタンプが追加される ETL フロー。

図 4. CDC と ELT が連動する仕組み。

CDC が ELT に役立つわけは、ダウンストリームでなんらかの変更を行う前に、元のレコードを保管することが望ましいためです。

EL の部分を機能させるには、Datastream などの CDC ソフトウェアや Debezium などのオープンソース ツールを使用してデータベース ログを処理し、Dataflow を使用して BigQuery にレコードを記録します。こうすれば、さらに変換を適用する前に SQL クエリを使用して最新バージョンを判断できます。次に例を示します。

WITH ranked AS (
  SELECT
    *,
    ROW_NUMBER() OVER (
      PARTITION BY RECORD KEY
      ORDER BY EVENT TIMESTAMP DESC
    ) AS rank
  FROM TABLE NAME
)
SELECT *
FROM ranked
WHERE rank = 1

既存のデータ パイプラインのリファクタリングを行う場合も、新しいデータ パイプラインを作成する場合も、ELT プロシージャとして適用された CDC パターンの使用を検討してください。このアプローチにより、確実にアップストリームでのデータ変更の履歴が揃い、たとえば次のように明確に責任を分離できます。

  • ソースシステム チームは、ソースシステムのログの可用性とデータイベントの公開に責任を持ちます。
  • データ プラットフォーム チームは、元のレコードの集合をデータ ウェアハウスに取り込んでタイムスタンプを追加することに責任を持ちます。
  • データ エンジニアリング チームとアナリスト チームは、データマートに取り込む一連の変換をスケジュールすることに責任を持ちます。

運用データ パイプラインでのフィードバック ループ

運用データ パイプラインは、データ ウェアハウスからデータを取得して必要に応じて変換し、その結果をオペレーション システムに書き込むデータ処理パイプラインです。

オペレーション システムとは、組織の毎日のトランザクションを処理するシステムを指します。たとえば、OLTP データベース、顧客関係管理(CRM)システム、商品カタログ管理(PCM)システムなどがオペレーション システムに該当します。多くの場合、これらのシステムはデータソースとして機能することから、運用データ パイプラインはフィードバック ループパターンを実装します。

次の図は、運用データ パイプラインのパターンを示しています。

ETL パイプラインでは、データがデータ ウェアハウスに流れ、次に運用パイプラインに渡り、そこから ETL パイプラインにフィードした元のシステムに戻ります。

図 5. 運用データ パイプラインのパターン。

次の例は、サービスの価格を PCM システムに書き込む運用データ パイプラインを説明するものです。PCM システムは、販売関連の商品情報(商品の色、販売チャネル、価格、季節性など)のソースとして信頼できるシステムです。エンドツーエンドのデータフローは次のようになります。

  • 価格関連のデータは複数のソースから入手できます。たとえば、地域別の現行価格は PCM から入手できます。競合他社の価格はサードパーティ サービスから入手できます。また、需要の予測データとサプライヤーの信頼性に関するデータは内部システムから入手できるといった具合です。
  • ETL パイプラインはソースからデータを pull し、そのデータを変換してその結果をデータ ウェアハウスに書き込みます。この例での変換は、PCM 内の各サービスに最適な基準価格を導き出すことを目標とした、すべてのソースが関連する複雑な計算です。
  • 最後に、運用パイプラインはデータ ウェアハウスから基準価格を取得し、季節的なイベントに合わせて価格を調整するための軽微な変換を行います。その最終的な価格を PCM に再び書き込みます。

ETL システムにフィードする PCM システム。

図 6. サービスの価格を PCM システムに書き込む運用データ パイプライン。

運用データ パイプラインは一種のダウンストリーム プロセスです。一方、ETLELT、または CDC を実装するデータ パイプラインはアップストリーム プロセスです。ただし、それぞれのデータ パイプラインを実装するために使用するツールは重複する場合があります。たとえば、すべてのデータ処理 DAG を定義して実行するには Dataflow を使用できます。BigQuery 内で実行する変換を定義するには GoogleSQL を使用できます。また、エンドツーエンドのデータフローをオーケストレートするには Cloud Composer を使用できます。

移行のアプローチを選択する

このセクションでは、データ パイプラインを移行するために採用できるさまざまなアプローチについて説明します。

BigQuery に書き込まれるようにデータ パイプラインをリダイレクトする

次の状況では、使用する技術が組み込みの BigQuery シンク(書き込みコネクタ)を提供しているかどうかを考慮しても良いでしょう。

  • レガシーデータ ウェアハウスに、ETL プロシージャを実行するデータ パイプラインからデータがフィードされます。
  • 変換ロジックは、データがデータ ウェアハウスに保存される前に実行されます。

BigQuery コネクタを使用したデータ処理テクノロジーを提供している独立系ソフトウェア ベンダー(ISV)としては、たとえば次が挙げられます。

データ パイプライン テクノロジーが BigQuery へのデータ取り込みをサポートしていない場合、このアプローチのバリエーションとして、データを一時的にファイルに書き込んで、後でそれらのファイルを BigQuery に取り込むという方法を検討してください。

古いシステムからのフィードをブロックし、BigQuery にフィードするデータ パイプライン。

図 7. データを BigQuery に書き込むように、データ パイプラインの最後の関数を再作成(再構成)します。

大まかに言えば、必要となる作業は、データを BigQuery に書き込むようにデータ パイプラインの最後の関数を再作成(再構成)することです。ただし、追加の変更または新たな作業を必要とする可能性がある選択肢に直面します。たとえば、次の作業です。

機能面

  • データ マッピング: ターゲット データベースのテーブル スキーマが変更される可能性がある場合、それらのマッピングを再構成する必要があります。
  • 指標の検証: スキーマとクエリの両方が変更される可能性があるため、履歴レポートと新しいレポートをどちらも検証する必要があります。

機能以外の面

  • オンプレミスから BigQuery への送信データ転送を許可するようにファイアウォールを構成しなければならない場合があります。
  • 送信データ転送に対応できるように、追加の帯域幅を作成するためのネットワークの変更が必要になる場合があります。

ファイルを中間手段として使用してデータ パイプラインをリダイレクトする

既存のオンプレミス データ パイプライン テクノロジーが Google API をサポートしていない場合、または Google API を使用できない場合は、データを BigQuery に到達させるための中間手段としてファイルを使用できます。

このアプローチはリダイレクトのアプローチと似ていますが、BigQuery に書き込み可能なネイティブ シンクを使用するのではなく、オンプレミスのファイル システムに書き込むことができるシンクを使用します。データがファイル システム内にあるときに、それらのファイルを Cloud Storage にコピーします。詳しくは、Cloud Storage の取り込みオプションの概要と、取り込みオプションの選択に関連する基準をご覧ください。

最後のステップでは、データのバッチ読み込みのガイドラインに従って、Cloud Storage から BigQuery にデータを読み込みます。

次の図は、このセクションで概説したアプローチを示しています。

古いデータ ウェアハウスではなく、ファイル システムにデータをフィードし、そのファイル システムから Cloud Storage、さらにそこから BigQuery にフィードする ETL パイプライン。

図 8. ファイルを中間手段として使用してデータ パイプラインをリダイレクトします。

ETL パイプラインのオーケストレーションに関しては、2 つの別個の手順を行う必要があります。

  1. 既存のオンプレミス パイプライン オーケストレーションを再利用して、変換後のデータをファイル システムに書き込みます。このオーケストレーションを拡張して、オンプレミスのファイル システムから Cloud Storage にファイルがコピーされるようにするか、定期的にコピーのステップを実行するスクリプトを追加で作成します。
  2. データが Cloud Storage 内にある場合は、Cloud Storage 転送を使用して Cloud Storage から BigQuery への定期的な読み込みをスケジュールします。Cloud Storage 転送に代わる方法としては、Cloud Storage トリガーCloud Composer があります。

図 8 に示されているように、Google Cloud 上のオーケストレーションに、SFTP などのプロトコルを使用してファイルを取得する pull モデルを採用することも可能です。

既存の ELT パイプラインを BigQuery に移行する

ELT パイプラインは 2 つの部分からなります。最初の部分でデータ ウェアハウスにデータを読み込み、もう 1 つの部分で SQL によってデータを変換し、ダウンストリームのプロセスで使用できるようにします。ELT パイプラインを移行する際は、それぞれの部分に固有の移行アプローチがあります。

データをデータ ウェアハウスに読み込む部分(EL の部分)については、データ パイプラインをリダイレクトするのセクションで説明しているガイドラインに従うことができます。ただし、EL パイプラインには含まれない変換に関するアドバイスは無視してください。

データソースが BigQuery Data Transfer Service(DTS)で直接またはサードパーティとの統合によってサポートされている場合は、DTS を使用して EL パイプラインを置き換えることができます。

既存の OSS データ パイプラインを Dataproc に移行する

データ パイプラインを Google Cloud に移行する際に、Apache HadoopApache SparkApache Flink などのオープンソース ソフトウェアを使用して作成されたレガシージョブを移行しなければならない場合があります。

Dataproc を使用すると、高速で使いやすいフルマネージドの Hadoop クラスタと Spark クラスタを、簡単かつコスト効果の高い方法でデプロイできます。Dataproc は、BigQuery コネクタを統合します。この BigQuery コネクタは Java ライブラリで、これにより Hadoop と Spark は Apache Hadoop の InputFormat クラスと OutputFormat クラスの抽象化されたバージョンを使用してデータを直接 BigQuery に書き込むことが可能になります。

Dataproc では簡単にクラスタを作成、削除できるので、1 つのモノリシックなクラスタを使うのではなく、多数のエフェメラル クラスタを使用できます。このアプローチにはいくつかのメリットがあります。

  • ジョブごとに異なるクラスタ構成を使用できるため、ジョブ間でツールを管理する必要がなくなります。
  • 個々のジョブまたはジョブグループに合わせてクラスタをスケーリングできます。
  • ジョブで使用したリソース量に対してのみ課金されます。
  • クラスタは、使用するタブに新しく構成されます。クラスタを常時維持する必要はありません。
  • 開発、テスト、本番環境用に別々のインフラストラクチャを用意する必要はありません。同じ定義を使用して、必要に応じて必要な数のクラスタを作成できます。

ジョブを移行するときは、増分移行のアプローチを取ることをおすすめします。このように徐々に移行する場合、次のメリットがあります。

  • 既存の Hadoop インフラストラクチャの個々のジョブを、成熟した環境にはつきものである複雑性から解放できます。
  • ジョブを個別に調査し、最適な移行パスを判断できます。
  • 予期しない問題が発生しても、依存するタスクを遅らせることなく、対処できます。
  • 本番環境に影響を与えることなく、複雑なプロセスごとに概念実証を実施できます。
  • 推奨のエフェメラル モデルにジョブを慎重に移行できます。

既存の Hadoop ジョブと Spark ジョブを Dataproc に移行する場合、移行するジョブの依存関係が、サポートされている Dataproc バージョンの対象となるかを確認するにはこちらをご覧ください。カスタム ソフトウェアをインストールする必要がある場合は、独自の Dataproc イメージを作成する、使用可能な初期化アクションを使用する(たとえば、Apache Flink の場合)、独自の初期化アクションを作成する、またはカスタム Python パッケージの要件を指定するオプションを検討してください。

ご利用にあたっては、Dataproc クイックスタート ガイドBigQuery コネクタのコードサンプルをご覧ください。オンプレミスから Dataproc への Hadoop ジョブの移行Dataproc への Apache Spark ジョブの移行もご覧ください。

サードパーティ データ パイプラインを Google Cloud 上で実行するように再ホストする

オンプレミスでデータ パイプラインを構築する際の一般的なシナリオでは、サードパーティ ソフトウェアを使用してパイプラインの実行とコンピューティング リソースの割り当てを管理します。

このようなパイプラインをクラウドに移行するには、使用しているソフトウェアの機能だけでなく、ライセンス、サポート、メンテナンスの各チームに応じていくつかの選択肢があります。

以降のセクションで、選択肢のいくつかを紹介します。

大まかに言うと、Google Cloud 内でサードパーティ ソフトウェアを実行するには、次の選択肢があります(それほど複雑でない選択肢から複雑な選択肢の順に記載します)。

  • Google Cloud と提携しているソフトウェア ベンダーの場合、そのベンダーのソフトウェアを Google Cloud Marketplace から入手できます。
  • サードパーティ ソフトウェアを Kubernetes 上で実行できる場合があります。
  • サードウェア ソフトウェアを 1 つ以上の仮想マシン(VM)で実行します。

サードパーティ ソフトウェアが Cloud Marketplace ソリューションとして提供されている場合、必要となる作業は次のとおりです。

この選択肢では、ベンダーが提供する使い慣れたプラットフォームを使用してデータ パイプラインをクラウドに導入できるため、かなり簡単に移行できます。さらに、元の環境と Google Cloud 上の新しい環境との間の移行を容易にする、ベンダーの専有ツールを使用できる場合もあります。

ベンダーが Cloud Marketplace ソリューションを提供していない場合でも、ベンダーのプロダクトを Kubernetes 上で実行できれば、Google Kubernetes Engine(GKE)を使用してパイプラインをホストできます。この場合は、次の作業が必要になります。

  • サードパーティ ソフトウェアが Kubernetes で可能なタスクの並列処理を利用できるよう、ベンダーの推奨に従って GKE クラスタを作成します。
  • 作成した GKE クラスタに、ベンダーの推奨に従ってサードパーティ ソフトウェアをインストールします。
  • データ ウェアハウスの BigQuery への移行: 概要で説明した反復的なアプローチに従って、移行対象のユースケースを選択し、移行します。

複雑さという点では、この選択肢は中間に位置します。この選択肢ではパイプラインの実行をスケーリングして並列処理するために、ベンダー固有の Kubernetes サポートを利用できます。ただし、GKE クラスタを作成して管理する必要があります。

ベンダーが Kubernetes をサポートしていない場合は、タスクをスケールアウトして並列処理できるよう、サードパーティ ソフトウェアを VM のプールにインストールする必要があります。ベンダーのソフトウェアが複数の VM へのタスク分散をサポートしているとしたら、ソフトウェアで提供されている機能を使用して、複数の VM インスタンスをマネージド インスタンス グループ(MIG)にグループ化します。これにより、必要に応じてスケールアウト / スケールインできます。

タスクの並列処理を扱うのは簡単なことではありません。ベンダーが複数の VM へのタスク分散機能を提供していない場合は、タスク ファーミング パターンを使用して MIG 内の VM にタスクを分散することをおすすめします。次の図は、このアプローチを示しています。

複数の入力が Pub/Sub に送信され、トピックが作成されます。これらのトピックは、さまざまな MIG によって読み取られます。

図 9. 3 つの VM からなるマネージド インスタンス グループ(MIG)

この図では、MIG 内の VM のそれぞれがサードパーティ パイプライン ソフトウェアを実行します。パイプラインの実行をトリガーするには、次の方法があります。

  • Cloud SchedulerCloud Composer、または Cloud Storage trigger を使用して、新しいデータが Cloud Storage バケットに到着した時点で自動的にトリガーする。
  • Cloud Endpoint または Cloud Functions を呼び出すか、Pub/Sub API を使用してプログラムによってトリガーする。
  • Google Cloud CLI を使用して、手動で新しいメッセージを Pub/Sub トピックに配置してトリガーする。

基本的に、前述のいずれの方法でも、メッセージを事前定義された Pub/Sub トピックに送信できます。各 VM にインストールする単純なエージェントを作成し、このエージェントで 1 つ以上の Pub/Sub トピックをリッスンします。メッセージがそのトピックに到着すると、エージェントがトピックからメッセージを pull し、サードパーティ ソフトウェア内のパイプラインを起動して、パイプラインの完了をリッスンします。パイプラインが完了すると、エージェントはリッスンしているトピックから次のメッセージを取得するという仕組みです。

いずれのシナリオでも、パイプラインを Google Cloud 上で動作させるための適切なラインセンス条項に従うために、ベンダーと協力することをおすすめします。

Google Cloud のマネージド サービスを使用するようにデータ パイプラインを再作成する

場合によっては、Google Cloud 上の新しいフレームワークとフルマネージド サービスを使用するように既存のデータ パイプラインを再作成するという選択肢を選ぶこともできます。このオプションが適しているのは、既存のパイプラインの実装で使われているテクノロジーが非推奨になっているか、パイプラインを変更せずにクラウドに移植して管理するのは実際的ではない、または法外な費用がかかると見込まれる場合です。

以降のセクションで、高度なデータ変換を大規模に実行できるフルマネージド GCP サービスとして Cloud Data Fusion と Dataflow の 2 つを紹介します。

Cloud Data Fusion

オープンソースの CDAP プロジェクトをベースに作成されている Cloud Data Fusion は、グラフィカル インターフェースを使用してデータ パイプランを構築、管理できる、フルマネージド データ統合サービスです。

Cloud Data Fusion UI でデータ パイプラインを開発するには、ソースを変換、シンク、その他のノードに接続して DAG を形成します。開発したデータ パイプラインをデプロイすると、Cloud Data Fusion プランナーがこの DAG を、Dataproc 上の Apache Spark ジョブとして実行される一連の並列計算に変換します。

Cloud Data Fusion を使用する場合、Java Database Connectivity(JDBC)ドライバを使用できるため、コードを作成する必要はありません。JDBC ドライバを使用すれば、ソースシステムのデータベースに接続してデータを読み取り、変換してから任意の宛先(たとえば、BigQuery)に読み込むことができます。それには、Cloud Data Fusion インスタンスに JDBC ドライバをアップロードして、データ パイプラインとして使用できるように構成する必要があります。詳しくは、Cloud Data Fusion での JDBC ドライバの使用に関するガイドをご覧ください。

Cloud Data Fusion では、ソース、変換、集計、シンク、エラーコレクタ、アラート パブリッシャー、アクション、実行後アクションとしてのプラグインが、カスタマイズ可能なコンポーネントとして公開されています。事前に作成されたプラグインで幅広いデータソースにアクセスできます。必要なプラグインが存在しない場合は、Cloud Data Fusion プラグイン API を使用して独自のブラグインを作成することもできます。詳細については、プラグインの概要をご覧ください。

Cloud Data Fusion パイプラインを使用する場合、バッチデータ パイプラインとストリーミング データ パイプラインの両方を作成できます。データ パイプラインでログと指標にアクセスできるようすると、管理者がデータ処理ワークフローを運用化するための手段にもなるため、カスタムツールを使用する必要がなくなります。

ご利用にあたっては、Cloud Data Fusion のコンセプトの概要をご覧ください。実用的な例については、クイックスタート ガイドターゲティング キャンペーン パイプラインの作成方法に関するチュートリアルをご覧ください。

Dataflow

Dataflow は、Apache Beam ジョブを大規模に実行するためのフルマネージド サービスです。Apache Beam は、ウィンドウ処理とセッション分析のプリミティブが豊富に用意されているだけでなく、ソースとシンクのコネクタからなるエコシステムも提供しているオープンソースです。これには BigQuery 用のコネクタも含まれます。Apache Beam を使用して、ストリーミング(リアルタイム)モードのデータとバッチ(履歴)モードのデータを同等の信頼性と明瞭度で変換、拡充できます。

Dataflow のサーバーレス アプローチは、パフォーマンス、スケーリング、可用性、セキュリティ、コンプライアンスに自動的に対処することによって、運用上のオーバーヘッドを取り除きます。つまり、ユーザーはサーバー クラスタの管理ではなく、プログラミングに専念できるということです。

Dataflow ジョブは、コマンドライン インターフェースJava SDK または Python SDK のいずれかを使用してさまざまな方法で送信できます。現在、すべての SDK とランナーを完全に相互運用可能にするために、ポータビリティ フレームワークの開発も進められています。

データクエリとパイプラインを他のフレームワークから Apache Beam と Dataflow に移植するには、Apache Beam プログラミング モデルの詳細を確認して、公式の Dataflow ドキュメントをご覧ください。

実用的な例については、Dataflow のクイックスタートチュートリアルをご覧ください。

オーケストレーションとスケジューリング

大まかに言うと、オーケストレーションとは複数のシステムを自動的に調整すること、スケジューリングとはオーケストレーション処理を自動的にトリガーすることを意味します。

  • ズームイン: データ パイプライン自体が、DAG(この場合はデータ処理 DAG)で記述されたデータ変換オーケストレーションです。
  • ズームアウト: データ パイプラインが他のデータ パイプラインの出力に依存する場合は、複数のパイプラインのオーケストレーションが必要になります。各パイプラインがサブ DAG となり、1 つの DAG(この場合はオーケストレーション DAG)を構成します。

これがデータ ウェアハウスでの典型的なセットアップです。ETL のセクションに記載されている図 1 に、セットアップの例が示されています。以降のセクションでは、複数のデータ パイプラインのオーケストレーションに目を向けます。

依存関係

依存関係は「ファンイン」または「ファンアウト」できます。ファンインでは、複数のデータ パイプラインが 1 つのオーケストレーション DAG の頂点に結合されます。ファンアウトでは、1 つのデータ パイプラインによって他の複数のデータ パイプラインがトリガーされます。多くの場合は、次の図に示すようにファンインとファンアウトが組み合わせられます。

A、B、C という複数のパイプラインがパイプライン D にファンインします。パイプライン D はパイプライン E、F、G にファンアウトします。これらはすべてオーケストレーション DAG によって編成されます。

図 10. ファンインとファンアウトを組み合わせた依存関係。

最適ではない環境では、一部の依存関係は利用可能なリソース量の制約によって生じたものです。たとえば、あるデータ パイプラインの実行に伴って、共通データが生成されます。他のデータ パイプラインは再計算を避けるためにこの共通データに依存しますが、共通データを生成したデータ パイプラインとは無関係です。最初のパイプラインで機能面のまたは機能面以外の問題が発生した場合、この最初のパイプラインに依存する他のデータ パイプラインにまでその障害が波及します。これにより、他のデータ パイプラインは待機状態になり、最悪の場合は、次の図に示すように完全に実行不能になります。

パイプライン A が失敗します。パイプライン A からの出力に依存するため、パイプライン B とパイプライン C も失敗します。

図 11. データ パイプラインの障害の波及により、そのデータ パイプラインに依存するパイプラインが実行できなくなります。

Google Cloud には、パイプラインとそのオーケストレーションの実行を最適化するために使用できる、豊富なコンピューティング リソースと特化されたツールが用意されています。残りのセクションでは、これらのリソースとツールについて説明します。

必要な移行作業

オーケストレーションのニーズを簡素化することがベスト プラクティスです。データ パイプライン間の依存関係の数が増えれば増えるほど、オーケストレーションは複雑化します。Google Cloud への移行は、オーケストレーション DAG を検査して依存関係を識別し、それらの依存関係を最適化する方法を判断する機会になります。

次のように段階的に依存関係を最適化することをおすすめします。

  1. 最初のイテレーションで、オーケストレーションをそのままの形で Google Cloud に移行します。
  2. 以降のイテレーションで、依存関係を分析し、実行可能であれば依存関係を並列化します。
  3. 最後に、共通タスクを独自の DAG に抽出してオーケストレーションを再編成します。

次のセクションで、具体的な例を用いてこの方法を説明します。

具体的な例

ある組織で、互いに関連する 2 つのパイプラインを使用しているとします。

  • 最初のパイプラインで組織全体の損益(P&L)を計算します。これは、多数の変換を使用する複雑なパイプラインです。パイプラインの一部は月間売上高の計算からなります。この売上高が、以降の変換ステップで使用されて最終的にテーブルに書き込まれます。
  • 2 番目のパイプラインでは、マーケティング部門が広告キャンペーンの取り組みを調整できるように、さまざまな商品の前年比と前月比の売上成長率を計算します。このパイプラインには、以前 P&L データ パイプラインで計算された月間売上高のデータが必要です。

この組織は、P&L データ パイプラインのほうがマーケティング パイプラインよりも優先度が高いとみなしています。あいにく、P&L は複雑なデータ パイプラインで大量のリソースを消費するため、他のパイプラインを同時に実行できません。しかも、P&L パイプラインで障害が発生した場合、マーケティング パイプラインと他の従属パイプラインがデータを実行できなくなり、P&L が再試行されるまで待機しなければなりません。次の図は、この状況を示しています。

P&L パイプラインは、マーケティング パイプラインに必要な「月間売上」アーティファクトを作成します。P&L パイプラインでは遅延などの問題が発生する可能性があります。

図 12. 複雑なデータ パイプラインが原因で、それよりも優先順位の低いパイプラインが実行できなくなる可能性があります。

組織は現在、BigQuery への移行を進めているところです。移行にあたって、2 つのユースケース(P&L とマーケティング売上成長率)を識別し、それらを移行バックログに含めています。次のイテレーションを計画する際に、組織は P&L ユースケースのほうを優先し、このユースケースをイテレーション バックログに追加しました。現在のオンプレミスではリソースが非常に限られていて、P&L データ パイプラインがたびたび遅延の原因となっているからです。P&L ユースケースの従属ユースケースもいくつか追加されています。そのうちの 1 つはマーケティング ユースケースです。

移行チームが最初のイテレーションを実施します。チームはリダイレクトのアプローチに従って、P&L ユースケースとマーケティング ユースケースの両方を Google Cloud に移行することにしました。パイラインのステップとオーケストレーションに変更は加えません。移行によって、P&L パイプラインで無制限のコンピューティング能力を使用できるようになりました。そのため、オンプレミスと比較すると、このパイプラインの実行時間が大幅に短縮されています。P&L パイプラインは月間売上高データを、マーケティング売上成長率パイプラインが使用する BigQuery テーブルに書き込みます。次の図は、これらの変更を示しています。

P&L パイプラインは以前と同じですが、遅延は発生しません。

図 13. リダイレクトのアプローチを使用した、複雑なデータ パイプラインの高速化。

Google Cloud は機能面以外の P&L の問題に役立ちましたが、機能面での問題はまだ残っています。月間売上高の計算には関係がないものの、この計算に先行する一部のタスクでよくエラーが発生します。先行タスクでエラーが発生すると、月間売上高の計算を実行できなくなります。さらに、従属パイプラインも起動できません。

2 回目のイテレーションで、チームはパフォーマンスの向上を目指して、両方のユースケースをイテレーション バックログに追加しました。チームは、P&L パイプラインで月間売上高を計算するパイプライン ステップを特定します。これらのステップは、次の図に示すように 1 つのサブ DAG を構成するものです。移行チームはこのサブ DAG をマーケティング パイプラインにコピーして、P&L とは独立してこのパイプラインを実行できるようにしました。Google Cloud には十分なコンピューティング能力があるため、両方のパイプラインを同時に実行できます。

P&L パイプラインとマーケティング パイプラインは別々のサブ DAG として実行されるため、P&L パイプラインに問題が発生しても、マーケティング パイプラインに影響はありません。

図 14. サブ DAG を使用して同時に実行される 2 つのパイプライン。

サブ DAG ロジックを重複させると、チームがサブ DAG ロジックの両方のコピーを同期した状態に維持しなければならないため、コードの管理オーバーヘッドが生じるという欠点があります。

3 回目のイテレーションで、チームはユースケースを見直し、月間売上高のサブ DAG を独立したパイプラインに抽出することにしました。新しい月間売上高パイプラインは、P&L パイプライン、マーケティング売上成長率パイプライン、他の従属パイプラインをトリガー(ファンアウト)します。この構成によって、パイプラインのそれぞれをオーケストレーション サブ DAG の 1 つとした、新しい完全なオーケストレーション DAG が作成されます。

「月間売上」が最初のパイプラインになり、ここから P&L パイプラインとマーケティング パイプラインにフィードします。

図 15. 個別のサブ DAG に各パイプラインを含めた、完全なオーケストレーション DAG。

移行チームは残りの機能面での問題については、後続のイテレーションで解決できます。問題を解決した後は、次をはじめとする Google Cloud マネージド サービスを使用するためにパイプラインを Google Cloud に移行できます。

  • Dataflow: Beam モデルを使用して、各データ パイプラインを自己完結型 DAG として定義できます。
  • Cloud Composer: 広範囲に及ぶオーケストレーションを 1 つ以上の Airflow DAG として定義できます。

Airflow はサブ DAG をネイティブにサポートしていますが、ネイティブ サポート機能ではパフォーマンスが制限される可能性があるため、Airflow DAG は推奨されません。代わりに、独立した DAG を TriggerDagRunOperator オペレーターとともに使用してください。

次のステップ

データ ウェアハウス移行の次のステップの詳細を確認する。

特定のデータ ウェアハウス テクノロジーから BigQuery に移行する方法についても説明します。