开发用于元数据导入的自定义连接器

本文档提供了一个参考模板,可帮助您构建用于从第三方来源提取元数据的自定义连接器。您在运行 代管式连接流水线 用于将元数据导入 Dataplex。

您可以构建连接器,从第三方来源提取元数据。例如,您可以构建连接器,从 MySQL、SQL Server、Oracle、Snowflake、Databricks 等来源中提取数据。

您可以使用本文档中的示例连接器作为构建您自己的连接器的起点。示例连接器连接到 Oracle Database Express 版本 (XE) 数据库。该连接器是用 Python 构建的,但您也可以使用 Java、Scala 或 R。

连接器的工作原理

连接器从第三方数据源提取元数据,将 元数据转换为 Dataplex ImportItem 格式,并生成 可通过 Dataplex 导入的元数据导入文件。

该连接器是托管式连接流水线的一部分。托管式连接流水线是一种编排的工作流,用于导入 Dataplex 目录元数据。托管式连接管道会运行连接器,并执行导入工作流中的其他任务,例如运行元数据导入作业和捕获日志。

托管式连接流水线使用 Dataproc 无服务器 批量作业。Dataproc Serverless 无服务器 Spark 执行环境。虽然您可以构建不使用 Spark 的连接器,但我们建议您使用 Spark,因为它可以提高连接器的性能。

连接器要求

连接器有以下要求:

  • 连接器必须是可以运行的 Artifact Registry 映像 Dataproc Serverless。
  • 连接器必须生成可由 Dataplex 元数据导入作业(metadataJobs.create API 方法)导入的格式的元数据文件。如需了解详细要求,请参阅元数据导入文件
  • 连接器必须接受以下命令行参数才能接收 来自流水线的信息:

    命令行参数 流水线提供的价值
    target_project_id PROJECT_ID
    target_location_id REGION
    target_entry_group_id ENTRY_GROUP_ID
    output_bucket CLOUD_STORAGE_BUCKET_ID
    output_folder FOLDER_ID

    连接器使用这些参数在目标条目组中生成元数据 projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID, 以及将数据写入 Cloud Storage 存储桶 gs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID。 流水线每次执行都会在存储桶 CLOUD_STORAGE_BUCKET_ID 中创建一个新文件夹 FOLDER_ID。连接器应将元数据导入文件写入此文件夹。

流水线模板支持 PySpark 连接器。这些模板假设驱动程序 (mainPythonFileUri) 是连接器映像上名为 main.py 的本地文件。您可以修改 流水线模板,如 Spark 连接器、 驱动程序 URI 或其他选项。

下面介绍了如何使用 PySpark 在元数据导入文件中创建导入项。

"""PySpark schemas for the data."""
entry_source_schema = StructType([
      StructField("display_name", StringType()),
      StructField("source", StringType())])

aspect_schema = MapType(StringType(),
                        StructType([
                            StructField("aspect_type", StringType()),
                            StructField("data", StructType([
                            ]))
                          ])
                        )

entry_schema = StructType([
  StructField("name", StringType()),
  StructField("entry_type", StringType()),
  StructField("fully_qualified_name", StringType()),
  StructField("parent_entry", StringType()),
  StructField("entry_source", entry_source_schema),
  StructField("aspects", aspect_schema)
])

import_item_schema = StructType([
  StructField("entry", entry_schema),
  StructField("aspect_keys", ArrayType(StringType())),
  StructField("update_mask", ArrayType(StringType()))
])

准备工作

本指南假定您熟悉 Python 和 PySpark。

请查看以下信息:

请执行以下操作。在同一 Google Cloud 中创建所有资源 位置。

  1. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the Dataplex, Dataproc, Workflows, and Artifact Registry APIs:

    gcloud services enable dataplex.googleapis.com dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com
  4. Install the Google Cloud CLI.
  5. To initialize the gcloud CLI, run the following command:

    gcloud init
  6. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  7. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant the roles/owner IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
  8. 创建 Cloud Storage 存储桶, 存储元数据导入文件。

  9. 在同一项目中创建以下 Dataplex Catalog 资源。

    如需查看示例值,请参阅本文档的Oracle 来源的 Dataplex Catalog 资源示例部分。

    1. 创建条目组
    2. 为要导入的条目创建自定义方面类型。使用命名惯例 SOURCE-ENTITY_TO_IMPORT

      (可选)您可以创建其他切面类型来存储其他信息。

    3. 创建自定义条目类型 为您要导入的资源分配 切面类型。使用命名惯例 SOURCE - ENTITY_TO_IMPORT

      例如,对于 Oracle 数据库,可以创建名为 oracle-database。将其关联到名为 oracle-database 的方面类型。

  10. 请确保您的第三方来源可通过 Google Cloud 项目。如需了解详情,请参阅 Dataproc Serverless for Spark 网络配置

