データ品質タスクを使用する

このドキュメントでは、組み込み BigQuery と外部の BigQuery テーブルのデータ品質チェックをスケジューリングして実行できるようにする Dataplex のデータ品質タスクを作成する方法について説明します。

詳細については、データ品質タスクの概要をご覧ください。

始める前に

このドキュメントでは、データ品質タスクを作成する既存の Dataplex レイクがあることを前提としています。

データ品質タスクを作成する前に、次の操作を行います。

Google API とサービスの有効化

  1. Dataproc API を有効にします。

    API の有効化

  2. ネットワークやサブネットワークで限定公開の Google アクセスを有効にします。Dataplex のデータ品質タスクで使用する予定のネットワークで、限定公開の Google アクセスを有効にします。Dataplex のデータ品質タスクを作成する際にネットワークまたはサブネットワークを指定しない場合、Dataplex はデフォルトのサブネットを使用します。その場合は、デフォルトのサブネットで限定公開の Google アクセスを有効にする必要があります。

仕様ファイルを作成する

Dataplex は、ドライバ プログラムとしてオープンソースの CloudDQ を使用します。Dataplex のデータ品質チェック要件は、CloudDQ YAML 仕様ファイルで定義されます。

データ品質タスクへの入力として、単一の YAML ファイル、または 1 つ以上の YAML ファイルを含む単一の zip アーカイブを作成できます。データ品質チェックの要件は、セクションごとに 1 ファイルで、別々の YAML 仕様ファイルにキャプチャすることをおすすめします。

仕様ファイルを準備するには、次の手順を行います。

  1. データ品質チェックの要件を定義する CloudDQ YAML 仕様ファイルを 1 つ以上作成します。必要な構文の詳細については、このドキュメントの仕様ファイルについてのセクションをご覧ください。

    YAML 仕様ファイルを .yml 形式または .yaml 形式で保存します。複数の YAML 仕様ファイルを作成する場合は、すべてのファイルを 1 つの ZIP アーカイブに保存してください。

  2. Cloud Storage バケットを作成する
  3. Cloud Storage バケットに仕様ファイルをアップロードします。

仕様ファイルについて

CloudDQ YAML 仕様ファイルには、次のセクションが必要です。

  • ルール(最上位の rules YAML ノードの下に定義される): 実行するルールのリスト。これらのルールは、NOT_NULLREGEX などの事前定義ルールタイプから作成することも、CUSTOM_SQL_EXPRCUSTOM_SQL_STATEMENT などのカスタム SQL ステートメントを拡張することもできます。CUSTOM_SQL_EXPR ステートメントは、custom_sql_exprFalse と評価したあらゆる行を失敗として報告します。CUSTOM_SQL_STATEMENT ステートメントは、ステートメント全体によって返されたあらゆる値を失敗として報告します。

  • 行フィルタ(最上位の row_filters YAML ノードで定義される): SQL 式。ブール値を定義して、検証のために基盤となるエンティティ サブジェクトからデータのサブセットをフェッチします。

  • ルール バインディング(最上位の rule_bindings YAML ノードの下で定義される): テーブルに適用する rulesrule filters を定義します。

  • ルールのディメンション(rule_dimensions YAML ノードで定義): ルールが対応する dimension フィールドで定義できるデータ品質ルールのディメンションの許可リストを定義します。

    次に例を示します。

    rule_dimensions:
      - consistency
      - correctness
      - duplication
      - completeness
      - conformance

    ルールでは、dimension フィールドは省略可能です。いずれかのルールに dimension が含まれている場合、ルールのディメンションのセクションは必須です。

詳細については、CloudDQ リファレンス ガイドサンプル仕様ファイルをご覧ください。

結果を格納する BigQuery データセットを作成する

  • 結果を保存するには、BigQuery データセットを作成します。

    データセットは、データ品質タスクを実行するテーブルと同じリージョンに存在する必要があります。

    Dataplex はこのデータセットを使用して、結果を格納する選択したテーブルを作成または再利用します。

サービス アカウントを作成する

次の Identity and Access Management(IAM)のロールと権限を持つサービス アカウントを作成します。

省略可: 詳細設定を使用する

この手順は省略可能です。

  1. BigQuery はデフォルトで、現在のユーザー プロジェクトでデータ品質チェックを実行します。また、タスクの --execution-args プロパティで --gcp_project_id TASK_ARGS 引数を使用して、別のプロジェクトを選択して BigQuery ジョブを実行することもできます。

  2. BigQuery クエリを実行するように指定されたプロジェクト ID が、サービス アカウント(--execution-service-account で指定)が作成されるプロジェクトと異なる場合は、プロジェクト間サービス アカウントの使用を無効にするiam.disableServiceAccountCreation)組織のポリシーがオフになっていることを確認します。また、BigQuery クエリが実行されているプロジェクトの BigQuery ジョブ スケジュールにサービス アカウントがアクセスできることを確認してください。

制限事項

  • 所与のデータ品質タスクに指定するすべてのテーブルは、同じ Google Cloud リージョンに属している必要があります。

データ品質タスクをスケジュール設定する

コンソール

  1. Google Cloud コンソールで、Dataplex の [処理] ページに移動します。

    [処理] に移動

  2. [タスクの作成] をクリックします。
  3. [データ品質の確認] カードで、[タスクを作成] をクリックします。
  4. [Dataplex レイク] で、レイクを選択します。
  5. [ID] に ID を入力します。
  6. [データ品質仕様] セクションで、次のようにします。
    1. [Select GCS file] フィールドで [参照] をクリックします。
    2. Cloud Storage バケットを選択します。

    3. [Select] をクリックします。

  7. [結果テーブル] セクションで、次のようにします。

    1. [BigQuery データセットの選択] フィールドで、[参照] をクリックします。

    2. 検証結果を格納する BigQuery データセットを選択します。

    3. [選択] をクリックします。

    4. [BigQuery テーブル] フィールドに、結果を保存するテーブルの名前を入力します。テーブルが存在しない場合は、Dataplex によって作成されます。内部処理タスク用に予約されているため、名前 dq_summary は使用しないでください。

  8. [サービス アカウント] セクションで、[ユーザー サービス アカウント] メニューからサービス アカウントを選択します。

  9. [続行] をクリックします。

  10. [スケジュールの設定] セクションで、データ品質タスクを実行するスケジュールを構成します。

  11. [作成] をクリックします。

gcloud CLI

Dataplex タスクの gcloud CLI コマンドを使用したデータ品質タスクの実行例を次に示します。

export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH"

# Google Cloud project where the Dataplex task is created.
export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT"

# Google Cloud region for the Dataplex lake.
export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID"

# Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region.
export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}"

# The Dataplex lake where your task is created.
export DATAPLEX_LAKE_NAME="DATAPLEX_LAKE_NAME"

# The service account used for running the task. Ensure that this service account
has sufficient IAM permissions on your project, including
BigQuery Data Editor, BigQuery Job User,
Dataplex Editor, Dataproc Worker, and Service
Usage Consumer.

# The BigQuery dataset used for storing the intermediate data
quality summary results and the BigQuery views associated with
each rule binding.
export TARGET_BQ_DATASET="TARGET_BQ_DATASET"

# If you want to use a different dataset for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding, use the following:
export CLOUDDQ_BIGQUERY_DATASET=$TARGET_BQ_DATASET

# The BigQuery dataset where the final results of the data quality checks are stored. This could be the same as CLOUDDQ_BIGQUERY_DATASET.
export TARGET_BQ_DATASET="TARGET_BQ_DATASET"

# The BigQuery table where the final results of the data quality checks are stored.
export TARGET_BQ_TABLE="TARGET_BQ_TABLE"

# The unique identifier for the task.
export TASK_ID="TASK_ID"

gcloud dataplex tasks create \
    --location="${DATAPLEX_REGION_ID}" \
    --lake="${DATAPLEX_LAKE_NAME}" \
    --trigger-type=ON_DEMAND \
    --execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \
    --spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \
    --spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \
    --execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \
    "$TASK_ID"
