Cloud Storage 转移作业

通过适用于 Cloud Storage 的 BigQuery Data Transfer Service,您可以安排从 Cloud Storage 到 BigQuery 的周期性数据加载作业。

准备工作

在创建 Cloud Storage 转移作业之前,请先完成以下操作:

限制

从 Cloud Storage 到 BigQuery 的周期性转移作业受到以下限制:

  • 转移作业中所有与通配符或运行时参数定义的模式匹配的文件都必须共用您已为目标表定义的相同架构,否则该转移作业将失败。如果表架构在多次运行期间发生更改,也会导致转移作业失败。
  • 由于可以对 Cloud Storage 对象进行版本控制,因此请务必注意,BigQuery 转移作业不支持已归档的 Cloud Storage 对象。对象必须处于活跃状态才能被转移。
  • 对于正在运行的转移作业,您需要先创建目标表及其架构,然后再设置转移作业,这一点与从 Cloud Storage 到 BigQuery 的各个数据加载作业不同。BigQuery 无法在周期性数据转移过程中创建表。
  • 以 Cloud Storage 为来源的转移作业始终会通过 WRITE_APPEND 偏好设置触发,该偏好设置会将数据附加到目标表中。如需了解详情,请参阅 JobConfigurationLoad 对象的 writeDisposition 字段的说明。
  • 如果 Cloud Storage 文件在转移过程中进行了修改,则 BigQuery Data Transfer Service 不保证转移所有文件或仅转移一次。
  • 如果数据集的位置设置为 US 以外的值,则单区域或多区域 Cloud Storage 存储分区必须与数据集位于相同的区域。
  • BigQuery Data Transfer Service 不保证外部数据源的数据一致性。在查询运行的过程中,底层数据的更改可能会导致意外行为。

  • Cloud Storage 源数据可能还存在其他限制,具体取决于源数据的格式。如需了解详情,请参阅以下内容:

  • 您的 Cloud Storage 存储分区必须位于与 BigQuery 中目标数据集的单地区或多地区兼容的单地区或多地区中。这称为“共置”。如需了解详情,请参阅 Cloud Storage 转移作业数据位置

最短间隔时间

  • 系统会立即选择源文件进行转移,源文件没有最短的期限。
  • 周期性转移作业之间的最短间隔时间为 15 分钟。周期性转移作业的默认间隔时间为 24 小时。

所需权限

在将数据加载到 BigQuery 时,您需要拥有相关权限,才能将数据加载到新的或现有的 BigQuery 表和分区中。如果要从 Cloud Storage 加载数据,您还需要拥有对您的数据所在的存储分区的访问权限。确保您拥有以下必要的权限:

  • BigQuery:若要创建预定的转移作业,您必须拥有 bigquery.transfers.update 权限。预定义的 IAM 角色 bigquery.admin 包含 bigquery.transfers.update 权限。如需详细了解 BigQuery Data Transfer Service 中的 IAM 角色,请参阅访问权限控制参考文档
  • Cloud Storage:您必须拥有针对单个存储分区的 storage.objects.get 权限,或更高级别的权限。如果要使用 URI 通配符,您还必须拥有 storage.objects.list 权限。如果您要在每次成功转移后删除源文件,还需要拥有 storage.objects.delete 权限。预定义的 IAM 角色 storage.objectAdmin 具有所有的这些权限。

设置 Cloud Storage 转移作业

要在 BigQuery Data Transfer Service 中创建 Cloud Storage 转移作业,请按下列步骤操作:

