BigQuery Data Transfer 连接器

定义用于在工作流内访问 BigQuery Data Transfer 的内置函数的 Workflows 连接器。

深入探索

如需查看包含此代码示例的详细文档,请参阅以下内容:

代码示例

YAML

# This workflow creates a new dataset and a new table inside that dataset, which are required
# for the BigQuery Data Transfer Job to run. It creates a new TransferJob configuration and starts
# a manual run of the transfer (30 seconds after the config is created).
# The transferRun is a blocking LRO.
# All resources get deleted once the transfer run completes.
#
# On success, it returns "SUCCESS".
#
# Features included in this test:
# - BigQuery Data Transfer connector
# - Waiting for long-running transfer run to complete
#
# This workflow expects following items to be provided through input argument for execution:
#   - projectID (string)
#     - The user project ID.
#   - datasetID (string)
#     - The dataset name, expected to have an unique value to avoid the
#       instance being referred by multiple tests.
#   - tableID (string)
#     - The table name, expected to have an unique value to avoid the
#       instance being referred by multiple tests.
#   - runConfigDisplayName (string)
#     - The transfer run configuration display name.
#
# Expected successful output: "SUCCESS"
main:
  params: [args]
  steps:
    - init:
        assign:
          - project_id: ${args.projectID}
          - destination_dataset: ${args.datasetID}
          - destination_table: ${args.tableID}
          - run_config_display_name: ${args.runConfigDisplayName}
          - run_config_data_source_id: "google_cloud_storage"
          - location: "us"
          - data_path_template: "gs://xxxxxx-bucket/xxxxx/xxxx"
    - create_dataset:
        call: googleapis.bigquery.v2.datasets.insert
        args:
          projectId: ${project_id}
          body:
            datasetReference:
              datasetId: ${destination_dataset}
              projectId: ${project_id}
    - create_table:
        call: googleapis.bigquery.v2.tables.insert
        args:
          datasetId: ${destination_dataset}
          projectId: ${project_id}
          body:
            tableReference:
              datasetId: ${destination_dataset}
              projectId: ${project_id}
              tableId: ${destination_table}
            schema:
              fields:
                - name: "column1"
                  type: "STRING"
                - name: "column2"
                  type: "STRING"
    - list_config:
        call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.list
        args:
          parent: ${"projects/" + project_id + "/locations/us"}
    - create_run_config:
        call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.create
        args:
          parent: ${"projects/" + project_id + "/locations/" + location}
          body:
            displayName: ${run_config_display_name}
            schedule: "every day 19:22"
            scheduleOptions:
              disableAutoScheduling: true
            destinationDatasetId: ${destination_dataset}
            dataSourceId: ${run_config_data_source_id}
            params:
              destination_table_name_template: ${destination_table}
              file_format: "CSV"
              data_path_template: ${data_path_template}
        result: config
    - get_time_in_30s:
        assign:
          - now_plus_30s: ${time.format(sys.now() + 30)}
    - start_run:
        call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.startManualRuns
        args:
          parent: ${config.name}
          body:
            requestedRunTime: ${now_plus_30s}
        result: runsResp
    - remove_run_config:
        call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.delete
        args:
          name: ${config.name}
    - delete_table:
        call: googleapis.bigquery.v2.tables.delete
        args:
          datasetId: ${destination_dataset}
          projectId: ${project_id}
          tableId: ${destination_table}
    - delete_dataset:
        call: googleapis.bigquery.v2.datasets.delete
        args:
          projectId: ${project_id}
          datasetId: ${destination_dataset}
    - the_end:
        return: "SUCCESS"

后续步骤

如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器