パラメータ 説明
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH データ品質タスクのデータ品質 YAML 構成入力への Cloud Storage パス。.yml 形式または .yaml 形式の単一の YAML ファイル、または複数の YAML ファイルを含む ZIP アーカイブを使用できます。
GOOGLE_CLOUD_PROJECT Dataplex タスクと BigQuery ジョブが作成される Google Cloud プロジェクト。
DATAPLEX_REGION_ID データ品質タスクが作成される Dataplex レイクのリージョン。
SERVICE_ACCOUNT タスクを実行するために使用されるサービス アカウント。このサービス アカウントに、始める前にセクションに概説されているとおりに十分な IAM 権限があることを確認してください。

--execution-args の場合、次の引数を、配置した引数として次の順序で渡す必要があります。

引数 説明
clouddq-executable.zip Cloud Storage の公開バケットから spark-file-uris で渡されたプリコンパイルされた実行可能ファイル。
ALL すべてのルール バインディングを実行します。または、特定のルール バインディングをカンマ区切りのリストとして指定できます。 例: RULE_1,RULE_2
gcp-project-id BigQuery クエリを実行するプロジェクト ID。
gcp-region-id データ品質検証用の BigQuery ジョブを実行するリージョン。このリージョンは、gcp-bq-dataset-idtarget_bigquery_summary_table のリージョンと同じにする必要があります。
gcp-bq-dataset-id rule_binding ビューと中間データ品質サマリーの結果を保存するために使用される BigQuery データセットです。
target-bigquery-summary-table データ品質チェックの最終結果が保存される BigQuery テーブルのテーブル ID 参照。 ID の値 dq_summary は内部処理タスク用に予約されているため、使用しないでください。
--summary_to_stdout (省略可)このフラグを渡すと、最後の実行で dq_summary テーブルに作成されたすべての検証結果行が JSON レコードとして Cloud Logging と stdout にログとして保存されます。

API

  1. 次のように置き換えます。

    PROJECT_ID = "Your Dataplex Project ID"
    REGION = "Your Dataplex lake region"
    LAKE_ID = "Your Dataplex lake ID"
    SERVICE_ACC = "Your service account used for reading the data"
    DATAPLEX_TASK_ID = "Unique task ID for the data quality task"
    BUCKET_NAME = "Your Cloud Storage bucket name containing the CloudDQ configs or YAML specification"
    GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project"
    GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional
    GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the data quality summary results"
    TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
  2. HTTP POST リクエストを送信します。
    POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID}
    {
    "spark": {
        "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py",
        "file_uris": [  f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip",
                        f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum",
                        f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip"
                    ]
    },
    "execution_spec": {
        "args": {
            "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}"
        },
        "service_account": "SERVICE_ACC"
    },
    "trigger_spec": {
    "type": "ON_DEMAND"
    },
    "description": "${DATAPLEX_TASK_DESCRIPTION}"
    }

Dataplex データ品質タスク用の Airflow DAG のサンプルもご覧ください。

スケジュール設定されたデータ品質タスクをモニタリングする

タスクをモニタリングする方法をご覧ください。

結果を見る

結果を格納するデータセットを作成するの説明に従って、データ品質検証の結果が、指定した BigQuery データセットとサマリー テーブルに保存されます。サマリー テーブルには、ルール バインディングと各検証実行のルールの組み合わせごとの出力サマリーが含まれます。サマリー テーブルの出力には、次の情報が含まれます。

