使用回呼和 Eventarc 觸發條件等待事件

工作流程可能需要等待外部程序。您可以使用 HTTP 回呼,等待其他服務向回呼端點發出要求,該要求會繼續執行工作流程。您也可以使用輪詢等待

本教學課程會示範如何使用 HTTP 回呼和 Eventarc 觸發條件,等待事件或 Pub/Sub 訊息,而不使用輪詢。雖然您可以透過事件或 Pub/Sub 訊息觸發工作流程,但您可能想暫停執行作業,等待其他事件發生後再繼續。舉例來說,某個事件會觸發工作流程來啟動程序,但工作流程必須等待另一個事件,表示程序已完成。您可以讓一個工作流程回呼另一個工作流程,藉此實作這項功能。

建立 Firestore 資料庫

Firestore 會將資料儲存在文件中,這些文件包含對應至值的欄位。這些文件會儲存在集合中,集合是文件的容器,可用於整理資料及建構查詢。進一步瞭解 Firestore

請注意,每個 Google Cloud 專案只能有一個 Firestore 資料庫。如需建立新資料庫,請完成下列步驟。

主控台

  1. 前往 Google Cloud 控制台的 Firestore「開始使用」頁面。

    前往「開始使用」

  2. 按一下「選取原生模式」

    如需選擇資料庫模式和逐項比較各項功能的說明,請參閱選擇原生模式和 Datastore 模式一文。

  3. 在「Select a location」(選取位置) 清單中,選取「nam5 (United States)」(nam5 (美國))

    位置會同時套用到您 Google Cloud 專案中的 Firestore 資料庫和 App Engine 應用程式。資料庫建立後,就無法變更位置。

  4. 按一下 [Create database] (建立資料庫)。

gcloud

如要建立 Firestore 資料庫,請先建立 App Engine 應用程式,然後執行 gcloud firestore databases create 指令:

gcloud app create --region=us-central
gcloud firestore databases create --region=us-central

您可以忽略 us-central is not a valid Firestore location 警告。 App Engine 和 Firestore 支援相同的位置,但 App Engine us-central (愛荷華州) 區域會對應至 Firestore nam5 (美國) 多區域。

建立 Pub/Sub 主題

本教學課程使用 Pub/Sub 做為事件來源。建立 Pub/Sub 主題,以便發布訊息。進一步瞭解如何建立及管理主題

主控台

  1. 前往 Google Cloud 控制台的 Pub/Sub「主題」頁面。

    前往「主題」

  2. 按一下 「建立主題」

  3. 在「Topic ID」(主題 ID) 欄位中輸入 topic-callback

  4. 接受其他預設值。

  5. 按一下「建立主題」

gcloud

如要建立主題,請執行 gcloud pubsub topics create 指令:

gcloud pubsub topics create topic-callback

建立 Cloud Storage 值區

本教學課程會使用 Cloud Storage 做為事件來源。建立 Cloud Storage bucket,以便上傳檔案。進一步瞭解如何建立儲存空間值區

主控台

  1. 在 Google Cloud 控制台,前往 Cloud Storage「Buckets」(值區) 頁面。

    前往 Cloud Storage

  2. 按一下 「建立」

  3. 在 bucket 的「Name」(名稱) 中輸入 PROJECT_ID-bucket-callback

    專案 ID 用於 callback-event-sample 工作流程,識別值區。

  4. 按一下「繼續」

  5. 在「位置類型」下方選取「區域」,然後選取「us-central1 (Iowa)」(us-central1 (愛荷華州))

  6. 接受其他預設值。

  7. 點選「建立」

gcloud

如要建立 bucket,請執行 gcloud storage buckets create 指令:

gcloud storage buckets create gs://PROJECT_ID-bucket-callback \
    --location=us-central1

專案 ID 用於 callback-event-sample 工作流程,識別值區。

建立事件來源後,即可部署事件接收器工作流程。

部署監聽事件的工作流程

當訊息發布至 Pub/Sub 主題,或檔案上傳至 Cloud Storage bucket 時,就會觸發 callback-event-listener 工作流程。工作流程會接收事件、從 Firestore 資料庫擷取適當的回呼詳細資料,然後將 HTTP 要求傳送至回呼端點。

