Connector for BigQuery Data Transfer

Workflows connector that defines the built-in function used to access BigQuery Data Transfer within a workflow.

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample

YAML

# This workflow creates a new dataset and a new table inside that dataset, which are required
# for the BigQuery Data Transfer Job to run. It creates a new TransferJob configuration and starts
# a manual run of the transfer (30 seconds after the config is created).
# The transferRun is a blocking LRO.
# All resources get deleted once the transfer run completes.
#
# On success, it returns "SUCCESS".
#
# Features included in this test:
# - BigQuery Data Transfer connector
# - Waiting for long-running transfer run to complete
#
# This workflow expects following items to be provided through input argument for execution:
#   - projectID (string)
#     - The user project ID.
#   - datasetID (string)
#     - The dataset name, expected to have an unique value to avoid the
#       instance being referred by multiple tests.
#   - tableID (string)
#     - The table name, expected to have an unique value to avoid the
#       instance being referred by multiple tests.
#   - runConfigDisplayName (string)
#     - The transfer run configuration display name.
#
# Expected successful output: "SUCCESS"
main:
  params: [args]
  steps:
    - init:
        assign:
          - project_id: ${args.projectID}
          - destination_dataset: ${args.datasetID}
          - destination_table: ${args.tableID}
          - run_config_display_name: ${args.runConfigDisplayName}
          - run_config_data_source_id: "google_cloud_storage"
          - location: "us"
          - data_path_template: "gs://xxxxxx-bucket/xxxxx/xxxx"
    - create_dataset:
        call: googleapis.bigquery.v2.datasets.insert
        args:
          projectId: ${project_id}
          body:
            datasetReference:
              datasetId: ${destination_dataset}
              projectId: ${project_id}
    - create_table:
        call: googleapis.bigquery.v2.tables.insert
        args:
          datasetId: ${destination_dataset}
          projectId: ${project_id}
          body:
            tableReference:
              datasetId: ${destination_dataset}
              projectId: ${project_id}
              tableId: ${destination_table}
            schema:
              fields:
                - name: "column1"
                  type: "STRING"
                - name: "column2"
                  type: "STRING"
    - list_config:
        call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.list
        args:
          parent: ${"projects/" + project_id + "/locations/us"}
    - create_run_config:
        call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.create
        args:
          parent: ${"projects/" + project_id + "/locations/" + location}
          body:
            displayName: ${run_config_display_name}
            schedule: "every day 19:22"
            scheduleOptions:
              disableAutoScheduling: true
            destinationDatasetId: ${destination_dataset}
            dataSourceId: ${run_config_data_source_id}
            params:
              destination_table_name_template: ${destination_table}
              file_format: "CSV"
              data_path_template: ${data_path_template}
        result: config
    - get_time_in_30s:
        assign:
          - now_plus_30s: ${time.format(sys.now() + 30)}
    - start_run:
        call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.startManualRuns
        args:
          parent: ${config.name}
          body:
            requestedRunTime: ${now_plus_30s}
        result: runsResp
    - remove_run_config:
        call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.delete
        args:
          name: ${config.name}
    - delete_table:
        call: googleapis.bigquery.v2.tables.delete
        args:
          datasetId: ${destination_dataset}
          projectId: ${project_id}
          tableId: ${destination_table}
    - delete_dataset:
        call: googleapis.bigquery.v2.datasets.delete
        args:
          projectId: ${project_id}
          datasetId: ${destination_dataset}
    - the_end:
        return: "SUCCESS"

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser.