本文档提供了一个参考模板,可帮助您构建用于从第三方来源提取元数据的自定义连接器。在运行用于将元数据导入 Dataplex 的托管式连接流水线时,您需要使用此连接器。
您可以构建连接器,以从第三方来源提取元数据。例如,您可以构建连接器,从 MySQL、SQL Server、Oracle、Snowflake、Databricks 等来源中提取数据。
您可以使用本文档中的示例连接器作为构建您自己的连接器的起点。示例连接器会连接到 Oracle Database Express Edition (XE) 数据库。该连接器是用 Python 构建的,但您也可以使用 Java、Scala 或 R。
连接器的运作方式
连接器会从第三方数据源中提取元数据,将元数据转换为 Dataplex ImportItem
格式,并生成可供 Dataplex 导入的元数据导入文件。
该连接器是托管式连接流水线的一部分。托管式连接流水线是一种编排的工作流,用于导入 Dataplex Catalog 元数据。托管式连接管道会运行连接器,并执行导入工作流中的其他任务,例如运行元数据导入作业和捕获日志。
托管式连接流水线使用 Dataproc Serverless 批处理作业运行连接器。Dataproc Serverless 提供无服务器 Spark 执行环境。虽然您可以构建不使用 Spark 的连接器,但我们建议您使用 Spark,因为它可以提高连接器的性能。
连接器要求
该连接器有以下要求:
- 连接器必须是可在 Dataproc Serverless 上运行的 Artifact Registry 映像。
- 连接器必须生成可由 Dataplex 元数据导入作业(
metadataJobs.create
API 方法)导入的格式的元数据文件。如需了解详细要求,请参阅元数据导入文件。 连接器必须接受以下命令行参数,才能从流水线接收信息:
命令行参数 流水线提供的价值 target_project_id
PROJECT_ID target_location_id
REGION target_entry_group_id
ENTRY_GROUP_ID output_bucket
CLOUD_STORAGE_BUCKET_ID output_folder
FOLDER_ID 连接器使用这些参数在目标条目组
projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID
中生成元数据,并将其写入 Cloud Storage 存储分区gs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID
。流水线每次执行都会在存储分区 CLOUD_STORAGE_BUCKET_ID 中创建一个新文件夹 FOLDER_ID。连接器应将元数据导入文件写入此文件夹。
流水线模板支持 PySpark 连接器。这些模板假定驱动程序 (mainPythonFileUri
) 是连接器映像上名为 main.py
的本地文件。您可以修改流水线模板以适应其他场景,例如 Spark 连接器、其他驱动程序 URI 或其他选项。
下面介绍了如何使用 PySpark 在元数据导入文件中创建导入项。
"""PySpark schemas for the data."""
entry_source_schema = StructType([
StructField("display_name", StringType()),
StructField("source", StringType())])
aspect_schema = MapType(StringType(),
StructType([
StructField("aspect_type", StringType()),
StructField("data", StructType([
]))
])
)
entry_schema = StructType([
StructField("name", StringType()),
StructField("entry_type", StringType()),
StructField("fully_qualified_name", StringType()),
StructField("parent_entry", StringType()),
StructField("entry_source", entry_source_schema),
StructField("aspects", aspect_schema)
])
import_item_schema = StructType([
StructField("entry", entry_schema),
StructField("aspect_keys", ArrayType(StringType())),
StructField("update_mask", ArrayType(StringType()))
])
准备工作
本指南假定您熟悉 Python 和 PySpark。
请查看以下信息:
请执行以下操作。在同一 Google Cloud位置创建所有资源。
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataplex, Dataproc, Workflows, and Artifact Registry APIs:
gcloud services enable dataplex.googleapis.com
dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com - Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant the
roles/owner
IAM role to the service account:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service account
-
-
创建 Cloud Storage 存储分区以存储元数据导入文件。
-
在同一项目中创建以下 Dataplex Catalog 资源。
如需查看示例值,请参阅本文档的Oracle 来源的 Dataplex Catalog 资源示例部分。
- 确保您的 Google Cloud 项目可以访问您的第三方来源。如需了解详情,请参阅 Dataproc Serverless for Spark 网络配置。
创建基本 Python 连接器
示例基本 Python 连接器使用 Dataplex 客户端库类为 Oracle 数据源创建顶级条目。然后,为条目字段提供值。
该连接器会创建一个包含以下条目的元数据导入文件:
instance
条目,条目类型为projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance
。此条目表示 Oracle Database XE 系统。database
条目,表示 Oracle Database XE 系统中的数据库。
如需构建基本的 Python 连接器,请执行以下操作:
设置本地环境。我们建议您使用虚拟环境。
mkdir venv python -m venv venv/ source venv/bin/activate
创建一个 Python 项目。
安装要求:
pip install -r requirements.txt
安装了以下要求:
在项目的根目录中添加
main.py
流水线文件。将代码部署到 Dataproc Serverless 时,
main.py
文件将用作执行入口点。我们建议您尽量减少存储在main.py
文件中的信息量;使用此文件调用连接器中定义的函数和类,例如src/bootstap.py
类。创建一个
src
文件夹,用于存储连接器的大部分逻辑。使用 Python 类更新
src/cmd_reader.py
文件,以接受命令行参数。您可以使用 argeparse 模块执行此操作。在生产环境中,我们建议您将密码存储在 Secret Manager 中。
使用用于创建常量的代码更新
src/constants.py
文件。使用您希望连接器为 Oracle 资源创建的 Dataplex Catalog 资源的构建方法更新
src/name_builder.py
文件。请使用本文档中适用于 Oracle 来源的 Dataplex Catalog 资源示例部分中所述的惯例。由于
name_builder.py
文件同时用于 Python 核心代码和 PySpark 核心代码,因此我们建议您将方法编写为纯函数,而不是类的成员。使用代码更新
src/top_entry_builder.py
文件,以便使用数据填充顶级条目。使用代码更新
src/bootstrap.py
文件,以生成元数据导入文件并运行连接器。在本地运行代码。
系统会返回一个名为
output.jsonl
的元数据导入文件。该文件包含两行,每行代表一个导入项。在运行元数据导入作业时,受管联接流水线会读取此文件。可选:扩展上例,使用 Dataplex 客户端库类为表、架构和视图创建导入项。您还可以在 Dataproc Serverless 上运行 Python 示例。
我们建议您创建使用 Spark(并在 Dataproc Serverless 上运行)的连接器,因为这样可以提高连接器的性能。
创建 PySpark 连接器
此示例基于 PySpark DataFrame API。您可以在本地安装 PySpark SQL 并运行它,然后再在 Dataproc Serverless 上运行。如果您在本地安装并运行 PySpark,请使用 pip 安装 PySpark 库,但无需安装本地 Spark 集群。
出于性能方面的原因,此示例不使用 PySpark 库中的预定义类。相反,该示例会创建 DataFrame,将 DataFrame 转换为 JSON 条目,然后将输出写入 JSON 行格式的元数据导入文件,以便导入 Dataplex。
如需使用 PySpark 构建连接器,请执行以下操作:
安装 PySpark:
pip install pyspark
安装要求:
pip install -r requirements.txt
安装了以下要求:
使用代码更新
oracle_connector.py
文件,以从 Oracle 数据源读取数据并返回 DataFrame。添加 SQL 查询以返回要导入的元数据。查询需要返回以下信息:
- 数据库架构
- 属于这些架构的表
- 属于这些表的列,包括列名称、列数据类型以及列是否可为 null 或必需
所有表和视图的所有列都存储在同一系统表中。您可以使用
_get_columns
方法选择列。根据您提供的参数,您可以分别为表格或视图选择列。请注意以下几点:
- 在 Oracle 中,数据库架构归数据库用户所有,并且与该用户同名。
- 架构对象是用户创建的逻辑结构。表或索引等对象可以存储数据,而视图或同义词等对象仅包含定义。
ojdbc11.jar
文件包含 Oracle JDBC 驱动程序。
使用用于应用 Spark 转换的共享方法更新
src/entry_builder.py
文件。请注意以下几点:
- 这些方法用于构建连接器为您的 Oracle 资源创建的 Dataplex Catalog 资源。请使用本文档中适用于 Oracle 来源的 Dataplex Catalog 资源示例部分中所述的惯例。
convert_to_import_items
方法适用于架构、表和视图。确保连接器的输出是可以由metadataJobs.create
方法处理的一个或多个导入项,而不是单个条目。- 即使在视图中,该列也称为
TABLE_NAME
。
使用代码更新
bootstrap.py
文件,以生成元数据导入文件并运行连接器。此示例将元数据导入文件保存为单个 JSON 行文件。您可以使用
DataFrameWriter
类等 PySpark 工具并行输出批量 JSON。连接器可以按任意顺序将条目写入元数据导入文件。
使用代码更新
gcs_uploader.py
文件,以将元数据导入文件上传到 Cloud Storage 存储分区。构建连接器映像。
如果您的连接器包含多个文件,或者您想使用默认 Docker 映像中未包含的库,则必须使用自定义容器。Dataproc Serverless for Spark 在 Docker 容器中运行工作负载。创建连接器的自定义 Docker 映像,并将该映像存储在 Artifact Registry 中。Dataproc Serverless 会从 Artifact Registry 读取映像。
创建 Dockerfile:
使用 Conda 作为软件包管理器。Dataproc Serverless for Spark 会在运行时将
pyspark
装载到容器中,因此您无需在自定义容器映像中安装 PySpark 依赖项。构建自定义容器映像并将其推送到 Artifact Registry。
由于一个映像可以有多个名称,因此您可以使用 Docker 标记为映像分配别名。
在 Dataproc Serverless 上运行连接器。 如需使用自定义容器映像提交 PySpark 批处理作业,请运行
gcloud dataproc batches submit pyspark
命令。gcloud dataproc batches submit pyspark main.py --project=PROJECT \ --region=REGION --batch=BATCH_ID \ --container-image=CUSTOM_CONTAINER_IMAGE \ --service-account=SERVICE_ACCOUNT_NAME \ --jars=PATH_TO_JAR_FILES \ --properties=PYSPARK_PROPERTIES \ -- PIPELINE_ARGUMENTS
请注意以下几点:
- JAR 文件是 Spark 的驱动程序。如需从 Oracle、MySQL 或 Postgres 读取数据,您必须向 Apache Spark 提供特定软件包。该软件包可以位于 Cloud Storage 中,也可以位于容器内。如果 JAR 文件位于容器内,路径类似于
file:///path/to/file/driver.jar
。在此示例中,JAR 文件的路径为/opt/spark/jars/
。 - PIPELINE_ARGUMENTS 是连接器的命令行参数。
连接器会从 Oracle 数据库中提取元数据,生成元数据导入文件,并将元数据导入文件保存到 Cloud Storage 存储分区。
- JAR 文件是 Spark 的驱动程序。如需从 Oracle、MySQL 或 Postgres 读取数据,您必须向 Apache Spark 提供特定软件包。该软件包可以位于 Cloud Storage 中,也可以位于容器内。如果 JAR 文件位于容器内,路径类似于
如需将元数据导入文件中的元数据手动导入 Dataplex,请运行元数据作业。使用
metadataJobs.create
方法。在命令行中,添加环境变量并为 curl 命令创建别名。
PROJECT_ID=PROJECT LOCATION_ID=LOCATION DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
调用 API 方法,传递您要导入的条目类型和切面类型。
gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF { "type": "IMPORT", "import_spec": { "source_storage_uri": "gs://BUCKET/FOLDER/", "entry_sync_mode": "FULL", "aspect_sync_mode": "INCREMENTAL", "scope": { "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"], "entry_types": [ "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"], "aspect_types": [ "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance", "projects/dataplex-types/locations/global/aspectTypes/schema", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"], }, }, } EOF )"
schema
切面类型是由 Dataplex 定义的全局切面类型。请注意,调用 API 方法时用于方面类型名称的格式与您在连接器代码中使用的格式不同。
可选:使用 Cloud Logging 查看元数据作业的日志。如需了解详情,请参阅监控 Dataplex 日志。
设置流水线编排
前面部分介绍了如何构建示例连接器并手动运行连接器。
在生产环境中,您可以使用 Workflows 等编排平台,在托管式连接流水线中运行连接器。
如需使用示例连接器运行托管式连接流水线,请按照使用工作流导入元数据的步骤操作。请执行以下操作:
- 在连接器所在的位置创建工作流。 Google Cloud
在工作流定义文件中,使用以下代码更新
submit_pyspark_extract_job
函数,以便使用您创建的连接器从 Oracle 数据库中提取数据。- submit_pyspark_extract_job: call: http.post args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" headers: Content-Type: "application/json" query: batchId: ${WORKFLOW_ID} body: pysparkBatch: mainPythonFileUri: file:///main.py jars: file:///opt/spark/jars/ojdbc11.jar args: - ${"--host_port=" + args.ORACLE_HOST_PORT} - ${"--user=" + args.ORACLE_USER} - ${"--password=" + args.ORACLE_PASSWORD} - ${"--database=" + args.ORACE_DATABASE} - ${"--project=" + args.TARGET_PROJECT_ID} - ${"--location=" + args.CLOUD_REGION} - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID} - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID} - ${"--folder=" + WORKFLOW_ID} runtimeConfig: version: "2.0" containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark" environmentConfig: executionConfig: serviceAccount: ${args.SERVICE_ACCOUNT} result: RESPONSE_MESSAGE
在工作流定义文件中,使用以下代码更新
submit_import_job
函数以导入条目。该函数会调用metadataJobs.create
API 方法来运行元数据导入作业。- submit_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" body: type: IMPORT import_spec: source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"} entry_sync_mode: FULL aspect_sync_mode: INCREMENTAL scope: entry_groups: - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID} entry_types: -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view" aspect_types: -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance" -"projects/dataplex-types/locations/global/aspectTypes/schema" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view" result: IMPORT_JOB_RESPONSE
提供与手动调用 API 方法时包含的条目类型和方面类型相同的条目类型和方面类型。请注意,每个字符串的末尾没有英文逗号。
执行工作流时,请提供以下运行时参数:
{ "CLOUD_REGION": "us-central1", "ORACLE_USER": "system", "ORACLE_HOST_PORT": "x.x.x.x:1521", "ORACLE_DATABASE": "xe", "ADDITIONAL_CONNECTOR_ARGS": [], }
可选:使用 Cloud Logging 查看托管式连接流水线的日志。日志载荷包含指向 Dataproc Serverless 批处理作业和元数据导入作业的日志的链接(如适用)。如需了解详情,请参阅查看工作流日志。
可选:如需提高托管连接流水线的安全性、性能和功能,请考虑执行以下操作:
- 使用 Secret Manager 存储第三方数据源的凭据。
- 使用 PySpark 将 JSON 行输出并行写入多个元数据导入文件。
- 使用前缀将大文件(超过 100 MB)拆分为较小的文件。
- 添加更多自定义方面,以从来源中捕获其他业务和技术元数据。
Oracle 来源的 Dataplex Catalog 资源示例
示例连接器会从 Oracle 数据库中提取元数据,并将元数据映射到相应的 Dataplex Catalog 资源。
层次结构注意事项
Dataplex 中的每个系统都有一个根条目,该条目是系统的父级条目。通常,根条目具有 instance
条目类型。下表显示了 Oracle 系统条目类型和方面类型的层次结构示例。
条目类型 ID | 说明 | 关联的方面类型 ID |
---|---|---|
oracle-instance |
导入的系统的根目录。 | oracle-instance |
oracle-database |
Oracle 数据库。 | oracle-database |
oracle-schema |
数据库架构。 | oracle-schema |
oracle-table |
表格。 |
|
oracle-view |
视图。 |
|
schema
切面类型是由 Dataplex 定义的全局切面类型。它包含对表、视图或其他包含列的实体中字段的说明。oracle-schema
自定义方面类型包含 Oracle 数据库架构的名称。
导入项字段示例
连接器应针对 Oracle 资源使用以下惯例。
-
完全限定名称:Oracle 资源的完全限定名称使用以下命名模板。禁止使用的字符使用反引号进行转义。
资源 模板 示例 实例 SOURCE
:ADDRESS
使用系统的主机和端口号或域名。
oracle:`localhost:1521`
或oracle:`myinstance.com`
数据库 SOURCE
:ADDRESS
.DATABASE
oracle:`localhost:1521`.xe
架构 SOURCE
:ADDRESS
.DATABASE
.SCHEMA
oracle:`localhost:1521`.xe.sys
表 SOURCE
:ADDRESS
.DATABASE
.SCHEMA
.TABLE_NAME
oracle:`localhost:1521`.xe.sys.orders
查看 SOURCE
:ADDRESS
.DATABASE
.SCHEMA
.VIEW_NAME
oracle:`localhost:1521`.xe.sys.orders_view
-
条目名称或条目 ID:Oracle 资源的条目使用以下命名模板。禁止使用的字符会被替换为允许使用的字符。资源使用前缀
projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries
。资源 模板 示例 实例 PREFIX
/HOST_PORT
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
数据库 PREFIX
/HOST_PORT
/databases/DATABASE
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
架构 PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
表 PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
/tables/TABLE
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
查看 PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
/views/VIEW
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
-
父条目:如果某条目不是系统的根条目,则该条目可以包含一个父条目字段,用于描述其在层次结构中的位置。该字段应包含父条目的名称。我们建议您生成此值。
下表显示了 Oracle 资源的父级条目。
条目 父条目 实例 ""
(空字符串)数据库 实例名称 架构 数据库名称 表 架构名称 查看 架构名称 方面映射:方面映射必须包含至少一个用于描述要导入的实体的方面。以下是 Oracle 表的方面映射示例。
"example-project.us-central1.oracle-table": { "aspect_type": "example-project.us-central1.oracle-table", "path": "", "data": {} },
您可以在
global
位置找到用于定义dataplex-types
项目中的表格或视图结构的预定义方面类型(例如schema
)。-
方面键:方面键采用 PROJECT.LOCATION.ASPECT_TYPE 的命名格式。下表显示了 Oracle 资源的示例方面键。
条目 示例宽高比键 实例 example-project.us-central1.oracle-instance
数据库 example-project.us-central1.oracle-database
架构 example-project.us-central1.oracle-schema
表 example-project.us-central1.oracle-table
查看 example-project.us-central1.oracle-view