Blob Storage 转移

借助适用于 Azure Blob Storage 连接器的 BigQuery Data Transfer Service,您可以自动安排和管理从 Blob Storage 到 BigQuery 的周期性加载作业。

准备工作

创建 Blob Storage 数据转移作业之前,请执行以下操作:

所需权限

如需创建 Blob Storage 数据转移作业,您需要 bigquery.transfers.update Identity and Access Management (IAM) 权限。您还需要目标数据集的 bigquery.datasets.getbigquery.datasets.update 权限。

预定义的 bigquery.admin IAM 角色包含创建 Blob Storage 数据转移作业所需的权限。

如需详细了解 BigQuery IAM,请参阅使用 IAM 进行访问权限控制

如需确认您在 Blob Storage 中拥有启用数据转移作业的正确权限,请参阅共享访问签名 (SAS)

如果要设置 Pub/Sub 的转移作业运行通知,必须拥有 pubsub.topics.setIamPolicy 权限。只有电子邮件通知不需要 Pub/Sub 权限。如需了解详情,请参阅 BigQuery Data Transfer Service 运行通知

限制

Blob Storage 数据转移存在以下限制:

设置 Blob Storage 数据转移作业

从下列选项中选择一项:

控制台

  1. 前往 Google Cloud 控制台中的“数据转移”页面。

    转到“数据传输”

  2. 点击 创建转移作业

  3. 创建转移作业页面上,执行以下操作:

    • 来源类型部分的来源中,选择 Azure Blob Storage

      转移作业来源类型

    • 转移配置名称部分的显示名称中,输入数据转移作业的名称。

    • 时间表选项部分中,执行以下操作:

      • 选择重复频率。如果您选择小时,则还必须指定频率。您还可以选择自定义来指定自定义重复频率。如果您选择按需,则当您手动触发转移作业时,此数据转移作业会运行。

      • 如果适用,请选择立即开始在设置的时间开始,并提供开始日期和运行时间。

    • 目标设置部分的数据集中,选择您创建的用来存储数据的数据集。

    • 数据源详细信息部分中,执行以下操作:

      • 目标表中,输入您创建用来在 BigQuery 中存储数据的表的名称。目标表名称支持使用参数
      • Azure Storage 账号名称中,输入 Blob Storage 账号名称。
      • 容器名称中,输入 Blob Storage 容器名称。
      • 数据路径中,输入用于过滤要转移的文件的路径。请参阅示例
      • SAS 令牌中,输入 Azure SAS 令牌。
      • 文件格式中,选择源数据格式。
      • 对于写入处置方式,选择 WRITE_APPEND 以将新数据递增附加到目标表,或选择 WRITE_TRUNCATE 以在每次转移作业运行期间覆盖目标表中的数据。WRITE_APPEND写入处置方式的默认值。

      如需详细了解 BigQuery Data Transfer Service 如何使用 WRITE_APPENDWRITE_TRUNCATE 注入数据,请参阅 Azure Blob 转移作业的数据注入。如需详细了解 writeDisposition 字段,请参阅 JobConfigurationLoad

      数据源详细信息

    • 转移选项部分中,执行以下操作:

      • 允许的错误数部分中,输入一个代表可以忽略的错误记录数上限的整数值。默认值为 0。
      • (可选)在小数目标类型中,输入源数据中十进制值转换为的可能 SQL 数据类型的英文逗号分隔列表。选择用于转换的 SQL 数据类型取决于以下条件:
        • 按照 NUMERICBIGNUMERICSTRING 的顺序,如果类型在指定的列表中并且支持精度和比例,则系统会选择该类型。
        • 如果列出的数据类型均不支持精度和比例,则系统会选择支持指定列表中最宽泛的范围的数据类型。如果在读取源数据时值超出支持的范围,则将抛出错误。
        • 数据类型 STRING 支持所有精度和比例值。
        • 如果将此字段留空,则对于 ORC,数据类型默认为 NUMERIC,STRING,对于其他文件格式,数据类型默认为 NUMERIC
        • 此字段不能包含重复的数据类型。
        • 系统会忽略您列出的数据类型的顺序。
    • 如果您选择了 CSV 或 JSON 作为文件格式,请在 JSON、CSV 部分中,选中忽略未知值以接受与架构不匹配的值所在的行。

    • 如果您选择了 CSV 作为文件格式,请在 CSV 部分中,输入用于加载数据的任何其他 CSV 选项

      CSV 选项

    • 通知选项部分中,您可以选择启用电子邮件通知和 Pub/Sub 通知。

      • 启用电子邮件通知后,转移作业管理员会在转移作业运行失败时收到电子邮件通知。
      • 启用 Pub/Sub 通知后,请选择要发布到的主题名称,或点击创建主题以创建一个。
    • 如果您使用 CMEK,请在高级选项部分中选择客户管理的密钥。系统会显示一系列可用的 CMEK 供您选择。如需了解 CMEK 如何与 BigQuery Data Transfer Service 搭配使用,请参阅使用转移作业指定加密密钥

  4. 点击保存

