Spark 데이터 계보

개요

데이터 계보는 시스템을 통해 데이터가 이동하는 방식, 즉 데이터의 출처, 데이터가 전달되는 위치, 데이터에 적용되는 변환을 추적할 수 있는 Dataplex 기능입니다.

데이터 계보를 사용 설정하면 Spark를 위한 서버리스 Dataproc 워크로드가 계보 이벤트를 캡처하여 Dataplex Data Lineage API에 게시합니다. Spark를 위한 서버리스 Dataproc은 OpenLineage Spark 플러그인을 사용하여 OpenLineage를 통해 Data Lineage API와 통합됩니다.

계보 시각화 그래프Data Lineage API를 사용하면 Dataplex를 통해 계보 정보에 액세스할 수 있습니다. 자세한 내용은 Dataplex에서 계보 그래프 보기를 참조하세요.

가용성, 기능 및 제한사항

BigQuery를 포함하는 Spark 데이터 계보 및 Cloud Storage 데이터 소스는 지원되는 Spark용 Dataproc Serverless 런타임 버전으로 실행되는 워크로드에 사용할 수 있으며 다음과 같은 예외 및 제한사항이 적용됩니다.

  • Spark 데이터 계보는 다음 항목에 제공되지 않습니다.
    • Dataproc Serverless Spark 런타임 버전 1.2 워크로드
    • Spark 스트리밍 워크로드
    • BigQuery 커넥터 버전 2(Spark의 데이터 소스 API 버전 2)

시작하기 전에

  1. Google Cloud 콘솔의 프로젝트 선택자 페이지에서 Spark를 위한 서버리스 Dataproc 워크로드에 사용할 프로젝트를 선택합니다.

    프로젝트 선택자로 이동

  2. Data Lineage API 및 Data Catalog API를 사용 설정합니다.

    API 사용 설정

필요한 역할

Spark용 Dataproc Serverless에서 데이터 계보를 사용하는 데 필요한 권한을 얻으려면 관리자에게 Dataproc 클러스터 VM 서비스 계정에 대한 다음 IAM 역할을 부여해 달라고 요청하세요.

역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

커스텀 역할이나 다른 사전 정의된 역할을 통해 필요한 권한을 얻을 수도 있습니다.

Spark 일괄 워크로드에 데이터 계보 사용 설정

워크로드를 제출할 때 spark.dataproc.lineage.enabled 속성을 true로 설정하여 일괄 워크로드에 Spark 데이터 계보를 사용 설정할 수 있습니다.

Google Cloud CLI 예시:

gcloud dataproc batches submit pyspark FILENAME.py
    --region=REGION \
    --version=1.1 \
    --properties=spark.dataproc.lineage.enabled=true \
    other args ...

Dataplex에서 계보 그래프 보기

계보 시각화 그래프에는 프로젝트 리소스와 이를 만든 프로세스 간의 관계가 표시됩니다. Google Cloud 콘솔의 그래프 시각화에서 데이터 계보 정보를 확인하거나 Data Lineage API에서 JSON 데이터로 정보를 가져올 수 있습니다.

자세한 내용은 Google Cloud 시스템에 데이터 계보 사용 을 참조하세요.

예:

다음 Spark 워크로드는 BigQuery 테이블에서 데이터를 읽고 출력을 다른 BigQuery 테이블에 씁니다.

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = lineage-ol-test
spark.conf.set('temporaryGcsBucket', bucket)

source = sample.source
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination = sample.destination
word_count.write.format('bigquery') \
  .option('table', destination) \
  .save()

이 Spark 워크로드는 Dataplex UI에서 다음 계보 그래프를 만듭니다.

샘플 계보 그래프

다음 단계