데이터 품질 태스크 사용

이 문서에서는 기본 제공 및 외부 BigQuery 테이블의 데이터 품질 확인을 예약하고 실행할 수 있게 해주는 Dataplex 데이터 품질 태스크를 만드는 방법을 보여줍니다.

자세한 내용은 데이터 품질 태스크 개요를 참조하세요.

시작하기 전에

이 문서에서는 데이터 품질 태스크를 만들 기존 Dataplex 레이크가 있다고 가정합니다.

Google API 및 서비스 사용 설정

  1. Dataproc API를 사용 설정합니다.

    API 사용 설정

  2. 네트워크 또는 서브네트워크에서 비공개 Google 액세스를 사용 설정합니다. Dataplex 데이터 품질 태스크와 함께 사용할 네트워크에서 비공개 Google 액세스를 사용 설정합니다. Dataplex 데이터 품질 태스크를 만들 때 네트워크 또는 서브네트워크를 지정하지 않은 경우에는 Dataplex에서 기본 서브넷을 사용합니다. 이 경우 기본 서브넷에서 비공개 Google 액세스를 사용 설정해야 합니다.

사양 파일 만들기

Dataplex는 오픈소스 CloudDQ를 드라이버 프로그램으로 사용합니다. Dataplex 데이터 품질 확인 요구사항은 CloudDQ YAML 사양 파일 내에 정의됩니다.

데이터 품질 태스크의 입력으로 단일 YAML 파일 또는 하나 이상의 YAML 파일을 포함하는 단일 ZIP 파일을 가질 수 있습니다. 각 섹션에 파일 하나가 있는 별도의 YAML 사양 파일에 데이터 품질 확인 요구사항을 캡처하는 것이 좋습니다.

사양 파일을 준비하려면 다음을 수행합니다.

  1. 데이터 품질 확인 요구사항을 정의하는 하나 이상의 CloudDQ YAML 사양 파일을 만듭니다. 필요한 구문에 대한 자세한 내용은 이 문서의 사양 파일 정보 섹션을 참조하세요.

    YAML 사양 파일을 .yml 또는 .yaml 형식으로 저장합니다. 여러 YAML 사양 파일을 만드는 경우 모든 파일을 단일 ZIP 파일에 저장합니다.

  2. Cloud Storage 버킷을 만듭니다.
  3. Cloud Storage 버킷에 사양 파일을 업로드합니다.

사양 파일 정보

CloudDQ YAML 사양 파일에는 다음 섹션이 포함되어야 합니다.

  • 규칙(최상위 rules YAML 노드에 정의됨): 실행할 규칙 목록입니다. NOT_NULL, REGEX 등 사전 정의된 규칙 유형에서 이러한 규칙을 만들거나 CUSTOM_SQL_EXPR, CUSTOM_SQL_STATEMENT 같은 커스텀 SQL 문을 사용하여 규칙을 확장할 수 있습니다. CUSTOM_SQL_EXPR 문은 custom_sql_exprFalse로 평가된 모든 행을 실패로 플래그 지정합니다. CUSTOM_SQL_STATEMENT 문은 전체 문이 반환한 모든 값을 실패로 플래그 지정합니다.

  • 행 필터(최상위 row_filters YAML 노드에 정의됨): 검증을 위해 기본 항목 제목에서 데이터 하위 집합을 가져오는 필터를 정의하는 불리언 값을 반환하는 SQL 표현식입니다.

  • 규칙 결합(최상위 rule_bindings YAML 노드에 정의됨): 테이블에 적용할 rulesrule filters를 정의합니다.

  • 규칙 측정기준(rule_dimensions YAML 노드에 정의됨): 규칙이 해당 dimension 필드에서 정의할 수 있는 데이터 품질 규칙 측정기준에 대한 허용 목록을 정의합니다.

    예를 들면 다음과 같습니다.

    rule_dimensions:
      - consistency
      - correctness
      - duplication
      - completeness
      - conformance

    규칙의 dimension 필드는 선택사항입니다. 규칙에 dimension이 나열된 경우에는 규칙 측정기준 섹션이 필수입니다.

