Cloud Composer 1 | Cloud Composer 2
このページでは、Pub/Sub トピックの変更に応じて Cloud Composer DAG をトリガーして、イベントベースの push アーキテクチャを作成する方法について説明します。このチュートリアルの例では、DAG プロセスの一部として、サブスクリプションの管理を含む Pub/Sub 管理の全サイクルを処理する方法を示します。DAG をトリガーする必要があるが、追加のアクセス権限を設定したくない一般的なユースケースに適しています。
たとえば、Pub/Sub を介して送信されるメッセージは、セキュリティ上の理由から、Cloud Composer 環境に直接アクセスしたくない場合のソリューションとして使用できます。Pub/Sub メッセージを作成し、Pub/Sub トピックで公開する Cloud Functions を構成できます。その後、Pub/Sub メッセージを pull し、それらのメッセージを処理する DAG を作成できます。
この特定の例では、Cloud Functions の関数を作成し、2 つの DAG をデプロイします。最初の DAG は Pub/Sub メッセージを pull し、Pub/Sub メッセージ コンテンツに沿って 2 番目の DAG をトリガーします。
このチュートリアルは、Python と Google Cloud コンソールに精通していることを前提としています。
目標
費用
このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。
- Cloud Composer(追加費用もご覧ください)
- Pub/Sub
- Cloud Functions
このチュートリアルを終了した後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。
準備
このチュートリアルでは、Google Cloud プロジェクトが必要です。プロジェクトは、次のように構成します:
Google Cloud コンソールで、プロジェクトを選択または作成します:
プロジェクトに対して課金が有効になっていることを確認します。、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
Google Cloud プロジェクト ユーザーが、必要なリソースを作成するために次のロールを持っていることを確認します。
- サービス アカウント ユーザー (
roles/iam.serviceAccountUser
) - Pub/Sub 編集者(
roles/pubsub.editor
) - 環境とストレージ オブジェクトの管理者
(
roles/composer.environmentAndStorageObjectAdmin
) - Cloud Functions 管理者 (
roles/cloudfunctions.admin
) - ログビューア (
roles/logging.viewer
)
- サービス アカウント ユーザー (
Cloud Functions を実行するサービス アカウントに、プロジェクトで Pub/Sub にアクセスするための十分な権限が付与されていることを確認します。デフォルトでは、Cloud Functions は App Engine のデフォルトのサービス アカウントを使用します。このサービス アカウントには、このチュートリアルで十分な権限を持つ編集者のロールがあります。
プロジェクトでAPI を有効にする
コンソール
Enable the Cloud Composer, Cloud Functions, and Pub/Sub APIs.
gcloud
Enable the Cloud Composer, Cloud Functions, and Pub/Sub APIs:
gcloud services enable composer.googleapis.comcloudfunctions.googleapis.com pubsub.googleapis.com
Terraform
Terraform スクリプトに次のリソース定義を追加して、プロジェクトで Cloud Composer API を有効にします。
resource "google_project_service" "composer_api" {
project = "<PROJECT_ID>"
service = "composer.googleapis.com"
// Disabling Cloud Composer API might irreversibly break all other
// environments in your project.
// This parameter prevents automatic disabling
// of the API when the resource is destroyed.
// We recommend to disable the API only after all environments are deleted.
disable_on_destroy = false
}
resource "google_project_service" "pubsub_api" {
project = "<PROJECT_ID>"
service = "pubsub.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "functions_api" {
project = "<PROJECT_ID>"
service = "cloudfunctions.googleapis.com"
disable_on_destroy = false
}
<PROJECT_ID>
は、プロジェクトのプロジェクト ID に置き換えます。例: example-project
。
Cloud Composer 環境を作成する
この手順の一環として、Cloud Composer v2 API サービス エージェント拡張機能(roles/composer.ServiceAgentV2Ext
)ロールを Composer サービス エージェント アカウントに付与します。Cloud Composer では、このアカウントを使用して Google Cloud プロジェクトでオペレーションを実行します。
Pub/Sub トピックの作成
この例では、Pub/Sub トピックに push されたメッセージに応答して DAG をトリガーします。この例で使用する Pub/Sub トピックを作成します。
コンソール
Google Cloud コンソールで、Pub/Sub トピック ページに移動します。
[トピックを作成] をクリックします。
[トピック ID] フィールドに、トピックの ID として
dag-topic-trigger
を入力します。他のオプションはデフォルトのままにします。
[トピックを作成] をクリックします。
gcloud
トピックを作成するには、Google Cloud CLI で gcloud pubsub topics create コマンドを実行します。
gcloud pubsub topics create dag-topic-trigger
Terraform
Terraform スクリプトに次のリソース定義を追加します。
resource "google_pubsub_topic" "trigger" {
project = "<PROJECT_ID>"
name = "dag-topic-trigger"
message_retention_duration = "86600s"
}
<PROJECT_ID>
は、プロジェクトのプロジェクト ID に置き換えます。例: example-project
。
DAG をアップロードする
DAG をお使いの環境にアップロードします。
- 次の DAG ファイルをローカル コンピュータに保存します。
<PROJECT_ID>
は、プロジェクトのプロジェクト ID に置き換えます。例:example-project
。- 編集済み DAG ファイルをご利用の環境にアップロードします。
サンプルコードには 2 つの DAG(trigger_dag
と target_dag
)が含まれています。
trigger_dag
DAG は Pub/Sub トピックに登録し、Pub/Sub メッセージを pull し、Pub/Sub メッセージ データの DAG ID で指定された別の DAG をトリガーします。この例では、trigger_dag
によって target_dag
DAG がトリガーされ、タスクログにメッセージが出力されます。
trigger_dag
DAG には次のタスクが含まれます。
subscribe_task
: Pub/Sub トピックへのサブスクライブpull_messages_operator
:PubSubPullOperator
を使用して Pub/Sub メッセージ データを読み取ります。trigger_target_dag
: Pub/Sub トピックから pull されたメッセージのデータに従って、別の DAG(この例ではtarget_dag
)をトリガーします。
target_dag
DAG にはタスク output_to_logs
が 1 つだけ含まれます。このタスクは、1 秒間遅延してタスクログにメッセージを出力します。
Pub/Sub トピックにメッセージを公開する Cloud Functions をデプロイする
このセクションでは、Pub/Sub トピックにメッセージを公開する Cloud Functions をデプロイします。
Cloud Functions を作成し、その構成を指定する
コンソール
Google Cloud コンソールで、[Cloud Functions] ページに移動します。
[関数の作成] をクリックします。
[環境] フィールドで [第 1 世代] を選択します。
[関数名] フィールドに、関数の名前(
pubsub-publisher
)を入力します。[トリガー] フィールドで、[HTTP] を選択します。
[認証] セクションで、[未認証の呼び出しを許可] を選択します。このオプションを使用すると、認証されていないユーザーに HTTP 関数の呼び出しが許可されます。
[保存] をクリックする。
[次へ] をクリックして、[コード] ステップに進みます。
Terraform
この手順では Google Cloud コンソールの使用を検討してください。Terraform から関数のソースコードを管理する簡単な方法がないためです。
この例では、Cloud Storage バケットを作成し、このバケットにファイルを保存して、バケットからのファイルを Cloud Functions のソースとして使用することで、Cloud Functions をローカルの zip アーカイブ ファイルからアップロードする方法を示します。この方法を使用した場合、新しいアーカイブ ファイルを作成しても、Terraform では関数のソースコードが自動的に更新されません。関数コードを再アップロードするには、アーカイブのファイル名を変更します。
pubsub_publisher.py
ファイルとrequirements.txt
ファイルをダウンロードします。pubsub_publisher.py
ファイルで、<PROJECT_ID>
をプロジェクトのプロジェクト ID に置き換えます。例:example-project
pbusub_publisner.py
ファイルとrequirements.txt
ファイルを含むpubsub_function.zip
という名前の zip アーカイブを作成します。- Terraform スクリプトが保存されているディレクトリに zip アーカイブを保存します。
- 次のリソース定義を Terraform スクリプトに追加し、
<PROJECT_ID>
をプロジェクトのプロジェクト ID に置き換えます。
resource "google_storage_bucket" "cloud_function_bucket" {
project = <PROJECT_ID>
name = "<PROJECT_ID>-cloud-function-source-code"
location = "US"
force_destroy = true
uniform_bucket_level_access = true
}
resource "google_storage_bucket_object" "cloud_function_source" {
name = "pubsub_function.zip"
bucket = google_storage_bucket.cloud_function_bucket.name
source = "./pubsub_function.zip"
}
resource "google_cloudfunctions_function" "pubsub_function" {
project = <PROJECT_ID>
name = "pubsub-publisher"
runtime = "python310"
region = "us-central1"
available_memory_mb = 128
source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
source_archive_object = "pubsub_function.zip"
timeout = 60
entry_point = "pubsub_publisher"
trigger_http = true
}
Cloud Functions のコード パラメータを指定する
コンソール
[コード] ステップの [ランタイム] フィールドで、関数で使用する言語ランタイムを選択します。この例では、Python 3.10 を選択します。
[エントリ ポイント] フィールドに、「
pubsub_publisher
」と入力します。これは、Cloud Functions の実行時に実行されるコードです。このフラグの値は、ソースコード内に存在する関数名または完全修飾クラス名である必要があります。
Terraform
このステップをスキップします。Cloud Functions のパラメータは、すでに google_cloudfunctions_function
リソースで定義されています。
Cloud Functions のコードをアップロードする
コンソール
[ソースコード] フィールドで、関数のソースコードの提供方法に適したオプションを選択します。このチュートリアルでは、Cloud Functions インライン エディタを使用して関数コードを追加します。別の方法として、ZIP ファイルをアップロードすることや、Cloud Source Repositories を使用することもできます。
- main.py ファイルに次のサンプルコードを入力します。
<PROJECT_ID>
は、プロジェクトのプロジェクト ID に置き換えます。例:example-project
Terraform
このステップをスキップします。Cloud Functions のパラメータは、すでに google_cloudfunctions_function
リソースで定義されています。
Cloud Functions の依存関係を指定する
コンソール
requirements.txt メタデータ ファイルで関数の依存関係を指定します。
関数をデプロイすると、Cloud Functions は requirements.txt ファイル内で宣言されている依存関係を、パッケージごとに 1 行ずつダウンロードしてインストールします。このファイルは、関数コードを含む main.py ファイルと同じディレクトリに配置する必要があります。詳細については、pip
ドキュメントの要件ファイルをご覧ください。
Terraform
このステップをスキップします。Cloud Functions の依存関係は、pubsub_function.zip
アーカイブの requirements.txt
ファイルで定義されます。
Cloud Functions の関数のデプロイ
コンソール
デプロイ] をクリックします。デプロイが正常に完了すると、Google Cloud コンソールの [Cloud Functions] ページで、関数に緑色のチェックマークが表示されます。
Cloud Functions を実行するサービス アカウントには、プロジェクト内に Pub/Sub にアクセスするための十分な権限があることを確認してください。
Terraform
Terraform を初期化します。
terraform init
構成を確認して、Terraform が作成または更新するリソースが想定どおりであることを確認します。
terraform plan
次のコマンドを実行して、構成が有効かどうかを確認します。
terraform validate
次のコマンドを実行し、プロンプトで「yes」と入力して、Terraform 構成を適用します。
terraform apply
Terraform に「Apply complete!」のメッセージが表示されるまで待ちます。
Google Cloud コンソールで、UI のリソースに移動して、Terraform によって作成または更新されたことを確認します。
Cloud Functions の関数をテストする
関数が Pub/Sub トピックでメッセージをパブリッシュし、サンプルの DAG が意図したとおりに機能することを確認するには:
DAG がアクティブであることを確認します。
Google Cloud コンソールで [環境] ページに移動します。
環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。
[DAG] タブに移動します。
trigger_dag
とtarget_dag
という名前の DAG の [State] 列の値を確認します。両方の DAG がActive
状態になっている必要があります。
テスト用の Pub/Sub メッセージを push します。環境変数にキーを保存するには、Cloud Shell で次のように入力します。
Google Cloud コンソールで、[Functions] ページに移動します。
関数の名前をクリックします。
pubsub-publisher
[テスト] タブに移動します。
[トリガー イベントの構成] セクションで、次の JSON の Key-Value を入力します。
{"message": "target_dag"}
このメッセージは後でテスト DAG をトリガーするため、Key-Value ペアを変更しないでください。[テストコマンド] セクションで、[Cloud Shell でテスト] をクリックします。
Cloud Shell ターミナルに、コマンドが自動的に表示されるまで待ちます。
Enter
を押して、このコマンドを実行します。[Cloud Shell の承認] メッセージが表示されたら、[承認] をクリックします。
メッセージのコンテンツが Pub/Sub メッセージに対応していることを確認します。この例では、出力メッセージは、関数からのレスポンスとして
Message b'target_dag' with message_length 10 published to
で始める必要があります。
target_dag
がトリガーされたことを確認します。trigger_dag
の新しい DAG 実行が完了するまで、少なくとも 1 分間待ちます。Google Cloud コンソールで [環境] ページに移動します。
環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。
[DAG] タブに移動します。
trigger_dag
をクリックすると DAG の詳細ページに移動します。[実行] タブに、trigger_dag
DAG の DAG 実行のリストが表示されます。この DAG は 1 分ごとに実行され、関数から送信されたすべての Pub/Sub メッセージを処理します。メッセージが送信されていない場合は、DAG 実行ログで
trigger_target
タスクがSkipped
とマークされます。DAG がトリガーされると、trigger_target
タスクはSuccess
とマークされます。最近の DAG 実行をいくつか調べて、3 つのタスク(
subscribe_task
、pull_messages_operator
、trigger_target
)すべてがSuccess
ステータスになっている DAG 実行を探します。[DAGs] タブに戻り、
target_dag
DAG の [Successful runs] 列に成功した実行が 1 つリストされていることを確認します。
Summary
このチュートリアルでは、Cloud Functions を使用して Pub/Sub トピックでメッセージをパブリッシュし、Pub/Sub トピックにサブスクライブして Pub/Sub メッセージを pull し、メッセージ データの DAG ID で指定された別の DAG をトリガーする DAG をデプロイする方法を学びました。
また、Pub/Sub サブスクリプションの作成と管理、および DAG のトリガーには、このチュートリアルの範囲外である別の方法も存在します。たとえば、指定されたイベントが発生したときに、Cloud Functions を使用して Airflow DAG をトリガーできます。チュートリアルを参照し、Google Cloud の他の機能を試す。
クリーンアップ
このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。
プロジェクトを削除する
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
リソースを個別に削除する
複数のチュートリアルとクイックスタートを実施する予定がある場合は、プロジェクトを再利用すると、プロジェクトの割り当て上限を超えないようにできます。
コンソール
- Cloud Composer 環境を削除します。この手順で環境のバケットも削除します。
- Pub/Sub トピックを削除します、
dag-topic-trigger
。 Cloud Functions 関数を削除する。
Google Cloud コンソールで、[Cloud Functions] に移動します。
削除する関数のチェックボックスをクリックします、
pubsub-publisher
。[削除] をクリックして、手順どおりに実行します。
Terraform
- Terraform スクリプトに、プロジェクトで必要なリソースのエントリが含まれていないことを確認します。たとえば、一部の API を有効にしたまま、IAM 権限をまだ割り当てておくことができます(Terraform スクリプトにこのような定義を追加した場合)。
terraform destroy
を実行します。- 環境のバケットを手動で削除します。Cloud Composer では自動的には削除されません。これは Google Cloud Console または Google Cloud CLI から行うことができます。
次のステップ
- DAG のテスト
- HTTP 関数のテスト
- Cloud Functions の関数をデプロイする
- Google Cloud のその他の機能を試す。チュートリアルをご覧ください。