テンプレートを使用してデータを処理する

Dataplex は、データの取り込み、処理、データ ライフサイクルの管理などの一般的なデータ処理タスクを実行するための(Dataflow が供給する)テンプレートを提供します。このガイドでは、データ処理テンプレートを構成して実行する方法について説明します。

始める前に

Dataplex テンプレートは、Dataflow によって提供されます。テンプレートを使用する前に、Dataflow API を有効にします。

Dataflow API を有効にする

次の点にご注意ください。

  • すべてのテンプレートは、共通の Dataflow パイプライン オプションをサポートしています。

  • Dataplex は、データ パイプラインを使用して、テンプレートで定義されたタスクのスケジュールを設定します。

  • Google Cloud コンソールの [Dataplex] ページには、Dataplex でスケジュールしたタスクのみが表示されます。

テンプレート: 未加工データをキュレートされたデータに変換する

Dataplex ファイル形式変換テンプレートは、Dataplex Cloud Storage アセットまたは CSV または JSON 形式に保存されている Dataplex エンティティのリストのデータを、別の Dataplex アセットの Parquet、もしくは Avro 形式データに変換します。パーティション レイアウトは変換中に保持されます。また、出力ファイルの圧縮もサポートしています。

テンプレートのパラメータ

パラメータ 説明
inputAssetOrEntitiesList 入力ファイルを含む Dataplex アセットまたは Dataplex エンティティ。このパラメータは、projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> または projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/entities/<entity1-name>,projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/entities/<entity 2 name>... の形式で指定する必要があります。
outputFileFormat Cloud Storage の出力ファイル形式。このパラメータは、PARQUET または AVRO の形式で指定する必要があります。
outputAsset 出力ファイルが保存される Cloud Storage バケットを含む Dataplex アセットの名前。このパラメータは、projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> の形式で指定する必要があります。outputAsset は、Google Cloud コンソールで、Dataplex アセットの [Details] タブで確認できます。
outputFileCompression 省略可: 出力ファイルの圧縮。このパラメータのデフォルト値は SNAPPY です。パラメータの他の値は UNCOMPRESSEDSNAPPYGZIPBZIP2 のいずれかにできます。BZIP2PARQUET ファイルではサポートされていません。
writeDisposition 省略可: 宛先ファイルがすでに存在する場合に実施するアクションを指定します。このパラメータのデフォルト値は SKIP です。これは、宛先ディレクトリに存在しないファイルのみを処理するよう指示します。パラメータには、OVERWRITE(既存のファイルを上書きする)または FAIL(少なくとも 1 つの宛先ファイルがすでに存在する場合は何も処理せず、エラーが発生する)のいずれかを指定できます。
updateDataplexMetadata

省略可: 新しく作成されたエンティティの Dataplex メタデータを更新するかどうか。このパラメータのデフォルト値は false です。

有効にすると、パイプラインはスキーマをソースからコピー先の Dataplex エンティティに自動的にコピーし、自動化された Dataplex Discovery はそれらのために実行されません。このフラグは、ソース(元の)データのスキーマが Dataplex によって管理されている場合に使用します。

テンプレートを実行する

Console

  1. Google Cloud コンソールで、[Dataplex] ページに移動します。

    Dataplex に移動

  2. [プロセス] ビューに移動します。

  3. [タスクを作成] をクリックします。

  4. [キュレートされた形式に変換する] で、[タスクを作成] をクリックします。

  5. Dataplex レイクを選択します。

  6. タスク名を指定します。

  7. タスクの実行に使用するリージョンを選択します。

  8. 必要なパラメータを入力します。

  9. [続行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--region=REGION_NAME \
--template-file-gcs-location=gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_File_Format_Conversion_Preview \
--parameters \
inputAssetOrEntitiesList=INPUT_ASSET_OR_ENTITIES_LIST,\
outputFileFormat=OUTPUT_FILE_FORMAT,\
outputAsset=OUTPUT_ASSET

次のように置き換えます。

JOB_NAME: a job name of your choice
PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
INPUT_ASSET_OR_ENTITIES_LIST: path to your JDBC drivers
OUTPUT_FILE_FORMAT: your output file format in Cloud Storage
OUTPUT_ASSET: your Dataplex output asset ID

REST

HTTP POST リクエストの送信:

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION_NAME/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
        "inputAssetOrEntitiesList": "INPUT_ASSET_OR_ENTITIES_LIST",
        "outputFileFormat": "OUTPUT_FILE_FORMAT",
        "outputAsset": "OUTPUT_ASSET",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_File_Format_Conversion_Preview",
 }
}