자세한 내용은 CloudDQ 참조 가이드샘플 사양 파일을 참조하세요.

결과를 저장할 데이터 세트 만들기

  • 결과를 저장하려면 BigQuery 데이터 세트를 만듭니다.

    데이터 세트는 데이터 품질 태스크를 실행할 테이블과 같은 리전에 있어야 합니다.

    Dataplex는 이 데이터 세트를 사용하며, 테이블을 새로 만들거나 사용자가 선택한 테이블을 재사용하여 결과를 저장합니다.

서비스 계정 만들기

다음 Identity and Access Management(IAM) 역할 및 권한이 있는 서비스 계정을 만듭니다.

선택사항: 고급 설정 사용

이 단계는 선택사항입니다.

  1. BigQuery는 기본적으로 현재 사용자 프로젝트에서 데이터 품질 확인을 실행합니다. 또는 태스크의 --execution-args 속성에 --gcp_project_id TASK_ARGS 인수를 사용하여 BigQuery 작업을 실행할 다른 프로젝트를 선택할 수 있습니다.

  2. BigQuery 쿼리를 실행하도록 지정된 프로젝트 ID가 서비스 계정이 생성된(--execution-service-account에 의해 지정) 프로젝트와 다른 경우, 프로젝트 간 서비스 계정의 사용을 중지(iam.disableServiceAccountCreation)하는 조직 정책을 끄도록 해야 합니다.

제한사항

  • 특정 데이터 품질 태스크에 지정된 모든 테이블은 동일한 Google Cloud 리전에 속해야 합니다.

데이터 품질 태스크 예약

콘솔

  1. Google Cloud 콘솔에서 Dataplex Process 페이지로 이동합니다.

    Process로 이동

  2. 태스크 만들기를 클릭합니다.
  3. 데이터 품질 확인 카드에서 태스크 만들기를 클릭합니다.
  4. Dataplex 레이크에서 레이크를 선택합니다.
  5. ID에 ID를 입력합니다.
  6. 데이터 품질 사양 섹션에서 다음을 수행합니다.
    1. GCS 파일 선택 필드에서 찾아보기를 클릭합니다.
    2. Cloud Storage 버킷을 선택합니다.

    3. 선택을 클릭합니다.

  7. 결과 테이블 섹션에서 다음을 수행합니다.

    1. BigQuery 데이터 세트 선택 필드에서 찾아보기를 클릭합니다.

    2. 검증 결과를 저장할 BigQuery 데이터 세트를 선택합니다.

    3. 선택을 클릭합니다.

    4. BigQuery 테이블 필드에 결과를 저장할 테이블 이름을 입력합니다. 테이블이 없으면 Dataplex에서 자동으로 만듭니다. dq_summary는 내부 처리 태스크용으로 예약되어 있으므로 사용하지 마세요.

  8. 서비스 계정 섹션의 사용자 서비스 계정 메뉴에서 서비스 계정을 선택합니다.

  9. 계속을 클릭합니다.

  10. 일정 설정 섹션에서 데이터 품질 태스크를 실행하기 위한 일정을 구성합니다.

  11. 만들기를 클릭합니다.

gcloud CLI

다음은 Dataplex 태스크 gcloud CLI 명령어를 사용하는 데이터 품질 태스크 실행의 예시입니다.

export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH"

# Google Cloud project where the Dataplex task is created.
export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT"

# Google Cloud region for the Dataplex lake.
export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID"

# Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region.
export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}"

# The Dataplex lake where your task is created.
export DATAPLEX_LAKE_NAME="DATAPLEX_LAKE_NAME"

# The service account used for running the task. Ensure that this service account
has sufficient IAM permissions on your project, including
BigQuery Data Editor, BigQuery Job User,
Dataplex Editor, Dataproc Worker, and Service
Usage Consumer.

