Cloud Storage 转移作业

通过适用于 Cloud Storage 的 BigQuery Data Transfer Service,您可以安排从 Cloud Storage 到 BigQuery 的周期性数据加载作业。

准备工作

创建 Cloud Storage 转移作业之前,请执行以下操作:

限制

从 Cloud Storage 到 BigQuery 的周期性转移作业受到以下限制:

  • 转移作业中所有与通配符或运行时参数定义的模式匹配的文件都必须共用您已为目标表定义的相同架构,否则该转移作业将失败。如果表架构在多次运行期间发生更改,也会导致转移作业失败。
  • 由于可以对 Cloud Storage 对象进行版本控制,因此请务必注意,BigQuery 转移作业不支持已归档的 Cloud Storage 对象。对象必须处于活跃状态才能被转移。
  • 对于正在运行的转移作业,您必须先创建目标表,然后再设置转移作业,这一点与从 Cloud Storage 到 BigQuery 的各个数据加载作业不同。对于 CSV 和 JSON 文件,您还必须提前定义表架构。BigQuery 无法在周期性数据转移过程中创建表。
  • 默认情况下,Cloud Storage 中的转移作业会将写入偏好设置参数设为 APPEND。在此模式下,未修改的文件只能加载到 BigQuery 中一次。如果文件的 last modification time 属性更新,文件将重新加载。
  • 如果 Cloud Storage 文件在转移过程中进行了修改,则 BigQuery Data Transfer Service 不保证转移所有文件或仅转移一次。 将数据从 Cloud Storage 存储桶加载到 BigQuery 时,需要遵循以下限制:

  • 如果您的数据集位置设置为 US 多区域以外的值,则 Cloud Storage 存储桶必须与数据集位于同一单区域中或包含在同一多区域内。

  • BigQuery 不保证外部数据源的数据一致性。在查询运行的过程中,底层数据的更改可能会导致意外行为。

  • BigQuery 不支持 Cloud Storage 对象版本控制。如果您在 Cloud Storage URI 中添加了世代编号,则加载作业将失败。

  • 另外,可能还存在其他限制,具体取决于 Cloud Storage 源数据的格式。有关详情,请参阅:

  • 您的 Cloud Storage 存储桶必须位于与 BigQuery 中目标数据集的单区域或多区域兼容的位置。这称为“共置”。如需了解详情,请参阅 Cloud Storage 转移作业数据位置

最短间隔时间

  • 系统会立即选择源文件进行转移,源文件没有最短的期限。
  • 周期性转移作业之间的最短间隔时间为 15 分钟。周期性转移作业的默认间隔时间为 24 小时。

所需权限

在将数据加载到 BigQuery 时,您需要拥有相关权限,才能将数据加载到新的或现有的 BigQuery 表和分区中。如果要从 Cloud Storage 加载数据,您还需要拥有对您的数据所在的存储桶的访问权限。确保您拥有以下必要的权限:

  • BigQuery:确保转移作业创建的个人或服务账号在 BigQuery 中拥有以下权限:

    • 创建转移作业所需的 bigquery.transfers.update 权限
    • 针对目标数据集的 bigquery.datasets.getbigquery.datasets.update 权限

    预定义的 IAM 角色 bigquery.admin 包含 bigquery.transfers.updatebigquery.datasets.updatebigquery.datasets.get 权限。如需详细了解 BigQuery Data Transfer Service 中的 IAM 角色,请参阅访问权限控制

  • Cloud Storage:您必须拥有针对单个存储桶的 storage.objects.get 权限,或更高级别的权限。如果要使用 URI 通配符,您还必须拥有 storage.objects.list 权限。如果您要在每次成功转移后删除源文件,还需要拥有 storage.objects.delete 权限。预定义的 IAM 角色 storage.objectAdmin 具有所有的这些权限。

设置 Cloud Storage 转移作业

要在 BigQuery Data Transfer Service 中创建 Cloud Storage 转移作业,请按下列步骤操作:

