データ分析

Terraform による Cloud Data Fusion デプロイの自動化

Cloud Data Fusion.jpg

※この投稿は米国時間 2020 年 6 月 13 日に、Google Cloud blog に投稿されたものの抄訳です。

Cloud Data Fusion は、ETL データ パイプラインを効率的に構築、管理できる、フルマネージドかつクラウド ネイティブなデータ統合サービスです。Cloud Data Fusion は、オープンソース プロジェクト CDAP の機能を利用しています。

Cloud Data Fusion の仕組み

多くの企業には、複数のソースからデータを取得し、そのデータを分析に役立つ形式に変換するデータ統合パイプラインがあります。Cloud Data Fusion は、このようなパイプラインをコーディングをほとんどあるいは一切行わずに構築できる Google Cloud サービスです。パイプラインを構成する方法の一つとして、UI を使用する方法があります。UI を使用すると視覚的かつ直感的にパイプラインを設計できますが、多くの組織では、本番環境でのデプロイを自動化する必要があります。管理可能な方法で自動化するには、Infrastructure as Code を介して行います。Terraform は、Hashicorp によって管理される Infrastructure as Code ツールであり、インフラストラクチャをスピンアップする業界標準の方法です。 

CDAP によりパイプラインのオーサリングが容易になりますが、CDAP REST API を使用してデプロイを自動化するには、いくつかの追加作業が必要です。このブログでは、Terraform を使用して Infrastructure as Code 内のさまざまな CDAP リソースのデプロイを自動化する方法について説明します。その際、コミュニティで管理される Terraform CDAP プロバイダに組み込まれている CDAP REST API の有用な抽象化を活用します。この投稿では、Cloud Foundations ツールキット モジュールのオープンソース化されているさらなる抽象化に焦点を当てます。

Cloud Data Fusion インスタンスの作成

Cloud Data Fusion インスタンスは、datafusion モジュールを使用して作成できます。以下は、モジュールの基本的な使用法です。

  ```
module "data_fusion" {
  source  = "terraform-google-modules/data-fusion/google"
  version = "~> 0.1"

  name    = "example-instance"
  project = "example-project"
  region  = "us-central1"
}
```

このモジュールでは、作成または再利用するインスタンスの名前、プロジェクト ID、リージョン、サブネットワークの入力が必須です。指定された場合を除き、インスタンス タイプのデフォルトはエンタープライズです。オプションで dataproc_subnet、labels、options も入力できます。 

プライベート IP CDF インスタンスの前提条件のデプロイ

ネットワーク上のトラフィックはパブリック インターネットを経由しないため、多くのユースケースでは、プライベート VPC ネットワークを介して Cloud Data Fusion への接続を確立する必要があります。プライベート IP Cloud Data Fusion インスタンスを作成するには、特定のインフラストラクチャをデプロイする必要があります。具体的には、VPC ネットワーク、Dataproc クラスタをデプロイするためのカスタム サブネット、Data Fusion テナント プロジェクトとのピアリングのための IP の割り当てが必要です。VPC はプライベート ネットワーク モジュールを使用してデプロイできます。また、Cloud Data Fusion バージョン 6.1.2 以前を使用している場合は、モジュールによって SSH 上り(内向き)ルールが作成され、Data Fusion インスタンスがポート 22 の Dataproc クラスタに到達できるようになります。以下は、モジュールの基本的な使用法を示すスニペットです。

  ```
module "private_data_fusion_network" {
  source  = "terraform-google-modules/data-fusion/google//modules/private_data_fusion_network"
  version = "~> 0.1"

  project_id  = "your-gcp-project-id"
  vpc_network = "data-fusion-net"
}
```

このモジュールで必須の入力: Data Fusion サービス アカウント、プライベート Data Fusion インスタンス ID、プライベート Data Fusion インスタンスのファイアウォール ルールを使用して作成される VPC ネットワーク、プライベート Data Fusion セットアップ用の gcp プロジェクト ID、プライベート Data Fusion インスタンス ID。このモジュールからの出力: プライベート Data Fusion インスタンス用に予約された IP CIDR 範囲、プライベート Data Fusion インスタンス用に作成された VPC、プライベート Data Fusion インスタンスによって制御される Dataproc クラスタ用に作成されたサブネットワーク、プライベート Data Fusion インスタンス用に作成された VPC。

