コンテンツに移動
デベロッパー

Google Cloud でデータ ワークロードをオーケストレート

2021年7月1日
Google Cloud Japan Team

※この投稿は米国時間 2021 年 6 月 18 日に、Google Cloud blog に投稿されたものの抄訳です。

データと分析の核心は、データから分析情報を引き出すことにより、影響力のある意思決定を行えるようにすることです。データの有用性を追求するために、往々にしてデータ サイエンティストやエンジニアには、データの取り込み、処理、分析を行うエンドツーエンドのワークフローを構築するという業務が課せられます。

これらのワークフローには通常、特定の順序で実行する必要のある複数のタスクまたはサービスが含まれます。それでは、タスクの実行が正常に完了したかどうか、いつ完了したかを知るにはどうすればいいでしょうか。また、ワークフローのサービスが失敗するとどうなるのでしょう。ワークロードの規模が大きく複雑になるにつれて、こうした疑問はより差し迫ったものになり、解決が難しくなることが想像できます。

そこで、オーケストレーションの出番です。オーケストレーションとは、ワークフローの自動化、管理、調整ですこのブログでは、Google Cloud でデータ ワークフローをオーケストレートする方法について考察します。

まず使用例を確認する

Google Cloud Storage の複数のバケットにまたがるデータがあり、BigQuery や、Google Cloud のデータ ウェアハウスなどで読み込みを行う前に抽出して変換する必要があるとします。これを行うには、Dataflow、Data Fusion、または Dataproc(順番に Beam、CDAP、Spark のマネージド サービス)でデータ パイプラインを構築します。

https://storage.googleapis.com/gweb-cloudblog-publish/images/sEYivV94rfNO.max-1000x1000.max-1000x1000.png

どの段階でオーケストレーションが必要になるのかご説明します。

  1. 毎晩午前 0 時にデータを変換するデータ パイプラインを実行します。

  2. パイプラインを実行する前に、データが Cloud Storage に存在することを確認しておきます。

  3. BigQuery ジョブを実行して、新しく処理されたデータのビューを作成します。

  4. パイプラインが完了したら Cloud Storage バケットを削除します。

  5. これらのいずれかが失敗した場合に Slack で通知を受け取る設定が必要です。

このプロセスは、より広範囲のワークフローの一部として調整する必要があります。つまり、オーケストレーションが必要です。

オーケストレーションとは、データを直接変換または処理することではなく、それらを行うサービスのサポートと調整であるという点を理解することが重要です。

どのツールを使用するべきか

Google Cloud Platform には、Cloud でワークフローをオーケストレートするのに役立つツールが数多く用意されています。これらのプロダクトのより詳細な比較については、このシリーズの最初のブログ投稿、適切なオーケストレーターの選択をご参照ください。データドリブン(特に ETL / ELT)のワークロードのオーケストレーションには Cloud Composer が最適です。

Cloud Composer は、ワークフローの作成、スケジュール設定、モニタリングに使用されるフルマネージドのオーケストレーション ツールです。Cloud Composer は、成長を続ける人気のオープンソース Apache Airflow プロジェクトをベースに構築されています。

最終的な目標は、タスクで構成されるワークフローを構築することです。ワークフローは、Python で有向非巡回グラフ(DAG)を作成することで構成できます。DAG は巡回のない一方向のタスクのフローで、それぞれのタスクは個別の作業単位を担当します。

https://storage.googleapis.com/gweb-cloudblog-publish/images/lyki-.Eldq1S.max-1100x1100.max-1100x1100.png

ワークフローが少数の cron ジョブまたは依存関係が緩やかなアドホック スクリプトによってまとめられていて、管理方法を知っているのが数人に限定されている場合は、DAG の特長を実感できるでしょう。全員が使用できる集中管理ツールでタスク全体を実行することによってワークフローの簡素化、一元化、共同作業化が実現でき、Airflow UI で追跡することも可能です。

なぜデータ ワークフローのオーケストレーションに Composer を使用するのか

Composer はさまざまなユースケースをサポートできますが、ユーザーの大多数を占めているのは、データ パイプラインをオーケストレートするワークフローを構築中のデータ エンジニアです。Composer はデータ ワークフローの管理で直面するいくつかの一般的な課題を解消できるので、多くのデータ エンジニアから支持されています。

複数のサービスを調整したりインターフェースで連結したりできることは、あらゆる ETL または ELT ワークフローにとって何よりも重要なことであり、エンジニアはこれを確実かつ安全に行う必要があります。ありがたいことに、複数のクラウド環境間でサービスとの通信を可能にする演算子とセンサーが何百も存在します。API を直接呼び出すための大量のコードを記述、維持する必要はありません。

