Daten mit einem Workflow aus Cloud Storage in BigQuery laden

Führt eine Reihe von Schritten aus, um das Laden zu orchestrieren und dann die Daten durch Aufrufen von Cloud Functions in BigQuery zu transformieren.

Weitere Informationen

Eine ausführliche Dokumentation, die dieses Codebeispiel enthält, finden Sie hier:

Codebeispiel

YAML

main:
  steps:
    - constants:
        assign:
          - create_job_url: CREATE_JOB_URL
          - poll_job_url: POLL_BIGQUERY_JOB_URL
          - run_job_url: RUN_BIGQUERY_JOB_URL
          - create_query_url: CREATE_QUERY_URL
          - region: BQ_REGION
          - table_name: BQ_DATASET_TABLE_NAME
        next: createJob

    - createJob:
        call: http.get
        args:
          url: ${create_job_url}
          auth:
              type: OIDC
          query:
              region: ${region}
              table_name: ${table_name}
        result: job
        next: setJobId

    - setJobId:
        assign:
          - job_id: ${job.body.job_id}
        next: jobCreateCheck

    - jobCreateCheck:
        switch:
          - condition: ${job_id == Null}
            next: noOpJob
        next: runLoadJob

    - runLoadJob:
        call: runBigQueryJob
        args:
            job_id: ${job_id}
            run_job_url: ${run_job_url}
            poll_job_url: ${poll_job_url}
        result: jobStatus
        next: loadRunCheck

    - loadRunCheck:
        switch:
          - condition: ${jobStatus == 2}
            next: createQueryJob
        next: failedLoadJob

    - createQueryJob:
        call: http.get
        args:
          url: ${create_query_url}
          query:
              qs: "select count(*) from serverless_elt_dataset.word_count"
              region: "US"
          auth:
              type: OIDC
        result: queryjob
        next: setQueryJobId

    - setQueryJobId:
        assign:
          - qid: ${queryjob.body.job_id}
        next: queryCreateCheck

    - queryCreateCheck:
        switch:
          - condition: ${qid == Null}
            next: failedQueryJob
        next: runQueryJob

    - runQueryJob:
        call: runBigQueryJob
        args:
          job_id: ${qid}
          run_job_url: ${run_job_url}
          poll_job_url: ${poll_job_url}
        result: queryJobState
        next: runQueryCheck

    - runQueryCheck:
        switch:
          - condition: ${queryJobState == 2}
            next: allDone
        next: failedQueryJob

    - noOpJob:
        return: "No files to import"
        next: end

    - allDone:
        return: "All done!"
        next: end

    - failedQueryJob:
        return: "Query job failed"
        next: end

    - failedLoadJob:
        return: "Load job failed"
        next: end


runBigQueryJob:
  params: [job_id, run_job_url, poll_job_url]
  steps:
    - startBigQueryJob:
        try:
          call: http.get
          args:
              url: ${run_job_url}
              query:
                job_id: ${job_id}
              auth:
                type: OIDC
              timeout: 600
          result: submitJobState
        retry: ${http.default_retry}
        next: validateSubmit

    - validateSubmit:
        switch:
          - condition: ${submitJobState.body.status == 1}
            next: sleepAndPollLoad
        next: returnState

    - returnState:
        return: ${submitJobState.body.status}

    - sleepAndPollLoad:
        call: sys.sleep
        args:
          seconds: 5
        next: pollJob

    - pollJob:
        try:
          call: http.get
          args:
            url: ${poll_job_url}
            query:
              job_id: ${job_id}
            auth:
              type: OIDC
            timeout: 600
          result: pollJobState
        retry:
          predicate: ${http.default_retry_predicate}
          max_retries: 10
          backoff:
            initial_delay: 1
            max_delay: 60
            multiplier: 2
        next: stateCheck

    - stateCheck:
        switch:
          - condition: ${pollJobState.body.status == 2}
            return: ${pollJobState.body.status}
          - condition: ${pollJobState.body.status == 3}
            return: ${pollJobState.body.status}
        next: sleepAndPollLoad

Nächste Schritte

Wenn Sie nach Codebeispielen für andere Google Cloud -Produkte suchen und filtern möchten, können Sie den Google Cloud -Beispielbrowser verwenden.