Menjalankan langkah alur kerja secara paralel

Langkah paralel dapat mengurangi total waktu eksekusi untuk alur kerja dengan melakukan beberapa panggilan pemblokiran secara bersamaan.

Memblokir panggilan seperti sleep, panggilan HTTP, dan callback dapat memerlukan waktu, dari milidetik hingga hari. Langkah paralel dimaksudkan untuk membantu operasi yang berjalan lama secara serentak. Jika alur kerja harus melakukan beberapa panggilan pemblokiran yang tidak bergantung satu sama lain, menggunakan cabang paralel dapat mengurangi total waktu eksekusi dengan memulai panggilan secara bersamaan, dan menunggu semua panggilan selesai.

Misalnya, jika alur kerja Anda harus mengambil data pelanggan dari beberapa sistem independen sebelum melanjutkan, cabang paralel memungkinkan permintaan API serentak. Jika ada lima sistem dan masing-masing memerlukan waktu dua detik untuk merespons, melakukan langkah-langkah secara berurutan dalam alur kerja dapat memerlukan waktu minimal 10 detik; melakukannya secara paralel dapat memerlukan waktu hanya dua detik.

Membuat langkah paralel

Buat langkah parallel untuk menentukan bagian alur kerja Anda tempat dua atau beberapa langkah dapat dijalankan secara serentak.

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

Ganti kode berikut:

  • PARALLEL_STEP_NAME: nama langkah paralel.
  • POLICY (opsional): menentukan tindakan yang akan dilakukan cabang lain saat terjadi pengecualian yang tidak ditangani. Kebijakan default, continueAll, tidak menghasilkan tindakan lebih lanjut, dan semua cabang lainnya akan mencoba dijalankan. Perhatikan bahwa continueAll adalah satu-satunya kebijakan yang saat ini didukung.
  • VARIABLE_A, VARIABLE_B, dan sebagainya: daftar variabel yang dapat ditulis dengan cakupan induk yang memungkinkan penetapan dalam langkah paralel. Untuk mengetahui informasi selengkapnya, lihat Variabel bersama.
  • CONCURRENCY_LIMIT (opsional): jumlah maksimum cabang dan iterasi yang dapat dieksekusi secara serentak dalam satu eksekusi alur kerja sebelum cabang dan iterasi lebih lanjut dimasukkan ke dalam antrean untuk menunggu. Hal ini hanya berlaku untuk satu langkah parallel dan tidak bersifat kaskade. Harus berupa bilangan bulat positif dan dapat berupa nilai literal atau ekspresi. Untuk mengetahui detailnya, lihat Batas konkurensi.
  • BRANCHES_OR_FOR: gunakan branches atau for untuk menunjukkan salah satu dari hal berikut:
    • Cabang yang dapat berjalan secara serentak.
    • Loop tempat iterasi dapat berjalan secara serentak.

Perhatikan hal berikut:

  • Cabang dan iterasi paralel dapat berjalan dalam urutan apa pun, dan mungkin berjalan dalam urutan yang berbeda dengan setiap eksekusi.
  • Langkah paralel dapat menyertakan langkah paralel bertingkat lainnya hingga batas kedalaman. Lihat Kuota dan batas.
  • Untuk detail selengkapnya, lihat halaman referensi sintaksis untuk langkah paralel.

Mengganti fungsi eksperimental dengan langkah paralel

Jika menggunakan experimental.executions.map untuk mendukung pekerjaan paralel, Anda dapat memigrasikan alur kerja untuk menggunakan langkah paralel, yang menjalankan loop for biasa secara paralel. Untuk mengetahui contohnya, lihat Mengganti fungsi eksperimental dengan langkah paralel.

Sampel

Contoh ini menunjukkan sintaksis.

Melakukan operasi secara paralel (menggunakan cabang)

Jika alur kerja Anda memiliki beberapa kumpulan langkah yang berbeda dan dapat dijalankan secara bersamaan, menempatkannya di cabang paralel dapat mengurangi total waktu yang diperlukan untuk menyelesaikan langkah-langkah tersebut.

Dalam contoh berikut, ID pengguna diteruskan sebagai argumen ke alur kerja dan data diambil secara paralel dari dua layanan yang berbeda. Variabel bersama memungkinkan nilai ditulis di cabang, dan dibaca setelah cabang selesai:

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

Memproses item secara paralel (menggunakan loop paralel)

Jika perlu melakukan tindakan yang sama untuk setiap item dalam daftar, Anda dapat menyelesaikan eksekusi dengan lebih cepat menggunakan loop paralel. Loop paralel memungkinkan beberapa iterasi loop dilakukan secara paralel. Perhatikan bahwa, tidak seperti loop for reguler, iterasi dapat dilakukan dalam urutan apa pun.

Pada contoh berikut, serangkaian notifikasi pengguna diproses dalam loop for paralel:

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

Menggabungkan data (menggunakan loop paralel)

Anda dapat memproses sekumpulan item saat mengumpulkan data dari operasi yang dilakukan pada setiap item. Misalnya, Anda mungkin ingin melacak ID item yang dibuat, atau mempertahankan daftar item dengan error.

Dalam contoh berikut, 10 kueri terpisah ke set data BigQuery publik masing-masing menampilkan jumlah kata dalam dokumen, atau kumpulan dokumen. Variabel bersama memungkinkan jumlah kata diakumulasikan dan dibaca setelah semua iterasi selesai. Setelah menghitung jumlah kata di semua dokumen, alur kerja akan menampilkan totalnya.

YAML

# Use a parallel loop to make ten queries to a public BigQuery dataset and
# use a shared variable to accumulate a count of words; after all iterations
# complete, return the total number of words across all documents
main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # 'numWords' is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

Langkah selanjutnya