Charger des données de Cloud Storage vers BigQuery à l'aide d'un workflow

Exécute une série d'étapes pour orchestrer le chargement, puis transformer les données dans BigQuery en appelant Cloud Functions.

En savoir plus

Pour obtenir une documentation détaillée incluant cet exemple de code, consultez les articles suivants :

Exemple de code

YAML

main:
  steps:
    - constants:
        assign:
          - create_job_url: CREATE_JOB_URL
          - poll_job_url: POLL_BIGQUERY_JOB_URL
          - run_job_url: RUN_BIGQUERY_JOB_URL
          - create_query_url: CREATE_QUERY_URL
          - region: BQ_REGION
          - table_name: BQ_DATASET_TABLE_NAME
        next: createJob

    - createJob:
        call: http.get
        args:
          url: ${create_job_url}
          auth:
              type: OIDC
          query:
              region: ${region}
              table_name: ${table_name}
        result: job
        next: setJobId

    - setJobId:
        assign:
          - job_id: ${job.body.job_id}
        next: jobCreateCheck

    - jobCreateCheck:
        switch:
          - condition: ${job_id == Null}
            next: noOpJob
        next: runLoadJob

    - runLoadJob:
        call: runBigQueryJob
        args:
            job_id: ${job_id}
            run_job_url: ${run_job_url}
            poll_job_url: ${poll_job_url}
        result: jobStatus
        next: loadRunCheck

    - loadRunCheck:
        switch:
          - condition: ${jobStatus == 2}
            next: createQueryJob
        next: failedLoadJob

    - createQueryJob:
        call: http.get
        args:
          url: ${create_query_url}
          query:
              qs: "select count(*) from serverless_elt_dataset.word_count"
              region: "US"
          auth:
              type: OIDC
        result: queryjob
        next: setQueryJobId

    - setQueryJobId:
        assign:
          - qid: ${queryjob.body.job_id}
        next: queryCreateCheck

    - queryCreateCheck:
        switch:
          - condition: ${qid == Null}
            next: failedQueryJob
        next: runQueryJob

    - runQueryJob:
        call: runBigQueryJob
        args:
          job_id: ${qid}
          run_job_url: ${run_job_url}
          poll_job_url: ${poll_job_url}
        result: queryJobState
        next: runQueryCheck

    - runQueryCheck:
        switch:
          - condition: ${queryJobState == 2}
            next: allDone
        next: failedQueryJob

    - noOpJob:
        return: "No files to import"
        next: end

    - allDone:
        return: "All done!"
        next: end

    - failedQueryJob:
        return: "Query job failed"
        next: end

    - failedLoadJob:
        return: "Load job failed"
        next: end


runBigQueryJob:
  params: [job_id, run_job_url, poll_job_url]
  steps:
    - startBigQueryJob:
        try:
          call: http.get
          args:
              url: ${run_job_url}
              query:
                job_id: ${job_id}
              auth:
                type: OIDC
              timeout: 600
          result: submitJobState
        retry: ${http.default_retry}
        next: validateSubmit

    - validateSubmit:
        switch:
          - condition: ${submitJobState.body.status == 1}
            next: sleepAndPollLoad
        next: returnState

    - returnState:
        return: ${submitJobState.body.status}

    - sleepAndPollLoad:
        call: sys.sleep
        args:
          seconds: 5
        next: pollJob

    - pollJob:
        try:
          call: http.get
          args:
            url: ${poll_job_url}
            query:
              job_id: ${job_id}
            auth:
              type: OIDC
            timeout: 600
          result: pollJobState
        retry:
          predicate: ${http.default_retry_predicate}
          max_retries: 10
          backoff:
            initial_delay: 1
            max_delay: 60
            multiplier: 2
        next: stateCheck

    - stateCheck:
        switch:
          - condition: ${pollJobState.body.status == 2}
            return: ${pollJobState.body.status}
          - condition: ${pollJobState.body.status == 3}
            return: ${pollJobState.body.status}
        next: sleepAndPollLoad

Étape suivante

Pour rechercher et filtrer des exemples de code pour d'autres produits Google Cloud , consultez l'explorateur d'exemplesGoogle Cloud .