Blob Storage の転送

Azure Blob Storage 用 BigQuery Data Transfer Service を使用すると、Blob Storage から BigQuery への定期的な読み込みジョブを自動的にスケジュールし、管理できます。

始める前に

Blob Storage の転送を作成する前に、次のことを行います。

必要な権限

Blob Storage の転送を作成するには、bigquery.transfers.update Identity and Access Management(IAM)権限が必要です。また、ターゲット データセットに対する bigquery.datasets.get 権限と bigquery.datasets.update 権限が必要です。

IAM 事前定義ロール bigquery.admin には、Blob Storage の転送を作成するために必要な権限が含まれています。

BigQuery IAM の詳細については、IAM でのアクセス制御をご覧ください。

転送を有効にするために Blob Storage に適切な権限があることを確認するには、Shared Access Signature(SAS)をご覧ください。

Pub/Sub の転送実行通知を設定する場合は、pubsub.topics.setIamPolicy 権限が必要です。メール通知のみの場合は、Pub/Sub の権限は必要ありません。詳細については、BigQuery Data Transfer Service の実行通知をご覧ください。

制限事項

Blob Storage の転送には、次の制限があります。

Blob Storage データ転送を設定する

次のオプションのいずれかを選択します。

コンソール

  1. Google Cloud コンソールで [BigQuery] ページに移動します。

    [BigQuery] に移動

  2. BigQuery のナビゲーション メニューで、[データ転送] をクリックします。

  3. [転送を作成] をクリックします。

  4. [転送を作成] ページで、次の操作を行います。

    • [ソースタイプ] セクションの [ソース] で、[Azure Blob Storage] を選択します。

      転送元のタイプ

    • [転送構成名] セクションの [表示名] に、転送名を入力します。

    • [スケジュール オプション] セクションで:

      • [繰り返しの頻度] を選択します。[時間]、[]、[]、[] を選択する場合は、頻度も指定する必要があります。[カスタム] を選択して、カスタムの繰り返しの頻度を指定することもできます。[オンデマンド] を選択した場合、手動で転送をトリガーすると、この転送が実行されます。

      • 必要に応じて、[すぐに開始可能] を選択するか、[設定した時刻に開始] を選択して開始日と実行時間を指定します。

    • [転送先の設定] セクションの [データセット] で、データを保存するために作成したデータセットを選択します。

    • [データソースの詳細] セクションで、次のようにします。

      • [宛先テーブル] に、BigQuery でデータを保存するために作成したテーブルの名前を入力します。宛先テーブルの名前では、パラメータがサポートされています。
      • [Azure ストレージ アカウント名] に、blob ストレージ アカウント名を入力します。
      • [コンテナ名] に、Blob Storage のコンテナ名を入力します。
      • [データパス] に、転送するファイルをフィルタするためのパスを入力します。をご覧ください。
      • [SAS トークン] に、Azure SAS トークンを入力します。
      • [ファイル形式] でソースデータ形式を選択します。
      • [書き込み処理] で、[WRITE_APPEND] を選択して宛先テーブルに新しいデータを段階的に追加するか、[WRITE_TRUNCATE] を選択して各転送の実行中に宛先テーブル内のデータを上書きします。WRITE_APPEND は、[書き込み処理] のデフォルト値です。

      BigQuery Data Transfer Service が WRITE_APPEND または WRITE_TRUNCATE を使用してデータを取り込む方法の詳細については、Azure Blob 転送用のデータの取り込みをご覧ください。writeDisposition フィールドの詳細については、JobConfigurationLoad をご覧ください。

      データソースの詳細

    • [転送オプション] セクションで、次の操作を行います。

      • [許容されるエラー数] に、無視できる不良レコードの最大数にあたる整数値を入力します。デフォルト値は 0 です。
      • (省略可)[Decimal target type] に、ソースデータの 10 進数値を変換できる SQL データ型のカンマ区切りのリストを入力します。変換にどの SQL データ型を選択するかは、次の条件によって決まります。
        • 型が指定されたリストに含まれていて、精度とスケールをサポートしている場合に、NUMERICBIGNUMERICSTRING の順序で型が選択されます。
        • 表示されたデータ型のいずれも精度とスケールをサポートしていない場合は、指定したリストで最も広い範囲をサポートしているデータ型が選択されます。ソースデータの読み取り時に、値がサポートされている範囲を超えると、エラーがスローされます。
        • データ型 STRING では、すべての精度とスケールの値がサポートされています。
        • このフィールドを空のままにすると、ORC ではデータ型はデフォルトで NUMERIC,STRING になり、他のファイル形式では NUMERIC になります。
        • このフィールドに重複するデータ型を含めることはできません。
        • リストするデータ型の順序は無視されます。
    • ファイル形式として CSV または JSON を選択した場合、スキーマに適合しない値を含んだ行を許容するには、[JSON, CSV] セクションで [不明な値を無視] をオンにします。

    • ファイル形式として CSV を選択した場合は、[CSV] セクションでデータを読み込むための追加の CSV オプションを入力します。

      CSV のオプション

    • [通知オプション] セクションで、メール通知と Pub/Sub 通知を有効にするかどうか選択できます。

      • メール通知を有効にすると、転送の実行が失敗した場合、転送管理者にメール通知が送信されます。
      • Pub/Sub 通知を有効にするときに、公開するトピック名を選択するか、または [トピックを作成する] クリックします。
    • CMEK を使用する場合は、[詳細オプション] セクションで [顧客管理の暗号鍵] を選択します。使用可能な CMEK のリストが表示され、ここから選択できます。CMEK が BigQuery Data Transfer Service と連携する仕組みについては、転送で暗号鍵を指定するをご覧ください。

  5. [保存] をクリックします。