ワークフローがより複雑になるにつれて、高度なタスク管理がますます重要になってきます。以前のタスクの状態に基づいて、タスクの並列化や分岐ができます。また、スケジュール機能や、予期しない動作に対処する機能(エラー発生時のメール送信や通知など)が組み込まれています。

Composer は、DAG の作成と実行に必要なインフラストラクチャを管理し、Airflow 環境の作成と維持を自動で行うことが可能なため、エンジニアの Airflow エクスペリエンスをレベルアップさせることができます。反復的なタスクを自動化して Composer に任せられることに加え、インフラストラクチャ管理に集中する必要がないということは、データ エンジニアがデータ パイプラインとワークフローの実際の構築に専念する時間を取り戻せるということです。

Composer をデータの処理と変換に使用できるか

Composer はデータ処理ツールではないため、ビッグデータを直接変換して処理するために使用しないでください。あるタスクから次のタスクに大量のデータを渡すようには設計されておらず、高度なデータ処理の並列化や、ビッグデータの処理に不可欠な集計などの機能は備えていません。また、Composer は、あるタスクを終了してから別のタスクを開始するまでに数秒かかる可能性があるため、超低レイテンシを必要とするワークロードより、バッチ ワークロードに適しています。

データ転送演算子について

注目される点として、たとえば Google Cloud Storage から BigQuery への演算子のように、あるソースから別のソースにデータを転送する転送演算子があります。

それでは、Composer はテータ転送には使用されないということでしょうか。それは少し違います。内部的には、Composer はデータを転送する BigQuery API の呼び出しだけを行います。データのダウンロードや、Composer ワーカー間の転送は行いません。これらはすべて BigQuery リソースが行います。

Composer の Dataflow や Data Fusion との比較

Data Fusion、Dataflow、Dataproc などのサービスは、データの取り込み、処理、変換に最適です。これらのサービスは、ビッグデータ上で直接操作するように設計されており、パフォーマンスの集約(シャッフル、グループ化)とデータのスケーリングをサポートするバッチ パイプラインとリアルタイム パイプラインの両方を構築できます。そして、このようにしてデータ パイプラインを構築するにあたり、Composer を使用してこれらのサービスの実行をより広範なワークフローの一部として管理できます。

Composer の使用例を復習する

最初に、ワークフローの各ステージのタスクで DAG を作成します。Dataflow にデータ パイプラインがあると仮定します。

https://storage.googleapis.com/gweb-cloudblog-publish/images/Blog_2_-_example_GCP_Composer_DAG.max-1500.max-1400x1400.png

これらのタスクが順次実行され、いずれかのタスクが失敗した場合、エラー通知が Slack に投稿されます。Slack との接続は Airflow UI で簡単に設定できます。DAG の一部でスケジュール間隔を定義できます。この場合は、シンプルに毎晩午前 0 時に実行するようにしました。

読み込んでいます...

これらのタスクは、Cloud Storage、BigQuery、Dataflow、Slack の演算子を使って簡単に作成できます。次の例は、Google Cloud Storage バケット内のファイルの存在を単純に調べる Cloud Storage センサーのスニペットです。

読み込んでいます...

この DAG のコードサンプルはこちらで確認できます。

DAG が完成したら、それを Google Cloud Storage にアップロードするだけで Airflow UI から実行できます。ここから、チームは Composer ワークフローのトリガーや追跡、進行状況のモニタリングを簡単に行えます。

https://storage.googleapis.com/gweb-cloudblog-publish/images/Airflow_UI_Screenshot.max-700x700.png

クラウド上でデータをホストできるようになったことで、データドリブン ワークロードはこれまでになく急速に進化、拡大してきました。エンジニアはタスクとサービスの管理を簡素化および一元化することを目指しており、データ オーケストレーションは重要性を増しています。Composer にワークフローのオーケストレーションと基盤となるリソースの管理を任せることで、データ エンジニアはデータ エンジニアリングのよりクリエイティブな面に集中できます。

もっと詳しくお知りになりたい場合

Airflow に焦点を当てた近日開催予定のオープンソース ライブ イベントにぜひご登録ください。また、Composer の使用を開始するためのこちらのチュートリアルコードサンプルもご覧ください。

-Google Cloud デベロッパー アドボケイト Rachael Deacon-Smith

投稿先