Connector für BigQuery Data Transfer

Workflow-Connector, der die integrierte Funktion definiert, die für den Zugriff auf BigQuery Data Transfer Service in einem Workflow verwendet wird.

Weitere Informationen

Eine ausführliche Dokumentation, die dieses Codebeispiel enthält, finden Sie hier:

Codebeispiel

YAML

# This workflow creates a new dataset and a new table inside that dataset, which are required
# for the BigQuery Data Transfer Job to run. It creates a new TransferJob configuration and starts
# a manual run of the transfer (30 seconds after the config is created).
# The transferRun is a blocking LRO.
# All resources get deleted once the transfer run completes.
#
# On success, it returns "SUCCESS".
#
# Features included in this test:
# - BigQuery Data Transfer connector
# - Waiting for long-running transfer run to complete
#
# This workflow expects following items to be provided through input argument for execution:
#   - projectID (string)
#     - The user project ID.
#   - datasetID (string)
#     - The dataset name, expected to have an unique value to avoid the
#       instance being referred by multiple tests.
#   - tableID (string)
#     - The table name, expected to have an unique value to avoid the
#       instance being referred by multiple tests.
#   - runConfigDisplayName (string)
#     - The transfer run configuration display name.
#
# Expected successful output: "SUCCESS"
main:
  params: [args]
  steps:
    - init:
        assign:
          - project_id: ${args.projectID}
          - destination_dataset: ${args.datasetID}
          - destination_table: ${args.tableID}
          - run_config_display_name: ${args.runConfigDisplayName}
          - run_config_data_source_id: "google_cloud_storage"
          - location: "us"
          - data_path_template: "gs://xxxxxx-bucket/xxxxx/xxxx"
    - create_dataset:
        call: googleapis.bigquery.v2.datasets.insert
        args:
          projectId: ${project_id}
          body:
            datasetReference:
              datasetId: ${destination_dataset}
              projectId: ${project_id}
    - create_table:
        call: googleapis.bigquery.v2.tables.insert
        args:
          datasetId: ${destination_dataset}
          projectId: ${project_id}
          body:
            tableReference:
              datasetId: ${destination_dataset}
              projectId: ${project_id}
              tableId: ${destination_table}
            schema:
              fields:
                - name: "column1"
                  type: "STRING"
                - name: "column2"
                  type: "STRING"
    - list_config:
        call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.list
        args:
          parent: ${"projects/" + project_id + "/locations/us"}
    - create_run_config:
        call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.create
        args:
          parent: ${"projects/" + project_id + "/locations/" + location}
          body:
            displayName: ${run_config_display_name}
            schedule: "every day 19:22"
            scheduleOptions:
              disableAutoScheduling: true
            destinationDatasetId: ${destination_dataset}
            dataSourceId: ${run_config_data_source_id}
            params:
              destination_table_name_template: ${destination_table}
              file_format: "CSV"
              data_path_template: ${data_path_template}
        result: config
    - get_time_in_30s:
        assign:
          - now_plus_30s: ${time.format(sys.now() + 30)}
    - start_run:
        call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.startManualRuns
        args:
          parent: ${config.name}
          body:
            requestedRunTime: ${now_plus_30s}
        result: runsResp
    - remove_run_config:
        call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.delete
        args:
          name: ${config.name}
    - delete_table:
        call: googleapis.bigquery.v2.tables.delete
        args:
          datasetId: ${destination_dataset}
          projectId: ${project_id}
          tableId: ${destination_table}
    - delete_dataset:
        call: googleapis.bigquery.v2.datasets.delete
        args:
          projectId: ${project_id}
          datasetId: ${destination_dataset}
    - the_end:
        return: "SUCCESS"

Nächste Schritte

Wenn Sie nach Codebeispielen für andere Google Cloud -Produkte suchen und filtern möchten, können Sie den Google Cloud -Beispielbrowser verwenden.