テンプレートを使用してデータを処理する

Dataplex では、Dataflow を利用して、データの取り込み、処理、データ ライフサイクルの管理などの一般的なデータ処理タスクを実行するテンプレートが用意されています。このガイドでは、データ処理テンプレートを構成して実行する方法について説明します。

始める前に

Dataplex テンプレートは、Dataflow を利用しています。テンプレートを使用する前に、Dataflow API を有効にします。

Dataflow API を有効にする

メモ

  • すべてのテンプレートで一般的な Dataflow パイプライン オプションがサポートされます。

  • Dataplex はデータ パイプラインを使用して、テンプレートで定義されたタスクのスケジュールを設定します。

  • スケジュールしたタスクは、Google Cloud コンソールの [Dataplex] ページでのみ確認できます。

テンプレート: 元データをキュレートされたデータに変換する

Dataplex ファイル形式変換テンプレートは、Dataplex Cloud Storage アセットまたは CSV または JSON 形式に保存されている Dataplex エンティティのリストのデータを、別の Dataplex アセットの Parquet、もしくは Avro 形式データに変換します。パーティションのレイアウトは変換において保持されます。また、出力ファイルを圧縮することもできます。

テンプレートのパラメータ

パラメータ 説明
inputAssetOrEntitiesList 入力ファイルを含む Dataplex アセットまたは Dataplex エンティティ。このパラメータは、projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> または projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/entities/<entity1-name>,projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/entities/<entity 2 name>... の形式で指定します。
outputFileFormat Cloud Storage の出力ファイル形式。このパラメータは、PARQUET または AVRO の形式にする必要があります。
outputAsset 出力ファイルが保存される Cloud Storage バケットを含む Dataplex アセットの名前。このパラメータは、projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> の形式に従う必要があります。outputAsset は、Google Cloud Console の [Dataplex アセット Details] タブにあります。
outputFileCompression (省略可)出力ファイルの圧縮。このパラメータのデフォルト値は SNAPPY です。パラメータの他の値は、UNCOMPRESSEDSNAPPYGZIPBZIP2 です。BZIP2 は、PARQUET ファイルではサポートされていません。
writeDisposition 省略可: 宛先ファイルがすでに存在する場合に実施するアクションを指定します。このパラメータのデフォルト値は SKIP です。これは、宛先のディレクトリに存在しないファイルのみを処理することを示します。パラメータには、OVERWRITE(既存のファイルを上書きする)または FAIL(少なくとも 1 つの宛先ファイルがすでに存在する場合は何も処理せず、エラーが発生する)のいずれかを指定できます。
updateDataplexMetadata

省略可: 新しく作成されたエンティティの Dataplex メタデータを更新するかどうか。このパラメータのデフォルト値は false です。

有効にすると、パイプラインはソースからスキーマをコピー先の Dataplex エンティティに自動的にコピーし、自動化された Dataplex Discovery は実行されません。ソース(未加工)データのスキーマが Dataplex によって管理されている場合は、このフラグを使用します。

テンプレートを実行する

コンソール

  1. Google Cloud コンソールの [Dataplex] ページに移動します。

    Dataplex に移動

  2. [Process] ビューに移動します。

  3. [タスクの作成] をクリックします。

  4. [キュレートされた形式に変換] で [タスクを作成] をクリックします。

  5. Dataplex レイクを選択します。

  6. タスク名を指定します。

  7. タスク実行のリージョンを選択します。

  8. 必要なパラメータを入力します。

  9. [続行] をクリックします。

gcloud

以下を置き換えます。

JOB_NAME: a job name of your choice
PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
INPUT_ASSET_OR_ENTITIES_LIST: path to your JDBC drivers
OUTPUT_FILE_FORMAT: your output file format in Cloud Storage
OUTPUT_ASSET: your Dataplex output asset ID

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--region=REGION_NAME \
--template-file-gcs-location=gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_File_Format_Conversion_Preview \
--parameters \
inputAssetOrEntitiesList=INPUT_ASSET_OR_ENTITIES_LIST,\
outputFileFormat=OUTPUT_FILE_FORMAT,\
outputAsset=OUTPUT_ASSET

REST API

以下を置き換えます。

PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
JOB_NAME: a job name of your choice
INPUT_ASSET_OR_ENTITIES_LIST: path to your JDBC drivers
OUTPUT_FILE_FORMAT: your output file format in Cloud Storage
OUTPUT_ASSET: your Dataplex output asset ID

HTTP POST リクエストの送信:

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION_NAME/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
        "inputAssetOrEntitiesList": "INPUT_ASSET_OR_ENTITIES_LIST",
        "outputFileFormat": "OUTPUT_FILE_FORMAT",
        "outputAsset": "OUTPUT_ASSET",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_File_Format_Conversion_Preview",
 }
}

