Cloud Storage バッチソース

このページでは、Cloud Data Fusion での Cloud Storage バッチソース プラグインの構成に関するガイダンスを示します。

Cloud Storage バッチソース プラグインを使用すると、Cloud Storage バケットからデータを読み取り、Cloud Data Fusion に取り込んでさらに処理と変換を行うことができます。これにより、次のような複数のファイル形式からデータを読み込むことが可能です。

  • 構造化: CSV、Avro、Parquet、ORC
  • 半構造化: JSON、XML
  • その他: テキスト、バイナリ

始める前に

Cloud Data Fusion には通常、次の 2 つのサービス アカウントがあります。

Cloud Storage バッチソース プラグインを使用する前に、各サービス アカウントに次のロールを付与します。

Cloud Data Fusion API サービス エージェント

このサービス アカウントには、すでに必要な権限がすべて付与されているため、権限を追加する必要はありません。

Compute Engine サービス アカウント

Google Cloud プロジェクトで、次の IAM のロールまたは権限を Compute Engine サービス アカウントに付与します。

プラグインを構成する

  1. Cloud Data Fusion ウェブ インターフェースに移動し、[Studio] をクリックします。
  2. [Realtime] ではなく、[Data Pipeline - Batch] が選択されていることを確認します。
  3. [Source] メニューで [GCS] をクリックします。Cloud Storage ノードがパイプラインに表示されます。
  4. ソースを構成するには、Cloud Storage ノードに移動して、[Properties] をクリックします。
  5. 次のプロパティを入力します。完全版リストについては、プロパティをご覧ください。

    1. Cloud Storage ノードのラベル(例: Cloud Storage tables)を入力します。
    2. 接続の詳細を入力します。新しい 1 回限りの接続または再利用可能な既存接続を設定できます。

      新しい接続

      Cloud Storage に 1 回限りの接続を追加する手順は次のとおりです。

      1. [接続を使用] をオフのままにします。
      2. [Project ID] フィールドの値は、自動検出された値のままにします。
      3. [Service account type] フィールドで、値を [File path] のままにし、[Service account file path] を自動検出のままにします。

      再利用可能な接続

      既存の接続を再利用する手順は次のとおりです。

      1. [接続を使用] をオンにします。
      2. [接続を参照] をクリックします。
      3. 接続名([Cloud Storage Default] など)をクリックします。

      4. 省略可: 接続が存在せず、再利用可能な新しい接続を作成する場合は [接続を追加] をクリックし、本ページにある [新しい接続] タブの手順をご参照ください。

    3. [Reference name] フィールドに、リネージに使用する名前を入力します(例: data-fusion-gcs-campaign)。

    4. [Path] フィールドに、読み取り元のパスを入力します(例: gs://BUCKET_PATH)。

    5. [Format] フィールドで、読み取るデータに次のいずれかのファイル形式を選択します。

      • avro
      • blob(blob 形式には、bytes 型の body というフィールドを含むスキーマが必要です)
      • csv
      • delimited
      • json
      • parquet
      • text(テキスト形式には、string 型の body というフィールドを含むスキーマが必要です)
      • tsv
      • 環境にデプロイしたフォーマット プラグインの名前
    6. (省略可)接続をテストするには、[Get schema] をクリックします。

    7. 省略可: [Sample size] フィールドに、選択したデータ型で確認する最大行数(たとえば、1000)を入力します。

    8. 省略可: [Override] フィールドに、列名とスキップする各データ型を入力します。

    9. 省略可: 最小分割サイズや正規表現のパスフィルタなどの [Advanced properties] を入力します(プロパティをご覧ください)。

    10. 省略可: [Temporary bucket name] フィールドに、Cloud Storage バケットの名前を入力します。

  6. 省略可: [Validate] をクリックして、見つかったエラーに対処します。

  7. [閉じる] をクリックします。プロパティは保存され、Cloud Data Fusion Studio で引き続きデータ パイプラインを構築できます。

プロパティ

プロパティ マクロ対応 必要なプロパティ 説明
ラベル × データ パイプラインのノードの名前。
接続を使用 × × ソースへの再利用可能な接続を参照します。接続を参照するときに表示される接続の追加、インポート、編集の詳細については、接続を管理するをご覧ください。
接続 [接続を使用] がオンになっている場合は、選択した再利用可能な接続の名前がこのフィールドに表示されます。
プロジェクト ID × [接続を使用] がオフになっている場合にのみ使用されます。プロジェクトのグローバルに一意の識別子。
デフォルトは auto-detect です。
サービス アカウントの種類 × 次のいずれかのオプションを選択します。
  • ファイルパス: サービス アカウントが配置されているパス。
  • JSON: サービス アカウントの JSON コンテンツ。
サービス アカウント ファイルのパス × サービス アカウント タイプの値が [ファイルパス] の場合にのみ使用されます。認証に使用するサービス アカウント キーのローカル ファイル システム上のパス。ジョブが Dataproc クラスタで実行される場合は、値を自動検出に設定します。他のタイプのクラスタでジョブが実行される場合、クラスタ内の各ノードにファイルが存在する必要があります。
デフォルトは auto-detect です。
サービス アカウント JSON × サービス アカウント タイプの値が JSON の場合にのみ使用されます。サービス アカウントの JSON ファイル コンテンツ。
参照名 × リネージやメタデータのアノテーションなど、他のサービスに対してこのソースを一意に識別する名前。
[Path] 読み取るファイルへのパス。ディレクトリが指定されている場合、パスはバックスラッシュ(/)で終了します(例: gs://bucket/path/to/directory/)。ファイル名パターンと照合するために、アスタリスク(*)をワイルドカードとして使用できます。ファイルが見つからなかった場合や一致しなかった場合は、パイプラインは失敗します。
形式 × 読み取るデータの形式。形式は次のいずれかにする必要があります。
  • avro
  • blob(blob 形式には、bytes 型の body というフィールドを含むスキーマが必要です)
  • csv
  • delimited
  • json
  • parquet
  • text(テキスト形式には、string 型の body というフィールドを含むスキーマが必要です)
  • tsv
  • 環境にデプロイしたフォーマット プラグインの名前
  • 形式がマクロの場合、事前にパッケージ化された形式のみを使用できます
サンプルサイズ × 自動データ型検出で調査される行の最大数。デフォルトは 1,000 です。
オーバーライド × 自動データ型検出がスキップされる、対応するデータを含む列のリスト。
区切り文字 × 形式が delimited である場合に使用する区切り文字。このプロパティは、他の形式では無視されます。
引用符で囲まれた値を有効にする × 引用符で囲まれたコンテンツを値として扱うかどうか。このプロパティは、csvtsvdelimited 形式でのみ使用されます。たとえば、このプロパティが true に設定されている場合、次の 2 つのフィールドが 1, "a, b, c" として出力されます。最初のフィールドの値は 1 です。2 つ目は a, b, c です。引用符自体は削除されます。改行区切り文字を引用符で囲むことはできません。
このプラグインでは、引用符が正しく囲まれていることを前提とします(例: "a, b, c")。引用を閉じていない("a,b,c,)とエラーが発生します。
デフォルト値は false です。
最初の行をヘッダーとして使用する × 各ファイルの最初の行を列ヘッダーとして使用するかどうか。サポートされている形式は、textcsvtsvdelimited です。
デフォルトは False です。
最小スプリット サイズ × 入力パーティションごとの最小サイズ(バイト単位)。パーティションが減少すると並列処理のレベルは高くなりますが、より多くのリソースを必要とし、オーバーヘッドが大きくなります。
Format の値が blob の場合、データを分割することはできません。
最大スプリット サイズ × 入力パーティションごとの最大サイズ(バイト単位)。パーティションが減少すると並列処理のレベルは高くなりますが、より多くのリソースを必要とし、オーバーヘッドが大きくなります。
Format の値が blob の場合、データを分割することはできません。
デフォルトは 128 MB です。
正規表現のパスフィルタ × 入力に含めるためにファイルパスが一致する必要がある正規表現。ファイル名だけでなく、フルパスが比較されます。ファイルを指定しないと、ファイルのフィルタリングは行われません。正規表現の構文の詳細については、パターンをご覧ください。
パスフィールド × レコードの読み取り元のファイルのパスを配置する出力フィールド。指定しない場合、パスは出力レコードに含まれません。指定する場合、フィールドは出力スキーマに文字列として存在する必要があります。
パスファイル名のみ × [パスフィールド] プロパティが設定されている場合、パスの URI ではなくファイル名のみを使用します。
デフォルトは False です。
ファイルを再帰的に読み取る × ファイルをパスから再帰的に読み取るかどうか。
デフォルトは False です。
空の入力を許可する × データが含まれていない入力パスを許可するかどうか。False に設定すると、読み取るデータがないときにプラグインでエラーが発生します。True に設定すると、エラーはスローされず、レコードは読み取られません。
デフォルトは False です。
データファイルの暗号化 × ファイルが暗号化されているかどうか。詳細については、データファイルの暗号化をご覧ください。
デフォルトは False です。
暗号化メタデータ ファイルの接尾辞 × 暗号化メタデータ ファイルのファイル名接尾辞。
デフォルトは metadata です。
ファイル システムのプロパティ × データを読み取るときに InputFormat で使用する追加のプロパティ。
ファイル エンコード × 読み取るファイルの文字エンコード。
デフォルトは UTF-8 です。
出力スキーマ × パスフィールド プロパティが設定されている場合、スキーマに文字列として存在する必要があります。

データファイルの暗号化

このセクションでは、データファイルの暗号化プロパティについて説明します。true に設定すると、ファイルは Tink ライブラリによって提供されるストリーミング AEAD を使用して復号されます。各データファイルには、暗号情報を含むメタデータ ファイルが付属している必要があります。たとえば、gs://BUCKET/PATH_TO_DIRECTORY/file1.csv.enc にある暗号化されたデータファイルには、gs://BUCKET/ PATH_TO_DIRECTORY/file1.csv.enc.metadata にあるメタデータ ファイルが必要です。メタデータ ファイルには、次のプロパティを持つ JSON オブジェクトが含まれています。

プロパティ 説明
kms データ暗号鍵の暗号化に使用された Cloud Key Management Service URI。
aad 暗号化で使用される Base64 でエンコードされた追加認証データ。
key set Tink ライブラリからのシリアル化された鍵セット情報を表す JSON オブジェクト。

    /* Counting example */
    {

      "kms": "gcp-kms://projects/my-key-project/locations/us-west1/keyRings/my-key-ring/cryptoKeys/mykey",

      "aad": "73iT4SUJBM24umXecCCf3A==",

      "keyset": {

          "keysetInfo": {

              "primaryKeyId": 602257784,

              "keyInfo": [{

                  "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmHkdfStreamingKey",

                  "outputPrefixType": "RAW",

                  "keyId": 602257784,

                  "status": "ENABLED"

              }]

          },

          "encryptedKeyset": "CiQAz5HH+nUA0Zuqnz4LCnBEVTHS72s/zwjpcnAMIPGpW6kxLggSrAEAcJKHmXeg8kfJ3GD4GuFeWDZzgGn3tfolk6Yf5d7rxKxDEChIMWJWGhWlDHbBW5B9HqWfKx2nQWSC+zjM8FLefVtPYrdJ8n6Eg8ksAnSyXmhN5LoIj6az3XBugtXvCCotQHrBuyoDY+j5ZH9J4tm/bzrLEjCdWAc+oAlhsUAV77jZhowJr6EBiyVuRVfcwLwiscWkQ9J7jjHc7ih9HKfnqAZmQ6iWP36OMrEn"

      }

    }
    

リリースノート

次のステップ