bq

使用 bq mk --transfer_config 命令创建 Blob Storage 转移作业:

bq mk \
  --transfer_config \
  --project_id=PROJECT_ID \
  --data_source=DATA_SOURCE \
  --display_name=DISPLAY_NAME \
  --target_dataset=DATASET \
  --destination_kms_key=DESTINATION_KEY \
  --params=PARAMETERS

请替换以下内容:

  • PROJECT_ID:(可选)包含目标数据集的项目 ID。如果未指定,则使用默认项目。
  • DATA_SOURCEazure_blob_storage
  • DISPLAY_NAME:数据转移作业配置的显示名称。转移作业名称可以是任何可让您在需要修改转移作业时识别该转移作业的名称。
  • DATASET:数据转移作业配置的目标数据集。
  • DESTINATION_KEY:(可选)Cloud KMS 密钥资源 ID,例如 projects/project_name/locations/us/keyRings/key_ring_name/cryptoKeys/key_name
  • PARAMETERS:数据转移作业配置的参数,以 JSON 格式列出。例如 --params={"param1":"value1", "param2":"value2"}。以下是 Blob Storage 数据转移作业的参数:
    • destination_table_name_template:必填。目标表的名称。
    • storage_account:必填。Blob Storage 账号名称。
    • container:必填。Blob Storage 容器名称。
    • data_path:可选。用于过滤要转移的文件的路径。请参阅示例
    • sas_token:必填。Azure SAS 令牌。
    • file_format:可选。要转移的文件类型:CSVJSONAVROPARQUETORC。默认值为 CSV
    • write_disposition:可选。选择 WRITE_APPEND 可将数据附加到目标表,或选择 WRITE_TRUNCATE 覆盖目标表中的数据。默认值为 WRITE_APPEND
    • max_bad_records:可选。允许的错误记录数。默认值为 0。
    • decimal_target_types:可选。源数据中十进制值转换为的可能 SQL 数据类型的英文逗号分隔列表。如果未提供此字段,对于 ORC,数据类型默认为 NUMERIC,STRING,对于其他文件格式,数据类型默认为 NUMERIC
    • ignore_unknown_values:可选;如果 file_format 不是 JSONCSV,则此参数会予以忽略。设置为 true 以接受与架构不匹配的值所在的行。
    • field_delimiter:可选;仅在 file_formatCSV 时适用。用于分隔字段的字符。默认值为 ,
    • skip_leading_rows:可选;仅在 file_formatCSV 时适用。表示您不想导入的标题行数。默认值为 0。
    • allow_quoted_newlines:可选;仅在 file_formatCSV 时适用。表示是否允许在引用字段中使用换行符。
    • allow_jagged_rows:可选;仅在 file_formatCSV 时适用。表示是否接受末尾处缺少可选列的行。缺少的值会填入 NULL

例如,以下命令会创建一个名为 mytransfer 的 Blob Storage 数据转移作业:

bq mk \
  --transfer_config \
  --data_source=azure_blob_storage \
  --display_name=mytransfer \
  --target_dataset=mydataset \
  --destination_kms_key=projects/myproject/locations/us/keyRings/mykeyring/cryptoKeys/key1
  --params={"destination_table_name_template":"mytable",
      "storage_account":"myaccount",
      "container":"mycontainer",
      "data_path":"myfolder/*.csv",
      "sas_token":"my_sas_token_value",
      "file_format":"CSV",
      "max_bad_records":"1",
      "ignore_unknown_values":"true",
      "field_delimiter":"|",
      "skip_leading_rows":"1",
      "allow_quoted_newlines":"true",
      "allow_jagged_rows":"false"}

API

使用 projects.locations.transferConfigs.create 方法并提供一个 TransferConfig 资源实例。

Java

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Java 设置说明进行操作。 如需了解详情,请参阅 BigQuery Java API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证