# The BigQuery dataset used for storing the intermediate data
quality summary results and the BigQuery views associated with
each rule binding.
export TARGET_BQ_DATASET="TARGET_BQ_DATASET"

# If you want to use a different dataset for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding, use the following:
export CLOUDDQ_BIGQUERY_DATASET=$TARGET_BQ_DATASET

# The BigQuery dataset where the final results of the data quality checks are stored. This could be the same as CLOUDDQ_BIGQUERY_DATASET.
export TARGET_BQ_DATASET="TARGET_BQ_DATASET"

# The BigQuery table where the final results of the data quality checks are stored.
export TARGET_BQ_TABLE="TARGET_BQ_TABLE"

# The unique identifier for the task.
export TASK_ID="TASK_ID"

gcloud dataplex tasks create \
    --location="${DATAPLEX_REGION_ID}" \
    --lake="${DATAPLEX_LAKE_NAME}" \
    --trigger-type=ON_DEMAND \
    --execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \
    --spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \
    --spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \
    --execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \
    "$TASK_ID"
매개변수 설명
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH 데이터 품질 태스크의 데이터 품질 YAML 구성 입력의 Cloud Storage 경로입니다. .yml 또는 .yaml 형식의 단일 YAML 파일 또는 하나 이상의 YAML 파일이 포함된 zip 보관 파일을 가질 수 있습니다.
GOOGLE_CLOUD_PROJECT Dataplex 태스크 및 BigQuery 작업이 생성되는 Google Cloud 프로젝트입니다.
DATAPLEX_REGION_ID 데이터 품질 태스크이 생성되는 Dataplex 레이크의 리전입니다.
SERVICE_ACCOUNT 태스크 실행에 사용되는 서비스 계정입니다. 이 서비스 계정에 시작하기 전 섹션에 설명된 대로 충분한 IAM 권한이 있는지 확인합니다.

--execution-args의 경우 다음 인수를 위치 인수로 전달해야 하며 따라서 이 순서대로 진행됩니다.

인수 설명
clouddq-executable.zip 공개 Cloud Storage 버킷에서 spark-file-uris에 전달된 사전 컴파일된 실행 파일입니다.
ALL 모든 규칙 바인딩을 실행합니다. 또는 특정 규칙 결합을 쉼표로 구분된 목록으로 제공할 수 있습니다. 예를 들면 RULE_1,RULE_2입니다.
gcp-project-id BigQuery 쿼리를 실행하는 프로젝트 ID입니다.
gcp-region-id 데이터 품질 검증을 위한 BigQuery 작업을 실행할 리전입니다. 이 리전은 gcp-bq-dataset-idtarget_bigquery_summary_table의 리전과 동일해야 합니다.
gcp-bq-dataset-id rule_binding 뷰 및 중간 데이터 품질 요약 결과를 저장하는 데 사용되는 BigQuery 데이터 세트입니다.
target-bigquery-summary-table 데이터 품질 확인의 최종 결과가 저장되는 BigQuery 테이블의 테이블 ID 참조입니다. dq_summary ID 값은 내부 처리 태스크용으로 예약되어 있으므로 사용하지 마세요.
--summary_to_stdout (선택사항) 이 플래그가 전달되면 마지막 실행에서 dq_summary 테이블에 생성된 모든 검증 결과 행이 Cloud Logging 및 stdout에 JSON 레코드로 로깅됩니다.

API

  1. 다음을 바꿉니다.

    PROJECT_ID = "Your Dataplex Project ID"
    REGION = "Your Dataplex lake region"
    LAKE_ID = "Your Dataplex lake ID"
    SERVICE_ACC = "Your service account used for reading the data"
    DATAPLEX_TASK_ID = "Unique task ID for the data quality task"
    BUCKET_NAME = "Your Cloud Storage bucket name containing the CloudDQ configs or YAML specification"
    GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project"
    GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional
    GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the data quality summary results"
    TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
  2. HTTP POST 요청을 제출합니다.
    POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID}
    {
    "spark": {
        "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py",
        "file_uris": [  f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip",
                        f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum",
                        f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip"
                    ]
    },
    "execution_spec": {
        "args": {
            "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}"
        },
        "service_account": "SERVICE_ACC"
    },
    "trigger_spec": {
    "type": "ON_DEMAND"
    },
    "description": "${DATAPLEX_TASK_DESCRIPTION}"
    }

