Cloud Storage CSV files to BigQuery テンプレート

Cloud Storage CSV files to BigQuery パイプラインは、Cloud Storage に保存されている CSV ファイルからデータを読み取り、結果を BigQuery テーブルに追加することを可能にするバッチ パイプラインです。CSV ファイルは、Compression 列挙型 SDK ページに記載されている形式の圧縮ファイルまたは非圧縮ファイルに対応します。

パイプラインの要件

このテンプレートを使用するには、パイプラインが次の要件を満たしている必要があります。

BigQuery スキーマの JSON ファイル

BigQuery スキーマを記述する JSON ファイルを作成します。スキーマに BigQuery Schema というタイトルのトップレベルの JSON 配列があり、その内容が {"name": "COLUMN_NAME", "type": "DATA_TYPE"} のパターンに従っていることを確認します。

Cloud Storage CSV files to BigQuery バッチ テンプレートでは、ターゲット BigQuery テーブルの STRUCT(レコード)フィールドに対するデータのインポートはサポートされていません。

次の JSON は、BigQuery スキーマの例を示しています。

{
  "BigQuery Schema": [
    {
      "name": "location",
      "type": "STRING"
    },
    {
      "name": "name",
      "type": "STRING"
    },
    {
      "name": "age",
      "type": "STRING"
    },
    {
      "name": "color",
      "type": "STRING"
    },
    {
      "name": "coffee",
      "type": "STRING"
    }
  ]
}

エラーテーブルのスキーマ

CSV ファイルから除外されたレコードを保存する BigQuery テーブルは、ここで定義したテーブル スキーマと一致する必要があります。

{
  "BigQuery Schema": [
    {
      "name": "RawContent",
      "type": "STRING"
    },
    {
      "name": "ErrorMsg",
      "type": "STRING"
    }
  ]
}

テンプレートのパラメータ

必須パラメータ

  • inputFilePattern: 処理するテキストを含む CSV ファイルへの Cloud Storage パス。例: gs://your-bucket/path/*.csv
  • schemaJSONPath: BigQuery スキーマを定義する JSON ファイルへの Cloud Storage パス。
  • outputTable: 処理されたデータを保存する BigQuery テーブルの名前。既存の BigQuery テーブルを再利用すると、データは宛先テーブルに追加されます。
  • bigQueryLoadingTemporaryDirectory: BigQuery の読み込みプロセスで使用する一時ディレクトリ。例: gs://your-bucket/your-files/temp_dir
  • badRecordsOutputTable: CSV ファイルを処理するときに拒否されたデータの保存に使用する BigQuery テーブルの名前。既存の BigQuery テーブルを再利用すると、データは宛先テーブルに追加されます。このテーブルのスキーマは、エラーテーブルのスキーマ(https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-csv-to-bigquery#GcsCSVToBigQueryBadRecordsSchema)と一致する必要があります。
  • delimiter: CSV ファイルで使用する列区切り文字。例: ,
  • csvFormat: Apache Commons CSV 形式に準拠する CSV 形式。デフォルト: Default

オプション パラメータ

  • containsHeaders: CSV ファイルにヘッダーが含まれているかどうか。デフォルト: false
  • csvFileEncoding: CSV ファイルの文字エンコード形式。使用できる値は US-ASCIIISO-8859-1UTF-8UTF-16 です。デフォルトは UTF-8 です。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the CSV files on Cloud Storage to BigQuery (Batch) template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_CSV_to_BigQuery \
    --region REGION_NAME \
    --parameters \
inputFilePattern=PATH_TO_CSV_DATA,\
schemaJSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
outputTable=BIGQUERY_DESTINATION_TABLE,\
badRecordsOutputTable=BIGQUERY_BAD_RECORDS_TABLE,\
csvFormat=CSV_FORMAT,\
delimiter=DELIMITER,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\
containsHeaders=CONTAINS_HEADERS,\
csvFileEncoding=CSV_FILE_ENCODING

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • PATH_TO_CSV_DATA: CSV ファイルへの Cloud Storage パス
  • PATH_TO_BIGQUERY_SCHEMA_JSON: スキーマ定義を含む JSON ファイルへの Cloud Storage パス
  • BIGQUERY_DESTINATION_TABLE: BigQuery 宛先テーブル名
  • BIGQUERY_BAD_RECORDS_TABLE: BigQuery の不正なレコードのテーブル名
  • PATH_TO_TEMP_DIR_ON_GCS: 一時ディレクトリへの Cloud Storage パス
  • DELIMITER: CSV ファイルの区切り文字
  • CSV_FORMAT: レコードの解析に使用する CSV 形式の仕様
  • CONTAINS_HEADERS: CSV ファイルにヘッダーが含まれているかどうか
  • CSV_FILE_ENCODING: CSV ファイルのエンコード

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_CSV_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern":"PATH_TO_CSV_DATA",
       "schemaJSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "outputTable":"BIGQUERY_DESTINATION_TABLE",
       "badRecordsOutputTable":"BIGQUERY_BAD_RECORDS_TABLE",
       "csvFormat":"CSV_FORMAT",
       "delimiter":"DELIMITER",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
       "containsHeaders": "CONTAINS_HEADERS",
       "csvFileEncoding": "CSV_FILE_ENCODING"
   },
   "environment": { "zone": "us-central1-f" }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • PATH_TO_CSV_DATA: CSV ファイルへの Cloud Storage パス
  • PATH_TO_BIGQUERY_SCHEMA_JSON: スキーマ定義を含む JSON ファイルへの Cloud Storage パス
  • BIGQUERY_DESTINATION_TABLE: BigQuery 宛先テーブル名
  • BIGQUERY_BAD_RECORDS_TABLE: BigQuery の不正なレコードのテーブル名
  • PATH_TO_TEMP_DIR_ON_GCS: 一時ディレクトリへの Cloud Storage パス
  • DELIMITER: CSV ファイルの区切り文字
  • CSV_FORMAT: レコードの解析に使用する CSV 形式の仕様
  • CONTAINS_HEADERS: CSV ファイルにヘッダーが含まれているかどうか
  • CSV_FILE_ENCODING: CSV ファイルのエンコード

次のステップ