Usar linhagem de dados com o Serverless para Apache Spark

Neste documento, descrevemos como ativar a linhagem de dados noGoogle Cloud sem servidor para cargas de trabalho em lote do Apache Spark e sessões interativas no nível do projeto, da carga de trabalho em lote ou da sessão interativa.

Visão geral

A linhagem de dados é um recurso do Dataplex Universal Catalog que permite acompanhar como os dados se movimentam nos sistemas: origem, destino e quais transformações são aplicadas a eles.

Google Cloud O Serverless para cargas de trabalho e sessões do Apache Spark captura eventos de linhagem e os publica na API Data Lineage do Dataplex Universal Catalog. O Serverless para Apache Spark se integra à API Data Lineage usando o OpenLineage e o plug-in OpenLineage Spark.

É possível acessar informações de linhagem pelo Dataplex Universal Catalog usando gráficos de linhagem e a API Data Lineage. Para mais informações, consulte Visualizar gráficos de linhagem no Dataplex Universal Catalog.

Disponibilidade, recursos e limitações

A linhagem de dados, que é compatível com fontes de dados do BigQuery e do Cloud Storage, está disponível para cargas de trabalho e sessões executadas com as versões de ambiente de execução sem servidor para Apache Spark 1.1, 1.2 e 2.2, com as seguintes exceções e limitações:

  • A linhagem de dados não está disponível para cargas de trabalho ou sessões do SparkR ou do Spark Streaming.

Antes de começar

  1. Na página do seletor de projetos no console do Google Cloud , selecione o projeto a ser usado para suas sessões ou cargas de trabalho do Serverless para Apache Spark.

    Acessar o seletor de projetos

  2. Ative a API Data Lineage.

    Ativar as APIs

Funções exigidas

Se a carga de trabalho em lote usar a conta de serviço padrão do Serverless para Apache Spark, ela terá o papel Dataproc Worker, que ativa a linhagem de dados. Nenhuma outra ação é necessária.

No entanto, se sua carga de trabalho em lote usar uma conta de serviço personalizada para ativar a linhagem de dados, conceda um papel obrigatório a ela, conforme explicado no parágrafo a seguir.

Para receber as permissões necessárias para usar a linhagem de dados com o Dataproc, peça ao administrador para conceder a você os seguintes papéis do IAM na conta de serviço personalizada da carga de trabalho em lote:

Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Também é possível conseguir as permissões necessárias usando papéis personalizados ou outros papéis predefinidos.

Ativar a linhagem de dados no nível do projeto

É possível ativar a linhagem de dados no nível do projeto. Quando ativada no nível do projeto, todas as cargas de trabalho em lote e sessões interativas subsequentes executadas no projeto terão a linhagem do Spark ativada.

Como ativar a linhagem de dados no nível do projeto

Para ativar a linhagem de dados no nível do projeto, defina os seguintes metadados personalizados do projeto.

Chave Valor
DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform

É possível desativar a linhagem de dados no nível do projeto definindo os metadados DATAPROC_LINEAGE_ENABLED como false.

Ativar a linhagem de dados para uma carga de trabalho em lote do Spark

Para ativar a linhagem de dados em uma carga de trabalho em lote, defina a propriedade spark.dataproc.lineage.enabled como true ao enviar a carga de trabalho.

Exemplo de carga de trabalho em lote

Este exemplo envia uma carga de trabalho em lote do lineage-example.py com linhagem do Spark ativada.

gcloud dataproc batches submit pyspark lineage-example.py \
    --region=REGION \
    --deps-bucket=gs://BUCKET \
    --properties=spark.dataproc.lineage.enabled=true

O lineage-example.py lê dados de uma tabela pública do BigQuery e grava a saída em uma nova tabela em um conjunto de dados do BigQuery. Ele usa um bucket do Cloud Storage para armazenamento temporário.

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .option('writeMethod', 'direct') \
  .save()

Faça as seguintes substituições:

  • REGION: selecione uma região para executar sua carga de trabalho.

  • BUCKET: o nome de um bucket do Cloud Storage para armazenar dependências.

  • PROJECT_ID, DATASET e TABLE: insira seu ID do projeto, o nome de um conjunto de dados do BigQuery e o nome de uma nova tabela a ser criada no conjunto de dados (a tabela não pode existir).

É possível conferir o gráfico de linhagem na interface do Dataplex Universal Catalog.

Gráfico de linhagem do Spark

Ativar a linhagem de dados para uma sessão interativa do Spark

Para ativar a linhagem de dados em uma sessão interativa do Spark, defina a propriedade spark.dataproc.lineage.enabled como true ao criar a sessão ou o modelo de sessão.

Exemplo de sessão interativa

O código do notebook PySpark a seguir configura uma sessão interativa do Serverless para Apache Spark com a linhagem de dados do Spark ativada. Em seguida, ele cria uma sessão do Spark Connect que executa uma consulta de contagem de palavras em um conjunto de dados público do BigQuery Shakespeare e grava a saída em uma nova tabela em um conjunto de dados do BigQuery.

# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session

session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"

# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()

# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
           'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

Faça as seguintes substituições:

  • PROJECT_ID, DATASET e TABLE: insira seu ID do projeto, o nome de um conjunto de dados do BigQuery e o nome de uma nova tabela a ser criada no conjunto de dados (a tabela não pode existir).

Para ver o gráfico de linhagem de dados, clique no nome da tabela de destino listado no painel de navegação da página Explorer do BigQuery e selecione a guia "Linhagem" no painel de detalhes da tabela.

Gráfico de linhagem do Spark

Visualizar a linhagem no catálogo universal do Dataplex

Um gráfico de linhagem mostra as relações entre os recursos do projeto e os processos que os criaram. É possível ver informações de linhagem de dados no console do Google Cloud ou recuperar as informações da API Data Lineage como dados JSON.

A seguir