列名 説明
dataplex_lake (文字列)検証されるテーブルが含まれている Dataplex レイクの ID。
dataplex_zone (文字列)検証されるテーブルを含む Dataplex ゾーンの ID。
dataplex_asset_id (文字列)検証されるテーブルを含む Dataplex アセットの ID。
execution_ts (timestamp)検証クエリが実行された時点のタイムスタンプ。
rule_binding_id (文字列)検証結果がレポートされるルール バインディングの ID。
rule_id (文字列)検証結果がレポートされるルール バインディングの下にあるルールの ID。
dimension (文字列)rule_id のデータ品質ディメンション。この値は、rule_dimensions YAML ノードで指定される値のいずれかです。
table_id (文字列)検証結果が報告されるエンティティの ID。この ID は、それぞれのルール バインディングの entity パラメータで指定されます。
column_id (文字列)検証結果が報告される列の ID。 この ID は、それぞれのルール バインディングの column パラメータで指定されます。
last_modified (タイムスタンプ)検証されている table_id の最後に更新されたタイムスタンプ。
metadata_json_string (文字列)ルール バインディングまたはデータ品質実行中に指定されたメタデータ パラメータ コンテンツの Key-Value ペア。
configs_hashsum (文字列)ルール バインディングとそれに関連するすべてのルール、ルール バインディング、行フィルタ、エンティティ構成を含む JSON ドキュメントのハッシュ合計。 configs_hashsum により、rule_binding ID または参照先の構成のいずれかの内容が変更されたときに、追跡できます。
dq_run_id (文字列)レコードの一意の ID。
invocation_id (文字列)データ品質実行の ID。同じデータ品質実行インスタンス内で生成されたすべてのデータ品質サマリー レコードは、同じ invocation_id を共有します。
progress_watermark (ブール値)増分検証のハイ ウォーターマークを判断する際に、このレコードがデータ品質チェックで考慮されるかどうかを決定します。FALSE の場合、ハイ ウォーターマーク値を確立するときに、個々のレコードは無視されます。この情報は、ハイ ウォーターマークを先に進めてはならないテストデータ品質検証を実行する際に役立ちます。Dataplex はデフォルトでこのフィールドに TRUE を設定しますが、引数 --progress_watermark の値が FALSE の場合にはオーバーライドできます。
rows_validated (整数)incremental_time_filter_column_id 列への row_filters とハイ ウォーターマーク フィルタの適用後に検証されたレコードの合計数(指定されている場合)。
complex_rule_validation_errors_count (浮動小数点数)CUSTOM_SQL_STATEMENT ルールによって返された行数。
complex_rule_validation_success_flag (ブール値)CUSTOM_SQL_STATEMENT ルールの成功ステータス。
success_count (整数)検証に合格したレコードの合計数。CUSTOM_SQL_STATEMENT ルールの場合、このフィールドは NULL に設定されます。
success_percentage (浮動小数点数)検証されたレコードの合計数のうち、検証に合格したレコード数の割合。CUSTOM_SQL_STATEMENT ルールの場合、このフィールドは NULL に設定されます。
failed_count (整数)検証で不合格となったレコードの合計数。CUSTOM_SQL_STATEMENT ルールの場合、このフィールドは NULL に設定されます。
failed_percentage (浮動小数点数)検証されたレコードの合計数のうち、検証で不合格だったレコード数の割合。CUSTOM_SQL_STATEMENT ルールの場合、このフィールドは NULL に設定されます。
null_count (整数)検証中に null を返したレコードの合計数。 NOT_NULL ルールと CUSTOM_SQL_STATEMENT ルールの場合、このフィールドは NULL に設定されます。
null_percentage (浮動小数点数)検証されたレコードの合計数のうち、検証中に null を返したレコード数の割合。NOT_NULL ルールと CUSTOM_SQL_STATEMENT ルールの場合、このフィールドは NULL に設定されます。
failed_records_query 失敗したすべてのルールに対して、失敗したレコードを取得するために使用できるクエリがこの列に格納されます。このドキュメントの、failed_records_query で失敗したルールのトラブルシューティングをご覧ください。

BigQuery エンティティの場合、最新の実行の SQL 検証ロジックを含むすべての rule_binding に対してビューが作成されます。これらのビューは、引数 --gcp-bq-dataset-id で指定された BigQuery データセット内にあります。

コストの最適化

次の最適化によって費用を削減できます。

段階的な検証

多くの場合、新しいパーティション(新しい行)で定期的に更新されるテーブルがあります。すべての実行で古いパーティションを再検証しない場合は、増分検証を使用できます。

増分検証では、列の値が単調に増加する TIMESTAMP 型または DATETIME 型の列をテーブルに含める必要があります。BigQuery テーブルのパーティショニングの基準となる列を使用できます。

増分検証を指定するには、ルール バインディングの一部として incremental_time_filter_column_id=TIMESTAMP/DATETIME type column の値を指定します。