控制台

  1. 转到 Google Cloud 控制台中的 BigQuery 页面。

    转到 BigQuery 页面

  2. 点击 数据转移

  3. 点击 创建转移作业

  4. 来源类型部分的来源中,选择 Google Cloud Storage

    转移作业来源

  5. 转移配置名称部分的显示名中,输入转移作业的名称,例如 My Transfer。转移作业名称可以是任何可让您在需要修改转移作业时识别该转移作业的名称。

    转移作业名称

  6. 时间表选项部分中,执行以下操作:

    • 选择重复频率。如果您选择小时,则还必须指定频率。您还可以选择自定义来指定自定义重复频率。如果您选择按需,则当您手动触发转移作业时,此转移作业会运行。

    • 如果适用,请选择立即开始在设置的时间开始,并提供开始日期和运行时间。

  7. Destination settings 部分的 Destination dataset 中,选择您创建的用来存储数据的数据集。

    转移作业数据集

  8. 数据源详细信息部分,执行以下操作:

    1. 目标表部分,输入目标表的名称。目标表必须遵循表命名规则。目标表名称也支持使用参数
    2. Cloud Storage URI 部分,输入 Cloud Storage URI。您可以使用通配符参数
    3. 写入偏好设置部分,选择:
      • APPEND,以采用增量方式将新数据附加到现有目标表。APPEND写入偏好设置的默认值。
      • MIRROR,以在每次转移作业运行期间覆盖目标表中的数据。

    如需详细了解 BigQuery Data Transfer Service 如何使用 APPENDMIRROR 注入数据,请参阅Cloud Storage 转移作业的数据注入。如需详细了解 writeDisposition 字段,请参阅 JobConfigurationLoad

    1. 如果您希望在每次成功转移后删除源文件,请选中在转移作业完成后删除源文件 (Delete source files after transfer) 复选框。系统会尽力运行删除作业。如果首次删除源文件失败,则系统不会重试删除作业。
    2. 转移选项部分,执行以下操作:

      1. 所有格式下方:
        1. 允许的错误数部分,输入 BigQuery 在运行作业时可以忽略的错误记录数上限。如果错误记录数超过此限值,作业结果中将返回一个“无效”错误,并且作业将运行失败。默认值为 0
        2. (可选)在小数目标类型 (Decimal target types) 部分,输入一个源小数值可转换为的可能 SQL 数据类型的英文逗号分隔列表。选择用于转换的 SQL 数据类型取决于以下条件:
          • 选择用于转换的数据类型将是以下列表中第一个支持源数据的精度和比例的数据类型(按顺序):NUMERIC、BIGNUMERIC 和 STRING。
          • 如果列出的数据类型均不支持精度和比例,则系统会选择支持指定列表中最宽泛的范围的数据类型。如果在读取源数据时值超出支持的范围,则会抛出错误。
          • 数据类型 STRING 支持所有精度和比例值。
          • 如果将此字段留空,则对于 ORC,数据类型默认为“NUMERIC,STRING”,对于其他文件格式,数据类型默认为“NUMERIC”。
          • 此字段不能包含重复的数据类型。
          • 系统会忽略您在此字段中列出的数据类型的顺序。
      2. “JSON, CSV”下方:
        • 如果您想让转移作业删除不符合目标表架构的数据,请选中忽略未知值复选框。
      3. AVRO 下:
        • 如果您想让转移作业将 Avro 逻辑类型转换为相应的 BigQuery 数据类型,请选中使用 Avro 逻辑类型复选框。系统默认会忽略大多数类型的 logicalType 特性,并改为使用基础 Avro 类型。
      4. CSV 下方:

        1. 字段分隔符部分,输入用于分隔字段的字符。默认值为英文逗号。
        2. 引号字符部分,输入用于括起 CSV 文件中数据部分的字符。默认值为英文双引号 (")。
        3. 对于要跳过的标题行数部分,如果您不想导入源文件中的标题行,请输入相应的标题行数。默认值为 0
        4. 对于允许引用的数据中包含换行符部分,如果您想要允许在引用字段中使用换行符,请选中此复选框。
        5. 对于允许使用可选列留空的行部分,如果您想要允许转移缺少 NULLABLE 列内容的行,请选中此复选框。
  9. 服务账号菜单中,从与您的 Google Cloud 项目关联的服务账号中选择一个服务账号。您可以将服务账号与转移作业相关联,而不是使用用户凭据。如需详细了解如何将服务账号用于数据转移,请参阅使用服务账号

    • 如果您使用联合身份登录,则需要有服务账号才能创建转移作业。如果您使用 Google 账号登录,则转移作业的服务账号是可选的。
    • 该服务账号必须具有 BigQuery 和 Cloud Storage 的必需权限
  10. 可选:在通知选项部分中,执行以下操作:

    1. 点击切换开关以启用电子邮件通知。启用此选项后,转移作业配置的所有者会在转移作业运行失败时收到电子邮件通知。
    2. 选择 Pub/Sub 主题部分,选择您的主题名称,或点击创建主题。此选项用于为您的转移作业配置 Pub/Sub 运行通知
  11. 可选:在高级选项部分:

    • 如果您使用 CMEK,请选择客户管理的密钥。系统会显示一系列可用的 CMEK 供您选择。

    如需了解 CMEK 如何与 BigQuery Data Transfer Service 搭配使用,请参阅使用转移作业指定加密密钥

  12. 点击保存

bq

输入 bq mk 命令并提供转移作业创建标志 --transfer_config。此外,还必须提供以下标志:

  • --data_source
  • --display_name
  • --target_dataset
  • --params

可选标志:

  • --destination_kms_key:如果您将客户管理的加密密钥 (CMEK) 用于此转移作业,请指定 Cloud KMS 密钥的密钥资源 ID。如需了解 CMEK 如何与 BigQuery Data Transfer Service 搭配使用,请参阅使用转移作业指定加密密钥
  • --service_account_name:指定用于 Cloud Storage 转移作业身份验证的服务账号,而不是您的用户账号。
bq mk \
--transfer_config \
--project_id=PROJECT_ID \
--data_source=DATA_SOURCE \
--display_name=NAME \
--target_dataset=DATASET \
--destination_kms_key="DESTINATION_KEY" \
--params='PARAMETERS' \
--service_account_name=SERVICE_ACCOUNT_NAME

其中:

  • PROJECT_ID 是您的项目 ID。如果未提供 --project_id 来指定具体项目,则系统会使用默认项目。
  • DATA_SOURCE 是数据源,例如 google_cloud_storage
  • NAME 是转移作业配置的显示名。转移作业名称可以是任何可让您在需要修改转移作业时识别该转移作业的名称。
  • DATASET 是转移作业配置的目标数据集。
  • DESTINATION_KEYCloud KMS 密钥资源 ID,例如 projects/project_name/locations/us/keyRings/key_ring_name/cryptoKeys/key_name
  • PARAMETERS 包含所创建转移作业配置的参数(采用 JSON 格式),例如:--params='{"param":"param_value"}'
    • destination_table_name_template:目标 BigQuery 表的名称。
    • data_path_template:包含待转移文件的 Cloud Storage URI,其中可包含一个通配符。
    • write_disposition:确定匹配的文件是附加到目标表,还是完全镜像。支持的值为 APPENDMIRROR。如需了解 BigQuery Data Transfer Service 如何在 Cloud Storage 转移作业中附加或镜像数据,请参阅 Cloud Storage 转移作业的数据注入
    • file_format:要转移的文件的格式。格式可以是 CSVJSONAVROPARQUETORC。默认值为 CSV
    • max_bad_records:对于任何 file_format 值,可以忽略的错误记录数上限。默认值为 0
    • decimal_target_types:对于任何 file_format 值,源小数值可转换为的可能 SQL 数据类型的英文逗号分隔列表。如果未提供此字段,对于 ORC,数据类型默认为 "NUMERIC,STRING",对于其他文件格式,数据类型默认为 "NUMERIC"
    • ignore_unknown_values:对于任何 file_format 值,设置为 TRUE,以接受包含与架构不匹配的值的行。如需了解详情,请参阅 JobConfigurationLoad 参考表中的 ignoreUnknownvalues 字段详细信息。
    • use_avro_logical_types:对于 AVRO file_format 值,设置为 TRUE,以将逻辑类型解释为其对应的类型(例如 TIMESTAMP),而不仅仅是使用其原始类型(例如 INTEGER)。
    • parquet_enum_as_string:对于 PARQUET file_format 值,设置为 TRUE 可将 PARQUET ENUM 逻辑类型推断为 STRING 而不是默认值 BYTES
    • parquet_enable_list_inference:对于 PARQUET file_format 值,请设置为 TRUE 以专门将架构推断用于 PARQUET LIST 逻辑类型。
    • reference_file_schema_uri:具有读取器架构的参考文件的 URI 路径。
    • field_delimiter:对于 CSV file_format 值,用于分隔字段的字符。默认值为英文逗号。
    • quote:对于 CSV file_format 值,此标志表示用于括起 CSV 文件中数据部分的字符。默认值为英文双引号 (")。
    • skip_leading_rows:对于 CSV file_format 值,请指明您不想导入的前导标题行数。默认值为 0。
    • allow_quoted_newlines:对于 CSV file_format 值,设置为 TRUE 以允许引用字段中使用换行符。
    • allow_jagged_rows:对于 CSV file_format 值,请设置为 TRUE 以接受末尾处缺少可选列的行。缺少的值会填入 NULL
    • preserve_ascii_control_characters:对于 CSV file_format 值,设置为 TRUE 以保留任何嵌入的 ASCII 控制字符。
    • encoding:指定 CSV 编码类型。支持的值包括 UTF8ISO_8859_1UTF16BEUTF16LEUTF32BEUTF32LE
    • delete_source_files:设置为 TRUE,以在每次成功转移后删除源文件。如果首次尝试删除源文件失败,则删除作业不会重新运行。默认值为 FALSE
  • SERVICE_ACCOUNT_NAME 是用于对转移作业进行身份验证的服务账号名称。该服务账号应属于用于创建转移作业的同一 project_id,并且应具有所有所需的权限

例如,以下命令使用 data_path_templategs://mybucket/myfile/*.csv、目标数据集 mydatasetfile_format CSV 创建一个名为 My Transfer 的 Cloud Storage 转移作业。此示例包含与 CSV file_format 关联的可选参数的非默认值。

该转移作业将在默认项目中创建:

bq mk --transfer_config \
--target_dataset=mydataset \
--display_name='My Transfer' \
--params='{"data_path_template":"gs://mybucket/myfile/*.csv",
"destination_table_name_template":"MyTable",
"file_format":"CSV",
"max_bad_records":"1",
"ignore_unknown_values":"true",
"field_delimiter":"|",
"quote":";",
"skip_leading_rows":"1",
"allow_quoted_newlines":"true",
"allow_jagged_rows":"false",
"delete_source_files":"true"}' \
--data_source=google_cloud_storage

运行命令后,您会收到类似如下的消息:

[URL omitted] Please copy and paste the above URL into your web browser and follow the instructions to retrieve an authentication code.

请按照说明操作,并将身份验证代码粘贴到命令行中。

API

使用 projects.locations.transferConfigs.create 方法并提供一个 TransferConfig 资源实例。

Java

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 BigQuery Java API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.bigquery.datatransfer.v1.CreateTransferConfigRequest;
import com.google.cloud.bigquery.datatransfer.v1.DataTransferServiceClient;
import com.google.cloud.bigquery.datatransfer.v1.ProjectName;
import com.google.cloud.bigquery.datatransfer.v1.TransferConfig;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

// Sample to create google cloud storage transfer config
public class CreateCloudStorageTransfer {

  public static void main(String[] args) throws IOException {
    // TODO(developer): Replace these variables before running the sample.
    final String projectId = "MY_PROJECT_ID";
    String datasetId = "MY_DATASET_ID";
    String tableId = "MY_TABLE_ID";
    // GCS Uri
    String sourceUri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv";
    String fileFormat = "CSV";
    String fieldDelimiter = ",";
    String skipLeadingRows = "1";
    Map<String, Value> params = new HashMap<>();
    params.put(
        "destination_table_name_template", Value.newBuilder().setStringValue(tableId).build());
    params.put("data_path_template", Value.newBuilder().setStringValue(sourceUri).build());
    params.put("write_disposition", Value.newBuilder().setStringValue("APPEND").build());
    params.put("file_format", Value.newBuilder().setStringValue(fileFormat).build());
    params.put("field_delimiter", Value.newBuilder().setStringValue(fieldDelimiter).build());
    params.put("skip_leading_rows", Value.newBuilder().setStringValue(skipLeadingRows).build());
    TransferConfig transferConfig =
        TransferConfig.newBuilder()
            .setDestinationDatasetId(datasetId)
            .setDisplayName("Your Google Cloud Storage Config Name")
            .setDataSourceId("google_cloud_storage")
            .setParams(Struct.newBuilder().putAllFields(params).build())
            .setSchedule("every 24 hours")
            .build();
    createCloudStorageTransfer(projectId, transferConfig);
  }

  public static void createCloudStorageTransfer(String projectId, TransferConfig transferConfig)
      throws IOException {
    try (DataTransferServiceClient client = DataTransferServiceClient.create()) {
      ProjectName parent = ProjectName.of(projectId);
      CreateTransferConfigRequest request =
          CreateTransferConfigRequest.newBuilder()
              .setParent(parent.toString())
              .setTransferConfig(transferConfig)
              .build();
      TransferConfig config = client.createTransferConfig(request);
      System.out.println("Cloud storage transfer created successfully :" + config.getName());
    } catch (ApiException ex) {
      System.out.print("Cloud storage transfer was not created." + ex.toString());
    }
  }
}

使用转移作业指定加密密钥

您可以指定客户管理的加密密钥 (CMEK) 来加密转移作业运行的数据。您可以使用 CMEK 支持来自 Cloud Storage 的转移作业。

当您使用转移作业指定 CMEK 时,BigQuery Data Transfer Service 会将 CMEK 应用于所注入数据的任何中间磁盘缓存,从而使整个数据转移工作流符合 CMEK 的规定。

如果最初不是使用 CMEK 创建转移作业,则无法更新现有转移作业来添加 CMEK。例如,您无法将最初默认加密的目标表更改为现在使用 CMEK 加密。反之,您也无法更改 CMEK 加密的目标表,使其具有不同类型的加密。

如果转移配置最初是使用 CMEK 加密创建的,则可以为转移作业更新 CMEK。当您为转移配置更新 CMEK 时,BigQuery Data Transfer Service 会在下次运行转移作业时将 CMEK 传播到目标表,其中 BigQuery Data Transfer Service 会在转移作业运行期间将所有过时的 CMEK 替换为新的 CMEK。如需了解详情,请参阅更新转移作业

您还可以使用项目默认密钥。当您使用转移作业指定项目默认密钥时,BigQuery Data Transfer Service 会将项目默认密钥用作任何新转移配置的默认密钥。

手动触发转移作业

除了从 Cloud Storage 自动安排的转移作业外,您还可以手动触发转移作业来加载其他数据文件。

如果转移作业配置会进行运行时参数化,则您需要指定要开始执行其他转移作业的日期范围。

如需触发转移作业,请执行以下操作:

控制台

  1. 转到 Google Cloud 控制台中的 BigQuery 页面。

    转到 BigQuery 页面

  2. 点击 数据转移

  3. 从列表中选择您的转移作业。

  4. 点击立即运行转移作业安排回填(对于运行时参数化转移作业配置)。

    • 如果您点击了立即运行转移作业,请选择运行一次性转移作业在特定日期运行(如适用)。如果您选择了在特定日期运行,请选择特定日期和时间:

      立即运行转移作业

    • 如果您点击了安排回填,请选择运行一次性转移作业在某个日期范围运行(如适用)。如果您选择了在某个日期范围运行,请选择开始和结束日期及时间:

      安排回填

  5. 点击确定

bq

输入 bq mk 命令并提供 --transfer_run 标志。您可以使用 --run_time 标志或是 --start_time--end_time 标志。

bq mk \
--transfer_run \
--start_time='START_TIME' \
--end_time='END_TIME' \
RESOURCE_NAME
bq mk \
--transfer_run \
--run_time='RUN_TIME' \
RESOURCE_NAME

其中:

  • START_TIMEEND_TIME 是以 Z 结尾或包含有效时区偏移量的时间戳。例如:

    • 2017-08-19T12:11:35.00Z
    • 2017-05-25T00:00:00+00:00
  • RUN_TIME 是时间戳,用于指定安排数据转移运行的时间。如果要为当前时间运行一次性转移作业,您可以使用 --run_time 标志。

  • RESOURCE_NAME 是转移作业的资源名称(也称为转移作业配置),例如 projects/myproject/locations/us/transferConfigs/1234a123-1234-1a23-1be9-12ab3c456de7。如果您不知道转移作业的资源名称,请运行 bq ls --transfer_config --transfer_location=LOCATION 命令以查找资源名称。

API

使用 projects.locations.transferConfigs.startManualRuns 方法并通过 parent 参数提供转移作业配置资源。

后续步骤