Amazon S3 转移作业

借助适用于 Amazon S3 的 BigQuery Data Transfer Service,您可以自动安排和管理从 Amazon S3 到 BigQuery 的周期性加载作业。

准备工作

在创建 Amazon S3 转移作业之前,请执行以下操作:

限制

Amazon S3 转移作业存在以下限制:

  • 目前,无法参数化 Amazon S3 URI 的存储分区部分。
  • 以 Amazon S3 为来源的转移作业始终会通过 WRITE_APPEND 偏好设置触发,该偏好设置会将数据附加到目标表中。如需了解详情,请参阅加载作业配置中的 configuration.load.writeDisposition
  • 可能还存在其他限制,具体取决于 Amazon S3 源数据的格式。如需了解详情,请参阅:

所需权限

创建 Amazon S3 转移作业之前,请执行以下操作:

  • 确保创建转移作业的人员在 BigQuery 中拥有以下所需权限:

    • 创建转移作业所需的 bigquery.transfers.update 权限
    • 针对目标数据集的 bigquery.datasets.getbigquery.datasets.update 权限

    预定义的 IAM 角色 bigquery.admin 包含 bigquery.transfers.updatebigquery.datasets.updatebigquery.datasets.get 权限。如需详细了解 BigQuery Data Transfer Service 中的 IAM 角色,请参阅访问权限控制参考文档

  • 请参阅 Amazon S3 的相关文档,以确保您已配置启用转移作业所需的所有权限。Amazon S3 源数据必须至少包含对其应用的 AWS 托管政策 AmazonS3ReadOnlyAccess

设置 Amazon S3 数据转移作业

要创建 Amazon S3 数据转移作业,请执行以下操作:

控制台

  1. 转到 Cloud Console 中的 BigQuery 页面。

    转到 BigQuery 页面

  2. 点击转移作业

  3. 点击创建转移作业

  4. 创建转移作业页面上:

    • 来源类型部分的来源中,选择 Amazon S3

      转移作业来源

    • 转移配置名称部分的显示名中,输入转移作业的名称,例如 My Transfer。转移作业名称可以是任何容易辨识的值,方便您以后在需要修改该作业时能轻松识别。

      转移作业名称

    • 时间表选项部分的时间表中,保留默认值(立即开始)或点击在设置的时间开始 (Start at a set time)。

      • 重复频率部分,从以下选项中选择转移作业的运行频率。 选项包括:

        • 每日一次(默认值)
        • 每周一次
        • 每月一次
        • 自定义
        • 按需

        如果您选择除“每日一次”以外的选项,则系统还会提供其他选项。例如,如果您选择“每周一次”,则系统会显示一个选项,供您选择星期几。

      • 开始日期和运行时间部分,输入转移作业的开始日期和时间。如果您选择的是立即开始,则此选项会处于停用状态。

        转移作业时间表

    • 目标设置部分的目标数据集中,选择您创建的用来存储数据的数据集。

      转移作业数据集

    • 数据源详细信息部分,执行以下操作:

      • Destination table 部分,输入您创建用来在 BigQuery 中存储数据的表的名称。目标表名称支持使用参数
      • Amazon S3 URI 部分,按照以下格式输入 URI:s3://mybucket/myfolder/...。URI 也支持使用参数
      • Access key ID 部分,输入访问密钥 ID。
      • 私有访问密钥 部分,输入私有访问密钥。
      • 文件格式部分,选择数据格式:JSON(以换行符分隔)、CSV、Avro、Parquet 或 ORC。

        S3 来源详细信息

    • 转移作业 - 所有格式部分中:

      • 允许的错误数部分,输入一个代表可以忽略的错误记录数上限的整数值。
      • (可选)在小数目标类型 (Decimal target types) 部分,输入一个源小数值可转换为的可能 SQL 数据类型的英文逗号分隔列表。选择用于转换的 SQL 数据类型取决于以下条件:
        • 选择用于转换的数据类型将是以下列表中第一个支持源数据的精度和比例的数据类型(按顺序):NUMERIC、BIGNUMERIC 和 STRING。
        • 如果列出的数据类型均不支持精度和比例,则系统会选择支持指定列表中最宽泛的范围的数据类型。如果在读取源数据时值超出支持的范围,则会抛出错误。
        • 数据类型 STRING 支持所有精度和比例值。
        • 如果将此字段留空,则对于 ORC,数据类型默认为“NUMERIC,STRING”,对于其他文件格式,数据类型默认为“NUMERIC”。
        • 此字段不能包含重复的数据类型。
        • 系统会忽略您在此字段中列出的数据类型的顺序。

      转移选项所有格式

    • 如果您选择了 CSV 或 JSON 作为文件格式,请在 JSON,CSV 部分中,选中忽略未知值以接受包含与架构不匹配的值所在的行。未知值会被忽略。对于 CSV 文件,此选项会忽略行尾的额外值。

      忽略未知值

    • 如果您选择了 CSV 作为文件格式,请在 CSV 部分中输入用于加载数据的任何其他 CSV 选项

      CSV 选项

    • (可选)在通知选项部分,执行以下操作:

      • 点击切换开关以启用电子邮件通知。启用此选项后,转移作业管理员会在转移作业运行失败时收到电子邮件通知。
      • 选择 Pub/Sub 主题部分,选择您的主题名称,或点击创建主题来创建一个主题。此选项用于为您的转移作业配置 Pub/Sub 运行通知
  5. 点击保存