Dataplex 데이터 품질 태스크용 샘플 Airflow DAG도 참조하세요.

예약된 데이터 품질 태스크 모니터링

태스크 모니터링 방법을 알아보세요.

결과 보기

데이터 품질 검증 결과는 결과를 저장할 데이터 세트 만들기의 설명대로 지정된 BigQuery 데이터 세트와 요약 테이블에 저장됩니다. 요약 테이블에는 각 검증 실행에 관한 규칙 바인딩 및 규칙의 조합에 대한 출력 요약이 포함됩니다. 요약 테이블의 출력에는 다음 정보가 포함됩니다.

열 이름 설명
dataplex_lake (문자열) 검증할 테이블이 포함된 Dataplex 레이크의 ID입니다.
dataplex_zone (문자열) 검증할 테이블이 포함된 Dataplex 영역의 ID입니다.
dataplex_asset_id (문자열) 검증할 테이블이 포함된 Dataplex 애셋의 ID입니다.
execution_ts (타임스탬프) 검증 쿼리가 실행된 시점의 타임스탬프입니다.
rule_binding_id (문자열) 유효성 검사 결과가 보고되는 규칙 바인딩의 ID입니다.
rule_id (문자열) 검증 결과가 보고되는 규칙 결합의 규칙 ID입니다.
dimension (문자열) rule_id의 데이터 품질 측정기준입니다. 이 값은 rule_dimensions YAML 노드에 지정된 값 중 하나여야 합니다.
table_id (문자열) 검증 결과가 보고되는 항목의 ID입니다. 이 ID는 각 규칙 결합의 entity 매개변수에 지정됩니다.
column_id (문자열) 검증 결과가 보고되는 열의 ID입니다. 이 ID는 각 규칙 결합의 column 매개변수에 지정됩니다.
last_modified (타임스탬프) 검증 중인 table_id의 마지막으로 수정된 타임스탬프입니다.
metadata_json_string (문자열) 규칙 결합에 지정되어 있거나 데이터 품질 실행 중에 지정된 메타데이터 매개변수 콘텐츠의 키-값 쌍입니다.
configs_hashsum (문자열) 규칙 결합과 모든 관련 규칙, 행 필터, 항목 구성이 포함된 JSON 문서의 해시 합계입니다. configs_hashsum를 사용하면 rule_binding ID의 콘텐츠 또는 참조된 구성 중 하나가 변경될 경우 추적할 수 있습니다.
dq_run_id (문자열) 레코드의 고유 ID입니다.
invocation_id (문자열) 데이터 품질 실행의 ID입니다. 동일한 데이터 품질 실행 인스턴스 내에서 생성된 모든 데이터 품질 요약 레코드는 동일한 invocation_id를 공유합니다.
progress_watermark (불리언) 데이터 품질 검사에서 점진적 검증을 위한 높은 워터마크를 결정하기 위해 이 특정 레코드를 고려할지 여부를 결정합니다. FALSE면 높은 워터마크 값을 설정할 때 각 레코드가 무시됩니다. 이 정보는 높은 워터마크를 진행해서는 안 되는 테스트 데이터 품질 검증을 실행할 때 유용합니다. Dataplex는 기본적으로 이 필드를 TRUE로 채우지만 --progress_watermark 인수의 값이 FALSE인 경우 값을 재정의할 수 있습니다.
rows_validated (정수) row_filtersincremental_time_filter_column_id 열의 모든 워터마크 필터(지정된 경우)를 적용한 후 검증된 총 레코드 수입니다.
complex_rule_validation_errors_count (부동 소수점 수) CUSTOM_SQL_STATEMENT 규칙에서 반환된 행 수입니다.
complex_rule_validation_success_flag (불리언) CUSTOM_SQL_STATEMENT 규칙의 성공 상태입니다.
success_count (정수) 검증을 통과한 총 레코드 수입니다. CUSTOM_SQL_STATEMENT 규칙의 경우 이 필드가 NULL로 설정됩니다.
success_percentage (부동 소수점 수) 검증된 총 레코드 수 내에서 유효성 검사를 통과한 레코드 수의 백분율입니다. CUSTOM_SQL_STATEMENT 규칙의 경우 이 필드가 NULL로 설정됩니다.
failed_count (정수) 검증에 실패한 총 레코드 수입니다. CUSTOM_SQL_STATEMENT 규칙의 경우 이 필드가 NULL로 설정됩니다.
failed_percentage (부동 소수점 수) 검증된 총 레코드 수 내에서 유효성 검사에 실패한 레코드 수의 백분율입니다. CUSTOM_SQL_STATEMENT 규칙의 경우 이 필드가 NULL로 설정됩니다.
null_count (정수) 검증 중에 null을 반환한 총 레코드 수입니다. 이 필드는 NOT_NULLCUSTOM_SQL_STATEMENT 규칙의 경우 NULL로 설정됩니다.
null_percentage (부동 소수점 수) 검증된 총 레코드 수 내에서 유효성 검사 중에 null을 반환한 레코드 수의 백분율입니다. 이 필드는 NOT_NULLCUSTOM_SQL_STATEMENT 규칙의 경우 NULL로 설정됩니다.
failed_records_query 이 열은 실패한 모든 규칙에 대해 실패한 레코드를 가져오는 데 사용할 수 있는 쿼리를 저장합니다. 이 문서의 failed_records_query로 실패한 규칙 해결을 참조하세요.