控制台

  1. 转到 Cloud Console 中的 BigQuery 页面。

    转到 BigQuery 页面

  2. 点击转移作业

  3. 点击创建

  4. 创建转移作业页面中执行以下操作:

    • 来源类型部分的来源中,选择 Cloud Storage

      转移作业来源

    • 转移配置名称部分的显示名中,输入转移作业的名称,例如 My Transfer。转移作业名称可以是任何容易辨识的值,方便您以后在需要修改该作业时能轻松识别。

      转移作业名称

    • 时间表选项部分的时间表中,保留默认值(立即开始)或点击在设置的时间开始 (Start at a set time)。

      • 重复频率部分,从以下选项中选择转移作业的运行频率。最短间隔时间为 15 分钟。
        • 每日一次(默认值)
        • 每周一次
        • 按月
        • 自定义。对于自定义时间表,请输入自定义频率;例如 every day 00:00。请参阅设置时间表的格式
        • 按需
      • 开始日期和运行时间部分,输入开始转移作业的日期和时间。如果选择立即开始,则此选项会处于停用状态。

        转移作业时间安排

    • 目标设置部分的目标数据集中,选择您创建的用来存储数据的数据集。

      转移作业数据集

    • 数据源详细信息部分,执行以下操作:

      • 目标表部分,输入目标表的名称。目标表必须遵循表命名规则。目标表名称也支持使用参数
      • Cloud Storage URI 部分,输入 Cloud Storage URI。您可以使用通配符参数
      • 写入偏好设置部分,选择:

        • APPEND,以将新数据附加到现有目标表,或
        • MIRROR,以刷新目标表中的数据,从而反映来源中已修改的数据。MIRROR 用于覆盖目标表中数据的新副本。
      • 如果您希望在每次成功转移后删除源文件,请选中在转移作业完成后删除源文件 (Delete source files after transfer) 复选框。系统会尽力运行删除作业。如果首次删除源文件失败,则系统不会重试删除作业。

      • 转移选项部分,执行以下操作:

        • 所有格式下方:
          • 允许的错误数部分,输入 BigQuery 在运行作业时可以忽略的错误记录数上限。如果错误记录数超过此限值,作业结果中将返回一个“无效”错误,并且作业将运行失败。默认值为 0
          • (可选)在小数目标类型 (Decimal target types) 部分,输入一个源小数值可转换为的可能 SQL 数据类型的英文逗号分隔列表。选择用于转换的 SQL 数据类型取决于以下条件:
            • 选择用于转换的数据类型将是以下列表中第一个支持源数据的精度和比例的数据类型(按顺序):NUMERIC、BIGNUMERIC(预览版)和 STRING。
            • 如果列出的数据类型均不支持精度和比例,则系统会选择支持指定列表中最宽泛的范围的数据类型。如果在读取源数据时值超出支持的范围,则会抛出错误。
            • 数据类型 STRING 支持所有精度和比例值。
            • 如果将此字段留空,则对于 ORC,数据类型默认为“NUMERIC,STRING”,对于其他文件格式,数据类型默认为“NUMERIC”。
            • 此字段不能包含重复的数据类型。
            • 系统会忽略您在此字段中列出的数据类型的顺序。
        • “JSON, CSV”下方:
          • 如果您想让转移作业删除不符合目标表架构的数据,请选中忽略未知值复选框。
        • CSV 下方:

          • 字段分隔符部分,输入用于分隔字段的字符。默认值为英文逗号。
          • 对于要跳过的标题行数部分,如果您不想导入源文件中的标题行,请输入相应的标题行数。默认值为 0
          • 对于允许引用的数据中包含换行符部分,如果您想要允许在引用字段中使用换行符,请选中此复选框。
          • 对于允许使用可选列留空的行部分,如果您想要允许转移缺少 NULLABLE 列内容的行,请选中此复选框。

      Cloud Storage 来源详细信息

    • (可选)在通知选项部分,执行以下操作:

      • 点击切换开关以启用电子邮件通知。启用此选项后,转移作业管理员会在转移作业运行失败时收到电子邮件通知。
      • 选择 Pub/Sub 主题部分,选择您的主题名称,或点击创建主题。此选项用于为您的转移作业配置 Pub/Sub 运行通知
  5. 点击保存

bq

输入 bq mk 命令并提供转移作业创建标志 --transfer_config。此外,还必须提供以下标志:

  • --data_source
  • --display_name
  • --target_dataset
  • --params
bq mk \
--transfer_config \
--project_id=project_id \
--data_source=data_source \
--display_name=name \
--target_dataset=dataset \
--params='parameters'

其中:

  • project_id 是项目 ID。如果未提供 --project_id 来指定具体项目,则系统会使用默认项目。
  • data_source 是数据源,即 google_cloud_storage
  • name 是转移作业配置的显示名。转移作业名称可以是任何容易辨识的值,让您以后在需要修改时能够轻松识别。
  • dataset 是转移作业配置的目标数据集。
  • parameters 包含所创建转移作业配置的参数(采用 JSON 格式),例如:--params='{"param":"param_value"}'
    • 对于 Cloud Storage,您必须提供 data_path_templatedestination_table_name_templatefile_format 参数。data_path_template 是包含待转移文件的 Cloud Storage URI,其中可包含一个通配符。destination_table_name_template 是目标表的名称。对于 file_format,请指明要转移的文件类型:CSVJSONAVROPARQUETORC。默认值为 CSV。
    • 对于所有 file_format 值,您可以添加可选参数 max_bad_records。默认值为 0
    • 对于所有 file_format 值,您可以添加可选参数 decimal_target_typesdecimal_target_types 是一个源小数值可转换为的可能 SQL 数据类型的英文逗号分隔列表。如果未提供此字段,则对于 ORC,数据类型默认为“NUMERIC,STRING”,对于其他文件格式,数据类型默认为“NUMERIC”。
    • 对于 file_format 中的 JSON 或 CSV 值,您可以添加可选参数 ignore_unknown_values。如果您没有为 file_format 选择 CSVJSON,此参数会被忽略。
    • 对于 CSV file_format,您可以为用于分隔字段的字符添加可选参数 field_delimiter。默认值为英文逗号。如果您没有为 file_format 选择 CSV,此参数会被忽略。
    • 对于 CSV file_format,您可以添加可选参数 skip_leading_rows,以指明您不想导入的标题行。默认值为 0。如果您没有为 file_format 选择 CSV,此参数会被忽略。
    • 对于 CSV file_format,如果您想要允许在引用字段中使用换行符,则可以添加可选参数 allow_quoted_newlines。如果您没有为 file_format 选择 CSV,此参数会被忽略。
    • 对于 CSV file_format,如果您想要接受末尾处缺少可选列的行,则可以添加可选参数 allow_jagged_rows。系统将填入 NULL 来代替缺少的值。 如果您没有为 file_format 选择 CSV,此参数会被忽略。
    • 每次成功转移后,可选参数 delete_source_files 将删除源文件。(如果首次删除源文件失败,则系统不会重试删除作业。)delete_source_files 的默认值为 false。

