Memuat data dari Cloud Storage ke BigQuery menggunakan alur kerja

Menjalankan serangkaian langkah untuk mengatur pemuatan, lalu mengubah data di BigQuery dengan memanggil Cloud Functions.

Mempelajari lebih lanjut

Untuk dokumentasi mendetail yang menyertakan contoh kode ini, lihat artikel berikut:

Contoh kode

YAML

main:
  steps:
    - constants:
        assign:
          - create_job_url: CREATE_JOB_URL
          - poll_job_url: POLL_BIGQUERY_JOB_URL
          - run_job_url: RUN_BIGQUERY_JOB_URL
          - create_query_url: CREATE_QUERY_URL
          - region: BQ_REGION
          - table_name: BQ_DATASET_TABLE_NAME
        next: createJob

    - createJob:
        call: http.get
        args:
          url: ${create_job_url}
          auth:
              type: OIDC
          query:
              region: ${region}
              table_name: ${table_name}
        result: job
        next: setJobId

    - setJobId:
        assign:
          - job_id: ${job.body.job_id}
        next: jobCreateCheck

    - jobCreateCheck:
        switch:
          - condition: ${job_id == Null}
            next: noOpJob
        next: runLoadJob

    - runLoadJob:
        call: runBigQueryJob
        args:
            job_id: ${job_id}
            run_job_url: ${run_job_url}
            poll_job_url: ${poll_job_url}
        result: jobStatus
        next: loadRunCheck

    - loadRunCheck:
        switch:
          - condition: ${jobStatus == 2}
            next: createQueryJob
        next: failedLoadJob

    - createQueryJob:
        call: http.get
        args:
          url: ${create_query_url}
          query:
              qs: "select count(*) from serverless_elt_dataset.word_count"
              region: "US"
          auth:
              type: OIDC
        result: queryjob
        next: setQueryJobId

    - setQueryJobId:
        assign:
          - qid: ${queryjob.body.job_id}
        next: queryCreateCheck

    - queryCreateCheck:
        switch:
          - condition: ${qid == Null}
            next: failedQueryJob
        next: runQueryJob

    - runQueryJob:
        call: runBigQueryJob
        args:
          job_id: ${qid}
          run_job_url: ${run_job_url}
          poll_job_url: ${poll_job_url}
        result: queryJobState
        next: runQueryCheck

    - runQueryCheck:
        switch:
          - condition: ${queryJobState == 2}
            next: allDone
        next: failedQueryJob

    - noOpJob:
        return: "No files to import"
        next: end

    - allDone:
        return: "All done!"
        next: end

    - failedQueryJob:
        return: "Query job failed"
        next: end

    - failedLoadJob:
        return: "Load job failed"
        next: end


runBigQueryJob:
  params: [job_id, run_job_url, poll_job_url]
  steps:
    - startBigQueryJob:
        try:
          call: http.get
          args:
              url: ${run_job_url}
              query:
                job_id: ${job_id}
              auth:
                type: OIDC
              timeout: 600
          result: submitJobState
        retry: ${http.default_retry}
        next: validateSubmit

    - validateSubmit:
        switch:
          - condition: ${submitJobState.body.status == 1}
            next: sleepAndPollLoad
        next: returnState

    - returnState:
        return: ${submitJobState.body.status}

    - sleepAndPollLoad:
        call: sys.sleep
        args:
          seconds: 5
        next: pollJob

    - pollJob:
        try:
          call: http.get
          args:
            url: ${poll_job_url}
            query:
              job_id: ${job_id}
            auth:
              type: OIDC
            timeout: 600
          result: pollJobState
        retry:
          predicate: ${http.default_retry_predicate}
          max_retries: 10
          backoff:
            initial_delay: 1
            max_delay: 60
            multiplier: 2
        next: stateCheck

    - stateCheck:
        switch:
          - condition: ${pollJobState.body.status == 2}
            return: ${pollJobState.body.status}
          - condition: ${pollJobState.body.status == 3}
            return: ${pollJobState.body.status}
        next: sleepAndPollLoad

Langkah berikutnya

Untuk menelusuri dan memfilter contoh kode untuk produk Google Cloud lainnya, lihat Google Cloud browser contoh.