与 OpenLineage 集成

OpenLineage 是一个用于收集和分析数据谱系信息的开放平台。对沿袭使用开放式标准 数据,OpenLineage 会从数据流水线组件中捕获沿袭事件 它们使用 OpenLineage API 来报告运行作业、作业和数据集。

通过 Data Lineage API,您可以导入 OpenLineage 事件,以便在 Dataplex 界面中显示这些事件以及来自 Google Cloud 服务(例如 BigQuery、Cloud Composer、Cloud Data Fusion 和 Dataproc)的谱系信息。

如需导入使用 OpenLineage 规范 请使用ProcessOpenLineageRunEvent REST API 方法,并将 OpenLineage 分面映射到 Data Lineage API 属性。

限制

  • Data Lineage API 支持 OpenLineage 主要版本 1 和 2。

  • Data Lineage API 不支持以下各项:

    • 任何包含消息格式更改的后续 OpenLineage 版本
    • DatasetEvent
    • JobEvent
  • 单封邮件的大小上限为 5 MB。

  • 输入和输出中的每个完全限定名称的长度不得超过 4,000 个字符。

  • 链接按事件进行分组,其中包含 100 个链接。单个 Pod 的 链接数为 1000。

  • Dataplex 显示每次作业运行的沿袭图,其中显示输入 和沿袭事件的输出。它不支持 Spark 阶段等更低级别的进程。

OpenLineage 映射

REST API 方法 ProcessOpenLineageRunEvent 将 OpenLineage 属性映射到 Data Lineage API 属性,如下所示:

Data Lineage API 属性 OpenLineage 属性
Process.name projects/PROJECT_NUMBER/locations/LOCATION/processes/HASH_OF_NAMESPACE_AND_NAME
Process.displayName Job.namespace + ":" + Job.name
Process.attributes Job.facets(请参阅存储的数据
Run.name projects/PROJECT_NUMBER/locations/LOCATION/processes/HASH_OF_NAMESPACE_AND_NAME/runs/HASH_OF_RUNID
Run.displayName Run.runId
Run.attributes Run.facets(请参阅存储的数据
Run.startTime eventTime
Run.endTime eventTime
Run.state eventType
LineageEvent.name projects/PROJECT_NUMBER/locations/LOCATION/processes/HASH_OF_NAMESPACE_AND_NAME/runs/HASH_OF_RUNID/lineageEvents/HASH_OF_JOB_RUN_INPUT_OUTPUTS_OF_EVENT(例如 projects/11111111/locations/us/processes/1234/runs/4321/lineageEvents/111-222-333)
LineageEvent.EventLinks.source 输入(fqn 是命名空间和名称的串联)
LineageEvent.EventLinks.target 输出(fqn 是命名空间和名称的串联)
LineageEvent.startTime eventTime
LineageEvent.endTime eventTime
requestId 由方法用户定义

导入 OpenLineage 事件

如果您尚未设置 OpenLineage,请参阅使用入门

如需将 OpenLineage 事件导入 Dataplex,请调用 REST API 方法 ProcessOpenLineageRunEvent

POST https://datalineage.googleapis.com/v1/projects/{project}/locations/{location}:processOpenLineageRunEvent \
--data '{"eventTime":"2023-04-04T13:21:16.098Z","eventType":"COMPLETE","inputs":[{"name":"somename","namespace":"somenamespace"}],"job":{"name":"somename","namespace":"somenamespace"},"outputs":[{"name":"somename","namespace":"somenamespace"}],"producer":"someproducer","run":{"runId":"somerunid"},"schemaURL":"https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"}'

分析 OpenLineage 中的信息

如需分析导入的 OpenLineage 事件,请参阅在 Dataplex 界面中查看谱系图

数据存储

Data Lineage API 不会存储 OpenLineage 消息中的所有构面数据。 Data Lineage API 存储以下分面字段:

  • spark_version
    • openlineage-spark-version
    • spark-version
  • 所有spark.logicalPlan.*
  • environment-properties(自定义 Google Cloud 沿袭分面)
    • origin.sourcetype”和“origin.name
    • spark.app.id
    • spark.app.name
    • spark.batch.id
    • spark.batch.uuid
    • spark.cluster.name
    • spark.cluster.region
    • spark.job.id
    • spark.job.uuid
    • spark.project.id
    • spark.query.node.name
    • spark.session.id
    • spark.session.uuid

Data Lineage API 存储以下信息:

  • eventTime
  • run.runId
  • job.namespace
  • job.name