bq

输入 bq mk 命令并提供转移作业创建标志 --transfer_config

bq mk \
--transfer_config \
--project_id=project_id \
--data_source=data_source \
--display_name=name \
--target_dataset=dataset \
--params='parameters'

其中:

  • project_id:可选。您的 Google Cloud 项目 ID。如果未提供 --project_id 来指定具体项目,则系统会使用默认项目。
  • data_source:必需。数据源 - amazon_s3
  • display_name:必需。此标志表示转移作业配置的显示名。转移作业名称可以是任何容易辨识的值,让您以后在需要修改时能够轻松识别。
  • dataset:必需。转移作业配置的目标数据集。
  • parameters:必需。所创建转移作业配置的参数(采用 JSON 格式)。例如 --params='{"param":"param_value"}'。以下是 Amazon S3 转移作业的参数:

    • destination_table_name_template:必需。目标表的名称。
    • data_path:必需。Amazon S3 URI,格式如下:

      s3://mybucket/myfolder/...

      URI 也支持使用参数

    • access_key_id:必需。您的访问密钥 ID。

    • secret_access_key:必需。您的私有访问密钥。

    • file_format:可选。表示要转移的文件类型:CSVJSONAVROPARQUETORC。默认值为 CSV

    • max_bad_records:可选。允许的错误记录数。默认值为 0

    • decimal_target_types:可选。一个源小数值可转换为的可能 SQL 数据类型的英文逗号分隔列表。如果未提供此字段,则对于 ORC,数据类型默认为“NUMERIC,STRING”,对于其他文件格式,数据类型默认为“NUMERIC”。

    • ignore_unknown_values:可选;如果 file_format 不是 JSONCSV,则此参数会被忽略。是否忽略数据中的未知值。

    • field_delimiter:可选;仅在 file_formatCSV 时适用。用于分隔字段的字符。默认值为英文逗号。

    • skip_leading_rows:可选;仅在 file_formatCSV 时适用。表示您不想导入的标题行数。默认值为 0

    • allow_quoted_newlines:可选;仅在 file_formatCSV 时适用。表示是否允许在引用字段中使用换行符。

    • allow_jagged_rows:可选;仅在 file_formatCSV 时适用。表示是否接受末尾处缺少可选列的行。系统将填入 NULL 来代替缺少的值。

例如,以下命令使用 data_path_templates3://mybucket/myfile/*.csv、目标数据集 mydatasetfile_format CSV 创建名为 My Transfer 的 Amazon S3 转移作业。此示例包含与 CSV file_format 关联的可选参数的非默认值。

