Connecteur pour le transfert de données BigQuery

Connecteur de workflows qui définit la fonction intégrée utilisée pour accéder au transfert de données BigQuery dans un workflow.

En savoir plus

Pour obtenir une documentation détaillée incluant cet exemple de code, consultez les articles suivants :

Exemple de code

YAML

# This workflow creates a new dataset and a new table inside that dataset, which are required
# for the BigQuery Data Transfer Job to run. It creates a new TransferJob configuration and starts
# a manual run of the transfer (30 seconds after the config is created).
# The transferRun is a blocking LRO.
# All resources get deleted once the transfer run completes.
#
# On success, it returns "SUCCESS".
#
# Features included in this test:
# - BigQuery Data Transfer connector
# - Waiting for long-running transfer run to complete
#
# This workflow expects following items to be provided through input argument for execution:
#   - projectID (string)
#     - The user project ID.
#   - datasetID (string)
#     - The dataset name, expected to have an unique value to avoid the
#       instance being referred by multiple tests.
#   - tableID (string)
#     - The table name, expected to have an unique value to avoid the
#       instance being referred by multiple tests.
#   - runConfigDisplayName (string)
#     - The transfer run configuration display name.
#
# Expected successful output: "SUCCESS"
main:
  params: [args]
  steps:
    - init:
        assign:
          - project_id: ${args.projectID}
          - destination_dataset: ${args.datasetID}
          - destination_table: ${args.tableID}
          - run_config_display_name: ${args.runConfigDisplayName}
          - run_config_data_source_id: "google_cloud_storage"
          - location: "us"
          - data_path_template: "gs://xxxxxx-bucket/xxxxx/xxxx"
    - create_dataset:
        call: googleapis.bigquery.v2.datasets.insert
        args:
          projectId: ${project_id}
          body:
            datasetReference:
              datasetId: ${destination_dataset}
              projectId: ${project_id}
    - create_table:
        call: googleapis.bigquery.v2.tables.insert
        args:
          datasetId: ${destination_dataset}
          projectId: ${project_id}
          body:
            tableReference:
              datasetId: ${destination_dataset}
              projectId: ${project_id}
              tableId: ${destination_table}
            schema:
              fields:
                - name: "column1"
                  type: "STRING"
                - name: "column2"
                  type: "STRING"
    - list_config:
        call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.list
        args:
          parent: ${"projects/" + project_id + "/locations/us"}
    - create_run_config:
        call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.create
        args:
          parent: ${"projects/" + project_id + "/locations/" + location}
          body:
            displayName: ${run_config_display_name}
            schedule: "every day 19:22"
            scheduleOptions:
              disableAutoScheduling: true
            destinationDatasetId: ${destination_dataset}
            dataSourceId: ${run_config_data_source_id}
            params:
              destination_table_name_template: ${destination_table}
              file_format: "CSV"
              data_path_template: ${data_path_template}
        result: config
    - get_time_in_30s:
        assign:
          - now_plus_30s: ${time.format(sys.now() + 30)}
    - start_run:
        call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.startManualRuns
        args:
          parent: ${config.name}
          body:
            requestedRunTime: ${now_plus_30s}
        result: runsResp
    - remove_run_config:
        call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.delete
        args:
          name: ${config.name}
    - delete_table:
        call: googleapis.bigquery.v2.tables.delete
        args:
          datasetId: ${destination_dataset}
          projectId: ${project_id}
          tableId: ${destination_table}
    - delete_dataset:
        call: googleapis.bigquery.v2.datasets.delete
        args:
          projectId: ${project_id}
          datasetId: ${destination_dataset}
    - the_end:
        return: "SUCCESS"

Étapes suivantes

Pour rechercher et filtrer des exemples de code pour d'autres produits Google Cloud, consultez l'exemple de navigateur Google Cloud.