이 문서에서는 기본 제공 및 외부 BigQuery 테이블의 데이터 품질 확인을 예약하고 실행할 수 있게 해주는 Dataplex 데이터 품질 태스크를 만드는 방법을 보여줍니다.
자세한 내용은 데이터 품질 태스크 개요를 참조하세요.
시작하기 전에
이 문서에서는 데이터 품질 태스크를 만들 기존 Dataplex 레이크가 있다고 가정합니다.
Google API 및 서비스 사용 설정
Dataproc API를 사용 설정합니다.
네트워크 또는 서브네트워크에서 비공개 Google 액세스를 사용 설정합니다. Dataplex 데이터 품질 태스크와 함께 사용할 네트워크에서 비공개 Google 액세스를 사용 설정합니다. Dataplex 데이터 품질 태스크를 만들 때 네트워크 또는 서브네트워크를 지정하지 않은 경우에는 Dataplex에서 기본 서브넷을 사용합니다. 이 경우 기본 서브넷에서 비공개 Google 액세스를 사용 설정해야 합니다.
사양 파일 만들기
Dataplex는 오픈소스 CloudDQ를 드라이버 프로그램으로 사용합니다. Dataplex 데이터 품질 확인 요구사항은 CloudDQ YAML 사양 파일 내에 정의됩니다.
데이터 품질 태스크의 입력으로 단일 YAML 파일 또는 하나 이상의 YAML 파일을 포함하는 단일 ZIP 파일을 가질 수 있습니다. 각 섹션에 파일 하나가 있는 별도의 YAML 사양 파일에 데이터 품질 확인 요구사항을 캡처하는 것이 좋습니다.
사양 파일을 준비하려면 다음을 수행합니다.
-
데이터 품질 확인 요구사항을 정의하는 하나 이상의 CloudDQ YAML 사양 파일을 만듭니다. 필요한 구문에 대한 자세한 내용은 이 문서의 사양 파일 정보 섹션을 참조하세요.
YAML 사양 파일을
.yml
또는.yaml
형식으로 저장합니다. 여러 YAML 사양 파일을 만드는 경우 모든 파일을 단일 ZIP 파일에 저장합니다. - Cloud Storage 버킷을 만듭니다.
- Cloud Storage 버킷에 사양 파일을 업로드합니다.
사양 파일 정보
CloudDQ YAML 사양 파일에는 다음 섹션이 포함되어야 합니다.
규칙(최상위
rules
YAML 노드에 정의됨): 실행할 규칙 목록입니다.NOT_NULL
,REGEX
등 사전 정의된 규칙 유형에서 이러한 규칙을 만들거나CUSTOM_SQL_EXPR
,CUSTOM_SQL_STATEMENT
같은 커스텀 SQL 문을 사용하여 규칙을 확장할 수 있습니다.CUSTOM_SQL_EXPR
문은custom_sql_expr
이False
로 평가된 모든 행을 실패로 플래그 지정합니다.CUSTOM_SQL_STATEMENT
문은 전체 문이 반환한 모든 값을 실패로 플래그 지정합니다.행 필터(최상위
row_filters
YAML 노드에 정의됨): 검증을 위해 기본 항목 제목에서 데이터 하위 집합을 가져오는 필터를 정의하는 불리언 값을 반환하는 SQL 표현식입니다.규칙 결합(최상위
rule_bindings
YAML 노드에 정의됨): 테이블에 적용할rules
및rule filters
를 정의합니다.규칙 측정기준(
rule_dimensions
YAML 노드에 정의됨): 규칙이 해당dimension
필드에서 정의할 수 있는 데이터 품질 규칙 측정기준에 대한 허용 목록을 정의합니다.예를 들면 다음과 같습니다.
rule_dimensions: - consistency - correctness - duplication - completeness - conformance
규칙의
dimension
필드는 선택사항입니다. 규칙에dimension
이 나열된 경우에는 규칙 측정기준 섹션이 필수입니다.
자세한 내용은 CloudDQ 참조 가이드 및 샘플 사양 파일을 참조하세요.
결과를 저장할 데이터 세트 만들기
-
결과를 저장하려면 BigQuery 데이터 세트를 만듭니다.
데이터 세트는 데이터 품질 태스크를 실행할 테이블과 같은 리전에 있어야 합니다.
Dataplex는 이 데이터 세트를 사용하며, 테이블을 새로 만들거나 사용자가 선택한 테이블을 재사용하여 결과를 저장합니다.
서비스 계정 만들기
다음 Identity and Access Management(IAM) 역할 및 권한이 있는 서비스 계정을 만듭니다.
- YAML 사양이 포함된 Cloud Storage 경로에 대한 읽기 액세스 권한. Cloud Storage 버킷에서 스토리지 객체 뷰어 역할(
roles/storage.objectViewer
)을 사용할 수 있습니다. - 유효성을 검사할 데이터가 있는 BigQuery 데이터 세트에 대한 읽기 액세스 권한. BigQuery 데이터 뷰어 역할을 사용할 수 있습니다.
- 테이블을 만들고(필요한 경우) 결과를 해당 테이블에 쓰기 위한 BigQuery 데이터 세트에 대한 쓰기 액세스 권한. 데이터 세트 수준에서 BigQuery 데이터 편집자 역할(
roles/bigquery.dataEditor
)을 사용할 수 있습니다. - 프로젝트에서 BigQuery 작업을 만들기 위한 프로젝트 수준의 BigQuery 작업 사용자 역할(
roles/bigquery.jobUser
) - 프로젝트 또는 레이크 수준의 Dataplex 메타데이터 리더 역할(
roles/dataplex.metadataReader
) - 프로젝트 수준의 서비스 사용량 소비자 역할(
roles/serviceusage.serviceUsageConsumer
) - Dataproc 작업자 역할
- 작업을 제출하는 사용자에게 부여된
iam.serviceAccounts.actAs
권한 - Dataplex 레이크 서비스 계정에 부여된 서비스 계정 사용자 역할. Google Cloud 콘솔에서 Dataplex 레이크 서비스 계정을 확인할 수 있습니다.
선택사항: 고급 설정 사용
이 단계는 선택사항입니다.
BigQuery는 기본적으로 현재 사용자 프로젝트에서 데이터 품질 확인을 실행합니다. 또는 태스크의
--execution-args
속성에--gcp_project_id
TASK_ARGS
인수를 사용하여 BigQuery 작업을 실행할 다른 프로젝트를 선택할 수 있습니다.BigQuery 쿼리를 실행하도록 지정된 프로젝트 ID가 서비스 계정이 생성된(
--execution-service-account
에 의해 지정) 프로젝트와 다른 경우, 프로젝트 간 서비스 계정의 사용을 중지(iam.disableServiceAccountCreation
)하는 조직 정책을 끄도록 해야 합니다.
제한사항
- 특정 데이터 품질 태스크에 지정된 모든 테이블은 동일한 Google Cloud 리전에 속해야 합니다.
데이터 품질 태스크 예약
콘솔
- Google Cloud 콘솔에서 Dataplex 프로세스 페이지로 이동합니다.
- 태스크 만들기를 클릭합니다.
- 데이터 품질 확인 카드에서 태스크 만들기를 클릭합니다.
- Dataplex 레이크에서 레이크를 선택합니다.
- ID에 ID를 입력합니다.
- 데이터 품질 사양 섹션에서 다음을 수행합니다.
- GCS 파일 선택 필드에서 찾아보기를 클릭합니다.
Cloud Storage 버킷을 선택합니다.
선택을 클릭합니다.
결과 테이블 섹션에서 다음을 수행합니다.
BigQuery 데이터 세트 선택 필드에서 찾아보기를 클릭합니다.
검증 결과를 저장할 BigQuery 데이터 세트를 선택합니다.
선택을 클릭합니다.
BigQuery 테이블 필드에 결과를 저장할 테이블 이름을 입력합니다. 테이블이 없으면 Dataplex에서 자동으로 만듭니다.
dq_summary
는 내부 처리 태스크용으로 예약되어 있으므로 사용하지 마세요.
서비스 계정 섹션의 사용자 서비스 계정 메뉴에서 서비스 계정을 선택합니다.
계속을 클릭합니다.
일정 설정 섹션에서 데이터 품질 태스크를 실행하기 위한 일정을 구성합니다.
만들기를 클릭합니다.
gcloud CLI
다음은 Dataplex 태스크 gcloud CLI 명령어를 사용하는 데이터 품질 태스크 실행의 예시입니다.
export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH" # Google Cloud project where the Dataplex task is created. export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT" # Google Cloud region for the Dataplex lake. export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID" # Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region. export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}" # The Dataplex lake where your task is created. export DATAPLEX_LAKE_NAME="DATAPLEX_LAKE_NAME" # The service account used for running the task. Ensure that this service account has sufficient IAM permissions on your project, including BigQuery Data Editor, BigQuery Job User, Dataplex Editor, Dataproc Worker, and Service Usage Consumer. # The BigQuery dataset used for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding. export TARGET_BQ_DATASET="TARGET_BQ_DATASET" # If you want to use a different dataset for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding, use the following: export CLOUDDQ_BIGQUERY_DATASET=$TARGET_BQ_DATASET # The BigQuery dataset where the final results of the data quality checks are stored. This could be the same as CLOUDDQ_BIGQUERY_DATASET. export TARGET_BQ_DATASET="TARGET_BQ_DATASET" # The BigQuery table where the final results of the data quality checks are stored. export TARGET_BQ_TABLE="TARGET_BQ_TABLE" # The unique identifier for the task. export TASK_ID="TASK_ID" gcloud dataplex tasks create \ --location="${DATAPLEX_REGION_ID}" \ --lake="${DATAPLEX_LAKE_NAME}" \ --trigger-type=ON_DEMAND \ --execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \ --spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \ --spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \ --execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \ "$TASK_ID"
매개변수 | 설명 |
---|---|
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH |
데이터 품질 태스크의 데이터 품질 YAML 구성 입력의 Cloud Storage 경로입니다. .yml 또는 .yaml 형식의 단일 YAML 파일 또는 하나 이상의 YAML 파일이 포함된 zip 보관 파일을 가질 수 있습니다. |
GOOGLE_CLOUD_PROJECT |
Dataplex 태스크 및 BigQuery 작업이 생성되는 Google Cloud 프로젝트입니다. |
DATAPLEX_REGION_ID |
데이터 품질 태스크이 생성되는 Dataplex 레이크의 리전입니다. |
SERVICE_ACCOUNT |
태스크 실행에 사용되는 서비스 계정입니다. 이 서비스 계정에 시작하기 전 섹션에 설명된 대로 충분한 IAM 권한이 있는지 확인합니다. |
--execution-args
의 경우 다음 인수를 위치 인수로 전달해야 하며 따라서 이 순서대로 진행됩니다.
인수 | 설명 |
---|---|
clouddq-executable.zip |
공개 Cloud Storage 버킷에서 spark-file-uris 에 전달된 사전 컴파일된 실행 파일입니다. |
ALL |
모든 규칙 바인딩을 실행합니다. 또는 특정 규칙 결합을 쉼표로 구분된 목록으로 제공할 수 있습니다.
예를 들면 RULE_1,RULE_2 입니다. |
gcp-project-id |
BigQuery 쿼리를 실행하는 프로젝트 ID입니다. |
gcp-region-id |
데이터 품질 검증을 위한 BigQuery 작업을 실행할 리전입니다. 이 리전은 gcp-bq-dataset-id 및 target_bigquery_summary_table 의 리전과 동일해야 합니다. |
gcp-bq-dataset-id |
rule_binding 뷰 및 중간 데이터 품질 요약 결과를 저장하는 데 사용되는 BigQuery 데이터 세트입니다. |
target-bigquery-summary-table |
데이터 품질 확인의 최종 결과가 저장되는 BigQuery 테이블의 테이블 ID 참조입니다. dq_summary ID 값은 내부 처리 태스크용으로 예약되어 있으므로 사용하지 마세요. |
--summary_to_stdout |
(선택사항) 이 플래그가 전달되면 마지막 실행에서 dq_summary 테이블에 생성된 모든 검증 결과 행이 Cloud Logging 및 stdout 에 JSON 레코드로 로깅됩니다. |
API
다음을 바꿉니다.
PROJECT_ID = "Your Dataplex Project ID" REGION = "Your Dataplex lake region" LAKE_ID = "Your Dataplex lake ID" SERVICE_ACC = "Your service account used for reading the data" DATAPLEX_TASK_ID = "Unique task ID for the data quality task" BUCKET_NAME = "Your Cloud Storage bucket name containing the CloudDQ configs or YAML specification" GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project" GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the data quality summary results" TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
- HTTP POST 요청을 제출합니다.
POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID} { "spark": { "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py", "file_uris": [ f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip", f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum", f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip" ] }, "execution_spec": { "args": { "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}" }, "service_account": "SERVICE_ACC" }, "trigger_spec": { "type": "ON_DEMAND" }, "description": "${DATAPLEX_TASK_DESCRIPTION}" }
예약된 데이터 품질 태스크 모니터링
태스크 모니터링 방법을 알아보세요.
결과 보기
데이터 품질 검증 결과는 결과를 저장할 데이터 세트 만들기의 설명대로 지정된 BigQuery 데이터 세트와 요약 테이블에 저장됩니다. 요약 테이블에는 각 검증 실행에 관한 규칙 바인딩 및 규칙의 조합에 대한 출력 요약이 포함됩니다. 요약 테이블의 출력에는 다음 정보가 포함됩니다.
열 이름 | 설명 |
---|---|
dataplex_lake |
(문자열) 검증할 테이블이 포함된 Dataplex 레이크의 ID입니다. |
dataplex_zone |
(문자열) 검증할 테이블이 포함된 Dataplex 영역의 ID입니다. |
dataplex_asset_id |
(문자열) 검증할 테이블이 포함된 Dataplex 애셋의 ID입니다. |
execution_ts |
(타임스탬프) 검증 쿼리가 실행된 시점의 타임스탬프입니다. |
rule_binding_id |
(문자열) 유효성 검사 결과가 보고되는 규칙 바인딩의 ID입니다. |
rule_id |
(문자열) 검증 결과가 보고되는 규칙 결합의 규칙 ID입니다. |
dimension |
(문자열) rule_id 의 데이터 품질 측정기준입니다. 이 값은 rule_dimensions YAML 노드에 지정된 값 중 하나여야 합니다. |
table_id |
(문자열) 검증 결과가 보고되는 항목의 ID입니다.
이 ID는 각 규칙 결합의 entity 매개변수에 지정됩니다. |
column_id |
(문자열) 검증 결과가 보고되는 열의 ID입니다.
이 ID는 각 규칙 결합의 column 매개변수에 지정됩니다. |
last_modified |
(타임스탬프) 검증 중인 table_id 의 마지막으로 수정된 타임스탬프입니다. |
metadata_json_string |
(문자열) 규칙 결합에 지정되어 있거나 데이터 품질 실행 중에 지정된 메타데이터 매개변수 콘텐츠의 키-값 쌍입니다. |
configs_hashsum |
(문자열) 규칙 결합과 모든 관련 규칙, 행 필터, 항목 구성이 포함된 JSON 문서의 해시 합계입니다.
configs_hashsum 를 사용하면 rule_binding ID의 콘텐츠 또는 참조된 구성 중 하나가 변경될 경우 추적할 수 있습니다. |
dq_run_id |
(문자열) 레코드의 고유 ID입니다. |
invocation_id |
(문자열) 데이터 품질 실행의 ID입니다. 동일한 데이터 품질 실행 인스턴스 내에서 생성된 모든 데이터 품질 요약 레코드는 동일한 invocation_id 를 공유합니다. |
progress_watermark |
(불리언) 데이터 품질 검사에서 점진적 검증을 위한 높은 워터마크를 결정하기 위해 이 특정 레코드를 고려할지 여부를 결정합니다. FALSE 면 높은 워터마크 값을 설정할 때 각 레코드가 무시됩니다. 이 정보는 높은 워터마크를 진행해서는 안 되는 테스트 데이터 품질 검증을 실행할 때 유용합니다. Dataplex는 기본적으로 이 필드를 TRUE 로 채우지만 --progress_watermark 인수의 값이 FALSE 인 경우 값을 재정의할 수 있습니다.
|
rows_validated |
(정수) row_filters 와 incremental_time_filter_column_id 열의 모든 워터마크 필터(지정된 경우)를 적용한 후 검증된 총 레코드 수입니다. |
complex_rule_validation_errors_count |
(부동 소수점 수) CUSTOM_SQL_STATEMENT 규칙에서 반환된 행 수입니다. |
complex_rule_validation_success_flag |
(불리언) CUSTOM_SQL_STATEMENT 규칙의 성공 상태입니다.
|
success_count |
(정수) 검증을 통과한 총 레코드 수입니다. CUSTOM_SQL_STATEMENT 규칙의 경우 이 필드가 NULL 로 설정됩니다.
|
success_percentage |
(부동 소수점 수) 검증된 총 레코드 수 내에서 유효성 검사를 통과한 레코드 수의 백분율입니다. CUSTOM_SQL_STATEMENT 규칙의 경우 이 필드가 NULL 로 설정됩니다. |
failed_count |
(정수) 검증에 실패한 총 레코드 수입니다. CUSTOM_SQL_STATEMENT 규칙의 경우 이 필드가 NULL 로 설정됩니다.
|
failed_percentage |
(부동 소수점 수) 검증된 총 레코드 수 내에서 유효성 검사에 실패한 레코드 수의 백분율입니다. CUSTOM_SQL_STATEMENT 규칙의 경우 이 필드가 NULL 로 설정됩니다. |
null_count |
(정수) 검증 중에 null을 반환한 총 레코드 수입니다.
이 필드는 NOT_NULL 및 CUSTOM_SQL_STATEMENT 규칙의 경우 NULL 로 설정됩니다. |
null_percentage |
(부동 소수점 수) 검증된 총 레코드 수 내에서 유효성 검사 중에 null을 반환한 레코드 수의 백분율입니다. 이 필드는 NOT_NULL 및 CUSTOM_SQL_STATEMENT 규칙의 경우 NULL 로 설정됩니다. |
failed_records_query |
이 열은 실패한 모든 규칙에 대해 실패한 레코드를 가져오는 데 사용할 수 있는 쿼리를 저장합니다. 이 문서의 failed_records_query 로 실패한 규칙 해결을 참조하세요. |
BigQuery 항목의 경우 최신 실행의 SQL 검증 로직이 포함된 모든 rule_binding
에 대한 뷰가 생성됩니다. 인수 --gcp-bq-dataset-id
에 지정된 BigQuery 데이터 세트에서 이러한 뷰를 찾을 수 있습니다.
비용 최적화
다음과 같은 최적화를 통해 비용을 절감할 수 있습니다.
점진적 검증
새 파티션(새 행)으로 정기적으로 업데이트되는 테이블을 사용하는 경우가 종종 있습니다. 실행할 때마다 이전 파티션을 다시 검증하지 않으려면 점진적 검증을 사용하면 됩니다.
점진적 검증의 경우 열 값이 단조 증가하는 테이블에 TIMESTAMP
또는 DATETIME
유형 열이 있어야 합니다. BigQuery 테이블이 파티션된 열을 사용할 수 있습니다.
증분 유효성 검사를 지정하려면 규칙 결합의 일부로 incremental_time_filter_column_id=TIMESTAMP/DATETIME type column
값을 지정하세요.
열을 지정할 때 데이터 품질 태스크는 마지막으로 실행된 데이터 품질 태스크의 타임스탬프보다 TIMESTAMP
값이 큰 행만 고려합니다.
사양 파일 예시
이 샘플을 사용하려면 sales
라는 BigQuery 데이터 세트를 만듭니다. 그런 다음 sales_orders
라는 팩트 테이블을 만들고 다음 GoogleSQL 문으로 쿼리를 실행하여 샘플 데이터를 추가합니다.
CREATE OR REPLACE TABLE sales.sales_orders
(
id STRING NOT NULL,
last_modified_timestamp TIMESTAMP,
customer_id STRING,
item_id STRING,
amount NUMERIC,
transaction_currency STRING
);
INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")
샘플 1
다음 코드 샘플에서는 이러한 값을 검증하기 위한 데이터 품질 확인을 만듭니다.
amount
: 값은 0 또는 양수입니다.item_id
: 5자리 영문자 다음에 15자리 숫자가 이어지는 영숫자 문자열입니다.transaction_currency
: 정적 목록에서 정의된 허용되는 통화 유형입니다. 이 샘플의 정적 목록은 통화 유형으로 GBP 및 JPY를 허용합니다. 이 검증은 국제로 표시된 행에만 적용됩니다.
# The following `NONE` row filter is required.
row_filters:
NONE:
filter_sql_expr: |-
True
# This filters for rows marked as international (INTNL).
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can apply to multiple tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
# Rule bindings associate rules to columns within tables.
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
다음을 바꿉니다.
PROJECT_ID
: 프로젝트 ID입니다.DATASET_ID
: 데이터 세트 ID입니다.
샘플 2
확인할 테이블이 Dataplex 레이크의 일부인 경우 레이크 또는 영역 표기법을 사용하여 테이블을 지정할 수 있습니다. 이를 통해 레이크 또는 영역별로 결과를 집계할 수 있습니다. 예를 들어 영역 수준 점수를 생성할 수 있습니다.
이 샘플을 사용하려면 레이크 ID가 operations
이고 영역 ID가 procurement
인 Dataplex 레이크를 만듭니다. 그런 다음 영역에 sales_orders
테이블을 애셋으로 추가합니다.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
다음을 바꿉니다.
- PROJECT_ID: 프로젝트 ID입니다.
- REGION_ID: 테이블이 있는 Dataplex 레이크의 리전 ID입니다(예:
us-central1
).
샘플 3
이 예시에서는 ID 값이 고유한지 확인하는 커스텀 SQL 확인을 추가하여 샘플 2를 개선합니다.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
샘플 4
이 예시에서는 last_modified_timestamp
열을 사용하여 점진적 검증을 추가하여 샘플 3을 개선합니다. 하나 이상의 규칙 결합에 대한 점진적 검증을 추가할 수 있습니다.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_CURRENCY_ID
failed_records_query
로 실패한 규칙 해결
실패한 모든 규칙에 대해 요약 테이블에서 실패한 레코드를 가져오는 데 사용할 수 있는 failed_records_query
열에 쿼리를 저장합니다.
디버깅하기 위해 YAML 파일에서 reference columns
을 사용할 수도 있습니다. 그러면 failed_records_query
의 출력을 원본 데이터와 조인하여 전체 레코드를 가져올 수 있게 됩니다. 예를 들어 primary_key
열 또는 복합 primary_key
열을 참조 열로 지정할 수 있습니다.
참조 열 지정
참조 열을 생성하려면 YAML 사양에 다음을 추가하면 됩니다.
reference_columns
섹션. 이 섹션에서는 각각 하나 이상의 참조 열을 지정하는 하나 이상의 참조 열 집합을 만들 수 있습니다.rule_bindings
섹션. 이 섹션에서는 규칙 바인딩의 규칙에 사용할 참조 열 ID(reference_columns_id
)를 지정하는 줄을 규칙 바인딩에 추가할 수 있습니다.reference_columns
섹션에 지정된 참조 열 세트 중 하나여야 합니다.
예를 들어 다음 YAML 파일은 reference_columns
섹션을 지정하고 id
, last_modified_timestamp
, item_id
와 같은 3개 열을 ORDER_DETAILS_REFERENCE_COLUMNS
집합의 일부로 정의합니다. 다음 예시에서는 샘플 테이블 sales_orders
를 사용합니다.
reference_columns:
ORDER_DETAILS_REFERENCE_COLUMNS:
include_reference_columns:
- id
- last_modified_timestamp
- item_id
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
params:
custom_sql_expr: |-
row_filters:
NONE:
filter_sql_expr: |-
True
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
rule_ids:
- VALUE_ZERO_OR_POSITIVE
실패한 레코드 쿼리 사용
실패한 레코드 쿼리는 실패한 규칙이 있는 모든 레코드의 행을 생성합니다. 여기에는 실패를 트리거한 열 이름, 실패를 트리거한 값, 참조 열 값이 포함됩니다. 데이터 품질 태스크 실행과 관련하여 사용할 수 있는 메타데이터도 포함됩니다.
다음은 참조 열 지정에서 설명한 YAML 파일에 대한 실패한 레코드 쿼리의 출력 예시입니다. amount
열의 실패와 실패한 값 -10
을 보여줍니다. 또한 참조 열의 해당 값도 기록합니다.
_dq_validation_invocation_id | _dq_validation_rule_binding_id | _dq_validation_rule_id | _dq_validation_column_id | _dq_validation_column_value | _dq_validation_dimension | _dq_validation_simple_rule_row_is_valid | _dq_validation_complex_rule_validation_errors_count | _dq_validation_complex_rule_validation_success_flag | id | last_modified_timestamp | item_id |
---|---|---|---|---|---|---|---|---|---|---|---|
10a25be9-8dfa-446c-a42c-75f6bb4a49d9 | TRANSACTION_AMOUNT_VALID | VALUE_ZERO_OR_POSITIVE | 금액 | -10 | FALSE | order1 | 2022-01-22T02:30:06.321Z | bad_item_id |
CUSTOM_SQL_STATEMENT 규칙에 실패한 레코드 쿼리 사용
CUSTOM_SQL_STATEMENT
규칙의 경우 실패한 레코드 쿼리에 custom_sql_statement_validation_errors
열이 포함됩니다. custom_sql_statement_validation_errors
열은 SQL 문의 출력과 일치하는 필드가 있는 중첩 열입니다. 참조 열은 CUSTOM_SQL_STATEMENT
규칙의 실패한 레코드 쿼리에 포함되지 않습니다.
예를 들어 CUSTOM_SQL_STATEMENT
규칙은 다음과 비슷합니다.
rules: TEST_RULE: rule_type: CUSTOM_SQL_STATEMENT custom_sql_arguments: - existing_id - replacement_id params: CUSTOM_SQL_STATEMENT: |- (SELECT product_name, product_key FROM data where $existing_id != $replacement_id)
custom_sql_statement_validation_errors
열에 대한 행이 1개 이상 포함되고 existing_id!=replacement_id
인 경우 모든 일치 항목에 대한 행이 포함됩니다.
JSON으로 렌더링하면 이 열의 셀 콘텐츠는 다음과 같이 표시될 수 있습니다.
{ "custom_sql_statement_valdation_errors" :{ "product_name"="abc" "product_key"="12345678" "_rule_binding_id"="your_rule_binding" } }
이러한 결과를 join on custom_sql_statement_valdation_errors.product_key
와 같은 중첩 참조를 사용하여 원본 테이블에 조인할 수 있습니다.
다음 단계
- CloudDQ YAML 사양 참조를 참고하세요.
- 샘플 데이터 품질 규칙은 단순 규칙 및 고급 규칙을 참고하세요.
- Dataplex 데이터 품질 태스크용 샘플 Airflow DAG를 참고하세요.