BigQuery 항목의 경우 최신 실행의 SQL 검증 로직이 포함된 모든 rule_binding에 대한 뷰가 생성됩니다. 인수 --gcp-bq-dataset-id에 지정된 BigQuery 데이터 세트에서 이러한 뷰를 찾을 수 있습니다.

비용 최적화

다음 최적화를 통해 비용을 절감할 수 있습니다.

점진적 검증

새 파티션(새 행)으로 정기적으로 업데이트되는 테이블을 사용하는 경우가 종종 있습니다. 실행할 때마다 이전 파티션을 다시 검증하지 않으려면 점진적 검증을 사용하면 됩니다.

점진적 검증의 경우 열 값이 단조 증가하는 테이블에 TIMESTAMP 또는 DATETIME 유형 열이 있어야 합니다. BigQuery 테이블이 파티션된 열을 사용할 수 있습니다.

증분 유효성 검사를 지정하려면 규칙 결합의 일부로 incremental_time_filter_column_id=TIMESTAMP/DATETIME type column 값을 지정하세요.

열을 지정할 때 데이터 품질 태스크는 마지막으로 실행된 데이터 품질 태스크의 타임스탬프보다 TIMESTAMP 값이 큰 행만 고려합니다.

사양 파일 예시

이 샘플을 사용하려면 sales라는 BigQuery 데이터 세트를 만듭니다. 그런 다음 sales_orders라는 팩트 테이블을 만들고 다음 GoogleSQL 문으로 쿼리를 실행하여 샘플 데이터를 추가합니다.

CREATE OR REPLACE TABLE sales.sales_orders
(
 id STRING NOT NULL,
 last_modified_timestamp TIMESTAMP,
 customer_id STRING,
 item_id STRING,
 amount NUMERIC,
 transaction_currency STRING
);

INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")

샘플 1

다음 코드 샘플에서는 이러한 값을 검증하기 위한 데이터 품질 확인을 만듭니다.

  • amount: 값은 0 또는 양수입니다.
  • item_id: 5자리 영문자 다음에 15자리 숫자가 이어지는 영숫자 문자열입니다.
  • transaction_currency: 정적 목록에서 정의된 허용되는 통화 유형입니다. 이 샘플의 정적 목록은 통화 유형으로 GBP 및 JPY를 허용합니다. 이 검증은 국제로 표시된 행에만 적용됩니다.