控制台

  1. 前往 Google Cloud 控制台的「Workflows」頁面:

    前往「Workflows」頁面

  2. 按一下 「建立」

  3. 輸入新工作流程的名稱:callback-event-listener

  4. 在「Region」(區域) 清單中選取「us-central1」

  5. 選取您先前建立的「服務帳戶」

  6. 點選「下一步」

  7. 在工作流程編輯器中,輸入下列工作流程定義:

    main:
      params: [event]
      steps:
        - log_event:
            call: sys.log
            args:
              text: ${event}
              severity: INFO
        - init:
            assign:
              - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
              - event_source_tokens: ${text.split(event.source, "/")}
              - event_source_len: ${len(event_source_tokens)}
              - event_source: ${event_source_tokens[event_source_len - 1]}
              - doc_name: ${database_root + event_source}
        - get_document_for_event_source:
            try:
              call: googleapis.firestore.v1.projects.databases.documents.get
              args:
                name: ${doc_name}
              result: document
            except:
                as: e
                steps:
                    - known_errors:
                        switch:
                        - condition: ${e.code == 404}
                          return: ${"No callbacks for event source " + event_source}
                    - unhandled_exception:
                        raise: ${e}
        - process_callback_urls:
            steps:
              - check_fields_exist:
                  switch:
                  - condition: ${not("fields" in document)}
                    return: ${"No callbacks for event source " + event_source}
                  - condition: true
                    next: processFields
              - processFields:
                  for:
                      value: key
                      in: ${keys(document.fields)}
                      steps:
                          - extract_callback_url:
                              assign:
                                  - callback_url: ${document.fields[key]["stringValue"]}
                          - log_callback_url:
                              call: sys.log
                              args:
                                text: ${"Calling back url " + callback_url}
                                severity: INFO
                          - http_post:
                              call: http.post
                              args:
                                  url: ${callback_url}
                                  auth:
                                      type: OAuth2
                                  body:
                                      event: ${event}
  8. 按一下 [Deploy] (部署)

gcloud

  1. 為工作流程建立原始碼檔案:

    touch callback-event-listener.yaml
  2. 在文字編輯器中,將下列工作流程複製到原始碼檔案:

    main:
      params: [event]
      steps:
        - log_event:
            call: sys.log
            args:
              text: ${event}
              severity: INFO
        - init:
            assign:
              - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
              - event_source_tokens: ${text.split(event.source, "/")}
              - event_source_len: ${len(event_source_tokens)}
              - event_source: ${event_source_tokens[event_source_len - 1]}
              - doc_name: ${database_root + event_source}
        - get_document_for_event_source:
            try:
              call: googleapis.firestore.v1.projects.databases.documents.get
              args:
                name: ${doc_name}
              result: document
            except:
                as: e
                steps:
                    - known_errors:
                        switch:
                        - condition: ${e.code == 404}
                          return: ${"No callbacks for event source " + event_source}
                    - unhandled_exception:
                        raise: ${e}
        - process_callback_urls:
            steps:
              - check_fields_exist:
                  switch:
                  - condition: ${not("fields" in document)}
                    return: ${"No callbacks for event source " + event_source}
                  - condition: true
                    next: processFields
              - processFields:
                  for:
                      value: key
                      in: ${keys(document.fields)}
                      steps:
                          - extract_callback_url:
                              assign:
                                  - callback_url: ${document.fields[key]["stringValue"]}
                          - log_callback_url:
                              call: sys.log
                              args:
                                text: ${"Calling back url " + callback_url}
                                severity: INFO
                          - http_post:
                              call: http.post
                              args:
                                  url: ${callback_url}
                                  auth:
                                      type: OAuth2
                                  body:
                                      event: ${event}
  3. 輸入下列指令來部署工作流程:

    gcloud workflows deploy callback-event-listener \
        --source=callback-event-listener.yaml \
        --location=us-central1 \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.

    SERVICE_ACCOUNT_NAME 改成您先前建立的服務帳戶名稱。

部署等待事件的工作流程

callback-event-sample 工作流程會將回呼詳細資料儲存在 Firestore 資料庫中,暫停執行,然後等待特定事件發生。