名前空間の構成

名前空間は、Cloud Data Fusion インスタンスを論理的に分割する目的で使用します。名前空間は、パイプライン、プラグイン、メタデータ、構成の分離をランタイムに実現するためと、マルチテナンシーを提供するために必要です。これは、さまざまなサブ組織がすべて同じインスタンス内にある場合に役立ちます。インスタンス内の各名前空間は、インスタンス内のパイプライン、設定、プラグイン、アーティファクトに関して、インスタンス内の他のすべての名前空間から分離されています。名前空間でさまざまな処理を実行するために必要な API 呼び出しは、CDAP REST API ドキュメントに記載されています。Terraform の名前空間の例をご覧ください。

  ```
module "staging" {
  source  = "terraform-google-modules/data-fusion/google//modules/namespace"
  version = "~> 0.1"

  name = var.name
  preferences = {
    FOO = "BAR"
  }
```

このモジュールでは、作成する名前空間の名前と、この名前空間の設定(ランタイムに構成されるランタイム引数)を指定する必要があります。

コンピューティング プロファイルのデプロイ

Cloud Data Fusion では、コンピューティング プロファイルはパイプラインの実行環境を表します。コンピューティング プロファイルを使用すると、ユーザーはパイプラインの実行に必要なリソースを指定できます。現在、Cloud Data Fusion パイプラインは、主に Dataproc クラスタで Apache Spark プログラムまたは MapReduce プログラムとして実行されます。コンピューティング プロファイルは 2 つのタイプの Dataproc クラスタを管理できます。1 つはエフェメラル クラスタ、もう 1 つは永続クラスタです。エフェメラル クラスタは、パイプラインの実行中に作成され、パイプラインが終了すると破棄されます。一方、永続的クラスタは既存のものであり、データ処理ジョブのリクエストを待機します。永続クラスタは複数のパイプライン実行を受け入れることができますが、エフェメラル クラスタはジョブスコープです。エフェメラル クラスタでは毎回新しいクラスタをプロビジョニングする必要があるため、パイプラインの実行ごとに起動オーバーヘッドが組み込まれます。エフェメラル クラスタは、エクスペリエンスの管理性が高いことが利点であり、クラスタと通信する際に SSH 認証鍵を指定する必要はありません。SSH 認証鍵は、CDAP サービスによって自動的に生成されます。

コンピューティング プロファイルでさまざまな処理を実行するために必要な API 呼び出しは、CDAP REST API ドキュメントに記載されています。Google 作成の Terraform モジュールを使用すると、カスタム コンピューティング プロファイルをデプロイして、ネットワークやコンピューティング サービス アカウントなどの設定を構成することができます。これらの設定は、デフォルトのコンピューティング プロファイルでは構成できません。

次の例では、プロファイルの名前とプロファイルのラベルの入力が必須です。

  ```
module "custom_dataproc" {
  source  = "terraform-google-modules/data-fusion/google//modules/dataproc_profile"
  version = "~> 0.1"

  name   = "custom-dataproc"
  label  = "Custom Dataproc Profile"

  network         = "example-network"
  subnet          = "example-subnetwork"
  service_account = "example-sa"
  gcs_bucket      = "example-bucket"
}
```

このモジュールでは、Dataproc クラスタを実行するネットワークの名前、サブネットワークの名前、サービスア カウントの名前など、さらに多くのオプションの入力が可能です。さらに、プロファイルをデプロイする名前空間と、認証に使用するアカウントキーも指定できます。

パイプラインのデプロイと更新

 パイプラインは、パイプライン モジュールを使用してデプロイできます。名前、名前空間、エクスポートされたパイプラインのパス(json_spec_path)の入力が必要です。これを取得するには、パイプラインが Data Fusion UI にデプロイされた後、[アクション] > [エクスポート] をクリックします。