创建基本 Python 连接器

示例基本 Python 连接器使用 Dataplex 客户端库类为 Oracle 数据源创建顶级条目。然后,您需要提供条目字段的值。

连接器会创建一个包含以下条目的元数据导入文件:

  • instance 条目,条目类型为 projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance。此条目表示 Oracle 数据库 XE 系统。
  • database 条目,表示 Oracle 数据库 XE 内的数据库 系统。

要构建基本的 Python 连接器,请执行以下操作:

  1. 设置本地环境。我们建议您使用虚拟环境。

    mkdir venv
    python -m venv venv/
    source venv/bin/activate
    

    使用有效维护版本的 Python。支持 Python 3.7 及更高版本。

  2. 创建一个 Python 项目。

  3. 添加一个包含以下内容的 requirements.txt 文件:

    • google-cloud-dataplex
    • google-cloud-storage

    安装要求:

    pip install -r requirements.txt
    
  4. 在项目的根目录中添加 main.py 流水线文件。

    将代码部署到 Dataproc Serverless 时,main.py 文件作为执行入口点。我们建议您尽量减少存储在 main.py 文件中的信息量;使用此文件调用连接器中定义的函数和类,例如 src/bootstap.py 类。

  5. 创建一个 src 文件夹来存储连接器的大部分逻辑。

  6. 创建名为 src/cmd_reader.py 的文件,其中包含用于接受命令行的 Python 类 参数。您可以使用 argeparse 模块来执行此操作。

    import argparse
    
    def read_args():
        """Reads arguments from the command line."""
        parser = argparse.ArgumentParser()
        parser.add_argument("-p", "--project", type=str, required=True,
                            help="Google Cloud project")
        parser.add_argument("-l", "--location", type=str, required=True,
                            help="Google Cloud location")
        parser.add_argument("-g", "--entry_group", type=str, required=True,
                            help="Dataplex entry group")
        parser.add_argument("--host_port", type=str, required=True,
                            help="Oracle host and port")
        parser.add_argument("--user", type=str, required=True, help="Oracle user")
        parser.add_argument("--password-secret", type=str, required=True,
                            help="An ID in Secret Manager for the Oracle password.")
        parser.add_argument("-d", "--database", type=str, required=True,
                            help="Oracle database")
        parser.add_argument("--bucket", type=str, required=True,
                            help="Cloud Storage bucket")
        parser.add_argument("--folder", type=str, required=True,
                            help="Folder in the bucket")
    
        return vars(parser.parse_known_args()[0])
    

    如需读取 Oracle 数据库,至少需要以下参数:

    • project:包含条目组的 Google Cloud 项目。 条目类型和切面类型。
    • location:条目组、条目类型和方面类型的 Google Cloud 位置。
    • entry_group:Dataplex 中的现有条目组。通过 您导入的元数据是属于此条目的条目 。
    • host_port:Oracle 实例的主机和端口号。
    • user:Oracle 用户。
    • password-secret:Oracle 用户的密码。在生产环境中,我们建议您将密码存储在 Secret Manager 中。
    • database:目标 Oracle 数据库。
    • bucket:存储元数据导入文件的 Cloud Storage 存储桶。
    • folder:Cloud Storage 存储桶中的文件夹,用于分隔元数据导入文件。每次工作流执行都会创建一个新文件夹。
  7. 创建一个名为 src/constants.py 的文件。添加以下代码以创建常量。

    SOURCE_TYPE = 'oracle'
    
    class EntryType(enum.Enum):
       """Types of Oracle entries."""
       INSTANCE: str = "projects/{project}/locations/{location}/entryTypes/oracle-instance"
       DATABASE: str = "projects/{project}/locations/{location}/entryTypes/oracle-database"
       DB_SCHEMA: str = "projects/{project}/locations/{location}/entryTypes/oracle-schema"
       TABLE: str = "projects/{project}/locations/{location}/entryTypes/oracle-table"
       VIEW: str = "projects/{project}/locations/{location}/entryTypes/oracle-view"
    
  8. 创建一个名为 src/name_builder.py 的文件。编写方法来构建您希望连接器为 Oracle 资源创建的 Dataplex Catalog 资源。请使用本文档中适用于 Oracle 来源的 Dataplex Catalog 资源示例部分中所述的惯例。

    由于 name_builder.py 文件同时用于 Python 核心代码和 PySpark 核心代码,因此我们建议您将方法编写为纯函数,而不是类的成员。

  9. 创建一个名为 src/top_level_builder.py 的文件。添加以下代码,以便使用数据填充顶级条目。

    from src.common import EntryType
    from src import name_builder as nb
    
    @dataclasses.dataclass(slots=True)
    class ImportItem:
        entry: dataplex_v1.Entry = dataclasses.field(
            default_factory=dataplex_v1.Entry)
        aspect_keys: List[str] = dataclasses.field(default_factory=list)
        update_mask: List[str] = dataclasses.field(default_factory=list)
    
    def dict_factory(data: object):
        """Factory function required for converting Entry dataclass to dict."""
        def convert(obj: object):
          if isinstance(obj, proto.Message):
            return proto.Message.to_dict(obj)
          return obj
    
        return dict((k, convert(v)) for k, v in data)
    
    class TopEntryBuilder:
        def __init__(self, config):
            self._config = config
            self._project = config["project"]
            self._location = config["location"]
    
        def create_jsonl_item(self, entry_type: EntryType):
            item = self.entry_to_import_item(self.create_entry(entry_type))
            return self.to_json(item)
    
        def create_entry(self, entry_type: EntryType):
            entry = dataplex_v1.Entry()
            entry.name = nb.create_name(self._config, entry_type)
            entry.entry_type = entry_type.value.format(project=self._project, location=self._location)
            entry.fully_qualified_name = nb.create_fqn(self._config, entry_type)
            entry.parent_entry = nb.create_parent_name(self._config, entry_type)
    
            aspect_key = nb.create_entry_aspect_name(self._config, entry_type)
    
            entry_aspect = dataplex_v1.Aspect()
            entry_aspect.aspect_type = aspect_key
            entry_aspect.data = {}
            entry.aspects[aspect_key] = entry_aspect
    
            return entry
    
        def entry_to_import_item(self, entry: dataplex_v1.Entry):
            import_item = ImportItem()
            import_item.entry = entry
            import_item.aspect_keys = list(entry.aspects.keys())
            import_item.update_mask = "aspects"
    
            return import_item
    
        def to_json(self, import_item: ImportItem):
            return json.dumps(dataclasses.asdict(import_item, dict_factory=dict_factory))
    
        import_item.aspect_keys = list(entry.aspects.keys())
        import_item.update_mask = "aspects"
    
        return import_item
    
      def to_json(self, import_item: ImportItem):
        return json.dumps(dataclasses.asdict(import_item, dict_factory=dict_factory))
    
  10. 创建一个名为 src/bootstrap.py 的文件。添加以下代码 生成元数据导入文件并运行连接器。

    FILENAME = "output.jsonl"
    
    def write_jsonl(output_file, json_strings):
        """Writes a list of strings to the file in JSONL format."""
        for string in json_strings:
            output_file.write(string + "\n")
    
    def run():
        """Runs a pipeline."""
        config = cmd_reader.read_args()
        with open(FILENAME, "w", encoding="utf-8") as file:
            # Write the top entries
            file.write(top_entry_builder.create(EntryType.INSTANCE))
            file.write(top_entry_builder.create(EntryType.DATABASE))
    
  11. 在本地运行代码。

    系统会返回一个名为 output.jsonl 的元数据导入文件。该文件包含两行,每行代表一个导入项。在运行元数据导入作业时,受管式连接流水线会读取此文件。

  12. 可选:扩展上例,使用 Dataplex 客户端库类为表、架构和视图创建导入项。您还可以在 Dataproc Serverless 上运行 Python 示例。

    我们建议您创建一个使用 Spark(并运行 ),因为它可以提高 连接器性能

