Este documento descreve como ativar a linhagem de dados em Google Cloud trabalhos de processamento em lote e sessões interativas sem servidor para o Apache Spark ao nível do projeto, trabalho de processamento em lote ou sessão interativa.
Vista geral
A linhagem de dados é uma funcionalidade do Catálogo universal do Dataplex que lhe permite acompanhar a forma como os dados se movem nos seus sistemas: de onde vêm, para onde são transmitidos e que transformações lhes são aplicadas.
Google Cloud O Serverless para cargas de trabalho e sessões do Apache Spark captura eventos de linhagem e publica-os na API Data Lineage do Dataplex Universal Catalog . O Serverless para Apache Spark integra-se com a API Data Lineage através do OpenLineage, usando o plug-in OpenLineage Spark.
Pode aceder às informações de linhagem através do catálogo universal do Dataplex, usando gráficos de linhagem e a API Data Lineage. Para mais informações, consulte o artigo Veja gráficos de linhagem no catálogo universal do Dataplex.
Disponibilidade, capacidades e limitações
A linhagem de dados, que suporta origens de dados do BigQuery e do Cloud Storage, está disponível para cargas de trabalho e sessões executadas com as versões de tempo de execução sem servidor para o Apache Spark 1.1
, 1.2
e 2.2
, com as seguintes exceções e limitações:
- A linhagem de dados não está disponível para cargas de trabalho ou sessões do SparkR ou do Spark Streaming.
Antes de começar
Na página do seletor de projetos na Google Cloud consola, selecione o projeto a usar para as suas cargas de trabalho ou sessões sem servidor para o Apache Spark.
Ative a API Data Lineage.
Funções necessárias
Se a sua carga de trabalho em lote usar a conta de serviço predefinida do Serverless para Apache Spark, tem a função Dataproc Worker
, que ativa a linhagem de dados. Não é necessária nenhuma ação adicional.
No entanto, se a sua carga de trabalho em lote usar uma conta de serviço personalizada para ativar a linhagem de dados, tem de conceder uma função necessária à conta de serviço personalizada, conforme explicado no parágrafo seguinte.
Para receber as autorizações de que precisa para usar a linhagem de dados com o Dataproc, peça ao seu administrador que lhe conceda as seguintes funções da IAM na sua conta de serviço personalizada da carga de trabalho em lote:
-
Conceda uma das seguintes funções:
-
Dataproc Worker (
roles/dataproc.worker
) -
Editor de linhagem de dados (
roles/datalineage.editor
) -
Produtor de linhagem de dados (
roles/datalineage.producer
) -
Administrador da linhagem de dados (
roles/datalineage.admin
)
-
Dataproc Worker (
Para mais informações sobre a atribuição de funções, consulte o artigo Faça a gestão do acesso a projetos, pastas e organizações.
Também pode conseguir as autorizações necessárias através de funções personalizadas ou outras funções predefinidas.
Ative a linhagem de dados ao nível do projeto
Pode ativar a linhagem de dados ao nível do projeto. Quando ativada ao nível do projeto, todas as cargas de trabalho em lote e sessões interativas subsequentes que executar no projeto terão a linhagem do Spark ativada.
Como ativar a linhagem de dados ao nível do projeto
Para ativar a linhagem de dados ao nível do projeto, defina os seguintes metadados personalizados do projeto.
Chave | Valor |
---|---|
DATAPROC_LINEAGE_ENABLED |
true |
DATAPROC_CLUSTER_SCOPES |
https://www.googleapis.com/auth/cloud-platform |
Pode desativar a linhagem de dados ao nível do projeto definindo os metadados DATAPROC_LINEAGE_ENABLED
como false
.
Ative a linhagem de dados para uma carga de trabalho em lote do Spark
Pode ativar a linhagem de dados numa carga de trabalho em lote
definindo a propriedade spark.dataproc.lineage.enabled
como true
quando
envia a carga de trabalho.
Exemplo de carga de trabalho em lote
Este exemplo envia uma carga de trabalho em lote lineage-example.py
com a linhagem do Spark ativada.
gcloud dataproc batches submit pyspark lineage-example.py \ --region=REGION \ --deps-bucket=gs://BUCKET \ --properties=spark.dataproc.lineage.enabled=true
lineage-example.py
lê dados de uma tabela pública do BigQuery e, em seguida, escreve o resultado numa nova tabela num conjunto de dados do BigQuery existente. Usa um contentor do Cloud Storage para armazenamento temporário.
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.option('writeMethod', 'direct') \
.save()
Faça as seguintes substituições:
REGION: selecione uma região para executar a sua carga de trabalho.
BUCKET: o nome de um contentor do Cloud Storage existente para armazenar dependências.
PROJECT_ID, DATASET e TABLE: insira o ID do projeto, o nome de um conjunto de dados do BigQuery existente e o nome de uma nova tabela a criar no conjunto de dados (a tabela não pode existir).
Pode ver o gráfico de linhagem na IU do catálogo universal do Dataplex.
Ative a linhagem de dados para uma sessão interativa do Spark
Pode ativar a linhagem de dados numa sessão interativa do Spark
definindo a propriedade spark.dataproc.lineage.enabled
como true
quando
criar a sessão ou o modelo de sessão.
Exemplo de sessão interativa
O seguinte código do bloco de notas do PySpark configura uma sessão interativa sem servidor para o Apache Spark com a linhagem de dados do Spark ativada. Em seguida, cria uma sessão do Spark Connect que executa uma consulta de contagem de palavras num conjunto de dados público do BigQuery Shakespeare e, de seguida, escreve o resultado numa nova tabela num conjunto de dados do BigQuery existente.
# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session
session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"
# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()
# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.save()
Faça as seguintes substituições:
- PROJECT_ID, DATASET e TABLE: insira o ID do projeto, o nome de um conjunto de dados do BigQuery existente e o nome de uma nova tabela a criar no conjunto de dados (a tabela não pode existir).
Pode ver o gráfico de linhagem de dados clicando no nome da tabela de destino apresentado no painel de navegação na página do Explorador do BigQuery e, de seguida, selecionando o separador Linhagem no painel de detalhes da tabela.
Veja a linhagem no catálogo universal do Dataplex
Um gráfico de linhagem apresenta as relações entre os recursos do seu projeto e os processos que os criaram. Pode ver informações de linhagem de dados na Google Cloud consola ou obter as informações da API Data Lineage como dados JSON.
O que se segue?
- Saiba mais sobre a linhagem de dados.