Connecteur de workflows qui définit la fonction intégrée utilisée pour accéder au transfert de données BigQuery dans un workflow.
En savoir plus
Pour obtenir une documentation détaillée incluant cet exemple de code, consultez les articles suivants :
Exemple de code
YAML
# This workflow creates a new dataset and a new table inside that dataset, which are required
# for the BigQuery Data Transfer Job to run. It creates a new TransferJob configuration and starts
# a manual run of the transfer (30 seconds after the config is created).
# The transferRun is a blocking LRO.
# All resources get deleted once the transfer run completes.
#
# On success, it returns "SUCCESS".
#
# Features included in this test:
# - BigQuery Data Transfer connector
# - Waiting for long-running transfer run to complete
#
# This workflow expects following items to be provided through input argument for execution:
# - projectID (string)
# - The user project ID.
# - datasetID (string)
# - The dataset name, expected to have an unique value to avoid the
# instance being referred by multiple tests.
# - tableID (string)
# - The table name, expected to have an unique value to avoid the
# instance being referred by multiple tests.
# - runConfigDisplayName (string)
# - The transfer run configuration display name.
#
# Expected successful output: "SUCCESS"
main:
params: [args]
steps:
- init:
assign:
- project_id: ${args.projectID}
- destination_dataset: ${args.datasetID}
- destination_table: ${args.tableID}
- run_config_display_name: ${args.runConfigDisplayName}
- run_config_data_source_id: "google_cloud_storage"
- location: "us"
- data_path_template: "gs://xxxxxx-bucket/xxxxx/xxxx"
- create_dataset:
call: googleapis.bigquery.v2.datasets.insert
args:
projectId: ${project_id}
body:
datasetReference:
datasetId: ${destination_dataset}
projectId: ${project_id}
- create_table:
call: googleapis.bigquery.v2.tables.insert
args:
datasetId: ${destination_dataset}
projectId: ${project_id}
body:
tableReference:
datasetId: ${destination_dataset}
projectId: ${project_id}
tableId: ${destination_table}
schema:
fields:
- name: "column1"
type: "STRING"
- name: "column2"
type: "STRING"
- list_config:
call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.list
args:
parent: ${"projects/" + project_id + "/locations/us"}
- create_run_config:
call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.create
args:
parent: ${"projects/" + project_id + "/locations/" + location}
body:
displayName: ${run_config_display_name}
schedule: "every day 19:22"
scheduleOptions:
disableAutoScheduling: true
destinationDatasetId: ${destination_dataset}
dataSourceId: ${run_config_data_source_id}
params:
destination_table_name_template: ${destination_table}
file_format: "CSV"
data_path_template: ${data_path_template}
result: config
- get_time_in_30s:
assign:
- now_plus_30s: ${time.format(sys.now() + 30)}
- start_run:
call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.startManualRuns
args:
parent: ${config.name}
body:
requestedRunTime: ${now_plus_30s}
result: runsResp
- remove_run_config:
call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.delete
args:
name: ${config.name}
- delete_table:
call: googleapis.bigquery.v2.tables.delete
args:
datasetId: ${destination_dataset}
projectId: ${project_id}
tableId: ${destination_table}
- delete_dataset:
call: googleapis.bigquery.v2.datasets.delete
args:
projectId: ${project_id}
datasetId: ${destination_dataset}
- the_end:
return: "SUCCESS"
Étapes suivantes
Pour rechercher et filtrer des exemples de code pour d'autres produits Google Cloud, consultez l'exemple de navigateur Google Cloud.