定义用于在工作流内访问 BigQuery Data Transfer 的内置函数的 Workflows 连接器。
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
YAML
# This workflow creates a new dataset and a new table inside that dataset, which are required
# for the BigQuery Data Transfer Job to run. It creates a new TransferJob configuration and starts
# a manual run of the transfer (30 seconds after the config is created).
# The transferRun is a blocking LRO.
# All resources get deleted once the transfer run completes.
#
# On success, it returns "SUCCESS".
#
# Features included in this test:
# - BigQuery Data Transfer connector
# - Waiting for long-running transfer run to complete
#
# This workflow expects following items to be provided through input argument for execution:
# - projectID (string)
# - The user project ID.
# - datasetID (string)
# - The dataset name, expected to have an unique value to avoid the
# instance being referred by multiple tests.
# - tableID (string)
# - The table name, expected to have an unique value to avoid the
# instance being referred by multiple tests.
# - runConfigDisplayName (string)
# - The transfer run configuration display name.
#
# Expected successful output: "SUCCESS"
main:
params: [args]
steps:
- init:
assign:
- project_id: ${args.projectID}
- destination_dataset: ${args.datasetID}
- destination_table: ${args.tableID}
- run_config_display_name: ${args.runConfigDisplayName}
- run_config_data_source_id: "google_cloud_storage"
- location: "us"
- data_path_template: "gs://xxxxxx-bucket/xxxxx/xxxx"
- create_dataset:
call: googleapis.bigquery.v2.datasets.insert
args:
projectId: ${project_id}
body:
datasetReference:
datasetId: ${destination_dataset}
projectId: ${project_id}
- create_table:
call: googleapis.bigquery.v2.tables.insert
args:
datasetId: ${destination_dataset}
projectId: ${project_id}
body:
tableReference:
datasetId: ${destination_dataset}
projectId: ${project_id}
tableId: ${destination_table}
schema:
fields:
- name: "column1"
type: "STRING"
- name: "column2"
type: "STRING"
- list_config:
call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.list
args:
parent: ${"projects/" + project_id + "/locations/us"}
- create_run_config:
call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.create
args:
parent: ${"projects/" + project_id + "/locations/" + location}
body:
displayName: ${run_config_display_name}
schedule: "every day 19:22"
scheduleOptions:
disableAutoScheduling: true
destinationDatasetId: ${destination_dataset}
dataSourceId: ${run_config_data_source_id}
params:
destination_table_name_template: ${destination_table}
file_format: "CSV"
data_path_template: ${data_path_template}
result: config
- get_time_in_30s:
assign:
- now_plus_30s: ${time.format(sys.now() + 30)}
- start_run:
call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.startManualRuns
args:
parent: ${config.name}
body:
requestedRunTime: ${now_plus_30s}
result: runsResp
- remove_run_config:
call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.delete
args:
name: ${config.name}
- delete_table:
call: googleapis.bigquery.v2.tables.delete
args:
datasetId: ${destination_dataset}
projectId: ${project_id}
tableId: ${destination_table}
- delete_dataset:
call: googleapis.bigquery.v2.datasets.delete
args:
projectId: ${project_id}
datasetId: ${destination_dataset}
- the_end:
return: "SUCCESS"
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。