该转移作业将在默认项目中创建:

bq mk --transfer_config \
--target_dataset=mydataset \
--display_name='My Transfer' \
--params='{"data_path_template":"s3://mybucket/myfile/*.csv",
"destination_table_name_template":"MyTable",
"file_format":"CSV",
"max_bad_records":"1",
"ignore_unknown_values":"true",
"field_delimiter":"|",
"skip_leading_rows":"1",
"allow_quoted_newlines":"true",
"allow_jagged_rows":"false",
"delete_source_files":"true"}' \
--data_source=amazon_s3

运行命令后,您会收到类似如下的消息:

[URL omitted] Please copy and paste the above URL into your web browser and follow the instructions to retrieve an authentication code.

请按照说明操作,并将身份验证代码粘贴到命令行中。

API

使用 projects.locations.transferConfigs.create 方法并提供一个 TransferConfig 资源实例。

Java

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.bigquery.datatransfer.v1.CreateTransferConfigRequest;
import com.google.cloud.bigquery.datatransfer.v1.DataTransferServiceClient;
import com.google.cloud.bigquery.datatransfer.v1.ProjectName;
import com.google.cloud.bigquery.datatransfer.v1.TransferConfig;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

// Sample to create amazon s3 transfer config.
public class CreateAmazonS3Transfer {

  public static void main(String[] args) throws IOException {
    // TODO(developer): Replace these variables before running the sample.
    final String projectId = "MY_PROJECT_ID";
    String datasetId = "MY_DATASET_ID";
    String tableId = "MY_TABLE_ID";
    // Amazon S3 Bucket Uri with read role permission
    String sourceUri = "s3://your-bucket-name/*";
    String awsAccessKeyId = "MY_AWS_ACCESS_KEY_ID";
    String awsSecretAccessId = "AWS_SECRET_ACCESS_ID";
    String sourceFormat = "CSV";
    String fieldDelimiter = ",";
    String skipLeadingRows = "1";
    Map<String, Value> params = new HashMap<>();
    params.put(
        "destination_table_name_template", Value.newBuilder().setStringValue(tableId).build());
    params.put("data_path", Value.newBuilder().setStringValue(sourceUri).build());
    params.put("access_key_id", Value.newBuilder().setStringValue(awsAccessKeyId).build());
    params.put("secret_access_key", Value.newBuilder().setStringValue(awsSecretAccessId).build());
    params.put("source_format", Value.newBuilder().setStringValue(sourceFormat).build());
    params.put("field_delimiter", Value.newBuilder().setStringValue(fieldDelimiter).build());
    params.put("skip_leading_rows", Value.newBuilder().setStringValue(skipLeadingRows).build());
    TransferConfig transferConfig =
        TransferConfig.newBuilder()
            .setDestinationDatasetId(datasetId)
            .setDisplayName("Your Aws S3 Config Name")
            .setDataSourceId("amazon_s3")
            .setParams(Struct.newBuilder().putAllFields(params).build())
            .setSchedule("every 24 hours")
            .build();
    createAmazonS3Transfer(projectId, transferConfig);
  }

  public static void createAmazonS3Transfer(String projectId, TransferConfig transferConfig)
      throws IOException {
    try (DataTransferServiceClient client = DataTransferServiceClient.create()) {
      ProjectName parent = ProjectName.of(projectId);
      CreateTransferConfigRequest request =
          CreateTransferConfigRequest.newBuilder()
              .setParent(parent.toString())
              .setTransferConfig(transferConfig)
              .build();
      TransferConfig config = client.createTransferConfig(request);
      System.out.println("Amazon s3 transfer created successfully :" + config.getName());
    } catch (ApiException ex) {
      System.out.print("Amazon s3 transfer was not created." + ex.toString());
    }
  }
}

查询数据

当数据转移到 BigQuery 时,数据会写入按提取时间分区的表。如需了解详情,请参阅分区表简介

