使用工作流将数据从 Cloud Storage 加载到 BigQuery
使用集合让一切井井有条
根据您的偏好保存内容并对其进行分类。
运行一系列步骤,通过调用 Cloud Functions 函数来编排加载,然后转换 BigQuery 中的数据。
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
如未另行说明,那么本页面中的内容已根据知识共享署名 4.0 许可获得了许可,并且代码示例已根据 Apache 2.0 许可获得了许可。有关详情,请参阅 Google 开发者网站政策。Java 是 Oracle 和/或其关联公司的注册商标。
[[["易于理解","easyToUnderstand","thumb-up"],["解决了我的问题","solvedMyProblem","thumb-up"],["其他","otherUp","thumb-up"]],[["很难理解","hardToUnderstand","thumb-down"],["信息或示例代码不正确","incorrectInformationOrSampleCode","thumb-down"],["没有我需要的信息/示例","missingTheInformationSamplesINeed","thumb-down"],["翻译问题","translationIssue","thumb-down"],["其他","otherDown","thumb-down"]],[],[],[],null,["# Load data from Cloud Storage to BigQuery using a workflow\n\nRuns a series of steps to orchestrate loading and then transforming data in BigQuery by calling Cloud Functions.\n\nExplore further\n---------------\n\n\nFor detailed documentation that includes this code sample, see the following:\n\n- [Load data from Cloud Storage to BigQuery using Workflows](/workflows/docs/tutorials/load-data-from-cloud-storage-to-bigquery-using-workflows)\n\nCode sample\n-----------\n\n### YAML\n\n main:\n steps:\n - constants:\n assign:\n - create_job_url: CREATE_JOB_URL\n - poll_job_url: POLL_BIGQUERY_JOB_URL\n - run_job_url: RUN_BIGQUERY_JOB_URL\n - create_query_url: CREATE_QUERY_URL\n - region: BQ_REGION\n - table_name: BQ_DATASET_TABLE_NAME\n next: createJob\n\n - createJob:\n call: http.get\n args:\n url: ${create_job_url}\n auth:\n type: OIDC\n query:\n region: ${region}\n table_name: ${table_name}\n result: job\n next: setJobId\n\n - setJobId:\n assign:\n - job_id: ${job.body.job_id}\n next: jobCreateCheck\n\n - jobCreateCheck:\n switch:\n - condition: ${job_id == Null}\n next: noOpJob\n next: runLoadJob\n\n - runLoadJob:\n call: runBigQueryJob\n args:\n job_id: ${job_id}\n run_job_url: ${run_job_url}\n poll_job_url: ${poll_job_url}\n result: jobStatus\n next: loadRunCheck\n\n - loadRunCheck:\n switch:\n - condition: ${jobStatus == 2}\n next: createQueryJob\n next: failedLoadJob\n\n - createQueryJob:\n call: http.get\n args:\n url: ${create_query_url}\n query:\n qs: \"select count(*) from serverless_elt_dataset.word_count\"\n region: \"US\"\n auth:\n type: OIDC\n result: queryjob\n next: setQueryJobId\n\n - setQueryJobId:\n assign:\n - qid: ${queryjob.body.job_id}\n next: queryCreateCheck\n\n - queryCreateCheck:\n switch:\n - condition: ${qid == Null}\n next: failedQueryJob\n next: runQueryJob\n\n - runQueryJob:\n call: runBigQueryJob\n args:\n job_id: ${qid}\n run_job_url: ${run_job_url}\n poll_job_url: ${poll_job_url}\n result: queryJobState\n next: runQueryCheck\n\n - runQueryCheck:\n switch:\n - condition: ${queryJobState == 2}\n next: allDone\n next: failedQueryJob\n\n - noOpJob:\n return: \"No files to import\"\n next: end\n\n - allDone:\n return: \"All done!\"\n next: end\n\n - failedQueryJob:\n return: \"Query job failed\"\n next: end\n\n - failedLoadJob:\n return: \"Load job failed\"\n next: end\n\n\n runBigQueryJob:\n params: [job_id, run_job_url, poll_job_url]\n steps:\n - startBigQueryJob:\n try:\n call: http.get\n args:\n url: ${run_job_url}\n query:\n job_id: ${job_id}\n auth:\n type: OIDC\n timeout: 600\n result: submitJobState\n retry: ${http.default_retry}\n next: validateSubmit\n\n - validateSubmit:\n switch:\n - condition: ${submitJobState.body.status == 1}\n next: sleepAndPollLoad\n next: returnState\n\n - returnState:\n return: ${submitJobState.body.status}\n\n - sleepAndPollLoad:\n call: sys.sleep\n args:\n seconds: 5\n next: pollJob\n\n - pollJob:\n try:\n call: http.get\n args:\n url: ${poll_job_url}\n query:\n job_id: ${job_id}\n auth:\n type: OIDC\n timeout: 600\n result: pollJobState\n retry:\n predicate: ${http.default_retry_predicate}\n max_retries: 10\n backoff:\n initial_delay: 1\n max_delay: 60\n multiplier: 2\n next: stateCheck\n\n - stateCheck:\n switch:\n - condition: ${pollJobState.body.status == 2}\n return: ${pollJobState.body.status}\n - condition: ${pollJobState.body.status == 3}\n return: ${pollJobState.body.status}\n next: sleepAndPollLoad\n\nWhat's next\n-----------\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=workflows)."]]