bq

bq mk --transfer_config コマンドを使用して、Blob Storage の転送を作成します。

bq mk \
  --transfer_config \
  --project_id=PROJECT_ID \
  --data_source=DATA_SOURCE \
  --display_name=DISPLAY_NAME \
  --target_dataset=DATASET \
  --destination_kms_key=DESTINATION_KEY \
  --params=PARAMETERS

次のように置き換えます。

  • PROJECT_ID: 省略可。ターゲット データセットを含むプロジェクト ID。指定しない場合は、デフォルトのプロジェクトが使用されます。
  • DATA_SOURCE: azure_blob_storage
  • DISPLAY_NAME: 転送構成の表示名。転送名には、後で修正が必要になった場合に簡単に識別できる任意の名前を使用できます。
  • DATASET: 転送構成のターゲット データセット。
  • DESTINATION_KEY:(省略可)Cloud KMS 鍵のリソース ID(例: projects/project_name/locations/us/keyRings/key_ring_name/cryptoKeys/key_name)。
  • PARAMETERS: JSON 形式で一覧表示される転送構成のパラメータ。例: --params={"param1":"value1", "param2":"value2"}Blob Storage 転送のパラメータは次のとおりです。
    • destination_table_name_template: 必須。宛先テーブルの名前を指定します。
    • storage_account: 必須。Blob Storage アカウント名。
    • container: 必須。Blob Storage のコンテナ名。
    • data_path: 省略可。転送するファイルをフィルタするパス。をご覧ください。
    • sas_token: 必須。Azure SAS トークン。
    • file_format: 省略可。転送するファイルの種類(CSVJSONAVROPARQUETORC)。デフォルト値は CSV です。
    • write_disposition: 省略可。宛先テーブルにデータを追加する場合、WRITE_APPEND を選択します。宛先テーブルのデータを上書きする場合は、WRITE_TRUNCATE を選択します。デフォルト値は WRITE_APPEND です。
    • max_bad_records: 省略可。許可する不良レコードの数を指定します。デフォルト値は 0 です。
    • decimal_target_types: 省略可。ソースデータの 10 進数値を変換できる、有効な SQL データ型のカンマ区切りのリスト。このフィールドを指定しない場合は、ORC ではデータ型はデフォルトで NUMERIC,STRING になり、他のファイル形式では NUMERIC になります。
    • ignore_unknown_values: 省略可。file_formatJSON または CSV でない場合は無視されます。スキーマに適合しない値を含んだ行を許容するには、true に設定します。
    • field_delimiter: 省略可。file_formatCSV の場合にのみ適用されます。フィールドを区切る文字を指定します。デフォルト値は , です。
    • skip_leading_rows: 省略可。file_formatCSV の場合にのみ適用されます。インポートの対象外にするヘッダー行の数を指定します。デフォルト値は 0 です。
    • allow_quoted_newlines: 省略可。file_formatCSV の場合にのみ適用されます。引用符で囲まれたフィールド内で改行を許可するかどうかを指定します。
    • allow_jagged_rows: 省略可。file_formatCSV の場合にのみ適用されます。末尾のオプションの列が欠落している行を許可するかどうかを指定します。欠損値には NULL が入力されます。

たとえば、以下では mytransfer という Blob Storage 転送を作成します。

bq mk \
  --transfer_config \
  --data_source=azure_blob_storage \
  --display_name=mytransfer \
  --target_dataset=mydataset \
  --destination_kms_key=projects/myproject/locations/us/keyRings/mykeyring/cryptoKeys/key1
  --params={"destination_table_name_template":"mytable",
      "storage_account":"myaccount",
      "container":"mycontainer",
      "data_path":"myfolder/*.csv",
      "sas_token":"my_sas_token_value",
      "file_format":"CSV",
      "max_bad_records":"1",
      "ignore_unknown_values":"true",
      "field_delimiter":"|",
      "skip_leading_rows":"1",
      "allow_quoted_newlines":"true",
      "allow_jagged_rows":"false"}

Java

