このドキュメントでは、組み込み BigQuery と外部の BigQuery テーブルのデータ品質チェックをスケジューリングして実行できるようにする Dataplex のデータ品質タスクを作成する方法について説明します。
詳細については、データ品質タスクの概要をご覧ください。
始める前に
このドキュメントでは、データ品質タスクを作成する既存の Dataplex レイクがあることを前提としています。
データ品質タスクを作成する前に、次の操作を行います。
Google API とサービスの有効化
Dataproc API を有効にします。
ネットワークやサブネットワークで限定公開の Google アクセスを有効にします。Dataplex のデータ品質タスクで使用する予定のネットワークで、限定公開の Google アクセスを有効にします。Dataplex のデータ品質タスクを作成する際にネットワークまたはサブネットワークを指定しない場合、Dataplex はデフォルトのサブネットを使用します。その場合は、デフォルトのサブネットで限定公開の Google アクセスを有効にする必要があります。
仕様ファイルを作成する
Dataplex は、ドライバ プログラムとしてオープンソースの CloudDQ を使用します。Dataplex のデータ品質チェック要件は、CloudDQ YAML 仕様ファイルで定義されます。
データ品質タスクへの入力として、単一の YAML ファイル、または 1 つ以上の YAML ファイルを含む単一の zip アーカイブを作成できます。データ品質チェックの要件は、セクションごとに 1 ファイルで、別々の YAML 仕様ファイルにキャプチャすることをおすすめします。
仕様ファイルを準備するには、次の手順を行います。
-
データ品質チェックの要件を定義する CloudDQ YAML 仕様ファイルを 1 つ以上作成します。必要な構文の詳細については、このドキュメントの仕様ファイルについてのセクションをご覧ください。
YAML 仕様ファイルを
.yml
形式または.yaml
形式で保存します。複数の YAML 仕様ファイルを作成する場合は、すべてのファイルを 1 つの ZIP アーカイブに保存してください。 - Cloud Storage バケットを作成する。
- Cloud Storage バケットに仕様ファイルをアップロードします。
仕様ファイルについて
CloudDQ YAML 仕様ファイルには、次のセクションが必要です。
ルール(最上位の
rules
YAML ノードの下に定義される): 実行するルールのリスト。これらのルールは、NOT_NULL
やREGEX
などの事前定義ルールタイプから作成することも、CUSTOM_SQL_EXPR
やCUSTOM_SQL_STATEMENT
などのカスタム SQL ステートメントを拡張することもできます。CUSTOM_SQL_EXPR
ステートメントは、custom_sql_expr
がFalse
と評価したあらゆる行を失敗として報告します。CUSTOM_SQL_STATEMENT
ステートメントは、ステートメント全体によって返されたあらゆる値を失敗として報告します。行フィルタ(最上位の
row_filters
YAML ノードで定義される): SQL 式。ブール値を定義して、検証のために基盤となるエンティティ サブジェクトからデータのサブセットをフェッチします。ルール バインディング(最上位の
rule_bindings
YAML ノードの下で定義される): テーブルに適用するrules
とrule filters
を定義します。ルールのディメンション(
rule_dimensions
YAML ノードで定義): ルールが対応するdimension
フィールドで定義できるデータ品質ルールのディメンションの許可リストを定義します。次に例を示します。
rule_dimensions: - consistency - correctness - duplication - completeness - conformance
ルールでは、
dimension
フィールドは省略可能です。いずれかのルールにdimension
が含まれている場合、ルールのディメンションのセクションは必須です。
詳細については、CloudDQ リファレンス ガイドとサンプル仕様ファイルをご覧ください。
結果を格納する BigQuery データセットを作成する
-
結果を保存するには、BigQuery データセットを作成します。
データセットは、データ品質タスクを実行するテーブルと同じリージョンに存在する必要があります。
Dataplex はこのデータセットを使用して、結果を格納する選択したテーブルを作成または再利用します。
サービス アカウントを作成する
次の Identity and Access Management(IAM)のロールと権限を持つサービス アカウントを作成します。
- YAML 仕様を含む Cloud Storage パスへの読み取りアクセス権。Cloud Storage バケットにはストレージ オブジェクト閲覧者のロール(
roles/storage.objectViewer
)を使用できます。 - 検証するデータを含む BigQuery データセットに対する読み取りアクセス権。BigQuery データ閲覧者のロールを使用できます。
- テーブルを作成して(必要な場合)、結果をそのテーブルに書き込むための BigQuery データセットへの書き込みアクセス権。データセット レベルで BigQuery データ編集者のロール(
roles/bigquery.dataEditor
)を使用できます。 - プロジェクトで BigQuery ジョブを作成するためのプロジェクト レベルの BigQuery ジョブユーザーのロール(
roles/bigquery.jobUser
)。 - プロジェクトまたはレイクのレベルでの Dataplex メタデータ読者のロール(
roles/dataplex.metadataReader
)。 - プロジェクト レベルでの Service Usage ユーザーのロール(
roles/serviceusage.serviceUsageConsumer
)。 - Dataproc ワーカーのロール。
- ジョブを送信するユーザーに付与される
iam.serviceAccounts.actAs
権限。 - Dataplex レイク サービス アカウントに付与されたサービス アカウント ユーザーのロール。Dataplex レイク サービス アカウントは Google Cloud コンソールで確認できます。
省略可: 詳細設定を使用する
この手順は省略可能です。
BigQuery はデフォルトで、現在のユーザー プロジェクトでデータ品質チェックを実行します。また、タスクの
--execution-args
プロパティで--gcp_project_id
TASK_ARGS
引数を使用して、別のプロジェクトを選択して BigQuery ジョブを実行することもできます。BigQuery クエリを実行するように指定されたプロジェクト ID が、サービス アカウント(
--execution-service-account
で指定)が作成されるプロジェクトと異なる場合は、プロジェクト間サービス アカウントの使用を無効にする(iam.disableServiceAccountCreation
)組織のポリシーがオフになっていることを確認します。また、BigQuery クエリが実行されているプロジェクトの BigQuery ジョブ スケジュールにサービス アカウントがアクセスできることを確認してください。
制限事項
- 所与のデータ品質タスクに指定するすべてのテーブルは、同じ Google Cloud リージョンに属している必要があります。
データ品質タスクをスケジュール設定する
コンソール
- Google Cloud コンソールで、Dataplex の [処理] ページに移動します。
- [タスクの作成] をクリックします。
- [データ品質の確認] カードで、[タスクを作成] をクリックします。
- [Dataplex レイク] で、レイクを選択します。
- [ID] に ID を入力します。
- [データ品質仕様] セクションで、次のようにします。
- [Select GCS file] フィールドで [参照] をクリックします。
Cloud Storage バケットを選択します。
[Select] をクリックします。
[結果テーブル] セクションで、次のようにします。
[BigQuery データセットの選択] フィールドで、[参照] をクリックします。
検証結果を格納する BigQuery データセットを選択します。
[選択] をクリックします。
[BigQuery テーブル] フィールドに、結果を保存するテーブルの名前を入力します。テーブルが存在しない場合は、Dataplex によって作成されます。内部処理タスク用に予約されているため、名前
dq_summary
は使用しないでください。
[サービス アカウント] セクションで、[ユーザー サービス アカウント] メニューからサービス アカウントを選択します。
[続行] をクリックします。
[スケジュールの設定] セクションで、データ品質タスクを実行するスケジュールを構成します。
[作成] をクリックします。
gcloud CLI
Dataplex タスクの gcloud CLI コマンドを使用したデータ品質タスクの実行例を次に示します。
export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH" # Google Cloud project where the Dataplex task is created. export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT" # Google Cloud region for the Dataplex lake. export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID" # Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region. export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}" # The Dataplex lake where your task is created. export DATAPLEX_LAKE_NAME="DATAPLEX_LAKE_NAME" # The service account used for running the task. Ensure that this service account has sufficient IAM permissions on your project, including BigQuery Data Editor, BigQuery Job User, Dataplex Editor, Dataproc Worker, and Service Usage Consumer. # The BigQuery dataset used for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding. export TARGET_BQ_DATASET="TARGET_BQ_DATASET" # If you want to use a different dataset for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding, use the following: export CLOUDDQ_BIGQUERY_DATASET=$TARGET_BQ_DATASET # The BigQuery dataset where the final results of the data quality checks are stored. This could be the same as CLOUDDQ_BIGQUERY_DATASET. export TARGET_BQ_DATASET="TARGET_BQ_DATASET" # The BigQuery table where the final results of the data quality checks are stored. export TARGET_BQ_TABLE="TARGET_BQ_TABLE" # The unique identifier for the task. export TASK_ID="TASK_ID" gcloud dataplex tasks create \ --location="${DATAPLEX_REGION_ID}" \ --lake="${DATAPLEX_LAKE_NAME}" \ --trigger-type=ON_DEMAND \ --execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \ --spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \ --spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \ --execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \ "$TASK_ID"
パラメータ | 説明 |
---|---|
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH |
データ品質タスクのデータ品質 YAML 構成入力への Cloud Storage パス。.yml 形式または .yaml 形式の単一の YAML ファイル、または複数の YAML ファイルを含む ZIP アーカイブを使用できます。 |
GOOGLE_CLOUD_PROJECT |
Dataplex タスクと BigQuery ジョブが作成される Google Cloud プロジェクト。 |
DATAPLEX_REGION_ID |
データ品質タスクが作成される Dataplex レイクのリージョン。 |
SERVICE_ACCOUNT |
タスクを実行するために使用されるサービス アカウント。このサービス アカウントに、始める前にセクションに概説されているとおりに十分な IAM 権限があることを確認してください。 |
--execution-args
の場合、次の引数を、配置した引数として次の順序で渡す必要があります。
引数 | 説明 |
---|---|
clouddq-executable.zip |
Cloud Storage の公開バケットから spark-file-uris で渡されたプリコンパイルされた実行可能ファイル。 |
ALL |
すべてのルール バインディングを実行します。または、特定のルール バインディングをカンマ区切りのリストとして指定できます。
例: RULE_1,RULE_2 |
gcp-project-id |
BigQuery クエリを実行するプロジェクト ID。 |
gcp-region-id |
データ品質検証用の BigQuery ジョブを実行するリージョン。このリージョンは、gcp-bq-dataset-id と target_bigquery_summary_table のリージョンと同じにする必要があります。 |
gcp-bq-dataset-id |
rule_binding ビューと中間データ品質サマリーの結果を保存するために使用される BigQuery データセットです。 |
target-bigquery-summary-table |
データ品質チェックの最終結果が保存される BigQuery テーブルのテーブル ID 参照。 ID の値 dq_summary は内部処理タスク用に予約されているため、使用しないでください。 |
--summary_to_stdout |
(省略可)このフラグを渡すと、最後の実行で dq_summary テーブルに作成されたすべての検証結果行が JSON レコードとして Cloud Logging と stdout にログとして保存されます。 |
API
次のように置き換えます。
PROJECT_ID = "Your Dataplex Project ID" REGION = "Your Dataplex lake region" LAKE_ID = "Your Dataplex lake ID" SERVICE_ACC = "Your service account used for reading the data" DATAPLEX_TASK_ID = "Unique task ID for the data quality task" BUCKET_NAME = "Your Cloud Storage bucket name containing the CloudDQ configs or YAML specification" GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project" GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the data quality summary results" TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
- HTTP POST リクエストを送信します。
POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID} { "spark": { "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py", "file_uris": [ f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip", f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum", f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip" ] }, "execution_spec": { "args": { "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}" }, "service_account": "SERVICE_ACC" }, "trigger_spec": { "type": "ON_DEMAND" }, "description": "${DATAPLEX_TASK_DESCRIPTION}" }
スケジュール設定されたデータ品質タスクをモニタリングする
タスクをモニタリングする方法をご覧ください。
結果を見る
結果を格納するデータセットを作成するの説明に従って、データ品質検証の結果が、指定した BigQuery データセットとサマリー テーブルに保存されます。サマリー テーブルには、ルール バインディングと各検証実行のルールの組み合わせごとの出力サマリーが含まれます。サマリー テーブルの出力には、次の情報が含まれます。
列名 | 説明 |
---|---|
dataplex_lake |
(文字列)検証されるテーブルが含まれている Dataplex レイクの ID。 |
dataplex_zone |
(文字列)検証されるテーブルを含む Dataplex ゾーンの ID。 |
dataplex_asset_id |
(文字列)検証されるテーブルを含む Dataplex アセットの ID。 |
execution_ts |
(timestamp)検証クエリが実行された時点のタイムスタンプ。 |
rule_binding_id |
(文字列)検証結果がレポートされるルール バインディングの ID。 |
rule_id |
(文字列)検証結果がレポートされるルール バインディングの下にあるルールの ID。 |
dimension |
(文字列)rule_id のデータ品質ディメンション。この値は、rule_dimensions YAML ノードで指定される値のいずれかです。 |
table_id |
(文字列)検証結果が報告されるエンティティの ID。この ID は、それぞれのルール バインディングの entity パラメータで指定されます。 |
column_id |
(文字列)検証結果が報告される列の ID。
この ID は、それぞれのルール バインディングの column パラメータで指定されます。 |
last_modified |
(タイムスタンプ)検証されている table_id の最後に更新されたタイムスタンプ。 |
metadata_json_string |
(文字列)ルール バインディングまたはデータ品質実行中に指定されたメタデータ パラメータ コンテンツの Key-Value ペア。 |
configs_hashsum |
(文字列)ルール バインディングとそれに関連するすべてのルール、ルール バインディング、行フィルタ、エンティティ構成を含む JSON ドキュメントのハッシュ合計。
configs_hashsum により、rule_binding ID または参照先の構成のいずれかの内容が変更されたときに、追跡できます。 |
dq_run_id |
(文字列)レコードの一意の ID。 |
invocation_id |
(文字列)データ品質実行の ID。同じデータ品質実行インスタンス内で生成されたすべてのデータ品質サマリー レコードは、同じ invocation_id を共有します。 |
progress_watermark |
(ブール値)増分検証のハイ ウォーターマークを判断する際に、このレコードがデータ品質チェックで考慮されるかどうかを決定します。FALSE の場合、ハイ ウォーターマーク値を確立するときに、個々のレコードは無視されます。この情報は、ハイ ウォーターマークを先に進めてはならないテストデータ品質検証を実行する際に役立ちます。Dataplex はデフォルトでこのフィールドに TRUE を設定しますが、引数 --progress_watermark の値が FALSE の場合にはオーバーライドできます。
|
rows_validated |
(整数)incremental_time_filter_column_id 列への row_filters とハイ ウォーターマーク フィルタの適用後に検証されたレコードの合計数(指定されている場合)。 |
complex_rule_validation_errors_count |
(浮動小数点数)CUSTOM_SQL_STATEMENT ルールによって返された行数。 |
complex_rule_validation_success_flag |
(ブール値)CUSTOM_SQL_STATEMENT ルールの成功ステータス。
|
success_count |
(整数)検証に合格したレコードの合計数。CUSTOM_SQL_STATEMENT ルールの場合、このフィールドは NULL に設定されます。
|
success_percentage |
(浮動小数点数)検証されたレコードの合計数のうち、検証に合格したレコード数の割合。CUSTOM_SQL_STATEMENT ルールの場合、このフィールドは NULL に設定されます。 |
failed_count |
(整数)検証で不合格となったレコードの合計数。CUSTOM_SQL_STATEMENT ルールの場合、このフィールドは NULL に設定されます。
|
failed_percentage |
(浮動小数点数)検証されたレコードの合計数のうち、検証で不合格だったレコード数の割合。CUSTOM_SQL_STATEMENT ルールの場合、このフィールドは NULL に設定されます。 |
null_count |
(整数)検証中に null を返したレコードの合計数。
NOT_NULL ルールと CUSTOM_SQL_STATEMENT ルールの場合、このフィールドは NULL に設定されます。 |
null_percentage |
(浮動小数点数)検証されたレコードの合計数のうち、検証中に null を返したレコード数の割合。NOT_NULL ルールと CUSTOM_SQL_STATEMENT ルールの場合、このフィールドは NULL に設定されます。 |
failed_records_query |
失敗したすべてのルールに対して、失敗したレコードを取得するために使用できるクエリがこの列に格納されます。このドキュメントの、failed_records_query で失敗したルールのトラブルシューティングをご覧ください。 |
BigQuery エンティティの場合、最新の実行の SQL 検証ロジックを含むすべての rule_binding
に対してビューが作成されます。これらのビューは、引数 --gcp-bq-dataset-id
で指定された BigQuery データセット内にあります。
コストの最適化
次の最適化によって費用を削減できます。
段階的な検証
多くの場合、新しいパーティション(新しい行)で定期的に更新されるテーブルがあります。すべての実行で古いパーティションを再検証しない場合は、増分検証を使用できます。
増分検証では、列の値が単調に増加する TIMESTAMP
型または DATETIME
型の列をテーブルに含める必要があります。BigQuery テーブルのパーティショニングの基準となる列を使用できます。
増分検証を指定するには、ルール バインディングの一部として incremental_time_filter_column_id=TIMESTAMP/DATETIME type column
の値を指定します。
列を指定すると、データ品質タスクで、最後に実行されたデータ品質タスクのタイムスタンプよりも TIMESTAMP
値が大きい行のみが考慮されます。
仕様ファイルの例
これらのサンプルを使用するには、sales
という名前の BigQuery データセットを作成します。次に、sales_orders
という名前のファクト テーブルを作成し、次の GoogleSQL ステートメントを使用してクエリを実行してサンプルデータを追加します。
CREATE OR REPLACE TABLE sales.sales_orders
(
id STRING NOT NULL,
last_modified_timestamp TIMESTAMP,
customer_id STRING,
item_id STRING,
amount NUMERIC,
transaction_currency STRING
);
INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")
サンプル 1
次のコードサンプルでは、これらの値を検証するためのデータ品質チェックを作成します。
amount
: 値は 0 または正の数値です。item_id
: アルファベットの 5 文字からなる英数字の文字列の後に 15 桁の数字が続きます。transaction_currency
: 使用可能な通貨の種類(静的リストで定義されているとおり)。このサンプルの静的リストでは、通貨のタイプとして GBP と JPY を使用できます。この検証は、「国際」とマークされた行にのみ適用されます。
# The following `NONE` row filter is required.
row_filters:
NONE:
filter_sql_expr: |-
True
# This filters for rows marked as international (INTNL).
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can apply to multiple tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
# Rule bindings associate rules to columns within tables.
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
次のように置き換えます。
PROJECT_ID
: 実際のプロジェクト ID。DATASET_ID
: データセット ID。
Sample 2
チェックするテーブルが Dataplex レイクの一部である場合は、レイクまたはゾーンの表記法を使用してテーブルを指定できます。これにより、レイクまたはゾーンごとに結果を集計できます。たとえば、ゾーンレベルのスコアを生成できます。
このサンプルを使用するには、レイク ID operations
とゾーン ID procurement
を使用して Dataplex レイクを作成します。次に、テーブル sales_orders
をアセットとしてゾーンに追加します。
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
次のように置き換えます。
- PROJECT_ID: 実際のプロジェクト ID。
- REGION_ID: テーブルが存在する Dataplex レイクのリージョン ID(
us-central1
など)。
サンプル 3
この例では、ID 値が一意かどうかを確認するカスタム SQL チェックを追加して、サンプル 2 を強化します。
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
サンプル 4
この例では、last_modified_timestamp
列を使用して増分検証を追加して、サンプル 3 を強化します。1 つ以上のルール バインディングに対して増分検証を追加できます。
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_CURRENCY_ID
failed_records_query
で失敗したルールのトラブルシューティング
失敗したすべてのルールに対して、概要テーブルの failed_records_query
列に、失敗したレコードを取得するために使用できるクエリが格納されます。
デバッグするために、YAML ファイルで reference columns
を使用することもできます。これにより、failed_records_query
の出力を元のデータと結合し、レコード全体を取得できます。たとえば、primary_key
列または複合 primary_key
列を参照列として指定できます。
参照列を指定する
参照列を生成するには、YAML 仕様に次を追加します。
reference_columns
セクション。このセクションでは、1 つ以上の参照列セットを作成し、各セットで 1 つ以上の列を指定できます。rule_bindings
セクション。このセクションでは、参照列 ID(reference_columns_id
)を指定するルール バインディングに行を追加して、そのルール バインディングのルールに使用できます。reference_columns
セクションで指定されている参照列セットのいずれかである必要があります。
たとえば、次の YAML ファイルでは reference_columns
セクションを指定し、ORDER_DETAILS_REFERENCE_COLUMNS
セットの一部として id
、last_modified_timestamp
、item_id
の 3 つの列を定義しています。 次の例では、サンプル テーブル sales_orders
を使用しています。
reference_columns:
ORDER_DETAILS_REFERENCE_COLUMNS:
include_reference_columns:
- id
- last_modified_timestamp
- item_id
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
params:
custom_sql_expr: |-
row_filters:
NONE:
filter_sql_expr: |-
True
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
rule_ids:
- VALUE_ZERO_OR_POSITIVE
失敗したレコードのクエリの使用
失敗したレコードクエリでは、失敗したルールがあるすべてのレコードに行が生成されます。これには、失敗をトリガーした列名、失敗をトリガーした値、参照列の値が含まれます。また、データ品質タスクの実行に関連して使用できるメタデータも含まれています。
次の例は、参照列を指定するで説明されている YAML ファイルに対して失敗したレコードクエリからの出力の例です。列 amount
の失敗と、失敗の値 -10
が示されます。また、参照列に対応する値も記録されます。
_dq_validation_invocation_id | _dq_validation_rule_binding_id | _dq_validation_rule_id | _dq_validation_column_id | _dq_validation_column_value | _dq_validation_dimension | _dq_validation_simple_rule_row_is_valid | _dq_validation_complex_rule_validation_errors_count | _dq_validation_complex_rule_validation_success_flag | id | last_modified_timestamp | item_id |
---|---|---|---|---|---|---|---|---|---|---|---|
10a25be9-8dfa-446c-a42c-75f6bb4a49d9 | TRANSACTION_AMOUNT_VALID | VALUE_ZERO_OR_POSITIVE | amount | -10 | FALSE | order1 | 2022-01-22T02:30:06.321Z | bad_item_id |
CUSTOM_SQL_STATEMENT ルールの失敗したレコードクエリを使用する
CUSTOM_SQL_STATEMENT
ルールの場合、失敗したレコードクエリには custom_sql_statement_validation_errors
列が含まれます。custom_sql_statement_validation_errors
列は、SQL ステートメントの出力と一致するフィールドがあるネストされた列です。CUSTOM_SQL_STATEMENT
ルールの失敗したレコードのクエリに参照列は含まれません。
たとえば、CUSTOM_SQL_STATEMENT
ルールは次のようになります。
rules: TEST_RULE: rule_type: CUSTOM_SQL_STATEMENT custom_sql_arguments: - existing_id - replacement_id params: CUSTOM_SQL_STATEMENT: |- (SELECT product_name, product_key FROM data where $existing_id != $replacement_id)
custom_sql_statement_validation_errors
列の 1 つ以上の行が含まれ、existing_id!=replacement_id
であるすべての発生の行があります。JSON でレンダリングされると、この列のセルの内容は次のようになる場合があります。
{ "custom_sql_statement_valdation_errors" :{ "product_name"="abc" "product_key"="12345678" "_rule_binding_id"="your_rule_binding" } }
join on custom_sql_statement_valdation_errors.product_key
のようなネストされた参照で、これらの結果を元のテーブルに結合できます。
次のステップ
- CloudDQ YAML 仕様リファレンス。
- データ品質ルールの例: 単純なルールまたは高度なルールをご覧ください。
- Dataplex データ品質タスク用の Airflow DAG の例。