Dataplex では、Dataflow を利用して、データの取り込み、処理、データ ライフサイクルの管理などの一般的なデータ処理タスクを実行するテンプレートが用意されています。このガイドでは、データ処理テンプレートを構成して実行する方法について説明します。
始める前に
Dataplex テンプレートは、Dataflow を利用しています。テンプレートを使用する前に、Dataflow API を有効にします。
メモ
すべてのテンプレートで一般的な Dataflow パイプライン オプションがサポートされます。
Dataplex はデータ パイプラインを使用して、テンプレートで定義されたタスクのスケジュールを設定します。
スケジュールしたタスクは、Google Cloud コンソールの [Dataplex] ページでのみ確認できます。
テンプレート: 元データをキュレートされたデータに変換する
Dataplex ファイル形式変換テンプレートは、Dataplex Cloud Storage アセットまたは CSV または JSON 形式に保存されている Dataplex エンティティのリストのデータを、別の Dataplex アセットの Parquet、もしくは Avro 形式データに変換します。パーティションのレイアウトは変換において保持されます。また、出力ファイルを圧縮することもできます。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputAssetOrEntitiesList |
入力ファイルを含む Dataplex アセットまたは Dataplex エンティティ。このパラメータは、projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> または projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/entities/<entity1-name>,projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/entities/<entity 2 name>... の形式で指定します。 |
outputFileFormat |
Cloud Storage の出力ファイル形式。このパラメータは、PARQUET または AVRO の形式にする必要があります。 |
outputAsset |
出力ファイルが保存される Cloud Storage バケットを含む Dataplex アセットの名前。このパラメータは、projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> の形式に従う必要があります。outputAsset は、Google Cloud Console の [Dataplex アセット Details ] タブにあります。 |
outputFileCompression |
(省略可)出力ファイルの圧縮。このパラメータのデフォルト値は SNAPPY です。パラメータの他の値は、UNCOMPRESSED 、SNAPPY 、GZIP 、BZIP2 です。BZIP2 は、PARQUET ファイルではサポートされていません。 |
writeDisposition |
省略可: 宛先ファイルがすでに存在する場合に実施するアクションを指定します。このパラメータのデフォルト値は SKIP です。これは、宛先のディレクトリに存在しないファイルのみを処理することを示します。パラメータには、OVERWRITE (既存のファイルを上書きする)または FAIL (少なくとも 1 つの宛先ファイルがすでに存在する場合は何も処理せず、エラーが発生する)のいずれかを指定できます。 |
updateDataplexMetadata |
省略可: 新しく作成されたエンティティの Dataplex メタデータを更新するかどうか。このパラメータのデフォルト値は 有効にすると、パイプラインはソースからスキーマをコピー先の Dataplex エンティティに自動的にコピーし、自動化された Dataplex Discovery は実行されません。ソース(未加工)データのスキーマが Dataplex によって管理されている場合は、このフラグを使用します。 |
テンプレートを実行する
コンソール
Google Cloud コンソールの [Dataplex] ページに移動します。
[Process] ビューに移動します。
[タスクの作成] をクリックします。
[キュレートされた形式に変換] で [タスクを作成] をクリックします。
Dataplex レイクを選択します。
タスク名を指定します。
タスク実行のリージョンを選択します。
必要なパラメータを入力します。
[続行] をクリックします。
gcloud
以下を置き換えます。
JOB_NAME: a job name of your choice PROJECT_ID: your template project ID REGION_NAME: region in which to run the job INPUT_ASSET_OR_ENTITIES_LIST: path to your JDBC drivers OUTPUT_FILE_FORMAT: your output file format in Cloud Storage OUTPUT_ASSET: your Dataplex output asset ID
シェルまたはターミナルで、テンプレートを実行します。
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_File_Format_Conversion_Preview \ --parameters \ inputAssetOrEntitiesList=INPUT_ASSET_OR_ENTITIES_LIST,\ outputFileFormat=OUTPUT_FILE_FORMAT,\ outputAsset=OUTPUT_ASSET
REST API
以下を置き換えます。
PROJECT_ID: your template project ID REGION_NAME: region in which to run the job JOB_NAME: a job name of your choice INPUT_ASSET_OR_ENTITIES_LIST: path to your JDBC drivers OUTPUT_FILE_FORMAT: your output file format in Cloud Storage OUTPUT_ASSET: your Dataplex output asset ID
HTTP POST リクエストの送信:
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION_NAME/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputAssetOrEntitiesList": "INPUT_ASSET_OR_ENTITIES_LIST", "outputFileFormat": "OUTPUT_FILE_FORMAT", "outputAsset": "OUTPUT_ASSET", }, "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_File_Format_Conversion_Preview", } }
テンプレート: BigQuery アセットから Cloud Storage アセットへの階層データ
Dataplex BigQuery to Cloud Storage テンプレートは、Dataplex BigQuery アセットから、Dataplex 互換のレイアウトと形式で Dataplex Cloud Storage アセットにデータをコピーします。BigQuery データセットやコピーする BigQuery テーブルのリストを指定できます。柔軟性を高めるため、このテンプレートでは、指定した変更日よりも古いデータをコピーできます。コピーの成功後に、必要に応じて BigQuery からデータを削除できます。
パーティション分割テーブルを BigQuery から Cloud Storage にコピーする場合、このテンプレートにより、Cloud Storage バケットに Hive スタイルのパーティションが作成されます。Cloud Storage に書き込むときに、テンプレートによって、既存のパーティション列に接尾辞 _pid
を追加して、新しいパーティション キーが作成されます。これは、外部テーブルとして BigQuery のデータにアクセスするために必要です。現在のところ、BigQuery では、Hive スタイルのパーティション キーを既存の列と同じにすることはできません。そのため、コピーしたテーブルを BigQuery から外部テーブルとして見ると、パーティション キーの追加列が含まれています。残りのデータはそのまま保持されます。
パーティション分割テーブルを BigQuery から Cloud Storage にコピーする場合:
- このテンプレートにより、Cloud Storage バケットに Hive スタイルのパーティションが作成されます。BigQuery では、Hive スタイルのパーティション キーを既存の列と同じにすることはできません。オプション
enforceSamePartitionKey
を使用して、新しいパーティション キーを作成するか、同じパーティション キーを維持しながら既存の列の名前を変更できます。 - Dataplex ディスカバリでは、BigQuery テーブル(および Dataproc Metastore のテーブル)の作成時にパーティション タイプを
string
として登録します。これは既存のパーティション フィルタに影響する可能性があります。
1 回のテンプレート実行で変換できるテーブルとパーティションの数には上限(約 300)があります。正確な数は、テーブル名の長さやその他の要因によって異なります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
sourceBigQueryDataset |
データを階層化する BigQuery データセット。このパラメータには、projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> 形式の Dataplex アセット名または projects/<name>/datasets/<dataset-id> 形式の BigQuery データセット ID のいずれかを含める必要があります。 |
destinationStorageBucketAssetName |
データを階層化する Cloud Storage バケットの Dataplex アセット名。このパラメータは、projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> の形式を使用します。 |
tables |
省略可: 階層化する BigQuery テーブルのカンマ区切りのリスト。リストを指定しない場合、すべてのテーブルが階層化されます。テーブルは名前のみで指定します(プロジェクト/データセット接頭辞なし)。また、大文字と小文字は区別されます。 |
exportDataModifiedBeforeDateTime |
省略可: このパラメータを使用すると、この日付より前(およびオプションの時刻)にデータが移動します。パーティション分割 BigQuery テーブルの場合、この日時より前に最終更新されたパーティションを移動します。パーティション分割されていないテーブルの場合は、この日時より前にテーブルが最後に更新された場合に移動します。指定しない場合は、すべてのテーブル/パーティションを移動します。日付と時刻はデフォルトでデフォルトのタイムゾーンで解析されますが、オプションの接尾辞 Z と +HH:mm がサポートされています。このパラメータは、YYYY-MM-DD 、YYYY-MM-DDTHH:mm:ss 、YYYY-MM-DDTHH:mm:ss+03:00 の形式に従う必要があります。相対的な日時もサポートされており、-PnDTnHnMn.nS という形式に従う必要があります(-P で始まる必要があります。これは過去の時刻を示します)。 |
fileFormat |
省略可: Cloud Storage の出力ファイル形式。このパラメータのデフォルト値は PARQUET です。パラメータに別の値を AVRO にすることもできます。 |
fileCompression |
(省略可)出力ファイルの圧縮。このパラメータのデフォルト値は SNAPPY です。パラメータの他の値は、UNCOMPRESSED 、SNAPPY 、GZIP 、BZIP2 です。BZIP2 は、PARQUET ファイルではサポートされていません。 |
deleteSourceData |
省略可: エクスポートの成功後に BigQuery からソースデータを削除するかどうか。値は true または false のいずれかです。このパラメータのデフォルト値は false です。 |
partitionIdRegExp |
(省略可)この正規表現のみと一致するパーティション ID を持つパーティションを処理します。値が指定されていない場合、このパラメータはデフォルトですべて処理されます。 |
writeDisposition |
省略可: 宛先ファイルがすでに存在する場合、つまり 1 つ以上のテーブル/パーティションがすでに階層化されている場合に実施するアクションを指定します。このパラメータのデフォルト値は SKIP です。これは、事前階層化されていないテーブルまたはパーティションのみを処理するシグナルであることを示します。パラメータには、OVERWRITE (既存のファイルを上書きする)または FAIL (少なくとも 1 つの宛先ファイルがすでに存在する場合は何も処理せず、エラーが発生する)のいずれかを指定できます。 |
enforceSamePartitionKey |
(省略可)同じパーティション キーを適用するかどうか。BigQuery の制限により、パーティション分割された外部テーブルのパーティション キー(ファイルパス内)を、ファイル内のいずれかの列と同じ名前にすることはできません。このパラメータが true(デフォルト値)の場合、ターゲット ファイルのパーティション キーが元のパーティション列名に設定され、ファイル内の列の名前が変更されます。false の場合、パーティション キーの名前が変更されます。 たとえば、元のテーブルが
|
updateDataplexMetadata |
省略可: 新しく作成されたエンティティの Dataplex メタデータを更新するかどうか。このパラメータのデフォルト値は 有効にすると、パイプラインはソースからスキーマをコピー先の Dataplex エンティティに自動的にコピーし、自動化された Dataplex Discovery は実行されません。このフラグは、ソース BigQuery テーブルのスキーマを管理する場合に使用します。 |
テンプレートを実行する
コンソール
Google Cloud コンソールの [Dataplex] ページに移動します。
[プロセス] ビューに移動します。
[Create Task] をクリックします。
[Tier from BQ to GCS Assets] で、[Create task] をクリックします。
Dataplex レイクを選択します。
タスク名を指定します。
タスク実行のリージョンを選択します。
必要なパラメータを入力します。
[続行] をクリックします。
gcloud
以下を置き換えます。
JOB_NAME: a job name of your choice PROJECT_ID: your template project ID REGION_NAME: region in which to run the job SOURCE_ASSET_NAME_OR_DATASET_ID: your Dataplex asset name for the source BigQuery dataset, or the dataset ID DESTINATION_ASSET_NAME: your Dataplex asset name for the destination Cloud Storage bucket
シェルまたはターミナルで、テンプレートを実行します。
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_BigQuery_to_GCS_Preview \ --parameters \ sourceBigQueryDataset=SOURCE_ASSET_NAME_OR_DATASET_ID,\ destinationStorageBucketAssetName=DESTINATION_ASSET_NAME
REST API
以下を置き換えます。
PROJECT_ID: your template project ID REGION_NAME: region in which to run the job JOB_NAME: a job name of your choice SOURCE_ASSET_NAME_OR_DATASET_ID: your Dataplex asset name for the source BigQuery dataset, or the dataset ID DESTINATION_ASSET_NAME: your Dataplex asset name for the destination Cloud Storage bucket REGION_NAME: region in which to run the job
HTTP POST リクエストの送信:
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION_NAME/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "sourceBigQueryDataset": "SOURCE_ASSET_NAME_OR_DATASET_ID", "destinationStorageBucketAssetName": "DESTINATION_ASSET_NAME", }, "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_BigQuery_to_GCS_Preview", } }
他の Google Cloud 提供またはカスタムの Dataflow テンプレートをスケジュールする
Dataplex では、Google Cloud が提供する Dataflow テンプレートまたはカスタム Dataflow テンプレートをコンソールでスケジュールしてモニタリングできます。
スケジュール
コンソール
Google Cloud コンソールの [Dataplex] ページに移動します。
[プロセス] ビューに移動します。
[Create Task] をクリックします。
[Dataflow パイプラインの作成] で、[Dataflow パイプラインを作成] をクリックします。
Dataplex レイクを選択します。
タスク名を入力します。
タスクを実行するリージョンを選択します。
Dataflow テンプレートを選択します。
必要なパラメータを入力します。
[続行] をクリックします。
モニタリング
コンソール
Google Cloud コンソールの [Dataplex] ページに移動します。
[Process] ビューに移動します。
[Dataflow パイプライン] をクリックします。
レイクまたはパイプラインの名前でフィルタします。