控制台

  1. 前往 Google Cloud 控制台的「Workflows」頁面:

    前往「Workflows」頁面

  2. 按一下 「建立」

  3. 輸入新工作流程的名稱:callback-event-sample

  4. 在「Region」(區域) 清單中選取「us-central1」

  5. 選取您先前建立的「服務帳戶」

  6. 點選「下一步」

  7. 在工作流程編輯器中,輸入下列工作流程定義:

    main:
      steps:
        - init:
            assign:
              - pubsub_topic: topic-callback
              - storage_bucket: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "-bucket-callback"}
        - await_pubsub_message:
            call: await_callback_event
            args:
              event_source: ${pubsub_topic}
            result: pubsub_event
        - await_storage_bucket:
            call: await_callback_event
            args:
              event_source: ${storage_bucket}
            result: storage_event
        - return_events:
            return:
                pubsub_event: ${pubsub_event}
                storage_event: ${storage_event}
    
    await_callback_event:
        params: [event_source]
        steps:
            - init:
                assign:
                  - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
                  - doc_name: ${database_root + event_source}
                  - execution_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
                  - firestore_key: ${"exec_" + text.split(execution_id, "-")[0]}
            - create_callback:
                call: events.create_callback_endpoint
                args:
                  http_callback_method: POST
                result: callback_details
            - save_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
                  body:
                    fields:
                      ${firestore_key}:
                        stringValue: ${callback_details.url}
            - log_and_await_callback:
                try:
                  steps:
                    - log_await_start:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Started waiting 1hr for an event from source " + event_source}
                    - await_callback:
                        call: events.await_callback
                        args:
                          callback: ${callback_details}
                          timeout: 3600
                        result: callback_request
                    - log_await_stop:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Stopped waiting for an event from source " + event_source}
                except:
                    as: e
                    steps:
                        - log_error:
                            call: sys.log
                            args:
                                severity: "ERROR"
                                text: ${"Received error " + e.message}
            - delete_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
            - check_null_event:
                switch:
                  - condition: ${callback_request == null}
                    return: null
            - log_await_result:
                call: sys.log
                args:
                  severity: INFO
                  data: ${callback_request.http_request.body.event}
            - return_event:
                return: ${callback_request.http_request.body.event}
  8. 按一下 [Deploy] (部署)

gcloud

  1. 為工作流程建立原始碼檔案:

    touch callback-event-sample.yaml
  2. 在文字編輯器中,將下列工作流程複製到原始碼檔案:

    main:
      steps:
        - init:
            assign:
              - pubsub_topic: topic-callback
              - storage_bucket: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "-bucket-callback"}
        - await_pubsub_message:
            call: await_callback_event
            args:
              event_source: ${pubsub_topic}
            result: pubsub_event
        - await_storage_bucket:
            call: await_callback_event
            args:
              event_source: ${storage_bucket}
            result: storage_event
        - return_events:
            return:
                pubsub_event: ${pubsub_event}
                storage_event: ${storage_event}
    
    await_callback_event:
        params: [event_source]
        steps:
            - init:
                assign:
                  - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
                  - doc_name: ${database_root + event_source}
                  - execution_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
                  - firestore_key: ${"exec_" + text.split(execution_id, "-")[0]}
            - create_callback:
                call: events.create_callback_endpoint
                args:
                  http_callback_method: POST
                result: callback_details
            - save_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
                  body:
                    fields:
                      ${firestore_key}:
                        stringValue: ${callback_details.url}
            - log_and_await_callback:
                try:
                  steps:
                    - log_await_start:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Started waiting 1hr for an event from source " + event_source}
                    - await_callback:
                        call: events.await_callback
                        args:
                          callback: ${callback_details}
                          timeout: 3600
                        result: callback_request
                    - log_await_stop:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Stopped waiting for an event from source " + event_source}
                except:
                    as: e
                    steps:
                        - log_error:
                            call: sys.log
                            args:
                                severity: "ERROR"
                                text: ${"Received error " + e.message}
            - delete_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
            - check_null_event:
                switch:
                  - condition: ${callback_request == null}
                    return: null
            - log_await_result:
                call: sys.log
                args:
                  severity: INFO
                  data: ${callback_request.http_request.body.event}
            - return_event:
                return: ${callback_request.http_request.body.event}
  3. 輸入下列指令來部署工作流程:

    gcloud workflows deploy callback-event-sample \
        --source=callback-event-sample.yaml \
        --location=us-central1 \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.

    SERVICE_ACCOUNT_NAME 改成您先前建立的服務帳戶名稱。

