Carga datos de Cloud Storage a BigQuery con un flujo de trabajo

Ejecuta una serie de pasos para organizar la carga y, luego, transformar los datos en BigQuery mediante una llamada a Cloud Functions.

Muestra de código

YAML

main:
  steps:
    - constants:
        assign:
          - create_job_url: CREATE_JOB_URL
          - poll_job_url: POLL_BIGQUERY_JOB_URL
          - run_job_url: RUN_BIGQUERY_JOB_URL
          - create_query_url: CREATE_QUERY_URL
          - region: BQ_REGION
          - table_name: BQ_DATASET_TABLE_NAME
        next: createJob

    - createJob:
        call: http.get
        args:
          url: ${create_job_url}
          auth:
              type: OIDC
          query:
              region: ${region}
              table_name: ${table_name}
        result: job
        next: setJobId

    - setJobId:
        assign:
          - job_id: ${job.body.job_id}
        next: jobCreateCheck

    - jobCreateCheck:
        switch:
          - condition: ${job_id == Null}
            next: noOpJob
        next: runLoadJob

    - runLoadJob:
        call: runBigQueryJob
        args:
            job_id: ${job_id}
            run_job_url: ${run_job_url}
            poll_job_url: ${poll_job_url}
        result: jobStatus
        next: loadRunCheck

    - loadRunCheck:
        switch:
          - condition: ${jobStatus == 2}
            next: createQueryJob
        next: failedLoadJob

    - createQueryJob:
        call: http.get
        args:
          url: ${create_query_url}
          query:
              qs: "select count(*) from serverless_elt_dataset.word_count"
              region: "US"
          auth:
              type: OIDC
        result: queryjob
        next: setQueryJobId

    - setQueryJobId:
        assign:
          - qid: ${queryjob.body.job_id}
        next: queryCreateCheck

    - queryCreateCheck:
        switch:
          - condition: ${qid == Null}
            next: failedQueryJob
        next: runQueryJob

    - runQueryJob:
        call: runBigQueryJob
        args:
          job_id: ${qid}
          run_job_url: ${run_job_url}
          poll_job_url: ${poll_job_url}
        result: queryJobState
        next: runQueryCheck

    - runQueryCheck:
        switch:
          - condition: ${queryJobState == 2}
            next: allDone
        next: failedQueryJob

    - noOpJob:
        return: "No files to import"
        next: end

    - allDone:
        return: "All done!"
        next: end

    - failedQueryJob:
        return: "Query job failed"
        next: end

    - failedLoadJob:
        return: "Load job failed"
        next: end


runBigQueryJob:
  params: [job_id, run_job_url, poll_job_url]
  steps:
    - startBigQueryJob:
        try:
          call: http.get
          args:
              url: ${run_job_url}
              query:
                job_id: ${job_id}
              auth:
                type: OIDC
              timeout: 600
          result: submitJobState
        retry: ${http.default_retry}
        next: validateSubmit

    - validateSubmit:
        switch:
          - condition: ${submitJobState.body.status == 1}
            next: sleepAndPollLoad
        next: returnState

    - returnState:
        return: ${submitJobState.body.status}

    - sleepAndPollLoad:
        call: sys.sleep
        args:
          seconds: 5
        next: pollJob

    - pollJob:
        try:
          call: http.get
          args:
            url: ${poll_job_url}
            query:
              job_id: ${job_id}
            auth:
              type: OIDC
            timeout: 600
          result: pollJobState
        retry:
          predicate: ${http.default_retry_predicate}
          max_retries: 10
          backoff:
            initial_delay: 1
            max_delay: 60
            multiplier: 2
        next: stateCheck

    - stateCheck:
        switch:
          - condition: ${pollJobState.body.status == 2}
            return: ${pollJobState.body.status}
          - condition: ${pollJobState.body.status == 3}
            return: ${pollJobState.body.status}
        next: sleepAndPollLoad

¿Qué sigue?

Para buscar y filtrar muestras de código para otros productos de Google Cloud, consulta el navegador de muestra de Google Cloud.