Dokumen ini menjelaskan cara mengaktifkan asal-usul data di Google Cloud Serverless untuk workload batch Apache Spark dan sesi interaktif di tingkat project, workload batch, atau sesi interaktif.
Ringkasan
Silsilah data adalah fitur Dataplex Universal Catalog yang memungkinkan Anda melacak bagaimana data bergerak melalui sistem: dari mana data berasal, ke mana data diteruskan, dan transformasi apa yang diterapkan pada data.
Google Cloud Serverless untuk workload dan sesi Apache Spark merekam peristiwa silsilah dan memublikasikannya ke Data Lineage API Dataplex Universal Catalog. Serverless for Apache Spark terintegrasi dengan Data Lineage API melalui OpenLineage, menggunakan plugin OpenLineage Spark.
Anda dapat mengakses informasi silsilah melalui Dataplex Universal Catalog, menggunakan grafik silsilah dan Data Lineage API. Untuk mengetahui informasi selengkapnya, lihat Melihat grafik silsilah di Dataplex Universal Catalog.
Ketersediaan, kemampuan, dan batasan
Silsilah data, yang mendukung sumber data BigQuery dan Cloud Storage, tersedia untuk workload dan sesi yang berjalan dengan versi runtime Serverless for Apache Spark 1.1
, 1.2
, dan 2.2
, dengan pengecualian dan batasan berikut:
- Silsilah data tidak tersedia untuk beban kerja atau sesi streaming SparkR atau Spark.
Sebelum memulai
Di halaman pemilih project di konsol Google Cloud , pilih project yang akan digunakan untuk beban kerja atau sesi Serverless for Apache Spark Anda.
Aktifkan Data Lineage API.
Peran yang diperlukan
Jika workload batch Anda menggunakan
akun layanan Serverless for Apache Spark default,
workload tersebut memiliki peran Dataproc Worker
, yang memungkinkan pelacakan asal data. Anda tidak perlu melakukan tindakan tambahan apa pun.
Namun, jika beban kerja batch Anda menggunakan akun layanan kustom untuk mengaktifkan asal data, Anda harus memberikan peran yang diperlukan ke akun layanan kustom seperti yang dijelaskan dalam paragraf berikut.
Untuk mendapatkan izin yang diperlukan untuk menggunakan silsilah data dengan Dataproc, minta administrator untuk memberi Anda peran IAM berikut pada akun layanan kustom beban kerja batch Anda:
-
Beri salah satu peran berikut:
-
Worker Dataproc (
roles/dataproc.worker
) -
Editor urutan data (
roles/datalineage.editor
) -
Produsen urutan data (
roles/datalineage.producer
) -
Administrator urutan data (
roles/datalineage.admin
)
-
Worker Dataproc (
Untuk mengetahui informasi selengkapnya tentang pemberian peran, lihat Mengelola akses ke project, folder, dan organisasi.
Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran kustom atau peran yang telah ditentukan lainnya.
Mengaktifkan silsilah data di level project
Anda dapat mengaktifkan silsilah data di tingkat project. Jika diaktifkan di tingkat project, semua beban kerja batch dan sesi interaktif berikutnya yang Anda jalankan dalam project akan mengaktifkan silsilah Spark.
Cara mengaktifkan silsilah data di tingkat project
Untuk mengaktifkan silsilah data di level project, tetapkan metadata project kustom berikut.
Kunci | Nilai |
---|---|
DATAPROC_LINEAGE_ENABLED |
true |
DATAPROC_CLUSTER_SCOPES |
https://www.googleapis.com/auth/cloud-platform |
Anda dapat menonaktifkan silsilah data di tingkat project dengan menyetel metadata
DATAPROC_LINEAGE_ENABLED
ke false
.
Mengaktifkan silsilah data untuk workload batch Spark
Anda dapat mengaktifkan silsilah data pada workload batch
dengan menyetel properti spark.dataproc.lineage.enabled
ke true
saat Anda
mengirimkan workload.
Contoh workload batch
Contoh ini mengirimkan workload batch lineage-example.py
dengan mengaktifkan silsilah Spark.
gcloud dataproc batches submit pyspark lineage-example.py \ --region=REGION \ --deps-bucket=gs://BUCKET \ --properties=spark.dataproc.lineage.enabled=true
lineage-example.py
membaca data dari tabel BigQuery publik, lalu menulis output ke tabel baru dalam set data BigQuery yang ada. Bucket ini menggunakan bucket Cloud Storage untuk penyimpanan sementara.
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.option('writeMethod', 'direct') \
.save()
Lakukan penggantian berikut:
REGION: Pilih region untuk menjalankan workload Anda.
BUCKET: Nama bucket Cloud Storage yang ada untuk menyimpan dependensi.
PROJECT_ID, DATASET, dan TABLE: Masukkan ID project Anda, nama set data BigQuery yang ada, dan nama tabel baru yang akan dibuat dalam set data (tabel tidak boleh ada).
Anda dapat melihat grafik silsilah di UI Dataplex Universal Catalog.
Mengaktifkan silsilah data untuk sesi interaktif Spark
Anda dapat mengaktifkan silsilah data pada sesi interaktif Spark
dengan menyetel properti spark.dataproc.lineage.enabled
ke true
saat Anda
membuat sesi atau template sesi.
Contoh sesi interaktif
Kode notebook PySpark berikut mengonfigurasi sesi interaktif Serverless for Apache Spark dengan pelacakan asal data Spark diaktifkan. Kemudian, kode ini membuat sesi Spark Connect yang menjalankan kueri jumlah kata pada set data Shakespeare BigQuery publik, lalu menulis output ke tabel baru dalam set data BigQuery yang ada.
# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session
session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"
# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()
# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.save()
Lakukan penggantian berikut:
- PROJECT_ID, DATASET, dan TABLE: Masukkan ID project Anda, nama set data BigQuery yang ada, dan nama tabel baru yang akan dibuat dalam set data (tabel tidak boleh ada).
Anda dapat melihat grafik asal data dengan mengklik nama tabel tujuan yang tercantum di panel navigasi pada halaman Explorer BigQuery, lalu memilih tab asal data di panel detail tabel.
Melihat silsilah di Dataplex Universal Catalog
Grafik silsilah menampilkan hubungan antara resource project Anda dan proses yang membuatnya. Anda dapat melihat informasi silsilah data di konsol Google Cloud atau mengambil informasi dari Data Lineage API sebagai data JSON.
Langkah berikutnya
- Pelajari lebih lanjut silsilah data.