建立 Eventarc 觸發條件以轉送 Pub/Sub 事件

您可以為 Eventarc 觸發條件指定篩選器 (包括事件來源和目標工作流程),藉此轉送事件。建立 Eventarc 觸發條件,在訊息發布至 Pub/Sub 主題時執行 callback-event-listener 工作流程。 進一步瞭解如何觸發工作流程

主控台

  1. 在 Google Cloud 控制台中,前往「Eventarc」頁面。

    前往 Eventarc

  2. 按一下「建立觸發條件」

  3. 輸入觸發條件名稱

    例如 trigger-pubsub-events-listener

  4. 在「Event provider」(事件提供者) 清單中,選取「Cloud Pub/Sub」

  5. 在「Event」(事件) 清單中,選取「Custom」(自訂) 下方的「google.cloud.pubsub.topic.v1.messagePublished」

  6. 在「Select a Cloud Pub/Sub topic」(選取 Cloud Pub/Sub 主題) 清單中,選取您先前建立的主題。

  7. 在「Region」(區域) 清單中,選取「us-central1 (Iowa)」(us-central1 (愛荷華州))

  8. 如果系統提示,請將 iam.serviceAccountTokenCreator 角色授予 Pub/Sub 服務帳戶。

  9. 選取您先前建立的「服務帳戶」

  10. 在「Event destination」(事件目的地) 清單中,選取「Workflows」(工作流程)

  11. 在「Select a workflow」(選取工作流程) 清單中,選取「callback-event-listener」工作流程。

  12. 點選「建立」

gcloud

如要建立觸發條件,請執行 gcloud eventarc triggers create 指令:

gcloud eventarc triggers create trigger-pubsub-events-listener \
    --location=us-central1 \
    --destination-workflow=callback-event-listener \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
    --transport-topic=topic-callback \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.

系統會轉換事件,並以執行階段引數的形式傳遞至工作流程執行作業。 請注意,新觸發條件最多可能需要 2 分鐘才會啟用。

建立 Eventarc 觸發條件以轉送 Cloud Storage 事件

您可以為 Eventarc 觸發條件指定篩選器 (包括事件來源和目標工作流程),藉此轉送事件。建立 Eventarc 觸發條件,在檔案上傳至 Cloud Storage bucket 時執行 callback-event-listener 工作流程。進一步瞭解如何觸發工作流程

主控台

  1. 在 Google Cloud 控制台中,前往「Eventarc」頁面。

    前往 Eventarc

  2. 按一下「建立觸發條件」

  3. 輸入觸發條件名稱

    例如 trigger-storage-events-listener

  4. 在「Event provider」(事件提供者) 清單中,選取「Cloud Storage」

  5. 在「Event」(事件) 清單中,選取「Direct」(直接) 下的「google.cloud.storage.object.v1.finalized」

  6. 在「Bucket」(值區) 清單中,瀏覽並選取您先前建立的值區。

  7. 在「Region」(區域) 清單中,根據 Cloud Storage 值區,接受「us-central1 (Iowa)」(us-central1 (愛荷華州)) 的預設值。

  8. 如果系統提示,請將 iam.serviceAccountTokenCreator 角色授予 Pub/Sub 服務帳戶。

  9. 選取您先前建立的「服務帳戶」

  10. 在「Event destination」(事件目的地) 清單中,選取「Workflows」(工作流程)

  11. 在「Select a workflow」(選取工作流程) 清單中,選取「callback-event-listener」工作流程。

  12. 點選「建立」

gcloud

如要建立觸發條件,請執行 gcloud eventarc triggers create 指令:

gcloud eventarc triggers create trigger-storage-events-listener \
    --location=us-central1 \
    --destination-workflow=callback-event-listener \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.storage.object.v1.finalized" \
    --event-filters="bucket=PROJECT_ID-bucket-callback" \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.

系統會轉換事件,並以執行階段引數的形式傳遞至工作流程執行作業。 請注意,新觸發條件最多可能需要 2 分鐘才會啟用。

執行主要工作流程