このサンプルを試す前に、クライアント ライブラリを使用した BigQuery クイックスタートJava の手順に沿って設定を行ってください。詳細については、BigQuery Java API のリファレンス ドキュメントをご覧ください。

BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。


import com.google.api.gax.rpc.ApiException;
import com.google.cloud.bigquery.datatransfer.v1.CreateTransferConfigRequest;
import com.google.cloud.bigquery.datatransfer.v1.DataTransferServiceClient;
import com.google.cloud.bigquery.datatransfer.v1.ProjectName;
import com.google.cloud.bigquery.datatransfer.v1.TransferConfig;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

// Sample to create azure blob storage transfer config.
public class CreateAzureBlobStorageTransfer {

  public static void main(String[] args) throws IOException {
    // TODO(developer): Replace these variables before running the sample.
    final String projectId = "MY_PROJECT_ID";
    final String displayName = "MY_TRANSFER_DISPLAY_NAME";
    final String datasetId = "MY_DATASET_ID";
    String tableId = "MY_TABLE_ID";
    String storageAccount = "MY_AZURE_STORAGE_ACCOUNT_NAME";
    String containerName = "MY_AZURE_CONTAINER_NAME";
    String dataPath = "MY_AZURE_FILE_NAME_OR_PREFIX";
    String sasToken = "MY_AZURE_SAS_TOKEN";
    String fileFormat = "CSV";
    String fieldDelimiter = ",";
    String skipLeadingRows = "1";
    Map<String, Value> params = new HashMap<>();
    params.put(
        "destination_table_name_template", Value.newBuilder().setStringValue(tableId).build());
    params.put("storage_account", Value.newBuilder().setStringValue(storageAccount).build());
    params.put("container", Value.newBuilder().setStringValue(containerName).build());
    params.put("data_path", Value.newBuilder().setStringValue(dataPath).build());
    params.put("sas_token", Value.newBuilder().setStringValue(sasToken).build());
    params.put("file_format", Value.newBuilder().setStringValue(fileFormat).build());
    params.put("field_delimiter", Value.newBuilder().setStringValue(fieldDelimiter).build());
    params.put("skip_leading_rows", Value.newBuilder().setStringValue(skipLeadingRows).build());
    createAzureBlobStorageTransfer(projectId, displayName, datasetId, params);
  }

  public static void createAzureBlobStorageTransfer(
      String projectId, String displayName, String datasetId, Map<String, Value> params)
      throws IOException {
    TransferConfig transferConfig =
        TransferConfig.newBuilder()
            .setDestinationDatasetId(datasetId)
            .setDisplayName(displayName)
            .setDataSourceId("azure_blob_storage")
            .setParams(Struct.newBuilder().putAllFields(params).build())
            .setSchedule("every 24 hours")
            .build();
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests.
    try (DataTransferServiceClient client = DataTransferServiceClient.create()) {
      ProjectName parent = ProjectName.of(projectId);
      CreateTransferConfigRequest request =
          CreateTransferConfigRequest.newBuilder()
              .setParent(parent.toString())
              .setTransferConfig(transferConfig)
              .build();
      TransferConfig config = client.createTransferConfig(request);
      System.out.println("Azure Blob Storage transfer created successfully: " + config.getName());
    } catch (ApiException ex) {
      System.out.print("Azure Blob Storage transfer was not created." + ex.toString());
    }
  }
}

転送に使用する暗号鍵を指定する

転送実行のデータを暗号化する顧客管理の暗号鍵(CMEK)を指定できます。CMEK を使用して、Azure Blob Storage からの転送をサポートできます。

転送で CMEK を指定すると、BigQuery Data Transfer Service は取り込まれたデータの中間ディスク上キャッシュに CMEK を適用して、データ転送ワークフロー全体が CMEK 準拠になるようにします。

最初に CMEK で作成されていなかった既存の転送は、更新して CMEK を追加することはできません。たとえば、最初にデフォルトの方法で暗号化されていた宛先テーブルを CMEK で暗号化されるように変更することはできません。逆に、CMEK で暗号化された宛先テーブルを別のタイプの暗号化に変更することはできません。

転送構成が CMEK 暗号化を使用して最初に作成された場合は、転送の CMEK を更新できます。転送構成の CMEK を更新すると、BigQuery Data Transfer Service は転送の次回実行時に CMEK を宛先テーブルに伝播します。BigQuery Data Transfer Service は、古い CMEK を転送中に新しい CMEK に置き換えます。詳細については、転送の更新をご覧ください。

プロジェクトのデフォルト鍵を使用することもできます。転送でプロジェクトのデフォルト鍵を指定すると、BigQuery Data Transfer Service は、新しい転送構成のデフォルト鍵としてプロジェクトのデフォルト鍵を使用します。

転送の設定に関するトラブルシューティング

転送の設定に問題がある場合は、Blob Storage の転送に関する問題をご覧ください。

次のステップ