列を指定すると、データ品質タスクで、最後に実行されたデータ品質タスクのタイムスタンプよりも TIMESTAMP 値が大きい行のみが考慮されます。

仕様ファイルの例

これらのサンプルを使用するには、sales という名前の BigQuery データセットを作成します。次に、sales_orders という名前のファクト テーブルを作成し、次の GoogleSQL ステートメントを使用してクエリを実行してサンプルデータを追加します。

CREATE OR REPLACE TABLE sales.sales_orders
(
 id STRING NOT NULL,
 last_modified_timestamp TIMESTAMP,
 customer_id STRING,
 item_id STRING,
 amount NUMERIC,
 transaction_currency STRING
);

INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")

サンプル 1

次のコードサンプルでは、これらの値を検証するためのデータ品質チェックを作成します。

  • amount: 値は 0 または正の数値です。
  • item_id: アルファベットの 5 文字からなる英数字の文字列の後に 15 桁の数字が続きます。
  • transaction_currency: 使用可能な通貨の種類(静的リストで定義されているとおり)。このサンプルの静的リストでは、通貨のタイプとして GBP と JPY を使用できます。この検証は、「国際」とマークされた行にのみ適用されます。
# The following `NONE` row filter is required.
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 # This filters for rows marked as international (INTNL).
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can apply to multiple tables or columns.
rules:
 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

# Rule bindings associate rules to columns within tables.
rule_bindings:
  TRANSACTION_AMOUNT_VALID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

  TRANSACTION_VALID_ITEM_ID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

  TRANSACTION_CURRENCY_VALID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

次のように置き換えます。

  • PROJECT_ID: 実際のプロジェクト ID。
  • DATASET_ID: データセット ID。

Sample 2

チェックするテーブルが Dataplex レイクの一部である場合は、レイクまたはゾーンの表記法を使用してテーブルを指定できます。これにより、レイクまたはゾーンごとに結果を集計できます。たとえば、ゾーンレベルのスコアを生成できます。

このサンプルを使用するには、レイク ID operations とゾーン ID procurement を使用して Dataplex レイクを作成します。次に、テーブル sales_ordersアセットとしてゾーンに追加します。

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}
rule_bindings:
 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

次のように置き換えます。

  • PROJECT_ID: 実際のプロジェクト ID。
  • REGION_ID: テーブルが存在する Dataplex レイクのリージョン ID(us-central1 など)。

サンプル 3

この例では、ID 値が一意かどうかを確認するカスタム SQL チェックを追加して、サンプル 2 を強化します。

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
 NO_DUPLICATES_IN_COLUMN_GROUPS:
   rule_type: CUSTOM_SQL_STATEMENT
   dimension: duplication
   params:
     custom_sql_arguments:
       - column_names
     custom_sql_statement: |-
       select a.*
       from data a
       inner join (
         select
           $column_names
         from data
         group by $column_names
         having count(*) > 1
       ) duplicates
       using ($column_names)

 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}

rule_bindings:
 TRANSACTIONS_UNIQUE:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: id
   row_filter_id: NONE
   rule_ids:
     - NO_DUPLICATES_IN_COLUMN_GROUPS:
         column_names: "id"

 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

サンプル 4

この例では、last_modified_timestamp 列を使用して増分検証を追加して、サンプル 3 を強化します。1 つ以上のルール バインディングに対して増分検証を追加できます。

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
 NO_DUPLICATES_IN_COLUMN_GROUPS:
   rule_type: CUSTOM_SQL_STATEMENT
   dimension: duplication
   params:
     custom_sql_arguments:
       - column_names
     custom_sql_statement: |-
       select a.*
       from data a
       inner join (
         select
           $column_names
         from data
         group by $column_names
         having count(*) > 1
       ) duplicates
       using ($column_names)

 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}

rule_bindings:
 TRANSACTIONS_UNIQUE:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: id
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - NO_DUPLICATES_IN_COLUMN_GROUPS:
         column_names: "id"

 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: amount
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: item_id
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALID_CURRENCY_ID

failed_records_query で失敗したルールのトラブルシューティング

失敗したすべてのルールに対して、概要テーブルfailed_records_query 列に、失敗したレコードを取得するために使用できるクエリが格納されます。