创建 PySpark 连接器

此示例基于 PySpark DataFrame API。您可以在本地安装 PySpark SQL 并运行它,然后再在 Dataproc Serverless 上运行。如果安装并运行 PySpark 您可以使用 pip 安装 PySpark 库,但无需安装 本地 Spark 集群。

出于性能方面的原因,此示例不使用 PySpark 库中的预定义类。相反,此示例会创建 DataFrame,将 DataFrame 转换为 JSON 条目,然后将输出写入元数据导入 (可以导入到 Dataplex 中的 JSON Lines 格式的文件)。

如需使用 PySpark 构建连接器,请执行以下操作:

  1. 添加以下代码,用于从 Oracle 数据源读取数据并返回 DataFrame。

    SPARK_JAR_PATH="/opt/spark/jars/ojdbc11.jar"
    
    class OracleConnector:
        """Reads data from Oracle and returns Spark Dataframes."""
    
        def __init__(self, config: Dict[str, str]):
            # Spark entrypoint
            self._spark = SparkSession.builder.appName("OracleIngestor").config(
                "spark.jars", SPARK_JAR_PATH).getOrCreate()
            self._config = config
            self._url = (f"jdbc:oracle:thin:@{config['host_port']}"
                        f":{config['database']}")
    
        def _execute(self, query: str) -> DataFrame:
            return self._spark.read.format('jdbc') \
              .option("driver", "oracle.jdbc.OracleDriver") \
              .option("url",  self._url) \
              .option('query', query) \
              .option('user', self._config["user"]) \
              .option('password', self._config["password"]) \
              .load()
    
        def get_db_schemas(self) -> DataFrame:
            """In Oracle, schemas are usernames."""
            query = "SELECT username FROM dba_users"
            return self._execute(query)
    
        def _get_columns(self, schema_name: str, object_type: str) -> str:
            """Every line here is a column."""
            return (f"SELECT col.TABLE_NAME, col.COLUMN_NAME, "
                    f"col.DATA_TYPE, col.NULLABLE "
                    f"FROM all_tab_columns col "
                    f"INNER JOIN DBA_OBJECTS tab "
                    f"ON tab.OBJECT_NAME = col.TABLE_NAME "
                    f"WHERE tab.OWNER = '{schema_name}' "
                    f"AND tab.OBJECT_TYPE = '{object_type}'")
    
        def get_dataset(self, schema_name: str, entry_type: EntryType):
            """Get table or view."""
            short_type = entry_type.name # the title of enum value
            query = self._get_columns(schema_name, short_type)
            return self._execute(query)
    

    添加 SQL 查询以返回要导入的元数据。查询需要返回以下信息:

    • 数据库架构
    • 属于这些架构的表
    • 属于这些表的列,包括列名称、列数据类型以及列是否可为 null 或必需

    所有表和视图的所有列都存储在同一个 系统表。您可以使用 _get_columns 方法选择列。 根据您提供的参数,您可以分别为表格或视图选择列。

    请注意以下几点:

    • 在 Oracle 中,数据库架构归数据库用户所有,并且与该用户同名。
    • 架构对象是由用户创建的逻辑结构。对象 表或索引等对象可以存储数据,而视图或 同义词只包含定义。
    • ojdbc11.jar 文件包含 Oracle JDBC 驱动程序
  2. 创建一个名为 src/entry_builder.py 的文件。添加了以下用于应用 Spark 转换的共享方法。

    import pyspark.sql.functions as F
    from src import name_builder as nb
    
    @F.udf(returnType=StringType())
    def choose_metadata_type_udf(data_type: str):
        """Parse Oracle column type to Dataplex type."""
        USER_DEFINED_FUNCTION
    
    def create_entry_source(column):
        """Create Entry Source segment."""
        return F.named_struct(F.lit("display_name"),
                              column,
                              F.lit("system"),
                              F.lit(SOURCE_TYPE))
    
    def create_entry_aspect(entry_aspect_name):
        """Create aspect with general information (usually it is empty)."""
        return F.create_map(F.lit(entry_aspect_name),
                            F.named_struct(F.lit("aspect_type"),
                                          F.lit(entry_aspect_name),
                                          F.lit("data"),
                                          F.create_map()
                                          )
                            )
    
    def convert_to_import_items(df, aspect_keys):
        """Convert entries to import items."""
        entry_columns = ["name", "fully_qualified_name", "parent_entry",
                        "entry_source", "aspects", "entry_type"]
    
        return df.withColumn("entry", F.struct(entry_columns)) \
          .withColumn("aspect_keys", F.array([F.lit(key) for key in aspect_keys]))\
          .withColumn("update_mask", F.array(F.lit("aspects"))) \
          .drop(*entry_columns)
    

    执行以下操作:

    • choose_metadata_type_udf 方法是用户定义的 PySpark 方法 函数来为列编制索引。对于 USER_DEFINED_FUNCTION,请编写方法来构建您希望连接器为 Oracle 资源创建的 Dataplex Catalog 资源。请使用本文档中适用于 Oracle 来源的 Dataplex Catalog 资源示例部分中所述的惯例。
    • convert_to_import_items 方法适用于架构、表和视图。确保连接器输出的一个或多个导入项 可以由 metadataJobs.create 方法、 而不是单个条目。
  3. src/entry_builder.py 中,添加以下代码以应用 Spark 架构转换。

    def build_schemas(config, df_raw_schemas):
      """Create a dataframe with database schemas."""
      entry_type = EntryType.DB_SCHEMA
      entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
      parent_name =  nb.create_parent_name(config, entry_type)
    
      create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, x),
                              StringType())
      create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, x),
                            StringType())
    
      # Remember to fill the missed project and location
      full_entry_type = entry_type.value.format(project=config["PROJECT"],
                                                location=config["LOCATION"])
    
      # To list of entries
      column = F.col("USERNAME")
      df = df_raw_schemas.withColumn("name", create_name_udf(column)) \
        .withColumn("fully_qualified_name", create_fqn_udf(column)) \
        .withColumn("parent_entry", F.lit(parent_name)) \
        .withColumn("entry_type", F.lit(full_entry_type)) \
        .withColumn("entry_source", create_entry_source(column)) \
        .withColumn("aspects", create_entry_aspect(entry_aspect_name)) \
      .drop(column)
    
      df = convert_to_import_items(df, [entry_aspect_name])
      return df
    
  4. src/entry_builder.py 中,添加应用 Spark 的以下代码 对表和视图的转换。此代码会创建一个表格列表 其中包含 Dataplex 可读取的列定义。

    def build_dataset(config, df_raw, db_schema, entry_type):
      """Build table entries from a flat list of columns."""
      schema_key = "dataplex-types.global.schema"
    
      # Format column names and details
      df = df_raw \
        .withColumn("mode", F.when(F.col("NULLABLE") == 'Y',
                                    "NULLABLE").otherwise("REQUIRED")) \
        .drop("NULLABLE") \
        .withColumnRenamed("DATA_TYPE", "dataType") \
        .withColumn("metadataType", choose_metadata_type_udf("dataType")) \
        .withColumnRenamed("COLUMN_NAME", "name")
    
      # Aggregate fields
      aspect_columns = ["name", "mode", "dataType", "metadataType"]
      df = df.withColumn("columns", F.struct(aspect_columns))\
        .groupby('TABLE_NAME') \
        .agg(F.collect_list("columns").alias("fields"))
    
      # Create aspects
      entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
      df = df.withColumn("schema",
                        F.create_map(F.lit(schema_key),
                                      F.named_struct(
                                          F.lit("aspect_type"),
                                          F.lit(schema_key),
                                          F.lit("data"),
                                          F.create_map(F.lit("fields"),
                                                      F.col("fields")))
                                      )
                        )\
        .withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \
      .drop("fields")
    
      # Merge separate aspect columns into the one map
      df = df.select(F.col("TABLE_NAME"),
                    F.map_concat("schema", "entry_aspect").alias("aspects"))
    
      # Fill general information and hierarchy names
      create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type,
                                                      db_schema, x),
                              StringType())
    
      create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type,
                                                    db_schema, x), StringType())
    
      parent_name = nb.create_parent_name(entry_type, db_schema)
      full_entry_type = entry_type.value.format(project=config["project"],
                                                location=config["location"])
    
      column = F.col("TABLE_NAME")
      df = df.withColumn("name", create_name_udf(column)) \
         .withColumn("fully_qualified_name", create_fqn_udf(column)) \
         .withColumn("entry_type", F.lit(full_entry_type)) \
         .withColumn("parent_entry", F.lit(parent_name)) \
         .withColumn("entry_source", create_entry_source(column)) \
      .drop(column)
    
      df = convert_to_import_items(df, [schema_key, entry_aspect_name])
      return df
    

    请注意,即使在视图中,该列也叫 TABLE_NAME

  5. 添加以下代码,用于生成元数据导入文件并运行连接器。

    def process_raw_dataset(df: pyspark.sql.dataframe.DataFrame,
                            config: Dict[str, str],
                            schema_name: str,
                            entry_type: EntryType):
        """Builds dataset and converts it to jsonl."""
        df = entry_builder.build_dataset(config, df, schema_name, entry_type)
        return df.toJSON().collect()
    
    def run():
        """Runs a pipeline."""
        config = cmd_reader.read_args()
        connector = OracleConnector(config)
    
        top_entry_builder = TopEntryBuilder(config)
        with open(FILENAME, "w", encoding="utf-8") as file:
            # Write top entries
            # ...
    
            # Get schemas
            df_raw_schemas = connector.get_db_schemas()
            schemas = [schema.USERNAME for schema in
                      df_raw_schemas.select('USERNAME').collect()]
            schemas_json_strings = entry_builder.build_schemas(config,
                                                              df_raw_schemas) \
                .toJSON() \
                .collect()
            write_jsonl(file, schemas_json_strings)
    
            for schema_name in schemas:
                print(f"Processing tables for {schema_name}")
                df_raw_tables = connector.get_table_columns(schema_name)
                tables_json_strings = process_raw_dataset(df_raw_tables,
                                                          config,
                                                          schema_name,
                                                          EntryType.TABLE)
                write_jsonl(file, tables_json_strings)
                print(f"Processing views for {schema_name}")
                df_raw_views = connector.get_view_columns(schema_name)
                views_json_strings = process_raw_dataset(df_raw_views,
                                                        config,
                                                        schema_name,
                                                        EntryType.VIEW)
                write_jsonl(file, views_json_strings)
    
        gcs_uploader.upload(config, FILENAME)
    

    此示例会将元数据导入文件保存为单个 JSON 行文件。您 可以使用 DataFrameWriter 类等 PySpark 工具 JSON。

    连接器可以按任意顺序向元数据导入文件写入条目。

  6. 添加以下代码,以便将元数据导入文件上传到 Cloud Storage 存储桶。

    def upload(config: Dict[str, str], filename: str):
        """Uploads a file to a bucket."""
        client = storage.Client()
        bucket = client.get_bucket(config["bucket"])
        folder = config["folder"]
    
        blob = bucket.blob(f"{folder}/{filename}")
        blob.upload_from_filename(filename)
    
  7. 构建连接器映像。

    如果您的连接器包含多个文件,或者您想要使用 其他库,您必须使用 自定义容器。 Dataproc Serverless for Spark 在 Docker 容器中运行工作负载。创建连接器的自定义 Docker 映像并存储 Artifact Registry 中的映像 Dataproc Serverless 会从 Artifact Registry 读取映像。

    1. 创建 Dockerfile:

      FROM debian:11-slim
      
      ENV DEBIAN_FRONTEND=noninteractive
      
      RUN apt update && apt install -y procps tini
      RUN apt install -y wget
      
      ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
      RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
      COPY ojdbc11.jar "${SPARK_EXTRA_JARS_DIR}"
      
      ENV CONDA_HOME=/opt/miniconda3
      ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
      ENV PATH=${CONDA_HOME}/bin:${PATH}
      RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py310_23.3.1-0-Linux-x86_64.sh
      
      RUN bash Miniconda3-py310_23.3.1-0-Linux-x86_64.sh -b -p /opt/miniconda3 \
        && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
        && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
        && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
        && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
      
      RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
          && ${CONDA_HOME}/bin/mamba install \
            conda \
            google-cloud-dataproc \
            google-cloud-logging \
            google-cloud-monitoring \
            google-cloud-storage
      
      RUN apt update && apt install -y git
      COPY requirements.txt .
      RUN python -m pip install -r requirements.txt
      
      ENV PYTHONPATH=/opt/python/packages
      RUN mkdir -p "${PYTHONPATH}/src/"
      COPY src/ "${PYTHONPATH}/src/"
      COPY main.py .
      
      RUN groupadd -g 1099 spark
      RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
      USER spark
      

      使用 Conda 作为软件包管理器。Dataproc 无服务器 对于 Spark 来说,在运行时会将 pyspark 装载到容器中,所以 则不需要在自定义容器映像中安装 PySpark 依赖项。

    2. 构建自定义容器映像并将其推送到 Artifact Registry。

      #!/bin/bash
      
      IMAGE=oracle-pyspark:0.0.1
      PROJECT=example-project
      
      REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pyspark
      
      docker build -t "${IMAGE}" .
      
      # Tag and push to Artifact Registry
      gcloud config set project ${PROJECT}
      gcloud auth configure-docker us-central1-docker.pkg.dev
      docker tag "${IMAGE}" "${REPO_IMAGE}"
      docker push "${REPO_IMAGE}"
      

      由于一个映像可以有多个名称,因此您可以使用 Docker 标记来 为映像指定别名。

  8. 在 Dataproc Serverless 上运行连接器。 要使用自定义容器映像提交 PySpark 批量作业,请运行 gcloud dataproc batches submit pyspark 命令

    gcloud dataproc batches submit pyspark main.py --project=PROJECT \
        --region=REGION --batch=BATCH_ID \
        --container-image=CUSTOM_CONTAINER_IMAGE \
        --service-account=SERVICE_ACCOUNT_NAME \
        --jars=PATH_TO_JAR_FILES \
        --properties=PYSPARK_PROPERTIES \
        -- PIPELINE_ARGUMENTS
    

    请注意以下几点:

    • JAR 文件是 Spark 的驱动程序。从 Oracle、MySQL 或 Postgres,您必须向 Apache Spark 提供特定的软件包。该软件包可以位于 Cloud Storage 中,也可以位于容器内。如果 JAR 文件位于容器内,路径类似于 file:///path/to/file/driver.jar。在此示例中, JAR 文件为 /opt/spark/jars/
    • PIPELINE_ARGUMENTS 是连接器的命令行参数。

    连接器会从 Oracle 数据库中提取元数据,生成元数据导入文件,并将元数据导入文件保存到 Cloud Storage 存储桶。

  9. 如需将元数据导入文件中的元数据手动导入 Dataplex,请运行元数据作业。使用 metadataJobs.create 方法

    1. 在命令行中,添加环境变量并为 curl 命令创建别名。

      PROJECT_ID=PROJECT
      LOCATION_ID=LOCATION
      DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID
      alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
      
    2. 调用 API 方法,并传递您要创建的条目类型和切面类型 导入的数据。

      gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF
      {
        "type": "IMPORT",
        "import_spec": {
          "source_storage_uri": "gs://BUCKET/FOLDER/",
          "entry_sync_mode": "FULL",
          "aspect_sync_mode": "INCREMENTAL",
          "scope": {
            "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"],
            "entry_types": [
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"],
      
            "aspect_types": [
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance",
              "projects/dataplex-types/locations/global/aspectTypes/schema",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"],
            },
          },
        }
      EOF
      )"
      

      schema 切面类型是由 Dataplex。

      请注意,调用 API 方法时用于方面类型名称的格式与您在连接器代码中使用的格式不同。

    3. 可选:使用 Cloud Logging 查看元数据作业的日志。对于 请参阅 监控 Dataplex 日志