# The following `NONE` row filter is required.
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 # This filters for rows marked as international (INTNL).
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can apply to multiple tables or columns.
rules:
 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

# Rule bindings associate rules to columns within tables.
rule_bindings:
  TRANSACTION_AMOUNT_VALID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

  TRANSACTION_VALID_ITEM_ID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

  TRANSACTION_CURRENCY_VALID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID입니다.
  • DATASET_ID: 데이터 세트 ID입니다.

샘플 2

확인할 테이블이 Dataplex 레이크의 일부인 경우 레이크 또는 영역 표기법을 사용하여 테이블을 지정할 수 있습니다. 이를 통해 레이크 또는 영역별로 결과를 집계할 수 있습니다. 예를 들어 영역 수준 점수를 생성할 수 있습니다.

이 샘플을 사용하려면 레이크 ID가 operations이고 영역 ID가 procurement인 Dataplex 레이크를 만듭니다. 그런 다음 영역에 sales_orders 테이블을 애셋으로 추가합니다.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}
rule_bindings:
 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID입니다.
  • REGION_ID: 테이블이 있는 Dataplex 레이크의 리전 ID입니다(예: us-central1).

샘플 3

이 예시에서는 ID 값이 고유한지 확인하는 커스텀 SQL 확인을 추가하여 샘플 2를 개선합니다.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
 NO_DUPLICATES_IN_COLUMN_GROUPS:
   rule_type: CUSTOM_SQL_STATEMENT
   dimension: duplication
   params:
     custom_sql_arguments:
       - column_names
     custom_sql_statement: |-
       select a.*
       from data a
       inner join (
         select
           $column_names
         from data
         group by $column_names
         having count(*) > 1
       ) duplicates
       using ($column_names)

 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}

rule_bindings:
 TRANSACTIONS_UNIQUE:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: id
   row_filter_id: NONE
   rule_ids:
     - NO_DUPLICATES_IN_COLUMN_GROUPS:
         column_names: "id"

 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

샘플 4

이 예시에서는 last_modified_timestamp 열을 사용하여 점진적 검증을 추가하여 샘플 3을 개선합니다. 하나 이상의 규칙 결합에 대한 점진적 검증을 추가할 수 있습니다.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
 NO_DUPLICATES_IN_COLUMN_GROUPS:
   rule_type: CUSTOM_SQL_STATEMENT
   dimension: duplication
   params:
     custom_sql_arguments:
       - column_names
     custom_sql_statement: |-
       select a.*
       from data a
       inner join (
         select
           $column_names
         from data
         group by $column_names
         having count(*) > 1
       ) duplicates
       using ($column_names)

 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}

rule_bindings:
 TRANSACTIONS_UNIQUE:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: id
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - NO_DUPLICATES_IN_COLUMN_GROUPS:
         column_names: "id"

 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: amount
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: item_id
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALID_CURRENCY_ID

failed_records_query로 실패한 규칙 해결

실패한 모든 규칙에 대해 요약 테이블에서 실패한 레코드를 가져오는 데 사용할 수 있는 failed_records_query 열에 쿼리를 저장합니다.

디버깅하기 위해 YAML 파일에서 reference columns을 사용할 수도 있습니다. 그러면 failed_records_query의 출력을 원본 데이터와 조인하여 전체 레코드를 가져올 수 있게 됩니다. 예를 들어 primary_key 열 또는 복합 primary_key 열을 참조 열로 지정할 수 있습니다.

참조 열 지정

참조 열을 생성하려면 YAML 사양에 다음을 추가하면 됩니다.

  1. reference_columns 섹션. 이 섹션에서는 각각 하나 이상의 참조 열을 지정하는 하나 이상의 참조 열 집합을 만들 수 있습니다.

  2. rule_bindings 섹션. 이 섹션에서는 규칙 바인딩의 규칙에 사용할 참조 열 ID(reference_columns_id)를 지정하는 줄을 규칙 바인딩에 추가할 수 있습니다. reference_columns 섹션에 지정된 참조 열 세트 중 하나여야 합니다.