例如,以下命令使用 data_path_templategs://mybucket/myfile/*.csv、目标数据集 mydatasetfile_format CSV 创建一个名为 My Transfer 的 Cloud Storage 转移作业。此示例包含与 CSV file_format 关联的可选参数的非默认值。

该转移作业将在默认项目中创建:

bq mk --transfer_config \
--target_dataset=mydataset \
--display_name='My Transfer' \
--params='{"data_path_template":"gs://mybucket/myfile/*.csv",
"destination_table_name_template":"MyTable",
"file_format":"CSV",
"max_bad_records":"1",
"ignore_unknown_values":"true",
"field_delimiter":"|",
"skip_leading_rows":"1",
"allow_quoted_newlines":"true",
"allow_jagged_rows":"false",
"delete_source_files":"true"}' \
--data_source=google_cloud_storage

运行命令后,您会收到类似如下的消息:

[URL omitted] Please copy and paste the above URL into your web browser and follow the instructions to retrieve an authentication code.

请按照说明操作,并将身份验证代码粘贴到命令行中。

API

使用 projects.locations.transferConfigs.create 方法并提供一个 TransferConfig 资源实例。

Java

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.bigquery.datatransfer.v1.CreateTransferConfigRequest;
import com.google.cloud.bigquery.datatransfer.v1.DataTransferServiceClient;
import com.google.cloud.bigquery.datatransfer.v1.ProjectName;
import com.google.cloud.bigquery.datatransfer.v1.TransferConfig;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

// Sample to create google cloud storage transfer config
public class CreateCloudStorageTransfer {

  public static void main(String[] args) throws IOException {
    // TODO(developer): Replace these variables before running the sample.
    final String projectId = "MY_PROJECT_ID";
    String datasetId = "MY_DATASET_ID";
    String tableId = "MY_TABLE_ID";
    // GCS Uri
    String sourceUri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv";
    String fileFormat = "CSV";
    String fieldDelimiter = ",";
    String skipLeadingRows = "1";
    Map<String, Value> params = new HashMap<>();
    params.put(
        "destination_table_name_template", Value.newBuilder().setStringValue(tableId).build());
    params.put("data_path_template", Value.newBuilder().setStringValue(sourceUri).build());
    params.put("write_disposition", Value.newBuilder().setStringValue("APPEND").build());
    params.put("file_format", Value.newBuilder().setStringValue(fileFormat).build());
    params.put("field_delimiter", Value.newBuilder().setStringValue(fieldDelimiter).build());
    params.put("skip_leading_rows", Value.newBuilder().setStringValue(skipLeadingRows).build());
    TransferConfig transferConfig =
        TransferConfig.newBuilder()
            .setDestinationDatasetId(datasetId)
            .setDisplayName("Your Google Cloud Storage Config Name")
            .setDataSourceId("google_cloud_storage")
            .setParams(Struct.newBuilder().putAllFields(params).build())
            .setSchedule("every 24 hours")
            .build();
    createCloudStorageTransfer(projectId, transferConfig);
  }

  public static void createCloudStorageTransfer(String projectId, TransferConfig transferConfig)
      throws IOException {
    try (DataTransferServiceClient client = DataTransferServiceClient.create()) {
      ProjectName parent = ProjectName.of(projectId);
      CreateTransferConfigRequest request =
          CreateTransferConfigRequest.newBuilder()
              .setParent(parent.toString())
              .setTransferConfig(transferConfig)
              .build();
      TransferConfig config = client.createTransferConfig(request);
      System.out.println("Cloud storage transfer created successfully :" + config.getName());
    } catch (ApiException ex) {
      System.out.print("Cloud storage transfer was not created." + ex.toString());
    }
  }
}

手动触发转移作业

除了从 Cloud Storage 自动安排的转移作业外,您还可以手动触发转移作业来加载其他数据文件。

如果转移作业配置会进行运行时参数化,则您需要指定要开始执行其他转移作业的日期范围。

如需手动触发转移作业,请执行以下操作:

控制台

  1. 转到 Cloud Console 中的 BigQuery 页面。

    转到 BigQuery 页面

  2. 点击数据传输

  3. 点击您的转移作业。

  4. 点击立即运行转移作业安排回填(对于运行时参数化转移作业配置)。

  5. 如果适用,请选择开始日期结束日期,然后点击确定进行确认。

    立即运行转移作业

    对于运行时参数化转移作业配置,当您点击安排回填时,您会看到日期选项。

    安排回填

后续步骤