设置流水线编排

前面部分介绍了如何构建示例连接器并手动运行连接器。

在生产环境中,您可以将连接器作为托管 来连接流水线。 Workflows。

  1. 要使用示例连接器运行代管式连接流水线,请按照 使用 Workflows 导入元数据的步骤。 您可以采取以下措施:

    • 在 连接器。
    • 在工作流定义文件中,使用以下代码更新 submit_pyspark_extract_job 函数,以便使用您创建的连接器从 Oracle 数据库中提取数据。

      - submit_pyspark_extract_job:
          call: http.post
          args:
            url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
            auth:
              type: OAuth2
              scopes: "https://www.googleapis.com/auth/cloud-platform"
            headers:
              Content-Type: "application/json"
            query:
              batchId: ${WORKFLOW_ID}
            body:
              pysparkBatch:
                mainPythonFileUri: file:///main.py
                jars: file:///opt/spark/jars/ojdbc11.jar
                args:
                  - ${"--host_port=" + args.ORACLE_HOST_PORT}
                  - ${"--user=" + args.ORACLE_USER}
                  - ${"--password=" + args.ORACLE_PASSWORD}
                  - ${"--database=" + args.ORACE_DATABASE}
                  - ${"--project=" + args.TARGET_PROJECT_ID}
                  - ${"--location=" + args.CLOUD_REGION}
                  - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID}
                  - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                  - ${"--folder=" + WORKFLOW_ID}
              runtimeConfig:
                version: "2.0"
                containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark"
              environmentConfig:
                executionConfig:
                    serviceAccount: ${args.SERVICE_ACCOUNT}
          result: RESPONSE_MESSAGE
      
    • 在工作流定义文件中,更新 submit_import_job 函数 替换为以下代码以导入条目。该函数会调用 metadataJobs.create API 方法,用于运行元数据导入作业。

      - submit_import_job:
          call: http.post
          args:
            url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
            auth:
              type: OAuth2
              scopes: "https://www.googleapis.com/auth/cloud-platform"
            body:
              type: IMPORT
              import_spec:
                source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                entry_sync_mode: FULL
                aspect_sync_mode: INCREMENTAL
                scope:
                  entry_groups:
                    - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}
                  entry_types:
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"
                  aspect_types:
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance"
                    -"projects/dataplex-types/locations/global/aspectTypes/schema"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"
          result: IMPORT_JOB_RESPONSE
      

      提供与手动调用 API 方法时包含的条目类型和方面类型相同的条目类型和方面类型。请注意,每个字符串的末尾没有英文逗号。

    • 执行工作流时,请提供以下运行时参数:

      {
        "CLOUD_REGION": "us-central1",
        "ORACLE_USER": "system",
        "ORACLE_HOST_PORT": "x.x.x.x:1521",
        "ORACLE_DATABASE": "xe",
        "ADDITIONAL_CONNECTOR_ARGS": [],
      }
      
  2. 可选:使用 Cloud Logging 查看代管式连接的日志 流水线。日志载荷包含 Dataproc 无服务器批量作业和元数据导入作业, 。如需了解详情,请参阅查看工作流日志

  3. 可选:要提高网站的安全性、性能和功能, 代管式连接流水线,请考虑执行以下操作:

    1. 使用 Secret Manager 存储第三方数据源的凭据。
    2. 使用 PySpark 将 JSON 行输出并行写入多个元数据导入文件。
    3. 使用前缀将大文件(超过 100 MB)拆分为较小的文件。
    4. 添加更多自定义切面,以捕获更多业务和技术信息 元数据。

