Cloud Composer 1 | Cloud Composer 2
このチュートリアルは、Cloud Composer 環境を Microsoft Azure に接続して、そこに格納されているデータを利用する方法を説明している、Google Cloud でデータ分析 DAG を実行するの修正版です。Cloud Composer を使用して Apache Airflow DAG を作成する方法について説明します。DAG は、BigQuery 一般公開データセットと Azure Blob Storage に保存されている CSV ファイルのデータを結合してから、Dataproc サーバーレス バッチジョブを実行して結合されたデータを処理します。
このチュートリアルの BigQuery 一般公開データセットは、世界中の気候統合データベースである ghcn_d です。CSV ファイルには、1997 年から 2021 年までの米国の休日の日付と名前に関する情報が含まれています。
DAG を使用して答えを得たい質問は、「この 25 年間で感謝祭のシカゴはどのくらい温かかったか」というものです。
目標
- デフォルト構成で Cloud Composer 環境を作成する
- Azure で blob を作成する
- 空の BigQuery データセットを作成する
- Cloud Storage バケットを新規作成する
- 次のタスクを含む DAG を作成、実行します。
- Azure Blob Storage から Cloud Storage に外部データセットを読み込む
- Cloud Storage から BigQuery に外部データセットを読み込む
- BigQuery で 2 つのデータセットを結合する
- データ分析の PySpark ジョブを実行する
準備
API を有効にする
次の API を有効にします。
コンソール
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
権限を付与する
ユーザー アカウントに次のロールと権限を付与します。
BigQuery データオーナー(
roles/bigquery.dataOwner
)のロールを付与して、BigQuery データセットを作成します。ストレージ管理者(
roles/storage.admin
)のロールを付与して、Cloud Storage バケットを作成します。
Cloud Composer 環境を作成して準備する
デフォルトのパラメータで Cloud Composer 環境を作成します。
- 米国に置かれたリージョンを選択します。
- 最新の Cloud Composer バージョンを選択します。
Airflow ワーカーが DAG タスクを正常に実行するために、Cloud Composer 環境で使用されるサービス アカウントに次のロールを付与します。
- BigQuery ユーザー(
roles/bigquery.user
) - BigQuery データオーナー(
roles/bigquery.dataOwner
) - サービス アカウント ユーザー(
roles/iam.serviceAccountUser
) - Dataproc 編集者(
roles/dataproc.editor
) - Dataproc ワーカー(
roles/dataproc.worker
)
- BigQuery ユーザー(
Google Cloud で関連リソースを作成して変更する
Cloud Composer 環境に
apache-airflow-providers-microsoft-azure
PyPI パッケージをインストールします。次のパラメータを使用して空の BigQuery データセットを作成します。
- 名前:
holiday_weather
- リージョン:
US
- 名前:
US
マルチリージョンで新しい Cloud Storage バケットを作成します。次のコマンドを実行して、ネットワーキングの要件を満たすために Dataproc サーバーレスを実行するリージョン内のデフォルト サブネットで限定公開の Google アクセスを有効にします。Cloud Composer 環境と同じリージョンを使用することをおすすめします。
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Azure で関連リソースを作成する
デフォルト設定でストレージ アカウントを作成します。
ストレージ アカウントのアクセスキーと接続文字列を取得します。
新しく作成したストレージ アカウントで、デフォルトのオプションを使用してコンテナを作成します。
前の手順で作成したコンテナに対するストレージ Blob Delegator ロールを付与します。
holidays.csv をアップロードして、Azure ポータルのデフォルト オプションでブロック blob を作成します。
Azure ポータルで、前の手順で作成したブロック blob の SAS トークンを作成します。
- 署名方法: ユーザーの委任キー
- 権限: 読み取り
- 許可対象 IP アドレス: なし
- 許可対象プロトコル: HTTPS のみ
Cloud Composer から Azure に接続する
Airflow UI を使用して Microsoft Azure 接続を追加します。
[管理者] > [接続] に移動します。
次の構成で新しい接続を作成します。
- 接続 ID:
azure_blob_connection
- 接続タイプ:
Azure Blob Storage
- Blob Storage ログイン: ストレージ アカウント名
- Blob Storage キー: ストレージ アカウントのアクセスキー
- Blob Storage アカウント接続文字列: ストレージ アカウントの接続文字列
- SAS Token: blob から生成された SAS トークン
- 接続 ID:
Dataproc サーバーレスを使用したデータ処理
PySpark ジョブの例を確認する
次のコードは、温度を摂氏 10 分の 1 の度数から摂氏度数に変換する PySpark ジョブの例です。このジョブは、データセットの温度データを別の形式に変換します。
PySpark ファイルを Cloud Storage にアップロードします。
PySpark ファイルを Cloud Storage にアップロードします。
data_analytics_process.py をローカルマシンに保存します。
Google Cloud コンソールで、Cloud Storage ブラウザページに移動します。
前の手順で作成したバケットの名前をクリックします。
バケットの [オブジェクト] タブで、[ファイルをアップロード] ボタンをクリックし、表示されたダイアログで
data_analytics_process.py
を選択し、[開く] をクリックします。
データ分析 DAG
DAG の例を確認する
DAG は複数の演算子を使用してデータを変換し、統合します。
AzureBlobStorageToGCSOperator
は、holidays.csv ファイルを Azure ブロック blob から Cloud Storage バケットに転送します。GCSToBigQueryOperator
は、Cloud Storage からの holiday.csv ファイルを、前の手順で作成した BigQueryholidays_weather
データセット内の新しいテーブルに取り込みます。DataprocCreateBatchOperator
は、Dataproc サーバーレスを使用して PySpark バッチジョブを作成し、実行します。BigQueryInsertJobOperator
は、[Date] 列の holidays.csv のデータを BigQuery 一般公開データセット ghcn_d の気象データと結合します。BigQueryInsertJobOperator
タスクは for ループを使用して動的に生成されます。また、これらのタスクはTaskGroup
にあるため、Airflow UI のグラフビューで読みやすくなります。
Airflow UI を使用して変数を追加する
Airflow における変数は、任意の設定や構成をシンプルな Key-Value ストアとして保存および取得するためのユニバーサルな方法です。この DAG では Airflow 変数を使用して共通の値を保存します。ご利用の環境に追加するには:
[管理] > [変数] に移動します。
次の変数を追加します。
gcp_project
: プロジェクト ID。gcs_bucket
: 前の手順で作成したバケットの名前(gs://
接頭辞は付けない)。gce_region
: Dataproc サーバーレス ネットワーキングの要件を満たす Dataproc ジョブを配置するリージョン。 これは、以前の手順で限定公開の Google アクセスを有効にしたリージョンです。dataproc_service_account
: Cloud Composer 環境のサービス アカウント。このサービス アカウントは、Cloud Composer 環境の [環境の構成] タブで確認できます。azure_blob_name
: 前の手順で作成した blob の名前。azure_container_name
: 前の手順で作成したコンテナの名前。
DAG を環境のバケットにアップロードする
Cloud Composer がスケジュールを設定するのは、環境のバケット内の /dags
フォルダにある DAG です。Google Cloud コンソールを使用して DAG をアップロードするには:
ローカルマシンに azureblobstoretogcsoperator_tutorial.py を保存します。
Google Cloud Console で [環境] ページに移動します。
環境のリストにある [DAG フォルダ] 列で、[DAG] リンクをクリックします。環境の DAG フォルダが開きます。
[ファイルをアップロード] をクリックします。
ローカルマシン上の
azureblobstoretogcsoperator_tutorial.py
を選択して、[開く] をクリックします。
DAG をトリガーする
Cloud Composer 環境で [DAG] タブをクリックします。
DAG ID
azure_blob_to_gcs_dag
をクリックします。[DAG をトリガー] をクリックします。
タスクが正常に完了したことを示す緑色のチェックマークが表示されるまで、5~10 分待ちます。
DAG の成功を検証する
Google Cloud コンソールで [BigQuery] ページに移動します。
[エクスプローラ] パネルでプロジェクト名をクリックします。
[
holidays_weather_joined
] をクリックします。[プレビュー] をクリックすると、結果の表が表示されます。値列の数値は、10 分の 1 の摂氏度数です。
[
holidays_weather_normalized
] をクリックします。[プレビュー] をクリックすると、結果の表が表示されます。値列の数値は、摂氏度数です。
クリーンアップ
このチュートリアル用に作成した個々のリソースを削除します。
このチュートリアル用に作成した Cloud Storage バケットを削除します。
Cloud Composer 環境を削除します(環境のバケットを手動で削除します)。