パイプラインのニュアンスについては、CDAP ドキュメントで説明しています。

前述のとおり、アプリケーションとデータの分離を実現するには名前空間が必要です。エクスポートされた JSON パイプラインのパスには、それが作成されたインスタンスのチェックポイント ディレクトリへのハードコードされた参照が含まれているため、json_spec_path にはチェックポイント キーが含まれていてはなりません。checkpointDir キーは、エクスポートされたパイプライン JSON の構成ブロックから削除する必要があります。新しいインスタンスでは、このキーが存在しない場合、正しいチェックポイント バケットであると推測されます。チェックポイントは、CDAP リアルタイム アプリで使用する必要があります。これは、CDAP リアルタイム アプリではパイプライン構成の disableCheckpoints キーが考慮されないため、Spark コンテキストが開始されないからです。詳しくは、こちらをご覧ください。このチェックポイント キーを削除する一般的な方法は、jq コマンドの使用です。

CheckpointDir キーは、Apache Spark が実行されるたびに JSON ファイルで生成されます。ここで直面する課題は、checkpointDir キーを手動で JSON から削除する必要があることです。キーは、エクスポート元の Cloud Data Fusion インスタンスから checkpointDir にハードコードされるため、削除する必要があります。これは、インスタンスが異なる環境(本番環境と開発環境)の場合に問題を引き起こす可能性があります。新しいインスタンスで正しいチェックポイント バケットを推測するには、このキーが存在していてはなりません。

以下は、cdap_application リソースのスニペットです。

  ```
resource "cdap_application" "pipeline" {
    name = "example_pipeline"
    spec = file("${path.module}/relative/path/to/pipeline_spec.json"
}
```

すでにデプロイされているパイプラインを更新するには、単にリポジトリで pull リクエストを実行します。これにより、Terraform 適用時にこの実行が停止します。さらに、Terraform によって、このパイプラインに必要なプラグインの新バージョンのプラグイン リソースが追加されます。アプリケーションは不変であるため、パイプラインが更新された場合、それらは新しいパイプラインとして(バージョン付きの名前で)処理される必要があります。

ストリーミング プログラムの実行

プログラムが実行されるタイミングは、パイプラインがランタイム引数で渡されて実行されるときであり、デプロイの完了後です。ストリーミング パイプラインは、手動でスケジュール設定またはトリガーされるバッチジョブとは異なり、長時間実行されるインフラストラクチャであるため、Infrastructure as Code として管理できます。プログラムの開始と停止、複数のプログラムのステータスの開始と確認などの処理を実行するための、関連する API 呼び出しについては、CDAP ドキュメントで説明しています。 

以下は、cdap_streaming_program_run リソースの例です。

  ```
resource "cdap_streaming_program_run" "test" {
  namespace = "adp_staging"
  app       = "HL7v2_to_fhir"

  runtime_arguments = {
    "system.profile.name" = "my-custom-profile-name"
  }
}
```

アプリの名前、プログラムの名前、ランタイム引数、タイプ(mapreduce、spark、workers など)は必須です。名前空間はオプションであり(指定しなかった場合は、デフォルトが使用されます)、cdap run_id が計算されます。 

自動化の課題は、リアルタイム ソースが実行時に変数構成(マクロとも呼ばれます)をサポートしないことです。つまり、マクロの機能を実現するハードコードされたアプリケーションを、実行ごとに追加で記述する必要があります。これを実現するには、JSON ファイルをテンプレート ファイルとして(Terraform 適用時に)レンダリングし、そこでランタイム引数を置き換えます。

提供されているモジュールを介して Cloud Data Fusion を実行してみてください。その際、最初のステップで、datafusion モジュールを使用して Cloud Data Fusion インスタンスを作成することを忘れないでください。フィードバック、提案、コメントをぜひお寄せください。ご連絡いただく場合は、リポジトリ自体に問題を作成してください。

- By Cloud テクニカル レジデント Svetak Sundhar