Use a linhagem de dados com o Serverless para Apache Spark

Este documento descreve como ativar a linhagem de dados em Google Cloud trabalhos de processamento em lote e sessões interativas sem servidor para o Apache Spark ao nível do projeto, trabalho de processamento em lote ou sessão interativa.

Vista geral

A linhagem de dados é uma funcionalidade do Catálogo universal do Dataplex que lhe permite acompanhar a forma como os dados se movem nos seus sistemas: de onde vêm, para onde são transmitidos e que transformações lhes são aplicadas.

Google Cloud O Serverless para cargas de trabalho e sessões do Apache Spark captura eventos de linhagem e publica-os na API Data Lineage do Dataplex Universal Catalog . O Serverless para Apache Spark integra-se com a API Data Lineage através do OpenLineage, usando o plug-in OpenLineage Spark.

Pode aceder às informações de linhagem através do catálogo universal do Dataplex, usando gráficos de linhagem e a API Data Lineage. Para mais informações, consulte o artigo Veja gráficos de linhagem no catálogo universal do Dataplex.

Disponibilidade, capacidades e limitações

A linhagem de dados, que suporta origens de dados do BigQuery e do Cloud Storage, está disponível para cargas de trabalho e sessões executadas com as versões de tempo de execução sem servidor para o Apache Spark 1.1, 1.2 e 2.2, com as seguintes exceções e limitações:

  • A linhagem de dados não está disponível para cargas de trabalho ou sessões do SparkR ou do Spark Streaming.

Antes de começar

  1. Na página do seletor de projetos na Google Cloud consola, selecione o projeto a usar para as suas cargas de trabalho ou sessões sem servidor para o Apache Spark.

    Aceder ao seletor de projetos

  2. Ative a API Data Lineage.

    Ative as APIs

Funções necessárias

Se a sua carga de trabalho em lote usar a conta de serviço predefinida do Serverless para Apache Spark, tem a função Dataproc Worker, que ativa a linhagem de dados. Não é necessária nenhuma ação adicional.

No entanto, se a sua carga de trabalho em lote usar uma conta de serviço personalizada para ativar a linhagem de dados, tem de conceder uma função necessária à conta de serviço personalizada, conforme explicado no parágrafo seguinte.

Para receber as autorizações de que precisa para usar a linhagem de dados com o Dataproc, peça ao seu administrador que lhe conceda as seguintes funções da IAM na sua conta de serviço personalizada da carga de trabalho em lote:

Para mais informações sobre a atribuição de funções, consulte o artigo Faça a gestão do acesso a projetos, pastas e organizações.

Também pode conseguir as autorizações necessárias através de funções personalizadas ou outras funções predefinidas.

Ative a linhagem de dados ao nível do projeto

Pode ativar a linhagem de dados ao nível do projeto. Quando ativada ao nível do projeto, todas as cargas de trabalho em lote e sessões interativas subsequentes que executar no projeto terão a linhagem do Spark ativada.

Como ativar a linhagem de dados ao nível do projeto

Para ativar a linhagem de dados ao nível do projeto, defina os seguintes metadados personalizados do projeto.

Chave Valor
DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform

Pode desativar a linhagem de dados ao nível do projeto definindo os metadados DATAPROC_LINEAGE_ENABLED como false.

Ative a linhagem de dados para uma carga de trabalho em lote do Spark

Pode ativar a linhagem de dados numa carga de trabalho em lote definindo a propriedade spark.dataproc.lineage.enabled como true quando envia a carga de trabalho.

Exemplo de carga de trabalho em lote

Este exemplo envia uma carga de trabalho em lote lineage-example.py com a linhagem do Spark ativada.

gcloud dataproc batches submit pyspark lineage-example.py \
    --region=REGION \
    --deps-bucket=gs://BUCKET \
    --properties=spark.dataproc.lineage.enabled=true

lineage-example.py lê dados de uma tabela pública do BigQuery e, em seguida, escreve o resultado numa nova tabela num conjunto de dados do BigQuery existente. Usa um contentor do Cloud Storage para armazenamento temporário.

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .option('writeMethod', 'direct') \
  .save()

Faça as seguintes substituições:

  • REGION: selecione uma região para executar a sua carga de trabalho.

  • BUCKET: o nome de um contentor do Cloud Storage existente para armazenar dependências.

  • PROJECT_ID, DATASET e TABLE: insira o ID do projeto, o nome de um conjunto de dados do BigQuery existente e o nome de uma nova tabela a criar no conjunto de dados (a tabela não pode existir).

Pode ver o gráfico de linhagem na IU do catálogo universal do Dataplex.

Gráfico de linhagem do Spark

Ative a linhagem de dados para uma sessão interativa do Spark

Pode ativar a linhagem de dados numa sessão interativa do Spark definindo a propriedade spark.dataproc.lineage.enabled como true quando criar a sessão ou o modelo de sessão.

Exemplo de sessão interativa

O seguinte código do bloco de notas do PySpark configura uma sessão interativa sem servidor para o Apache Spark com a linhagem de dados do Spark ativada. Em seguida, cria uma sessão do Spark Connect que executa uma consulta de contagem de palavras num conjunto de dados público do BigQuery Shakespeare e, de seguida, escreve o resultado numa nova tabela num conjunto de dados do BigQuery existente.

# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session

session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"

# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()

# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
           'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

Faça as seguintes substituições:

  • PROJECT_ID, DATASET e TABLE: insira o ID do projeto, o nome de um conjunto de dados do BigQuery existente e o nome de uma nova tabela a criar no conjunto de dados (a tabela não pode existir).

Pode ver o gráfico de linhagem de dados clicando no nome da tabela de destino apresentado no painel de navegação na página do Explorador do BigQuery e, de seguida, selecionando o separador Linhagem no painel de detalhes da tabela.

Gráfico de linhagem do Spark

Veja a linhagem no catálogo universal do Dataplex

Um gráfico de linhagem apresenta as relações entre os recursos do seu projeto e os processos que os criaram. Pode ver informações de linhagem de dados na Google Cloud consola ou obter as informações da API Data Lineage como dados JSON.

O que se segue?