Daten mit einem Workflow aus Cloud Storage in BigQuery laden

Führt eine Reihe von Schritten aus, um das Laden und anschließende Transformieren von Daten in BigQuery durch Aufrufen von Cloud Functions zu orchestrieren.

Codebeispiel

YAML

main:
  steps:
    - constants:
        assign:
          - create_job_url: CREATE_JOB_URL
          - poll_job_url: POLL_BIGQUERY_JOB_URL
          - run_job_url: RUN_BIGQUERY_JOB_URL
          - create_query_url: CREATE_QUERY_URL
          - region: BQ_REGION
          - table_name: BQ_DATASET_TABLE_NAME
        next: createJob

    - createJob:
        call: http.get
        args:
          url: ${create_job_url}
          auth:
              type: OIDC
          query:
              region: ${region}
              table_name: ${table_name}
        result: job
        next: setJobId

    - setJobId:
        assign:
          - job_id: ${job.body.job_id}
        next: jobCreateCheck

    - jobCreateCheck:
        switch:
          - condition: ${job_id == Null}
            next: noOpJob
        next: runLoadJob

    - runLoadJob:
        call: runBigQueryJob
        args:
            job_id: ${job_id}
            run_job_url: ${run_job_url}
            poll_job_url: ${poll_job_url}
        result: jobStatus
        next: loadRunCheck

    - loadRunCheck:
        switch:
          - condition: ${jobStatus == 2}
            next: createQueryJob
        next: failedLoadJob

    - createQueryJob:
        call: http.get
        args:
          url: ${create_query_url}
          query:
              qs: "select count(*) from serverless_elt_dataset.word_count"
              region: "US"
          auth:
              type: OIDC
        result: queryjob
        next: setQueryJobId

    - setQueryJobId:
        assign:
          - qid: ${queryjob.body.job_id}
        next: queryCreateCheck

    - queryCreateCheck:
        switch:
          - condition: ${qid == Null}
            next: failedQueryJob
        next: runQueryJob

    - runQueryJob:
        call: runBigQueryJob
        args:
          job_id: ${qid}
          run_job_url: ${run_job_url}
          poll_job_url: ${poll_job_url}
        result: queryJobState
        next: runQueryCheck

    - runQueryCheck:
        switch:
          - condition: ${queryJobState == 2}
            next: allDone
        next: failedQueryJob

    - noOpJob:
        return: "No files to import"
        next: end

    - allDone:
        return: "All done!"
        next: end

    - failedQueryJob:
        return: "Query job failed"
        next: end

    - failedLoadJob:
        return: "Load job failed"
        next: end


runBigQueryJob:
  params: [job_id, run_job_url, poll_job_url]
  steps:
    - startBigQueryJob:
        try:
          call: http.get
          args:
              url: ${run_job_url}
              query:
                job_id: ${job_id}
              auth:
                type: OIDC
              timeout: 600
          result: submitJobState
        retry: ${http.default_retry}
        next: validateSubmit

    - validateSubmit:
        switch:
          - condition: ${submitJobState.body.status == 1}
            next: sleepAndPollLoad
        next: returnState

    - returnState:
        return: ${submitJobState.body.status}

    - sleepAndPollLoad:
        call: sys.sleep
        args:
          seconds: 5
        next: pollJob

    - pollJob:
        try:
          call: http.get
          args:
            url: ${poll_job_url}
            query:
              job_id: ${job_id}
            auth:
              type: OIDC
            timeout: 600
          result: pollJobState
        retry:
          predicate: ${http.default_retry_predicate}
          max_retries: 10
          backoff:
            initial_delay: 1
            max_delay: 60
            multiplier: 2
        next: stateCheck

    - stateCheck:
        switch:
          - condition: ${pollJobState.body.status == 2}
            return: ${pollJobState.body.status}
          - condition: ${pollJobState.body.status == 3}
            return: ${pollJobState.body.status}
        next: sleepAndPollLoad

Nächste Schritte

Informationen zum Suchen und Filtern von Codebeispielen für andere Google Cloud-Produkte finden Sie im Google Cloud-Beispielbrowser.