デバッグするために、YAML ファイルで reference columns を使用することもできます。これにより、failed_records_query の出力を元のデータと結合し、レコード全体を取得できます。たとえば、primary_key 列または複合 primary_key 列を参照列として指定できます。

参照列を指定する

参照列を生成するには、YAML 仕様に次を追加します。

  1. reference_columns セクション。このセクションでは、1 つ以上の参照列セットを作成し、各セットで 1 つ以上の列を指定できます。

  2. rule_bindings セクション。このセクションでは、参照列 ID(reference_columns_id)を指定するルール バインディングに行を追加して、そのルール バインディングのルールに使用できます。reference_columns セクションで指定されている参照列セットのいずれかである必要があります。

たとえば、次の YAML ファイルでは reference_columns セクションを指定し、ORDER_DETAILS_REFERENCE_COLUMNS セットの一部として idlast_modified_timestampitem_id の 3 つの列を定義しています。 次の例では、サンプル テーブル sales_orders を使用しています。

reference_columns:
  ORDER_DETAILS_REFERENCE_COLUMNS:
    include_reference_columns:
      - id
      - last_modified_timestamp
      - item_id
rules:
  VALUE_ZERO_OR_POSITIVE:
  rule_type: CUSTOM_SQL_EXPR
  params:
    custom_sql_expr: |-

row_filters:
NONE:
  filter_sql_expr: |-
      True

rule_bindings:
TRANSACTION_AMOUNT_VALID:
  entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
  column_id: amount
  row_filter_id: NONE
  reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
  rule_ids:
    - VALUE_ZERO_OR_POSITIVE

失敗したレコードのクエリの使用

失敗したレコードクエリでは、失敗したルールがあるすべてのレコードに行が生成されます。これには、失敗をトリガーした列名、失敗をトリガーした値、参照列の値が含まれます。また、データ品質タスクの実行に関連して使用できるメタデータも含まれています。

次の例は、参照列を指定するで説明されている YAML ファイルに対して失敗したレコードクエリからの出力の例です。列 amount の失敗と、失敗の値 -10 が示されます。また、参照列に対応する値も記録されます。

_dq_validation_invocation_id _dq_validation_rule_binding_id _dq_validation_rule_id _dq_validation_column_id _dq_validation_column_value _dq_validation_dimension _dq_validation_simple_rule_row_is_valid _dq_validation_complex_rule_validation_errors_count _dq_validation_complex_rule_validation_success_flag id last_modified_timestamp item_id
10a25be9-8dfa-446c-a42c-75f6bb4a49d9 TRANSACTION_AMOUNT_VALID VALUE_ZERO_OR_POSITIVE amount -10 FALSE order1 2022-01-22T02:30:06.321Z bad_item_id

CUSTOM_SQL_STATEMENT ルールの失敗したレコードクエリを使用する

CUSTOM_SQL_STATEMENT ルールの場合、失敗したレコードクエリには custom_sql_statement_validation_errors 列が含まれます。custom_sql_statement_validation_errors 列は、SQL ステートメントの出力と一致するフィールドがあるネストされた列です。CUSTOM_SQL_STATEMENT ルールの失敗したレコードのクエリに参照列は含まれません。

たとえば、CUSTOM_SQL_STATEMENT ルールは次のようになります。

rules:
  TEST_RULE:
    rule_type: CUSTOM_SQL_STATEMENT
    custom_sql_arguments:
      - existing_id
      - replacement_id
    params:
     CUSTOM_SQL_STATEMENT: |-
       (SELECT product_name, product_key FROM data
       where $existing_id != $replacement_id)
この例の結果には、custom_sql_statement_validation_errors 列の 1 つ以上の行が含まれ、existing_id!=replacement_id であるすべての発生の行があります。

JSON でレンダリングされると、この列のセルの内容は次のようになる場合があります。

{
  "custom_sql_statement_valdation_errors" :{
    "product_name"="abc"
    "product_key"="12345678"
    "_rule_binding_id"="your_rule_binding"
  }
}

join on custom_sql_statement_valdation_errors.product_key のようなネストされた参照で、これらの結果を元のテーブルに結合できます。

次のステップ