テンプレート: BigQuery アセットから Cloud Storage アセットへの階層データ

Dataplex BigQuery to Cloud Storage テンプレートは、Dataplex BigQuery アセットから、Dataplex 互換のレイアウトと形式で Dataplex Cloud Storage アセットにデータをコピーします。BigQuery データセットやコピーする BigQuery テーブルのリストを指定できます。柔軟性を高めるため、このテンプレートでは、指定した変更日よりも古いデータをコピーできます。コピーの成功後に、必要に応じて BigQuery からデータを削除できます。

パーティション分割テーブルを BigQuery から Cloud Storage にコピーする場合、このテンプレートにより、Cloud Storage バケットに Hive スタイルのパーティションが作成されます。Cloud Storage に書き込むときに、テンプレートによって、既存のパーティション列に接尾辞 _pid を追加して、新しいパーティション キーが作成されます。これは、外部テーブルとして BigQuery のデータにアクセスするために必要です。現在のところ、BigQuery では、Hive スタイルのパーティション キーを既存の列と同じにすることはできません。そのため、コピーしたテーブルを BigQuery から外部テーブルとして見ると、パーティション キーの追加列が含まれています。残りのデータはそのまま保持されます。

パーティション分割テーブルを BigQuery から Cloud Storage にコピーする場合:

  • このテンプレートにより、Cloud Storage バケットに Hive スタイルのパーティションが作成されます。BigQuery では、Hive スタイルのパーティション キーを既存の列と同じにすることはできません。オプション enforceSamePartitionKey を使用して、新しいパーティション キーを作成するか、同じパーティション キーを維持しながら既存の列の名前を変更できます。
  • Dataplex ディスカバリでは、BigQuery テーブル(および Dataproc Metastore のテーブル)の作成時にパーティション タイプを string として登録します。これは既存のパーティション フィルタに影響する可能性があります。

1 回のテンプレート実行で変換できるテーブルとパーティションの数には上限(約 300)があります。正確な数は、テーブル名の長さやその他の要因によって異なります。

テンプレートのパラメータ

パラメータ 説明
sourceBigQueryDataset データを階層化する BigQuery データセット。このパラメータには、projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> 形式の Dataplex アセット名または projects/<name>/datasets/<dataset-id> 形式の BigQuery データセット ID のいずれかを含める必要があります。
destinationStorageBucketAssetName データを階層化する Cloud Storage バケットの Dataplex アセット名。このパラメータは、projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> の形式を使用します。
tables 省略可: 階層化する BigQuery テーブルのカンマ区切りのリスト。リストを指定しない場合、すべてのテーブルが階層化されます。テーブルは名前のみで指定します(プロジェクト/データセット接頭辞なし)。また、大文字と小文字は区別されます。
exportDataModifiedBeforeDateTime 省略可: このパラメータを使用すると、この日付より前(およびオプションの時刻)にデータが移動します。パーティション分割 BigQuery テーブルの場合、この日時より前に最終更新されたパーティションを移動します。パーティション分割されていないテーブルの場合は、この日時より前にテーブルが最後に更新された場合に移動します。指定しない場合は、すべてのテーブル/パーティションを移動します。日付と時刻はデフォルトでデフォルトのタイムゾーンで解析されますが、オプションの接尾辞 Z+HH:mm がサポートされています。このパラメータは、YYYY-MM-DDYYYY-MM-DDTHH:mm:ssYYYY-MM-DDTHH:mm:ss+03:00 の形式に従う必要があります。相対的な日時もサポートされており、-PnDTnHnMn.nS という形式に従う必要があります(-P で始まる必要があります。これは過去の時刻を示します)。
fileFormat 省略可: Cloud Storage の出力ファイル形式。このパラメータのデフォルト値は PARQUET です。パラメータに別の値を AVRO にすることもできます。
fileCompression (省略可)出力ファイルの圧縮。このパラメータのデフォルト値は SNAPPY です。パラメータの他の値は、UNCOMPRESSEDSNAPPYGZIPBZIP2 です。BZIP2 は、PARQUET ファイルではサポートされていません。
deleteSourceData 省略可: エクスポートの成功後に BigQuery からソースデータを削除するかどうか。値は true または false のいずれかです。このパラメータのデフォルト値は false です。
partitionIdRegExp (省略可)この正規表現のみと一致するパーティション ID を持つパーティションを処理します。値が指定されていない場合、このパラメータはデフォルトですべて処理されます。
writeDisposition 省略可: 宛先ファイルがすでに存在する場合、つまり 1 つ以上のテーブル/パーティションがすでに階層化されている場合に実施するアクションを指定します。このパラメータのデフォルト値は SKIP です。これは、事前階層化されていないテーブルまたはパーティションのみを処理するシグナルであることを示します。パラメータには、OVERWRITE(既存のファイルを上書きする)または FAIL(少なくとも 1 つの宛先ファイルがすでに存在する場合は何も処理せず、エラーが発生する)のいずれかを指定できます。
enforceSamePartitionKey