Oracle 来源的 Dataplex Catalog 资源示例

示例连接器从 Oracle 数据库提取元数据,并将 元数据添加到相应的 Dataplex Catalog 资源。

层次结构注意事项

Dataplex 中的每个系统都有一个根条目,该条目是系统的父级条目。根条目通常具有 instance 条目类型。 下表显示了条目类型和切面类型的层次结构示例 数据库服务

条目类型 ID 说明 关联的切面类型 ID
oracle-instance 导入的系统的根目录。 oracle-instance
oracle-database Oracle 数据库。 oracle-database
oracle-schema 数据库架构。 oracle-schema
oracle-table 表格。

oracle-table

schema

oracle-view 视图。

oracle-view

schema

schema 方面类型是由 Dataplex 定义的全局方面类型。其中包含对表中字段的说明 视图或其他包含列的实体。oracle-schema 自定义切面类型 包含 Oracle 数据库架构的名称。

导入项字段示例

连接器应为 Oracle 资源使用以下约定。

  • 完全限定名称:Oracle 的完全限定名称 资源使用以下命名模板。禁用字符包括 使用反引号进行了转义。

    资源 模板 示例
    实例

    SOURCE:ADDRESS

    使用系统的主机和端口号或域名。

    oracle:`localhost:1521`oracle:`myinstance.com`
    数据库 SOURCE:ADDRESS.DATABASE oracle:`localhost:1521`.xe
    架构 SOURCEADDRESSDATABASESCHEMA oracle:`localhost:1521`.xe.sys
    SOURCE:ADDRESS.DATABASE.SCHEMA.TABLE_NAME oracle:`localhost:1521`.xe.sys.orders
    查看 SOURCE:ADDRESS.DATABASE.SCHEMA.VIEW_NAME oracle:`localhost:1521`.xe.sys.orders_view
  • 条目名称或条目 ID:Oracle 资源的条目 请使用以下命名模板。禁用字符会替换为 允许的字符。资源使用前缀 projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries

    资源 模板 示例
    实例 PREFIX/HOST_PORT projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
    数据库 PREFIX/HOST_PORT/databases/DATABASE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
    架构 PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
    PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
    查看 PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEW projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
  • 父条目:如果条目不是系统的根条目,则 有一个父条目字段,用于说明该条目在 层级结构。该字段应包含父条目的名称。我们建议您生成此值。

    下表显示了 Oracle 资源的父条目。

    条目 父条目
    实例 ""(空字符串)
    数据库 实例名称
    架构 数据库名称
    架构名称
    查看 架构名称
  • 切面映射:切面映射必须至少包含一个切面 描述要导入的实体。以下是 Oracle 表的方面映射示例。

    "example-project.us-central1.oracle-table": {
        "aspect_type": "example-project.us-central1.oracle-table",
        "path": "",
        "data": {}
     },

    您可以在 global 位置找到用于定义 dataplex-types 项目中的表格或视图结构的预定义方面类型(例如 schema)。

  • 方面键:方面键采用 PROJECT.LOCATION.ASPECT_TYPE 的命名格式。下表显示了 Oracle 资源的示例方面键。

    条目 切面键示例
    实例 example-project.us-central1.oracle-instance
    数据库 example-project.us-central1.oracle-database
    架构 example-project.us-central1.oracle-schema
    example-project.us-central1.oracle-table
    查看 example-project.us-central1.oracle-view

后续步骤