Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
このページでは、Cloud Composer 2 を使用して Google Cloud で Dataproc サーバーレス ワークロードを実行する方法について説明します。
次のセクションの例では、Dataproc サーバーレスのバッチ ワークロードを管理する演算子を使用する方法を示しています。これらの演算子は、Dataproc サーバーレス Spark バッチ ワークロードの作成、削除、一覧表示、取得を行う DAG で使用します。
Dataproc サーバーレス Batch ワークロードと連携する演算子用の DAG を作成します。
カスタム コンテナと Dataproc Metastore を使用する DAG を作成します。
これらの DAG の永続履歴サーバーを構成します。
始める前に
Dataproc API を有効にします。
コンソール
Enable the Dataproc API.
gcloud
Enable the Dataproc API:
gcloud services enable dataproc.googleapis.com
Batch ワークロード ファイルの場所を選択します。次のいずれかのオプションを使用できます。
- このファイルを格納する Cloud Storage バケットを作成する。
- 環境のバケットを使用する。このファイルを Airflow と同期する必要がないため、
/dags
フォルダまたは/data
フォルダの外に別のサブフォルダを作成できます。例:/batches
- 既存のバケットを使用します。
ファイルと Airflow 変数を設定する
このセクションでは、このチュートリアルで使用するファイルを設定し、Airflow 変数を構成する方法について説明します。
Dataproc Serverless Spark ML ワークロード ファイルをバケットにアップロードする
このチュートリアルのワークロードは、pyspark スクリプトを実行します。
pyspark スクリプトを
spark-job.py
という名前のローカル ファイルに保存します。たとえば、サンプルの pyspark スクリプトを使用できます。始める前にで選択した場所にファイルをアップロードします。
Airflow 変数を設定する
次のセクションの例では、Airflow 変数を使用しています。これらの変数の値を Airflow で設定すると、DAG コードがこれらの値にアクセスできるようになります。
このチュートリアルの例では、次の Airflow 変数を使用します。使用する例によっては、必要に応じて設定できます。
DAG コードで使用する次の Airflow 変数を設定します。
project_id
: プロジェクト ID.bucket_name
: ワークロードのメインの Python ファイル(spark-job.py
)が配置されているバケットの URI。このロケーションは、始める前にで選択しました。phs_cluster
: 永続的履歴サーバーのクラスタ名。この変数は、永続履歴サーバーを作成するときに設定します。image_name
: カスタム コンテナ イメージの名前とタグ(image:tag
)。DataprocCreateBatchOperator でカスタム コンテナ イメージを使用する場合は、この変数を設定します。metastore_cluster
: Dataproc Metastore サービス名。この変数は、DataprocCreateBatchOperator で Dataproc Metastore サービスを使用するときに設定します。region_name
: Dataproc Metastore サービスが配置されているリージョン。この変数は、DataprocCreateBatchOperator で Dataproc Metastore サービスを使用するときに設定します。
Google Cloud コンソールと Airflow UI を使用して各 Airflow 変数を設定する
Google Cloud コンソールで [環境] ページに移動します。
環境のリストで、使用中の環境の [Airflow] リンクをクリックします。Airflow UI が開きます。
Airflow UI で、[管理] > [変数] を選択します。
[新しいレコードの追加] をクリックします。
[キー] フィールドに変数の名前を指定し、[値] フィールドにその変数の値を設定します。
[保存] をクリックします。
永続履歴サーバーを作成する
永続的履歴サーバー(PHS)を使用して、バッチ ワークロードの Spark 履歴ファイルを表示します。
- 永続履歴サーバーを作成します。
phs_cluster
Airflow 変数で PHS クラスタの名前を指定していることを確認します。
DataprocCreateBatchOperator
次の DAG は、Dataproc Serverless Batch ワークロードを開始します。
DataprocCreateBatchOperator
引数の詳細については、演算子のソースコードをご覧ください。
DataprocCreateBatchOperator
の batch
パラメータで渡すことができる属性の詳細については、Batch クラスの説明をご覧ください。
DataprocCreateBatchOperator でカスタム コンテナ イメージを使用する
次の例は、カスタム コンテナ イメージを使用してワークロードを実行する方法を示しています。カスタム コンテナを使用すると、デフォルトのコンテナ イメージで提供されていない Python の依存関係を追加できます。
カスタム コンテナ イメージを使用するには:
image_name
Airflow 変数でイメージを指定します。カスタム イメージで DataprocCreateBatchOperator を使用します。
DataprocCreateBatchOperator で Dataproc Metastore サービスを使用する
DAG から Dataproc Metastore サービスを使用するには:
メタストア サービスがすでに起動していることを確認します。
メタストア サービスの起動については、Dataproc Metastore を有効または無効にするをご覧ください。
構成を作成するためのバッチ演算子の詳細については、PeripheralsConfig をご覧ください。
メタストア サービスが稼働したら、
metastore_cluster
変数でその名前を指定し、region_name
Airflow 変数でリージョンを指定します。DataprocCreateBatchOperator で metastore サービスを使用するには:
DataprocDeleteBatchOperator
DataprocDeleteBatchOperator を使用すると、ワークロードのバッチ ID に基づいてバッチを削除できます。
DataprocListBatchesOperator
DataprocDeleteBatchOperator は、指定された project_id とリージョン内に存在するバッチを一覧表示します。
DataprocGetBatchOperator
DataprocGetBatchOperator は、特定のバッチ ワークロードをフェッチします。