如果您要直接查询表,而不是使用自动生成的视图,那么必须在查询中使用 _PARTITIONTIME 伪列。如需了解详情,请参阅查询分区表

前缀匹配与通配符匹配的影响

Amazon S3 API 支持前缀匹配,但不支持通配符匹配。与某个前缀匹配的所有 Amazon S3 文件都将转移到 Google Cloud 中。但是,只有与转移作业配置中的 Amazon S3 URI 匹配的文件才会实际加载到 BigQuery 中。这可能会导致一些未加载到 BigQuery 中的转移文件产生额外的 Amazon S3 出站流量费用。

例如,假设有以下数据路径:

s3://bucket/folder/*/subfolder/*.csv

并且源位置有以下文件:

s3://bucket/folder/any/subfolder/file1.csv
s3://bucket/folder/file2.csv

这将导致前缀为 s3://bucket/folder/ 的所有 Amazon S3 文件都会转移到 Google Cloud。在本示例中,file1.csvfile2.csv 都将被转移。

但是,只有与 s3://bucket/folder/*/subfolder/*.csv 匹配的文件才会实际加载到 BigQuery 中。在本示例中,只有 file1.csv 才会加载到 BigQuery 中。

问题排查

下面介绍了一些常见的错误和推荐的解决方法。

Amazon S3 PERMISSION_DENIED 错误

错误 推荐执行的操作
您提供的 AWS 访问密钥 ID 不存在于我们的记录中。 请确保存在访问密钥并且其 ID 正确无误。
我们计算出的请求签名与您提供的签名不匹配。请检查您的密钥和签名方法。 请确保转移作业配置具有正确的对应私有访问密钥
无法获取源 S3 存储分区的位置。其他详细信息:访问被拒绝

无法获取源 S3 存储分区的位置。其他详细信息:HTTP/1.1 403 禁止

S3 错误消息:访问被拒绝
请确保 AWS IAM 用户有权执行以下操作:
  • 列出 Amazon S3 存储分区。
  • 获取存储分区的位置。
  • 读取存储分区中的对象。
服务器无法初始化对象上传操作。InvalidObjectState:该操作对于对象的存储类别无效

无法获取源 S3 存储分区的位置。其他详细信息:已停用对此对象的所有访问权限
恢复归档到 Amazon Glacier 的任意对象。在恢复归档到 Amazon Glacier 的 Amazon S3 对象之前,您无法访问这些对象
已停用对此对象的所有访问权限 请确认转移作业配置中的 Amazon S3 URI 正确无误

Amazon S3 转移作业限制错误

错误 推荐执行的操作
转移作业中的文件数量超过了上限 (10000)。 请考虑是否可以将 Amazon S3 URI 中的通配符数量减少到一个。如果可以,请使用新的转移作业配置重试,因为每次转移作业运行的最大文件数量将增加

请考虑是否可以将转移作业配置拆分为多个转移作业配置,每个转移作业配置转移一部分源数据。
转移作业中的文件大小超过了上限(16492674416640 字节)。 请考虑是否可以将转移作业配置拆分为多个转移作业配置,每个转移作业配置转移一部分源数据。

常规问题

错误 推荐执行的操作
系统从 Amazon S3 转移了文件,但未将其加载到 BigQuery 中。转移作业日志可能类似如下所示:

Moving data from Amazon S3 to Google Cloud complete: Moved <NNN> object(s).
No new files found matching <Amazon S3 URI>.
请确认转移作业配置中的 Amazon S3 URI 正确无误。

如果转移作业配置旨在加载具有某个公共前缀的所有文件,请确保 Amazon S3 URI 以通配符结尾。
例如,如需加载 s3://my-bucket/my-folder/ 中的所有文件,转移作业配置中的 Amazon S3 URI 必须为 s3://my-bucket/my-folder/*,而不只是 s3://my-bucket/my-folder/
其他问题 请参阅排查转移作业配置问题

后续步骤