Menjalankan langkah alur kerja secara paralel

Langkah paralel dapat mengurangi total waktu eksekusi untuk alur kerja dengan melakukan beberapa panggilan pemblokir secara bersamaan.

Memblokir panggilan seperti sleep, panggilan HTTP, dan callbacks dapat memerlukan waktu dari milidetik hingga hari. Langkah-langkah paralel dimaksudkan untuk membantu operasi yang berjalan lama secara serentak. Jika alur kerja harus melakukan beberapa panggilan pemblokiran yang tidak bergantung satu sama lain, penggunaan cabang paralel dapat mengurangi total waktu eksekusi dengan memulai panggilan secara bersamaan, dan menunggu hingga semuanya selesai.

Misalnya, jika alur kerja Anda harus mengambil data pelanggan dari beberapa sistem independen sebelum melanjutkan, cabang paralel akan mengizinkan permintaan API serentak. Jika ada lima sistem dan masing-masing sistem memerlukan waktu dua detik untuk merespons, melakukan langkah-langkah secara berurutan dalam alur kerja dapat memerlukan waktu minimal 10 detik; melakukannya secara paralel hanya membutuhkan waktu dua detik.

Membuat langkah paralel

Buat langkah parallel untuk menentukan bagian alur kerja Anda tempat dua atau beberapa langkah dapat dieksekusi secara serentak.

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

Ganti kode berikut:

  • PARALLEL_STEP_NAME: nama langkah paralel.
  • POLICY (opsional): menentukan tindakan yang akan diambil cabang lain saat terjadi pengecualian yang tidak tertangani. Kebijakan default, continueAll, tidak akan menghasilkan tindakan lebih lanjut, dan semua cabang lainnya akan mencoba dijalankan. Perlu diketahui bahwa continueAll adalah satu-satunya kebijakan yang saat ini didukung.
  • VARIABLE_A, VARIABLE_B, dan seterusnya: daftar variabel yang dapat ditulis dengan cakupan induk yang memungkinkan penetapan dalam langkah paralel. Untuk mengetahui informasi selengkapnya, lihat Variabel bersama.
  • CONCURRENCY_LIMIT (opsional): jumlah maksimum cabang dan iterasi yang dapat dieksekusi secara bersamaan dalam satu eksekusi alur kerja sebelum cabang dan iterasi lebih lanjut diantrekan untuk menunggu. Ini hanya berlaku untuk satu langkah parallel dan tidak menurun. Harus berupa bilangan bulat positif dan dapat berupa nilai literal atau ekspresi. Untuk mengetahui detailnya, lihat Batas konkurensi.
  • BRANCHES_OR_FOR: gunakan branches atau for untuk menunjukkan salah satu dari hal berikut:
    • Cabang yang dapat berjalan secara serentak.
    • Loop tempat iterasi dapat berjalan secara serentak.

Perhatikan hal-hal berikut:

  • Cabang dan iterasi paralel dapat berjalan dalam urutan apa pun, dan dapat berjalan dalam urutan yang berbeda di setiap eksekusi.
  • Langkah paralel dapat mencakup langkah paralel lain yang disusun bertingkat hingga batas kedalaman. Lihat Kuota dan batas.
  • Untuk mengetahui detail selengkapnya, lihat halaman referensi sintaksis untuk langkah paralel.

Mengganti fungsi eksperimental dengan langkah paralel

Jika menggunakan experimental.executions.map untuk mendukung pekerjaan paralel, Anda dapat memigrasikan alur kerja untuk menggunakan langkah paralel, dengan menjalankan loop for biasa secara paralel. Untuk contoh, lihat Mengganti fungsi eksperimental dengan langkah paralel.

Sampel

Contoh ini menunjukkan sintaksis.

Melakukan operasi secara paralel (menggunakan cabang)

Jika alur kerja Anda memiliki beberapa rangkaian langkah yang berbeda dan dapat dijalankan secara bersamaan, menempatkannya di cabang paralel dapat mengurangi total waktu yang diperlukan untuk menyelesaikan langkah-langkah tersebut.

Pada contoh berikut, ID pengguna diteruskan sebagai argumen ke alur kerja dan data diambil secara paralel dari dua layanan yang berbeda. Variabel bersama memungkinkan nilai ditulis di cabang, dan dibaca setelah cabang selesai:

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

Memproses item secara paralel (menggunakan loop paralel)

Jika perlu melakukan tindakan yang sama untuk setiap item dalam daftar, Anda dapat menyelesaikan eksekusi dengan lebih cepat menggunakan loop paralel. Loop paralel memungkinkan beberapa iterasi loop dilakukan secara paralel. Perlu diperhatikan bahwa, tidak seperti for loop biasa, iterasi dapat dilakukan dalam urutan apa pun.

Pada contoh berikut, serangkaian notifikasi pengguna diproses dalam loop for paralel:

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

Menggabungkan data (menggunakan loop paralel)

Anda dapat memproses serangkaian item sambil mengumpulkan data dari operasi yang dilakukan pada setiap item. Misalnya, Anda mungkin ingin melacak ID item yang dibuat, atau mempertahankan daftar item yang memiliki error.

Dalam contoh berikut, 10 kueri terpisah untuk set data BigQuery publik, masing-masing menampilkan jumlah kata dalam dokumen, atau kumpulan dokumen. Variabel bersama memungkinkan jumlah kata terakumulasi dan dibaca setelah semua iterasi selesai. Setelah menghitung jumlah kata di semua dokumen, alur kerja akan menampilkan totalnya.

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # numWords is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

Langkah selanjutnya