import com.google.api.gax.rpc.ApiException;
import com.google.cloud.bigquery.datatransfer.v1.CreateTransferConfigRequest;
import com.google.cloud.bigquery.datatransfer.v1.DataTransferServiceClient;
import com.google.cloud.bigquery.datatransfer.v1.ProjectName;
import com.google.cloud.bigquery.datatransfer.v1.TransferConfig;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

// Sample to create azure blob storage transfer config.
public class CreateAzureBlobStorageTransfer {

  public static void main(String[] args) throws IOException {
    // TODO(developer): Replace these variables before running the sample.
    final String projectId = "MY_PROJECT_ID";
    final String displayName = "MY_TRANSFER_DISPLAY_NAME";
    final String datasetId = "MY_DATASET_ID";
    String tableId = "MY_TABLE_ID";
    String storageAccount = "MY_AZURE_STORAGE_ACCOUNT_NAME";
    String containerName = "MY_AZURE_CONTAINER_NAME";
    String dataPath = "MY_AZURE_FILE_NAME_OR_PREFIX";
    String sasToken = "MY_AZURE_SAS_TOKEN";
    String fileFormat = "CSV";
    String fieldDelimiter = ",";
    String skipLeadingRows = "1";
    Map<String, Value> params = new HashMap<>();
    params.put(
        "destination_table_name_template", Value.newBuilder().setStringValue(tableId).build());
    params.put("storage_account", Value.newBuilder().setStringValue(storageAccount).build());
    params.put("container", Value.newBuilder().setStringValue(containerName).build());
    params.put("data_path", Value.newBuilder().setStringValue(dataPath).build());
    params.put("sas_token", Value.newBuilder().setStringValue(sasToken).build());
    params.put("file_format", Value.newBuilder().setStringValue(fileFormat).build());
    params.put("field_delimiter", Value.newBuilder().setStringValue(fieldDelimiter).build());
    params.put("skip_leading_rows", Value.newBuilder().setStringValue(skipLeadingRows).build());
    createAzureBlobStorageTransfer(projectId, displayName, datasetId, params);
  }

  public static void createAzureBlobStorageTransfer(
      String projectId, String displayName, String datasetId, Map<String, Value> params)
      throws IOException {
    TransferConfig transferConfig =
        TransferConfig.newBuilder()
            .setDestinationDatasetId(datasetId)
            .setDisplayName(displayName)
            .setDataSourceId("azure_blob_storage")
            .setParams(Struct.newBuilder().putAllFields(params).build())
            .setSchedule("every 24 hours")
            .build();
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests.
    try (DataTransferServiceClient client = DataTransferServiceClient.create()) {
      ProjectName parent = ProjectName.of(projectId);
      CreateTransferConfigRequest request =
          CreateTransferConfigRequest.newBuilder()
              .setParent(parent.toString())
              .setTransferConfig(transferConfig)
              .build();
      TransferConfig config = client.createTransferConfig(request);
      System.out.println("Azure Blob Storage transfer created successfully: " + config.getName());
    } catch (ApiException ex) {
      System.out.print("Azure Blob Storage transfer was not created." + ex.toString());
    }
  }
}

使用转移作业指定加密密钥

您可以指定客户管理的加密密钥 (CMEK) 来加密转移作业运行的数据。您可以使用 CMEK 支持来自 Azure Blob Storage 的转移作业。

当您使用转移作业指定 CMEK 时,BigQuery Data Transfer Service 会将 CMEK 应用于所注入数据的任何中间磁盘缓存,从而使整个数据转移工作流符合 CMEK 的规定。

如果最初不是使用 CMEK 创建转移作业,则无法更新现有转移作业来添加 CMEK。例如,您无法将最初默认加密的目标表更改为现在使用 CMEK 加密。反之,您也无法将 CMEK 加密的目标表更改为采用其他类型的加密。

如果转移配置最初是使用 CMEK 加密创建的,则可以为转移作业更新 CMEK。当您为转移配置更新 CMEK 时,BigQuery Data Transfer Service 会在下次运行转移作业时将 CMEK 传播到目标表,其中 BigQuery Data Transfer Service 会在转移作业运行期间将所有过时的 CMEK 替换为新的 CMEK。如需了解详情,请参阅更新转移作业

您也可以使用项目默认密钥。当您使用转移作业指定项目默认密钥时,BigQuery Data Transfer Service 会将项目默认密钥用作任何新转移配置的默认密钥。

排查转移作业设置问题

如果您在设置数据转移作业时遇到问题,请参阅 Blob Storage 转移问题

后续步骤