예를 들어 다음 YAML 파일은 reference_columns 섹션을 지정하고 id, last_modified_timestamp, item_id와 같은 3개 열을 ORDER_DETAILS_REFERENCE_COLUMNS 집합의 일부로 정의합니다. 다음 예시에서는 샘플 테이블 sales_orders를 사용합니다.

reference_columns:
  ORDER_DETAILS_REFERENCE_COLUMNS:
    include_reference_columns:
      - id
      - last_modified_timestamp
      - item_id
rules:
  VALUE_ZERO_OR_POSITIVE:
  rule_type: CUSTOM_SQL_EXPR
  params:
    custom_sql_expr: |-

row_filters:
NONE:
  filter_sql_expr: |-
      True

rule_bindings:
TRANSACTION_AMOUNT_VALID:
  entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
  column_id: amount
  row_filter_id: NONE
  reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
  rule_ids:
    - VALUE_ZERO_OR_POSITIVE

실패한 레코드 쿼리 사용

실패한 레코드 쿼리는 실패한 규칙이 있는 모든 레코드의 행을 생성합니다. 여기에는 실패를 트리거한 열 이름, 실패를 트리거한 값, 참조 열 값이 포함됩니다. 데이터 품질 태스크 실행과 관련하여 사용할 수 있는 메타데이터도 포함됩니다.

다음은 참조 열 지정에서 설명한 YAML 파일에 대한 실패한 레코드 쿼리의 출력 예시입니다. amount 열의 실패와 실패한 값 -10을 보여줍니다. 또한 참조 열의 해당 값도 기록합니다.

_dq_validation_invocation_id _dq_validation_rule_binding_id _dq_validation_rule_id _dq_validation_column_id _dq_validation_column_value _dq_validation_dimension _dq_validation_simple_rule_row_is_valid _dq_validation_complex_rule_validation_errors_count _dq_validation_complex_rule_validation_success_flag id last_modified_timestamp item_id
10a25be9-8dfa-446c-a42c-75f6bb4a49d9 TRANSACTION_AMOUNT_VALID VALUE_ZERO_OR_POSITIVE amount -10 FALSE order1 2022-01-22T02:30:06.321Z bad_item_id

CUSTOM_SQL_STATEMENT 규칙에 실패한 레코드 쿼리 사용

CUSTOM_SQL_STATEMENT 규칙의 경우 실패한 레코드 쿼리에 custom_sql_statement_validation_errors 열이 포함됩니다. custom_sql_statement_validation_errors 열은 SQL 문의 출력과 일치하는 필드가 있는 중첩 열입니다. 참조 열은 CUSTOM_SQL_STATEMENT 규칙의 실패한 레코드 쿼리에 포함되지 않습니다.

예를 들어 CUSTOM_SQL_STATEMENT 규칙은 다음과 비슷합니다.

rules:
  TEST_RULE:
    rule_type: CUSTOM_SQL_STATEMENT
    custom_sql_arguments:
      - existing_id
      - replacement_id
    params:
     CUSTOM_SQL_STATEMENT: |-
       (SELECT product_name, product_key FROM data
       where $existing_id != $replacement_id)
이 예시의 결과에는 custom_sql_statement_validation_errors 열에 대한 행이 1개 이상 포함되고 existing_id!=replacement_id인 경우 모든 일치 항목에 대한 행이 포함됩니다.

JSON으로 렌더링하면 이 열의 셀 내용은 다음과 같이 표시될 수 있습니다.

{
  "custom_sql_statement_valdation_errors" :{
    "product_name"="abc"
    "product_key"="12345678"
    "_rule_binding_id"="your_rule_binding"
  }
}

이러한 결과를 join on custom_sql_statement_valdation_errors.product_key와 같은 중첩 참조를 사용하여 원본 테이블에 조인할 수 있습니다.

다음 단계