(省略可)同じパーティション キーを適用するかどうか。BigQuery の制限により、パーティション分割された外部テーブルのパーティション キー(ファイルパス内)を、ファイル内のいずれかの列と同じ名前にすることはできません。このパラメータが true(デフォルト値)の場合、ターゲット ファイルのパーティション キーが元のパーティション列名に設定され、ファイル内の列の名前が変更されます。false の場合、パーティション キーの名前が変更されます。

たとえば、元のテーブルが TS および enforceSamePartitionKey=true という名前の列でパーティション分割されている場合、宛先ファイルのパスは gs://<bucket>/TS=<partition ID>/<file> になり、列名は TS_pkey に変更されます。これにより、古いテーブルまたは新しいテーブルの同じパーティションに対して既存のクエリを実行できます。

enforceSamePartitionKey=false の場合、宛先ファイルのパスは gs://<bucket>/TS_pid=<partition ID>/<file> ですが、ファイルでは列名が TS として保持されます。

updateDataplexMetadata

省略可: 新しく作成されたエンティティの Dataplex メタデータを更新するかどうか。このパラメータのデフォルト値は false です。

有効にすると、パイプラインはソースからスキーマをコピー先の Dataplex エンティティに自動的にコピーし、自動化された Dataplex Discovery は実行されません。このフラグは、ソース BigQuery テーブルのスキーマを管理する場合に使用します。

テンプレートを実行する

コンソール

  1. Google Cloud コンソールの [Dataplex] ページに移動します。

    Dataplex に移動

  2. [プロセス] ビューに移動します。

  3. [Create Task] をクリックします。

  4. [Tier from BQ to GCS Assets] で、[Create task] をクリックします。

  5. Dataplex レイクを選択します。

  6. タスク名を指定します。

  7. タスク実行のリージョンを選択します。

  8. 必要なパラメータを入力します。

  9. [続行] をクリックします。

gcloud

以下を置き換えます。

JOB_NAME: a job name of your choice
PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
SOURCE_ASSET_NAME_OR_DATASET_ID: your Dataplex asset
name for the source BigQuery dataset, or the dataset ID
DESTINATION_ASSET_NAME: your Dataplex asset name for
the destination Cloud Storage bucket

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--region=REGION_NAME \
--template-file-gcs-location=gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_BigQuery_to_GCS_Preview \
--parameters \
sourceBigQueryDataset=SOURCE_ASSET_NAME_OR_DATASET_ID,\
destinationStorageBucketAssetName=DESTINATION_ASSET_NAME

REST API

以下を置き換えます。

PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
JOB_NAME: a job name of your choice
SOURCE_ASSET_NAME_OR_DATASET_ID: your Dataplex asset
name for the source BigQuery dataset, or the dataset ID
DESTINATION_ASSET_NAME: your Dataplex asset name for
the destination Cloud Storage bucket
REGION_NAME: region in which to run the job

HTTP POST リクエストの送信:

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION_NAME/flexTemplates:launch
{
 "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
        "sourceBigQueryDataset": "SOURCE_ASSET_NAME_OR_DATASET_ID",
        "destinationStorageBucketAssetName": "DESTINATION_ASSET_NAME",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_BigQuery_to_GCS_Preview",
 }
}

他の Google Cloud 提供またはカスタムの Dataflow テンプレートをスケジュールする

Dataplex では、Google Cloud が提供する Dataflow テンプレートまたはカスタム Dataflow テンプレートをコンソールでスケジュールしてモニタリングできます。

スケジュール

コンソール

  1. Google Cloud コンソールの [Dataplex] ページに移動します。

    Dataplex に移動

  2. [プロセス] ビューに移動します。

  3. [Create Task] をクリックします。

  4. [Dataflow パイプラインの作成] で、[Dataflow パイプラインを作成] をクリックします。

  5. Dataplex レイクを選択します。

  6. タスク名を入力します。

  7. タスクを実行するリージョンを選択します。

  8. Dataflow テンプレートを選択します。

  9. 必要なパラメータを入力します。

  10. [続行] をクリックします。

モニタリング

コンソール

  1. Google Cloud コンソールの [Dataplex] ページに移動します。

    Dataplex に移動

  2. [Process] ビューに移動します。

  3. [Dataflow パイプライン] をクリックします。

  4. レイクまたはパイプラインの名前でフィルタします。