次のように置き換えます。

PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
JOB_NAME: a job name of your choice
INPUT_ASSET_OR_ENTITIES_LIST: path to your JDBC drivers
OUTPUT_FILE_FORMAT: your output file format in Cloud Storage
OUTPUT_ASSET: your Dataplex output asset ID

テンプレート: BigQuery アセットから Cloud Storage アセットにデータを階層化する

Dataplex BigQuery to Cloud Storage テンプレートは、Dataplex 互換のレイアウトと形式で、Dataplex BigQuery アセットから Dataplex Cloud Storage アセットにデータをコピーします。コピーする BigQuery データセットまたは BigQuery テーブルのリストを指定できます。柔軟性を高めるために、このテンプレートでは、指定した変更日より古いデータをコピーできます。また、コピーが正常に完了した後に BigQuery からデータを削除することもできます。

パーティション分割テーブルを BigQuery から Cloud Storage にコピーする場合:

  • このテンプレートにより、Cloud Storage バケットに Hive スタイルのパーティションが作成されます。BigQuery では、Hive スタイルのパーティション キーを既存の列と同じにすることはできません。オプション enforceSamePartitionKey を使用すると、新しいパーティション キーを作成するか、同じパーティション キーを保持して既存の列の名前を変更できます。
  • Dataplex Discovery は、BigQuery テーブル(および Dataproc Metastore のテーブル)を作成するときに、パーティション タイプを string として登録します。これにより、既存のパーティション フィルタに影響する可能性があります。

1 回のテンプレート実行で変換できるテーブルとパーティションの数には上限があります。この上限は約 300 です。正確な数は、テーブル名の長さなどの要因によって異なります。

テンプレートのパラメータ

パラメータ 説明
sourceBigQueryDataset データを階層化する BigQuery データセット。このパラメータには、projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> 形式の Dataplex アセット名または projects/<name>/datasets/<dataset-id> 形式の BigQuery データセット ID を含める必要があります。
destinationStorageBucketAssetName データを階層化する Cloud Storage バケットの Dataplex アセット名。このパラメータは、projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> の形式に従う必要があります。
tables 省略可: 階層化する BigQuery テーブルのカンマ区切りのリスト。リストを指定しない場合、すべてのテーブルが階層化されます。テーブルは名前のみで指定します(プロジェクト/データセット接頭辞なし)。また、大文字と小文字は区別されます。
exportDataModifiedBeforeDateTime 省略可: このパラメータを使用して、この日付(およびオプションの時刻)より古いデータを移動します。BigQuery のパーティション分割テーブルの場合、この日時より前に最終更新されたパーティションを移動します。パーティション分割されていないテーブルの場合、テーブルの最終更新がこの日時より前の場合は移動します。指定しない場合、すべてのテーブル/パーティションを移動します。デフォルトでは、日付と時刻はデフォルトのタイムゾーンで解析されますが、オプションのサフィックス Z+HH:mm がサポートされています。このパラメータは、YYYY-MM-DDYYYY-MM-DDTHH:mm:ssYYYY-MM-DDTHH:mm:ss+03:00 の形式に従う必要があります。相対的な日時もサポートされており、-PnDTnHnMn.nS という形式に従う必要があります(-P で始まる必要があります。これは過去の時刻を示します)。
fileFormat 省略可: Cloud Storage の出力ファイル形式。このパラメータのデフォルト値は PARQUET です。パラメータの他の値は AVRO です。
fileCompression 省略可: 出力ファイルの圧縮。このパラメータのデフォルト値は SNAPPY です。パラメータの他の値は UNCOMPRESSEDSNAPPYGZIPBZIP2 のいずれかにできます。BZIP2PARQUET ファイルではサポートされていません。
deleteSourceData 省略可: エクスポートが正常に完了した後に BigQuery からソースデータを削除するかどうか。値は true または false です。このパラメータのデフォルト値は false です。
partitionIdRegExp 省略可: この正規表現に一致するパーティション ID を持つパーティションのみを処理します。値が指定されていない場合、このパラメータはデフォルトですべて処理されます。
writeDisposition 省略可: 宛先ファイルがすでに存在する場合(1 つ以上のテーブルまたはパーティションがすでに事前階層化されている場合)に実施するアクションを指定します。このパラメータのデフォルト値は SKIP です。これは、まだ事前階層化されていないテーブル / パーティションのみを処理するよう指示します。パラメータには、OVERWRITE(既存のファイルを上書きする)または FAIL(少なくとも 1 つの宛先ファイルがすでに存在する場合は何も処理せず、エラーが発生する)のいずれかを指定できます。
enforceSamePartitionKey

