Neste documento, descrevemos como ativar a linhagem de dados noGoogle Cloud sem servidor para cargas de trabalho em lote do Apache Spark e sessões interativas no nível do projeto, da carga de trabalho em lote ou da sessão interativa.
Visão geral
A linhagem de dados é um recurso do Dataplex Universal Catalog que permite acompanhar como os dados se movimentam nos sistemas: origem, destino e quais transformações são aplicadas a eles.
Google Cloud O Serverless para cargas de trabalho e sessões do Apache Spark captura eventos de linhagem e os publica na API Data Lineage do Dataplex Universal Catalog. O Serverless para Apache Spark se integra à API Data Lineage usando o OpenLineage e o plug-in OpenLineage Spark.
É possível acessar informações de linhagem pelo Dataplex Universal Catalog usando gráficos de linhagem e a API Data Lineage. Para mais informações, consulte Visualizar gráficos de linhagem no Dataplex Universal Catalog.
Disponibilidade, recursos e limitações
A linhagem de dados, que é compatível com fontes de dados do BigQuery e do Cloud Storage, está disponível para cargas de trabalho e sessões executadas com as versões de ambiente de execução sem servidor para Apache Spark 1.1
, 1.2
e 2.2
, com as seguintes exceções e limitações:
- A linhagem de dados não está disponível para cargas de trabalho ou sessões do SparkR ou do Spark Streaming.
Antes de começar
Na página do seletor de projetos no console do Google Cloud , selecione o projeto a ser usado para suas sessões ou cargas de trabalho do Serverless para Apache Spark.
Ative a API Data Lineage.
Funções exigidas
Se a carga de trabalho em lote usar a
conta de serviço padrão do Serverless para Apache Spark,
ela terá o papel Dataproc Worker
, que ativa a linhagem de dados. Nenhuma outra ação é necessária.
No entanto, se sua carga de trabalho em lote usar uma conta de serviço personalizada para ativar a linhagem de dados, conceda um papel obrigatório a ela, conforme explicado no parágrafo a seguir.
Para receber as permissões necessárias para usar a linhagem de dados com o Dataproc, peça ao administrador para conceder a você os seguintes papéis do IAM na conta de serviço personalizada da carga de trabalho em lote:
-
Conceda um dos seguintes papéis:
-
Worker do Dataproc (
roles/dataproc.worker
) -
Editor da linhagem de dados (
roles/datalineage.editor
) -
Produtor de linhagem de dados (
roles/datalineage.producer
) -
Administrador da linhagem de dados (
roles/datalineage.admin
)
-
Worker do Dataproc (
Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.
Também é possível conseguir as permissões necessárias usando papéis personalizados ou outros papéis predefinidos.
Ativar a linhagem de dados no nível do projeto
É possível ativar a linhagem de dados no nível do projeto. Quando ativada no nível do projeto, todas as cargas de trabalho em lote e sessões interativas subsequentes executadas no projeto terão a linhagem do Spark ativada.
Como ativar a linhagem de dados no nível do projeto
Para ativar a linhagem de dados no nível do projeto, defina os seguintes metadados personalizados do projeto.
Chave | Valor |
---|---|
DATAPROC_LINEAGE_ENABLED |
true |
DATAPROC_CLUSTER_SCOPES |
https://www.googleapis.com/auth/cloud-platform |
É possível desativar a linhagem de dados no nível do projeto definindo os metadados DATAPROC_LINEAGE_ENABLED
como false
.
Ativar a linhagem de dados para uma carga de trabalho em lote do Spark
Para ativar a linhagem de dados em uma carga de trabalho em lote, defina a propriedade spark.dataproc.lineage.enabled
como true
ao enviar a carga de trabalho.
Exemplo de carga de trabalho em lote
Este exemplo envia uma carga de trabalho em lote do lineage-example.py
com linhagem do Spark
ativada.
gcloud dataproc batches submit pyspark lineage-example.py \ --region=REGION \ --deps-bucket=gs://BUCKET \ --properties=spark.dataproc.lineage.enabled=true
O lineage-example.py
lê dados de uma tabela pública do BigQuery e grava a saída em uma nova tabela em um conjunto de dados do BigQuery. Ele usa um bucket do Cloud Storage para armazenamento temporário.
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.option('writeMethod', 'direct') \
.save()
Faça as seguintes substituições:
REGION: selecione uma região para executar sua carga de trabalho.
BUCKET: o nome de um bucket do Cloud Storage para armazenar dependências.
PROJECT_ID, DATASET e TABLE: insira seu ID do projeto, o nome de um conjunto de dados do BigQuery e o nome de uma nova tabela a ser criada no conjunto de dados (a tabela não pode existir).
É possível conferir o gráfico de linhagem na interface do Dataplex Universal Catalog.
Ativar a linhagem de dados para uma sessão interativa do Spark
Para ativar a linhagem de dados em uma sessão interativa do Spark, defina a propriedade spark.dataproc.lineage.enabled
como true
ao criar a sessão ou o modelo de sessão.
Exemplo de sessão interativa
O código do notebook PySpark a seguir configura uma sessão interativa do Serverless para Apache Spark com a linhagem de dados do Spark ativada. Em seguida, ele cria uma sessão do Spark Connect que executa uma consulta de contagem de palavras em um conjunto de dados público do BigQuery Shakespeare e grava a saída em uma nova tabela em um conjunto de dados do BigQuery.
# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session
session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"
# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()
# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.save()
Faça as seguintes substituições:
- PROJECT_ID, DATASET e TABLE: insira seu ID do projeto, o nome de um conjunto de dados do BigQuery e o nome de uma nova tabela a ser criada no conjunto de dados (a tabela não pode existir).
Para ver o gráfico de linhagem de dados, clique no nome da tabela de destino listado no painel de navegação da página Explorer do BigQuery e selecione a guia "Linhagem" no painel de detalhes da tabela.
Visualizar a linhagem no catálogo universal do Dataplex
Um gráfico de linhagem mostra as relações entre os recursos do projeto e os processos que os criaram. É possível ver informações de linhagem de dados no console do Google Cloud ou recuperar as informações da API Data Lineage como dados JSON.
A seguir
- Saiba mais sobre a linhagem de dados.