執行工作流程時,系統會執行與該工作流程相關聯的目前工作流程定義。執行 callback-event-sample 工作流程。這是主要工作流程,會等待特定事件發生,只有在次要工作流程發出適當的回呼要求時,才會繼續執行。

主控台

  1. 前往 Google Cloud 控制台的「Workflows」頁面。

    前往「Workflows」頁面

  2. 在「Workflows」頁面中,按一下「callback-event-sample」工作流程,前往詳細資料頁面。

  3. 在「Workflow Details」(工作流程詳細資料) 頁面中,按一下 「Execute」(執行)

  4. 再次按一下「執行」

    工作流程執行作業會開始。執行作業時,您應該會看到 Execution stateRunning,以及類似以下的記錄項目:Started waiting 1hr for an event from source topic-callback

gcloud

如要執行工作流程,請執行 gcloud workflows run 指令:

gcloud workflows run callback-event-sample \
    --location=us-central1

工作流程執行作業會開始。執行作業時,您應該會看到類似下方的執行狀態:

Waiting for execution [a848a164-268a-449c-b2fe-396f32f2ed66] to complete...working...

產生事件並檢查執行狀態

您可以產生事件、查看記錄項目,以及檢查工作流程執行狀態,確認結果是否符合預期。

發布訊息

將訊息發布至先前建立的 Pub/Sub 主題。

主控台

  1. 前往 Google Cloud 控制台的 Pub/Sub「主題」頁面。

    前往「主題」

  2. 按一下「topic-callback」

  3. 按一下「Messages」(訊息) 分頁標籤。

  4. 按一下「發布訊息」

  5. 在「Message body」(訊息內文) 欄位中輸入 Hello World

  6. 按一下 [發布]

gcloud

如要發布訊息,請使用 gcloud pubsub topics publish 指令:

gcloud pubsub topics publish topic-callback \
    --message="Hello World"

上傳物件

將檔案上傳至先前建立的 Cloud Storage bucket。

主控台

  1. 在 Google Cloud 控制台,前往「Cloud Storage bucket」頁面。

    前往「Buckets」(值區) 頁面

  2. 按一下先前建立的值區名稱。

  3. 在「物件」分頁中,執行下列任一操作:

    • 將需要的檔案從桌面或檔案管理員拖曳到 Google Cloud 控制台的主要窗格。

    • 按一下「上傳檔案」,選取要上傳的檔案,然後按一下「開啟」

gcloud

如要上傳檔案,請執行 gcloud storage cp 指令:

gcloud storage cp OBJECT_LOCATION gs://PROJECT_ID-bucket-callback/

OBJECT_LOCATION 替換為物件的本機路徑。例如:random.txt

查看記錄項目和執行狀態

確認 callback-event-sample 工作流程已順利完成。

主控台

  1. 前往 Google Cloud 控制台的「Workflows」頁面。

    前往「Workflows」頁面

  2. 在「Workflows」頁面中,按一下「callback-event-sample」工作流程,前往詳細資料頁面。

  3. 在「Workflow Details」(工作流程詳細資料) 頁面中,如要擷取特定執行的詳細資料,請按一下適當的執行 ID。

    「執行狀態」應為「成功」,且在「輸出」窗格中,您應該會看到收到的 Pub/Sub 和 Cloud Storage 事件。

gcloud

  1. 篩選記錄檔項目,並以 JSON 格式傳回輸出內容:

    gcloud logging read "resource.type=workflows.googleapis.com/Workflow AND textPayload:calling OR textPayload:waiting" \
        --format=json
  2. 尋找類似下列內容的記錄項目:

    "textPayload": "Stopped waiting for an event from source..."
    "textPayload": "Calling back url https://workflowexecutions.googleapis.com/v1/projects/..."
    "textPayload": "Started waiting 1hr for an event from source..."
    
  3. 檢查上次執行嘗試的狀態:

    gcloud workflows executions wait-last

    結果應類似如下:

    Using cached execution name: projects/1085953646031/locations/us-central1/workflows/callback-event-sample/executions/79929e4e-82c1-4da1-b068-f828034c01b7
    Waiting for execution [79929e4e-82c1-4da1-b068-f828034c01b7] to complete...done.
    [...]
    state: SUCCEEDED