Cloud Composer 1 | Cloud Composer 2
このチュートリアルは、Cloud Composer 環境を Microsoft Azure に接続して、そこに格納されているデータを利用する方法を説明している、Google Cloud でデータ分析 DAG を実行するの修正版です。Cloud Composer を使用して Apache Airflow DAG を作成する方法について説明します。DAG は、BigQuery 一般公開データセットと Azure Blob Storage に保存されている CSV ファイルのデータを結合してから、Dataproc サーバーレス バッチジョブを実行して結合されたデータを処理します。
このチュートリアルの BigQuery 一般公開データセットは、世界中の気候統合データベースである ghcn_d です。CSV ファイルには、1997 年から 2021 年までの米国の休日の日付と名前に関する情報が含まれています。
DAG を使用して答えを得たい質問は、「この 25 年間で感謝祭のシカゴはどのくらい温かかったか」というものです。
目標
- デフォルト構成で Cloud Composer 環境を作成する
- Azure で blob を作成する
- 空の BigQuery データセットを作成する
- Cloud Storage バケットを新規作成する
- 次のタスクを含む DAG を作成、実行します。
- Azure Blob Storage から Cloud Storage に外部データセットを読み込む
- Cloud Storage から BigQuery に外部データセットを読み込む
- BigQuery で 2 つのデータセットを結合する
- データ分析の PySpark ジョブを実行する
準備
API を有効にする
次の API を有効にします。
コンソール
Dataproc, Cloud Composer, BigQuery, Cloud Storage API を有効にします。
gcloud
Dataproc, Cloud Composer, BigQuery, Cloud Storage API を有効にします。
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
権限を付与する
ユーザー アカウントに次のロールと権限を付与します。
BigQuery データオーナー(
roles/bigquery.dataOwner
)のロールを付与して、BigQuery データセットを作成します。ストレージ管理者(
roles/storage.admin
)のロールを付与して、Cloud Storage バケットを作成します。
Cloud Composer 環境を作成して準備する
デフォルトのパラメータで Cloud Composer 環境を作成します。
- 米国に置かれたリージョンを選択します。
- 最新の Cloud Composer バージョンを選択します。
Airflow ワーカーが DAG タスクを正常に実行するために、Cloud Composer 環境で使用されるサービス アカウントに次のロールを付与します。
- BigQuery ユーザー(
roles/bigquery.user
) - BigQuery データオーナー(
roles/bigquery.dataOwner
) - サービス アカウント ユーザー(
roles/iam.serviceAccountUser
) - Dataproc 編集者(
roles/dataproc.editor
) - Dataproc ワーカー(
roles/dataproc.worker
)
- BigQuery ユーザー(
Google Cloud で関連リソースを作成して変更する
Cloud Composer 環境に
apache-airflow-providers-microsoft-azure
PyPI パッケージをインストールします。次のパラメータを使用して空の BigQuery データセットを作成します。
- 名前:
holiday_weather
- リージョン:
US
- 名前:
US
マルチリージョンで新しい Cloud Storage バケットを作成します。次のコマンドを実行して、ネットワーキングの要件を満たすために Dataproc サーバーレスを実行するリージョン内のデフォルト サブネットで限定公開の Google アクセスを有効にします。Cloud Composer 環境と同じリージョンを使用することをおすすめします。
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Azure で関連リソースを作成する
デフォルト設定でストレージ アカウントを作成します。
ストレージ アカウントのアクセスキーと接続文字列を取得します。
新しく作成したストレージ アカウントで、デフォルトのオプションを使用してコンテナを作成します。
前の手順で作成したコンテナに対するストレージ Blob Delegator ロールを付与します。
holidays.csv をアップロードして、Azure ポータルのデフォルト オプションでブロック blob を作成します。
Azure ポータルで、前の手順で作成したブロック blob の SAS トークンを作成します。
- 署名方法: ユーザーの委任キー
- 権限: 読み取り
- 許可対象 IP アドレス: なし
- 許可対象プロトコル: HTTPS のみ
Cloud Composer から Azure に接続する
Airflow UI を使用して Microsoft Azure 接続を追加します。
[管理者] > [接続] に移動します。
次の構成で新しい接続を作成します。
- 接続 ID:
azure_blob_connection
- 接続タイプ:
Azure Blob Storage
- Blob Storage ログイン: ストレージ アカウント名
- Blob Storage キー: ストレージ アカウントのアクセスキー
- Blob Storage アカウント接続文字列: ストレージ アカウントの接続文字列
- SAS トークン: blob から生成された SAS トークン
- 接続 ID:
Dataproc サーバーレスを使用したデータ処理
PySpark ジョブの例を確認する
次のコードは、温度を摂氏 10 分の 1 の度数から摂氏度数に変換する PySpark ジョブの例です。このジョブは、データセットから取得した温度データを別の形式に変換します。
PySpark ファイルを Cloud Storage にアップロードします。
PySpark ファイルを Cloud Storage にアップロードします。
data_analytics_process.py をローカルマシンに保存します。
Google Cloud コンソールで、Cloud Storage ブラウザページに移動します。
前の手順で作成したバケットの名前をクリックします。
バケットの [オブジェクト] タブで、[ファイルをアップロード] ボタンをクリックし、表示されたダイアログで
data_analytics_process.py
を選択し、[開く] をクリックします。
データ分析 DAG
DAG の例を確認する
DAG は複数の演算子を使用してデータを変換し、統合します。
AzureBlobStorageToGCSOperator
は、holidays.csv ファイルを Azure ブロック blob から Cloud Storage バケットに転送します。GCSToBigQueryOperator
は、Cloud Storage からの holiday.csv ファイルを、前の手順で作成した BigQueryholidays_weather
データセット内の新しいテーブルに取り込みます。DataprocCreateBatchOperator
は、Dataproc サーバーレスを使用して PySpark バッチジョブを作成し、実行します。BigQueryInsertJobOperator
は、[Date] 列の holidays.csv のデータを BigQuery 一般公開データセット ghcn_d の気象データと結合します。BigQueryInsertJobOperator
タスクは for ループを使用して動的に生成されます。また、これらのタスクはTaskGroup
にあるため、Airflow UI のグラフビューで読みやすくなります。
Airflow UI を使用して変数を追加する
Airflow における変数は、任意の設定や構成をシンプルな Key-Value ストアとして保存および取得するためのユニバーサルな方法です。この DAG では Airflow 変数を使用して共通の値を保存します。ご利用の環境に追加するには:
[管理] > [変数] に移動します。
次の変数を追加します。
gcp_project
: プロジェクト ID。gcs_bucket
: 前の手順で作成したバケットの名前(gs://
接頭辞は付けない)。gce_region
: Dataproc サーバーレス ネットワーキングの要件を満たす Dataproc ジョブを配置するリージョン。 これは、以前の手順で限定公開の Google アクセスを有効にしたリージョンです。dataproc_service_account
: Cloud Composer 環境のサービス アカウント。このサービス アカウントは、Cloud Composer 環境の [環境の構成] タブで確認できます。azure_blob_name
: 前の手順で作成した blob の名前。azure_blob_path
- 前の手順で作成した blob の URL。情報を取得するには、blob を右クリックして [表示 / 編集] を選択します。URL は概要タブにあります。azure_container_name
: 前の手順で作成したコンテナの名前。
DAG を環境のバケットにアップロードする
Cloud Composer がスケジュールを設定するのは、環境のバケット内の /dags
フォルダにある DAG です。Google Cloud コンソールを使用して DAG をアップロードするには:
ローカルマシンに azureblobstoretogcsoperator_tutorial.py を保存します。
Google Cloud Console で [環境] ページに移動します。
環境のリストにある [DAG フォルダ] 列で、[DAG] リンクをクリックします。環境の DAG フォルダが開きます。
[ファイルをアップロード] をクリックします。
ローカルマシン上の
azureblobstoretogcsoperator_tutorial.py
を選択して、[開く] をクリックします。
DAG をトリガーする
Cloud Composer 環境で [DAG] タブをクリックします。
DAG ID
azure_blob_to_gcs_dag
をクリックします。[DAG をトリガー] をクリックします。
タスクが正常に完了したことを示す緑色のチェックマークが表示されるまで、5~10 分待ちます。
DAG の成功を検証する
Google Cloud コンソールで [BigQuery] ページに移動します。
[エクスプローラ] パネルでプロジェクト名をクリックします。
[
holidays_weather_joined
] をクリックします。[プレビュー] をクリックすると、結果の表が表示されます。値列の数値は、10 分の 1 の摂氏度数です。
[
holidays_weather_normalized
] をクリックします。[プレビュー] をクリックすると、結果の表が表示されます。値列の数値は、摂氏度数です。
クリーンアップ
このチュートリアル用に作成した個々のリソースを削除します。
このチュートリアル用に作成した Cloud Storage バケットを削除します。
Cloud Composer 環境を削除します(環境のバケットを手動で削除します)。