省略可: 同じパーティション キーを適用するかどうか。BigQuery の制限により、外部のパーティション分割テーブルのパーティション キー(ファイルパス内)はファイル内の列のいずれかと同じ名前にすることはできません。このパラメータが true(デフォルト値)の場合、ターゲット ファイルのパーティション キーは元のパーティション列名に設定され、ファイルの列名が変更されます。false の場合、パーティション キーの名前が変更されます。

たとえば、元のテーブルが TS および enforceSamePartitionKey=true という名前の列でパーティション分割されている場合、宛先ファイルのパスは gs://<bucket>/TS=<partition ID>/<file> になり、列名は TS_pkey に変更されます。これにより、既存のクエリを古いテーブルまたは新しいテーブルの同じパーティションに対して実行できます。

enforceSamePartitionKey=false の場合、宛先ファイルのパスは gs://<bucket>/TS_pid=<partition ID>/<file> になりますが、列名はファイル内の TS のままになります。

updateDataplexMetadata

省略可: 新しく作成されたエンティティの Dataplex メタデータを更新するかどうか。このパラメータのデフォルト値は false です。

有効にすると、パイプラインはスキーマをソースからコピー先の Dataplex エンティティに自動的にコピーし、自動化された Dataplex Discovery はそれらのために実行されません。このフラグは、ソース BigQuery テーブルのスキーマを管理する場合に使用します。

テンプレートを実行する

Console

  1. Google Cloud コンソールで、[Dataplex] ページに移動します。

    Dataplex に移動

  2. [プロセス] ビューに移動します。

  3. [タスクの作成] をクリックします。

  4. [BQ アセットから GCS アセットに階層化する] で、[タスクを作成] をクリックします。

  5. Dataplex レイクを選択します。

  6. タスク名を指定します。

  7. タスクの実行に使用するリージョンを選択します。

  8. 必要なパラメータを入力します。

  9. [続行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--region=REGION_NAME \
--template-file-gcs-location=gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_BigQuery_to_GCS_Preview \
--parameters \
sourceBigQueryDataset=SOURCE_ASSET_NAME_OR_DATASET_ID,\
destinationStorageBucketAssetName=DESTINATION_ASSET_NAME

次のように置き換えます。

JOB_NAME: a job name of your choice
PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
SOURCE_ASSET_NAME_OR_DATASET_ID: your Dataplex asset
name for the source BigQuery dataset, or the dataset ID
DESTINATION_ASSET_NAME: your Dataplex asset name for
the destination Cloud Storage bucket

REST

HTTP POST リクエストの送信:

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION_NAME/flexTemplates:launch
{
 "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
        "sourceBigQueryDataset": "SOURCE_ASSET_NAME_OR_DATASET_ID",
        "destinationStorageBucketAssetName": "DESTINATION_ASSET_NAME",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_BigQuery_to_GCS_Preview",
 }
}

次のように置き換えます。

PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
JOB_NAME: a job name of your choice
SOURCE_ASSET_NAME_OR_DATASET_ID: your Dataplex asset
name for the source BigQuery dataset, or the dataset ID
DESTINATION_ASSET_NAME: your Dataplex asset name for
the destination Cloud Storage bucket
REGION_NAME: region in which to run the job

他の Google Cloud 提供またはカスタム Dataflow テンプレートをスケジュールする

Dataplex を使用すると、Google Cloud 提供の Dataflow テンプレートまたはカスタム Dataflow テンプレートをコンソールでスケジュールしてモニタリングできます。

スケジュール

Console

  1. Google Cloud コンソールで、[Dataplex] ページに移動します。

    Dataplex に移動

  2. [プロセス] ビューに移動します。

  3. [タスクの作成] をクリックします。

  4. [Dataflow パイプラインの作成] で、[Dataflow パイプラインを作成] をクリックします。

  5. Dataplex レイクを選択します。

  6. タスク名を指定します。

  7. タスクを実行するリージョンを選択します。

  8. Dataflow テンプレートを選択します。

  9. 必要なパラメータを入力します。

  10. [続行] をクリックします。

モニタリング

Console

  1. Google Cloud コンソールで、[Dataplex] ページに移動します。

    Dataplex に移動

  2. [プロセス] ビューに移動します。

  3. [Dataflow パイプライン] をクリックします。

  4. レイク名またはパイプライン名でフィルタします。