開發用於匯入中繼資料的自訂連接器

本文提供參考範本,協助您建構自訂連接器,從第三方來源擷取中繼資料。執行受管理連線管道時,您會使用連接器,將中繼資料匯入 Dataplex Universal Catalog。

您可以建構連接器,從第三方來源擷取中繼資料。舉例來說,您可以建立連接器,從 MySQL、SQL Server、Oracle、Snowflake、Databricks 等來源擷取資料。

您可以參考本文中的範例連接器,建立自己的連接器。範例連接器會連線至 Oracle Database Express Edition (XE) 資料庫。這個連結器是以 Python 建構,但您也可以使用 Java、Scala 或 R。

連結器的運作方式

連結器會從第三方資料來源擷取中繼資料,將中繼資料轉換為 Dataplex Universal Catalog ImportItem 格式,並產生可由 Dataplex Universal Catalog 匯入的中繼資料匯入檔案。

連接器是代管連線管道的一部分。受管理連線管道是自動化工作流程,用於匯入 Dataplex Universal Catalog 中繼資料。代管連線管道會執行連接器,並在匯入工作流程中執行其他工作,例如執行中繼資料匯入工作及擷取記錄。

受管理連線管道會使用 Dataproc Serverless 批次工作執行連接器。Dataproc Serverless 提供無伺服器 Spark 執行環境。雖然您可以建構不使用 Spark 的連接器,但我們建議使用 Spark,因為這有助於提升連接器的效能。

連接器需求

連接器必須符合下列規定:

  • 連接器必須是可在 Dataproc Serverless 上執行的 Artifact Registry 映像檔。
  • 連接器產生的中繼資料檔案格式必須可供 Dataplex Universal Catalog 中繼資料匯入工作 (metadataJobs.create API 方法) 匯入。如需詳細規定,請參閱「中繼資料匯入檔案」。
  • 連接器必須接受下列指令列引數,才能從管道接收資訊:

    指令列引數 管道提供的值
    target_project_id PROJECT_ID
    target_location_id REGION
    target_entry_group_id ENTRY_GROUP_ID
    output_bucket CLOUD_STORAGE_BUCKET_ID
    output_folder FOLDER_ID

    連接器會使用這些引數,在目標項目群組 projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID 中產生中繼資料,並寫入 Cloud Storage 值區 gs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID。每次執行管道時,系統都會在 bucket CLOUD_STORAGE_BUCKET_ID 中建立新資料夾 FOLDER_ID。連接器應將中繼資料匯入檔案寫入這個資料夾。

管道範本支援 PySpark 連接器。範本會假設驅動程式 (mainPythonFileUri) 是連接器映像檔中名為 main.py 的本機檔案。您可以修改其他情境的管道範本,例如 Spark 連接器、其他驅動程式 URI 或其他選項。

以下說明如何使用 PySpark 在中繼資料匯入檔案中建立匯入項目。

"""PySpark schemas for the data."""
entry_source_schema = StructType([
      StructField("display_name", StringType()),
      StructField("source", StringType())])

aspect_schema = MapType(StringType(),
                        StructType([
                            StructField("aspect_type", StringType()),
                            StructField("data", StructType([
                            ]))
                          ])
                        )

entry_schema = StructType([
  StructField("name", StringType()),
  StructField("entry_type", StringType()),
  StructField("fully_qualified_name", StringType()),
  StructField("parent_entry", StringType()),
  StructField("entry_source", entry_source_schema),
  StructField("aspects", aspect_schema)
])

import_item_schema = StructType([
  StructField("entry", entry_schema),
  StructField("aspect_keys", ArrayType(StringType())),
  StructField("update_mask", ArrayType(StringType()))
])

事前準備

本指南假設您熟悉 Python 和 PySpark。

查看下列資訊:

請完成下列步驟。所有資源均須建立於相同位置。 Google Cloud

  1. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  2. Verify that billing is enabled for your Google Cloud project.

  3. Enable the Dataplex, Dataproc, Workflows, and Artifact Registry APIs:

    gcloud services enable dataplex.googleapis.com dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com
  4. Install the Google Cloud CLI.

  5. 如果您使用外部識別資訊提供者 (IdP),請先 使用聯合身分登入 gcloud CLI

  6. 如要初始化 gcloud CLI,請執行下列指令:

    gcloud init
  7. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  8. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant the roles/owner IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
  9. 建立 Cloud Storage bucket,以儲存中繼資料匯入檔案。

  10. 在同一個專案中建立下列中繼資料資源。

    如需範例值,請參閱本文的「Oracle 來源的範例中繼資料資源」一節。

    1. 建立項目群組
    2. 為要匯入的項目建立自訂切面類型。使用命名慣例「SOURCE-ENTITY_TO_IMPORT」。

      舉例來說,如果是 Oracle 資料庫,請建立名為 oracle-database 的構面類型。

      您也可以視需要建立其他面向類型,儲存其他資訊。

    3. 為要匯入的資源建立自訂項目類型,並指派相關切面類型。使用命名慣例「SOURCE-ENTITY_TO_IMPORT」。

      舉例來說,如果是 Oracle 資料庫,請建立名為 oracle-database 的項目類型。將其連結至名為「oracle-database」的指標類型。

  11. 確認可從 Google Cloud 專案存取第三方來源。詳情請參閱「Dataproc Serverless for Spark 網路設定」。
  12. 建立基本的 Python 連接器

    這個基本 Python 連接器範例會使用 Dataplex Universal Catalog 用戶端程式庫類別,為 Oracle 資料來源建立頂層項目。然後為輸入欄位提供值。

    連接器會建立中繼資料匯入檔案,其中包含下列項目:

    • instance 項目,項目類型為 projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance。這個項目代表 Oracle Database XE 系統。
    • database 項目,代表 Oracle Database XE 系統中的資料庫。

    如要建構基本 Python 連接器,請按照下列步驟操作:

    1. 複製 cloud-dataplex 存放區

    2. 設定本機環境。建議您使用虛擬環境。

      mkdir venv
      python -m venv venv/
      source venv/bin/activate
      

      請使用 Python 的「有效」或「維護」版本。支援 Python 3.7 以上版本。

    3. 建立 Python 專案。

    4. 安裝必要項目:

      pip install -r requirements.txt
      

      已安裝下列必要項目:

      google-cloud-dataplex==2.2.2
      google-cloud-storage
      google-cloud-secret-manager
      
    5. 在專案根目錄中新增 main.py 管道檔案。

      from src import bootstrap
      
      
      if __name__ == '__main__':
          bootstrap.run()
      

      將程式碼部署至 Dataproc Serverless 時,main.py 檔案會做為執行作業的進入點。建議您盡量減少 main.py 檔案中儲存的資訊量,並使用這個檔案呼叫連接器中定義的函式和類別,例如 src/bootstap.py 類別。

    6. 建立 src 資料夾,儲存連接器的多數邏輯。

    7. 使用 Python 類別更新 src/cmd_reader.py 檔案,以接受指令列引數。您可以使用 argeparse 模組執行這項操作。

      """Command line reader."""
      import argparse
      
      
      def read_args():
          """Reads arguments from the command line."""
          parser = argparse.ArgumentParser()
      
          # Dataplex arguments
          parser.add_argument("--target_project_id", type=str, required=True,
              help="The name of the target Google Cloud project to import the metadata into.")
          parser.add_argument("--target_location_id", type=str, required=True,
              help="The target Google Cloud location where the metadata will be imported into.")
          parser.add_argument("--target_entry_group_id", type=str, required=True,
              help="The ID of the entry group to import metadata into. "
                   "The metadata will be imported into entry group with the following"
                   "full resource name: projects/${target_project_id}/"
                   "locations/${target_location_id}/entryGroups/${target_entry_group_id}.")
      
          # Oracle arguments
          parser.add_argument("--host_port", type=str, required=True,
              help="Oracle host and port number separated by the colon (:).")
          parser.add_argument("--user", type=str, required=True, help="Oracle User.")
          parser.add_argument("--password-secret", type=str, required=True,
              help="Secret resource name in the Secret Manager for the Oracle password.")
          parser.add_argument("--database", type=str, required=True,
              help="Source Oracle database.")
      
          # Google Cloud Storage arguments
          # It is assumed that the bucket is in the same region as the entry group
          parser.add_argument("--output_bucket", type=str, required=True,
              help="The Cloud Storage bucket to write the generated metadata import file.")
          parser.add_argument("--output_folder", type=str, required=True,
              help="A folder in the Cloud Storage bucket, to write the generated metadata import files.")
      
          return vars(parser.parse_known_args()[0])
      

      在正式環境中,建議您將密碼儲存在 Secret Manager

    8. 使用程式碼更新 src/constants.py 檔案,建立常數。

      """Constants that are used in the different files."""
      import enum
      
      SOURCE_TYPE = "oracle"
      
      # Symbols for replacement
      FORBIDDEN = "#"
      ALLOWED = "!"
      
      
      class EntryType(enum.Enum):
          """Types of Oracle entries."""
          INSTANCE: str = "projects/{project}/locations/{location}/entryTypes/oracle-instance"
          DATABASE: str = "projects/{project}/locations/{location}/entryTypes/oracle-database"
          DB_SCHEMA: str = "projects/{project}/locations/{location}/entryTypes/oracle-schema"
          TABLE: str = "projects/{project}/locations/{location}/entryTypes/oracle-table"
          VIEW: str = "projects/{project}/locations/{location}/entryTypes/oracle-view"
      
    9. 使用方法更新 src/name_builder.py 檔案,建構您希望連接器為 Oracle 資源建立的中繼資料資源。請使用本文件「Oracle 來源的範例中繼資料資源」一節中說明的慣例。

      """Builds Dataplex hierarchy identifiers."""
      from typing import Dict
      from src.constants import EntryType, SOURCE_TYPE
      
      
      # Oracle cluster users start with C## prefix, but Dataplex doesn't accept #.
      # In that case in names it is changed to C!!, and escaped with backticks in FQNs
      FORBIDDEN_SYMBOL = "#"
      ALLOWED_SYMBOL = "!"
      
      
      def create_fqn(config: Dict[str, str], entry_type: EntryType,
                     schema_name: str = "", table_name: str = ""):
          """Creates a fully qualified name or Dataplex v1 hierarchy name."""
          if FORBIDDEN_SYMBOL in schema_name:
              schema_name = f"`{schema_name}`"
      
          if entry_type == EntryType.INSTANCE:
              # Requires backticks to escape column
              return f"{SOURCE_TYPE}:`{config['host_port']}`"
          if entry_type == EntryType.DATABASE:
              instance = create_fqn(config, EntryType.INSTANCE)
              return f"{instance}.{config['database']}"
          if entry_type == EntryType.DB_SCHEMA:
              database = create_fqn(config, EntryType.DATABASE)
              return f"{database}.{schema_name}"
          if entry_type in [EntryType.TABLE, EntryType.VIEW]:
              database = create_fqn(config, EntryType.DATABASE)
              return f"{database}.{schema_name}.{table_name}"
          return ""
      
      
      def create_name(config: Dict[str, str], entry_type: EntryType,
                      schema_name: str = "", table_name: str = ""):
          """Creates a Dataplex v2 hierarchy name."""
          if FORBIDDEN_SYMBOL in schema_name:
              schema_name = schema_name.replace(FORBIDDEN_SYMBOL, ALLOWED_SYMBOL)
          if entry_type == EntryType.INSTANCE:
              name_prefix = (
                  f"projects/{config['target_project_id']}/"
                  f"locations/{config['target_location_id']}/"
                  f"entryGroups/{config['target_entry_group_id']}/"
                  f"entries/"
              )
              return name_prefix + config["host_port"].replace(":", "@")
          if entry_type == EntryType.DATABASE:
              instance = create_name(config, EntryType.INSTANCE)
              return f"{instance}/databases/{config['database']}"
          if entry_type == EntryType.DB_SCHEMA:
              database = create_name(config, EntryType.DATABASE)
              return f"{database}/database_schemas/{schema_name}"
          if entry_type == EntryType.TABLE:
              db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
              return f"{db_schema}/tables/{table_name}"
          if entry_type == EntryType.VIEW:
              db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
              return f"{db_schema}/views/{table_name}"
          return ""
      
      
      def create_parent_name(config: Dict[str, str], entry_type: EntryType,
                             parent_name: str = ""):
          """Generates a Dataplex v2 name of the parent."""
          if entry_type == EntryType.DATABASE:
              return create_name(config, EntryType.INSTANCE)
          if entry_type == EntryType.DB_SCHEMA:
              return create_name(config, EntryType.DATABASE)
          if entry_type == EntryType.TABLE:
              return create_name(config, EntryType.DB_SCHEMA, parent_name)
          return ""
      
      
      def create_entry_aspect_name(config: Dict[str, str], entry_type: EntryType):
          """Generates an entry aspect name."""
          last_segment = entry_type.value.split("/")[-1]
          return f"{config['target_project_id']}.{config['target_location_id']}.{last_segment}"
      

      由於 name_builder.py 檔案同時用於 Python 核心程式碼和 PySpark 核心程式碼,建議您將方法編寫為純函式,而非類別成員。

    10. 使用程式碼更新 src/top_entry_builder.py 檔案,以資料填入頂層項目。

      """Non-Spark approach for building the entries."""
      import dataclasses
      import json
      from typing import List, Dict
      
      import proto
      from google.cloud import dataplex_v1
      
      from src.constants import EntryType
      from src import name_builder as nb
      
      
      @dataclasses.dataclass(slots=True)
      class ImportItem:
          """A template class for Import API."""
      
          entry: dataplex_v1.Entry = dataclasses.field(default_factory=dataplex_v1.Entry)
          aspect_keys: List[str] = dataclasses.field(default_factory=list)
          update_mask: List[str] = dataclasses.field(default_factory=list)
      
      
      def _dict_factory(data: object):
          """Factory function required for converting Entry dataclass to dict."""
      
          def convert(obj: object):
              if isinstance(obj, proto.Message):
                  return proto.Message.to_dict(obj)
              return obj
      
          return dict((k, convert(v)) for k, v in data)
      
      
      def _create_entry(config: Dict[str, str], entry_type: EntryType):
          """Creates an entry based on a Dataplex library."""
          entry = dataplex_v1.Entry()
          entry.name = nb.create_name(config, entry_type)
          entry.entry_type = entry_type.value.format(
              project=config["target_project_id"], location=config["target_location_id"]
          )
          entry.fully_qualified_name = nb.create_fqn(config, entry_type)
          entry.parent_entry = nb.create_parent_name(config, entry_type)
      
          aspect_key = nb.create_entry_aspect_name(config, entry_type)
      
          # Add mandatory aspect
          entry_aspect = dataplex_v1.Aspect()
          entry_aspect.aspect_type = aspect_key
          entry_aspect.data = {}
          entry.aspects[aspect_key] = entry_aspect
      
          return entry
      
      
      def _entry_to_import_item(entry: dataplex_v1.Entry):
          """Packs entry to import item, accepted by the API,"""
          import_item = ImportItem()
          import_item.entry = entry
          import_item.aspect_keys = list(entry.aspects.keys())
          import_item.update_mask = "aspects"
      
          return import_item
      
      
      def create(config, entry_type: EntryType):
          """Creates an entry, packs it to Import Item and converts to json."""
          import_item = _entry_to_import_item(_create_entry(config, entry_type))
          return json.dumps(dataclasses.asdict(import_item, dict_factory=_dict_factory))
      
    11. 使用程式碼更新 src/bootstrap.py 檔案,產生中繼資料匯入檔案並執行連接器。

      """The entrypoint of a pipeline."""
      from typing import Dict
      
      from src.constants import EntryType
      from src import cmd_reader
      from src import secret_manager
      from src import entry_builder
      from src import gcs_uploader
      from src import top_entry_builder
      from src.oracle_connector import OracleConnector
      
      
      FILENAME = "output.jsonl"
      
      
      def write_jsonl(output_file, json_strings):
          """Writes a list of string to the file in JSONL format."""
      
          # For simplicity, dataset is written into the one file. But it is not
          # mandatory, and the order doesn't matter for Import API.
          # The PySpark itself could dump entries into many smaller JSONL files.
          # Due to performance, it's recommended to dump to many smaller files.
          for string in json_strings:
              output_file.write(string + "\n")
      
      
      def process_dataset(
          connector: OracleConnector,
          config: Dict[str, str],
          schema_name: str,
          entry_type: EntryType,
      ):
          """Builds dataset and converts it to jsonl."""
          df_raw = connector.get_dataset(schema_name, entry_type)
          df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
          return df.toJSON().collect()
      
      
      def run():
          """Runs a pipeline."""
          config = cmd_reader.read_args()
          config["password"] = secret_manager.get_password(config["password_secret"])
          connector = OracleConnector(config)
      
          with open(FILENAME, "w", encoding="utf-8") as file:
              # Write top entries that don't require connection to the database
              file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
              file.writelines("\n")
              file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
      
              # Get schemas, write them and collect to the list
              df_raw_schemas = connector.get_db_schemas()
              schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
              schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
      
              write_jsonl(file, schemas_json)
      
              # Ingest tables and views for every schema in a list
              for schema in schemas:
                  print(f"Processing tables for {schema}")
                  tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                  write_jsonl(file, tables_json)
                  print(f"Processing views for {schema}")
                  views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                  write_jsonl(file, views_json)
      
          gcs_uploader.upload(config, FILENAME)
      
    12. 在本機執行程式碼。

      系統會傳回名為 output.jsonl 的中繼資料匯入檔案。檔案有兩行,每一行代表一個匯入項目。執行中繼資料匯入作業時,代管連線管道會讀取這個檔案。

    13. 選用:擴充先前的範例,使用 Dataplex Universal Catalog 用戶端程式庫類別,為表格、結構定義和檢視區塊建立匯入項目。您也可以在 Dataproc Serverless 上執行 Python 範例。

      建議您建立使用 Spark 的連接器 (並在 Dataproc Serverless 上執行),因為這樣可以提升連接器的效能。

    建立 PySpark 連接器

    本範例是以 PySpark DataFrame API 為基礎。您可以在 Dataproc Serverless 上執行 PySpark SQL 之前,先在本機安裝並執行。如要在本機安裝及執行 PySpark,請使用 pip 安裝 PySpark 程式庫,但不必安裝本機 Spark 叢集。

    基於效能考量,這個範例不會使用 PySpark 程式庫的預先定義類別。這個範例會建立 DataFrame,將 DataFrame 轉換為 JSON 項目,然後以 JSON Lines 格式將輸出內容寫入中繼資料匯入檔案,以便匯入 Dataplex Universal Catalog。

    如要使用 PySpark 建構連接器,請按照下列步驟操作:

    1. 複製 cloud-dataplex 存放區

    2. 安裝 PySpark:

      pip install pyspark
      
    3. 安裝必要項目:

      pip install -r requirements.txt
      

      已安裝下列必要項目:

      google-cloud-dataplex==2.2.2
      google-cloud-storage
      google-cloud-secret-manager
      
    4. 使用程式碼更新 oracle_connector.py 檔案,從 Oracle 資料來源讀取資料並傳回 DataFrame。

      """Reads Oracle using PySpark."""
      from typing import Dict
      from pyspark.sql import SparkSession, DataFrame
      
      from src.constants import EntryType
      
      
      SPARK_JAR_PATH = "/opt/spark/jars/ojdbc11.jar"
      
      
      class OracleConnector:
          """Reads data from Oracle and returns Spark Dataframes."""
      
          def __init__(self, config: Dict[str, str]):
              # PySpark entrypoint
              self._spark = SparkSession.builder.appName("OracleIngestor") \
                  .config("spark.jars", SPARK_JAR_PATH) \
                  .getOrCreate()
      
              self._config = config
              self._url = f"jdbc:oracle:thin:@{config['host_port']}:{config['database']}"
      
          def _execute(self, query: str) -> DataFrame:
              """A generic method to execute any query."""
              return self._spark.read.format("jdbc") \
                  .option("driver", "oracle.jdbc.OracleDriver") \
                  .option("url", self._url) \
                  .option("query", query) \
                  .option("user", self._config["user"]) \
                  .option("password", self._config["password"]) \
                  .load()
      
          def get_db_schemas(self) -> DataFrame:
              """In Oracle, schemas are usernames."""
              query = "SELECT username FROM dba_users"
              return self._execute(query)
      
          def _get_columns(self, schema_name: str, object_type: str) -> str:
              """Gets a list of columns in tables or views in a batch."""
              # Every line here is a column that belongs to the table or to the view.
              # This SQL gets data from ALL the tables in a given schema.
              return (f"SELECT col.TABLE_NAME, col.COLUMN_NAME, "
                      f"col.DATA_TYPE, col.NULLABLE "
                      f"FROM all_tab_columns col "
                      f"INNER JOIN DBA_OBJECTS tab "
                      f"ON tab.OBJECT_NAME = col.TABLE_NAME "
                      f"WHERE tab.OWNER = '{schema_name}' "
                      f"AND tab.OBJECT_TYPE = '{object_type}'")
      
          def get_dataset(self, schema_name: str, entry_type: EntryType):
              """Gets data for a table or a view."""
              # Dataset means that these entities can contain end user data.
              short_type = entry_type.name  # table or view, or the title of enum value
              query = self._get_columns(schema_name, short_type)
              return self._execute(query)
      

      新增 SQL 查詢,傳回要匯入的中繼資料。查詢必須傳回下列資訊:

      • 資料庫結構定義
      • 屬於這些結構定義的資料表
      • 這些資料表所屬的資料欄,包括資料欄名稱、資料欄資料類型,以及資料欄是否可為空值或必要

      所有資料表和檢視區塊的所有資料欄,都會儲存在同一個系統資料表中。您可以使用 _get_columns 方法選取資料欄。 視您提供的參數而定,您可以分別為資料表或檢視畫面選取資料欄。

      注意事項:

      • 在 Oracle 中,資料庫結構定義由資料庫使用者擁有,且名稱與該使用者相同。
      • 架構物件是使用者建立的邏輯結構,資料表或索引等物件可以保存資料,而檢視表或同義字等物件只包含定義。
      • ojdbc11.jar 檔案包含 Oracle JDBC 驅動程式
    5. 使用共用方法更新 src/entry_builder.py 檔案,套用 Spark 轉換。

      """Creates entries with PySpark."""
      import pyspark.sql.functions as F
      from pyspark.sql.types import StringType
      
      from src.constants import EntryType, SOURCE_TYPE
      from src import name_builder as nb
      
      
      @F.udf(returnType=StringType())
      def choose_metadata_type_udf(data_type: str):
          """Choose the metadata type based on Oracle native type."""
          if data_type.startswith("NUMBER") or data_type in ["FLOAT", "LONG"]:
              return "NUMBER"
          if data_type.startswith("VARCHAR") or data_type.startswith("NVARCHAR2"):
              return "STRING"
          if data_type == "DATE":
              return "DATETIME"
          return "OTHER"
      
      
      def create_entry_source(column):
          """Create Entry Source segment."""
          return F.named_struct(F.lit("display_name"),
                                column,
                                F.lit("system"),
                                F.lit(SOURCE_TYPE))
      
      
      def create_entry_aspect(entry_aspect_name):
          """Create aspect with general information (usually it is empty)."""
          return F.create_map(
              F.lit(entry_aspect_name),
              F.named_struct(
                  F.lit("aspect_type"),
                  F.lit(entry_aspect_name),
                  F.lit("data"),
                  F.create_map()
                  )
              )
      
      
      def convert_to_import_items(df, aspect_keys):
          """Convert entries to import items."""
          entry_columns = ["name", "fully_qualified_name", "parent_entry",
                           "entry_source", "aspects", "entry_type"]
      
          # Puts entry to "entry" key, a list of keys from aspects in "aspects_keys"
          # and "aspects" string in "update_mask"
          return df.withColumn("entry", F.struct(entry_columns)) \
            .withColumn("aspect_keys", F.array([F.lit(key) for key in aspect_keys])) \
            .withColumn("update_mask", F.array(F.lit("aspects"))) \
            .drop(*entry_columns)
      
      
      def build_schemas(config, df_raw_schemas):
          """Create a dataframe with database schemas from the list of usernames.
          Args:
              df_raw_schemas - a dataframe with only one column called USERNAME
          Returns:
              A dataframe with Dataplex-readable schemas.
          """
          entry_type = EntryType.DB_SCHEMA
          entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
      
          # For schema, parent name is the name of the database
          parent_name =  nb.create_parent_name(config, entry_type)
      
          # Create user-defined function.
          create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, x),
                                  StringType())
          create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, x),
                                 StringType())
      
          # Fills the missed project and location into the entry type string
          full_entry_type = entry_type.value.format(
              project=config["target_project_id"],
              location=config["target_location_id"])
      
          # Converts a list of schema names to the Dataplex-compatible form
          column = F.col("USERNAME")
          df = df_raw_schemas.withColumn("name", create_name_udf(column)) \
            .withColumn("fully_qualified_name", create_fqn_udf(column)) \
            .withColumn("parent_entry", F.lit(parent_name)) \
            .withColumn("entry_type", F.lit(full_entry_type)) \
            .withColumn("entry_source", create_entry_source(column)) \
            .withColumn("aspects", create_entry_aspect(entry_aspect_name)) \
          .drop(column)
      
          df = convert_to_import_items(df, [entry_aspect_name])
          return df
      
      
      def build_dataset(config, df_raw, db_schema, entry_type):
          """Build table entries from a flat list of columns.
          Args:
              df_raw - a plain dataframe with TABLE_NAME, COLUMN_NAME, DATA_TYPE,
                       and NULLABLE columns
              db_schema - parent database schema
              entry_type - entry type: table or view
          Returns:
              A dataframe with Dataplex-readable data of tables of views.
          """
          schema_key = "dataplex-types.global.schema"
      
          # The transformation below does the following
          # 1. Alters NULLABLE content from Y/N to NULLABLE/REQUIRED
          # 2. Renames NULLABLE to mode
          # 3. Renames DATA_TYPE to dataType
          # 4. Creates metadataType column based on dataType column
          # 5. Renames COLUMN_NAME to name
          df = df_raw \
            .withColumn("mode", F.when(F.col("NULLABLE") == 'Y', "NULLABLE").otherwise("REQUIRED")) \
            .drop("NULLABLE") \
            .withColumnRenamed("DATA_TYPE", "dataType") \
            .withColumn("metadataType", choose_metadata_type_udf("dataType")) \
            .withColumnRenamed("COLUMN_NAME", "name")
      
          # The transformation below aggregate fields, denormalizing the table
          # TABLE_NAME becomes top-level filed, and the rest is put into
          # the array type called "fields"
          aspect_columns = ["name", "mode", "dataType", "metadataType"]
          df = df.withColumn("columns", F.struct(aspect_columns))\
            .groupby('TABLE_NAME') \
            .agg(F.collect_list("columns").alias("fields"))
      
          # Create nested structured called aspects.
          # Fields are becoming a part of a `schema` struct
          # There is also an entry_aspect that is repeats entry_type as aspect_type
          entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
          df = df.withColumn("schema",
                             F.create_map(F.lit(schema_key),
                                          F.named_struct(
                                              F.lit("aspect_type"),
                                              F.lit(schema_key),
                                              F.lit("data"),
                                              F.create_map(F.lit("fields"),
                                                           F.col("fields")))
                                          )
                             )\
            .withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \
          .drop("fields")
      
          # Merge separate aspect columns into the one map called 'aspects'
          df = df.select(F.col("TABLE_NAME"),
                         F.map_concat("schema", "entry_aspect").alias("aspects"))
      
          # Define user-defined functions to fill the general information
          # and hierarchy names
          create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type,
                                                           db_schema, x),
                                  StringType())
      
          create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type,
                                                         db_schema, x), StringType())
      
          parent_name = nb.create_parent_name(config, entry_type, db_schema)
          full_entry_type = entry_type.value.format(
              project=config["target_project_id"],
              location=config["target_location_id"])
      
          # Fill the top-level fields
          column = F.col("TABLE_NAME")
          df = df.withColumn("name", create_name_udf(column)) \
            .withColumn("fully_qualified_name", create_fqn_udf(column)) \
            .withColumn("entry_type", F.lit(full_entry_type)) \
            .withColumn("parent_entry", F.lit(parent_name)) \
            .withColumn("entry_source", create_entry_source(column)) \
          .drop(column)
      
          df = convert_to_import_items(df, [schema_key, entry_aspect_name])
          return df
      

      注意事項:

      • 這些方法會建構連接器為 Oracle 資源建立的中繼資料資源。請使用本文件「Oracle 來源的範例中繼資料資源」一節中說明的慣例。
      • convert_to_import_items 方法適用於結構定義、資料表和檢視區塊。請確認連線器輸出的是一或多個可由 metadataJobs.create 方法處理的匯入項目,而非個別項目。
      • 即使在檢視區塊中,該資料欄也稱為 TABLE_NAME
    6. 使用程式碼更新 bootstrap.py 檔案,產生中繼資料匯入檔案並執行連接器。

      """The entrypoint of a pipeline."""
      from typing import Dict
      
      from src.constants import EntryType
      from src import cmd_reader
      from src import secret_manager
      from src import entry_builder
      from src import gcs_uploader
      from src import top_entry_builder
      from src.oracle_connector import OracleConnector
      
      
      FILENAME = "output.jsonl"
      
      
      def write_jsonl(output_file, json_strings):
          """Writes a list of string to the file in JSONL format."""
      
          # For simplicity, dataset is written into the one file. But it is not
          # mandatory, and the order doesn't matter for Import API.
          # The PySpark itself could dump entries into many smaller JSONL files.
          # Due to performance, it's recommended to dump to many smaller files.
          for string in json_strings:
              output_file.write(string + "\n")
      
      
      def process_dataset(
          connector: OracleConnector,
          config: Dict[str, str],
          schema_name: str,
          entry_type: EntryType,
      ):
          """Builds dataset and converts it to jsonl."""
          df_raw = connector.get_dataset(schema_name, entry_type)
          df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
          return df.toJSON().collect()
      
      
      def run():
          """Runs a pipeline."""
          config = cmd_reader.read_args()
          config["password"] = secret_manager.get_password(config["password_secret"])
          connector = OracleConnector(config)
      
          with open(FILENAME, "w", encoding="utf-8") as file:
              # Write top entries that don't require connection to the database
              file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
              file.writelines("\n")
              file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
      
              # Get schemas, write them and collect to the list
              df_raw_schemas = connector.get_db_schemas()
              schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
              schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
      
              write_jsonl(file, schemas_json)
      
              # Ingest tables and views for every schema in a list
              for schema in schemas:
                  print(f"Processing tables for {schema}")
                  tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                  write_jsonl(file, tables_json)
                  print(f"Processing views for {schema}")
                  views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                  write_jsonl(file, views_json)
      
          gcs_uploader.upload(config, FILENAME)
      

      這個範例會將中繼資料匯入檔案儲存為單一 JSON Lines 檔案。您可以使用 DataFrameWriter 類別等 PySpark 工具,平行輸出批次的 JSON。

      連接器可以依任意順序將項目寫入中繼資料匯入檔案。

    7. 使用程式碼更新 gcs_uploader.py 檔案,將中繼資料匯入檔案上傳至 Cloud Storage 值區。

      """Sends files to GCP storage."""
      from typing import Dict
      from google.cloud import storage
      
      
      def upload(config: Dict[str, str], filename: str):
          """Uploads a file to GCP bucket."""
          client = storage.Client()
          bucket = client.get_bucket(config["output_bucket"])
          folder = config["output_folder"]
      
          blob = bucket.blob(f"{folder}/{filename}")
          blob.upload_from_filename(filename)
      
    8. 建構連接器映像檔。

      如果連接器包含多個檔案,或您想使用預設 Docker 映像檔未納入的程式庫,就必須使用自訂容器。Dataproc Serverless for Spark 會在 Docker 容器中執行工作負載。建立連接器的自訂 Docker 映像檔,並將映像檔儲存在 Artifact Registry 中。 Dataproc Serverless 會從 Artifact Registry 讀取映像檔。

      1. 建立 Dockerfile:

        FROM debian:11-slim
        
        ENV DEBIAN_FRONTEND=noninteractive
        
        RUN apt update && apt install -y procps tini
        RUN apt install -y wget
        
        ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
        RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
        COPY ojdbc11.jar "${SPARK_EXTRA_JARS_DIR}"
        
        ENV CONDA_HOME=/opt/miniconda3
        ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
        ENV PATH=${CONDA_HOME}/bin:${PATH}
        RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py311_24.9.2-0-Linux-x86_64.sh
        
        RUN bash Miniconda3-py310_23.3.1-0-Linux-x86_64.sh -b -p /opt/miniconda3 \
          && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
          && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
          && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
          && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
        
        RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
            && ${CONDA_HOME}/bin/mamba install \
              conda \
              google-cloud-dataproc \
              google-cloud-logging \
              google-cloud-monitoring \
              google-cloud-storage
        
        RUN apt update && apt install -y git
        COPY requirements.txt .
        RUN python -m pip install -r requirements.txt
        
        ENV PYTHONPATH=/opt/python/packages
        RUN mkdir -p "${PYTHONPATH}/src/"
        COPY src/ "${PYTHONPATH}/src/"
        COPY main.py .
        
        RUN groupadd -g 1099 spark
        RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
        USER spark

        使用 Conda 做為套件管理員。Dataproc Serverless for Spark 會在執行階段將 pyspark 掛接到容器中,因此您不需要在自訂容器映像檔中安裝 PySpark 依附元件。

      2. 建構自訂容器映像檔,並推送至 Artifact Registry。

        #!/bin/bash
        
        IMAGE=oracle-pyspark:0.0.1
        PROJECT=<PROJECT_ID>
        
        
        REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pyspark
        
        docker build -t "${IMAGE}" .
        
        # Tag and push to GCP container registry
        gcloud config set project ${PROJECT}
        gcloud auth configure-docker us-central1-docker.pkg.dev
        docker tag "${IMAGE}" "${REPO_IMAGE}"
        docker push "${REPO_IMAGE}"
        

        由於一個映像檔可以有多個名稱,因此您可以使用 Docker 標記為映像檔指派別名。

    9. 在 Dataproc Serverless 上執行連接器。 如要使用自訂容器映像檔提交 PySpark 批次工作,請執行 gcloud dataproc batches submit pyspark 指令

      gcloud dataproc batches submit pyspark main.py --project=PROJECT \
          --region=REGION --batch=BATCH_ID \
          --container-image=CUSTOM_CONTAINER_IMAGE \
          --service-account=SERVICE_ACCOUNT_NAME \
          --jars=PATH_TO_JAR_FILES \
          --properties=PYSPARK_PROPERTIES \
          -- PIPELINE_ARGUMENTS
      

      注意事項:

      • JAR 檔案是 Spark 的驅動程式,如要從 Oracle、MySQL 或 Postgres 讀取資料,您必須為 Apache Spark 提供特定套件。套件可位於 Cloud Storage 或容器內。如果 JAR 檔案位於容器內,路徑會類似於 file:///path/to/file/driver.jar。在本例中,JAR 檔案的路徑為 /opt/spark/jars/
      • PIPELINE_ARGUMENTS 是連接器的指令列引數。

      這個連接器會從 Oracle 資料庫擷取中繼資料、產生中繼資料匯入檔案,並將中繼資料匯入檔案儲存至 Cloud Storage 值區。

    10. 如要將中繼資料匯入檔案中的中繼資料手動匯入 Dataplex Universal Catalog,請執行中繼資料工作。請使用 metadataJobs.create 方法

      1. 在指令列中新增環境變數,並為 curl 指令建立別名。

        PROJECT_ID=PROJECT
        LOCATION_ID=LOCATION
        DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID
        alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
        
      2. 呼叫 API 方法,並傳遞要匯入的項目類型和切面類型。

        gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF
        {
          "type": "IMPORT",
          "import_spec": {
            "source_storage_uri": "gs://BUCKET/FOLDER/",
            "entry_sync_mode": "FULL",
            "aspect_sync_mode": "INCREMENTAL",
            "scope": {
              "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"],
              "entry_types": [
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"],
        
              "aspect_types": [
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance",
                "projects/dataplex-types/locations/global/aspectTypes/schema",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"],
              },
            },
          }
        EOF
        )"
        

        schema 切面類型是由 Dataplex Universal Catalog 定義的全域切面類型。

        請注意,呼叫 API 方法時使用的面向類型名稱格式,與在連接器程式碼中使用的格式不同。

      3. 選用:使用 Cloud Logging 查看中繼資料工作的記錄。詳情請參閱「監控 Dataplex Universal Catalog 記錄」。

    設定管道自動化調度管理

    前幾節說明如何建構範例連接器,以及手動執行連接器。

    在實際工作環境中,您會使用 Workflows 等自動化調度平台,將連接器做為代管連線管道的一部分執行。

    1. 如要使用範例連接器執行代管連線管道,請按照使用工作流程匯入中繼資料的步驟操作。 請執行下列操作:

      • 在與連結器相同的 Google Cloud 位置建立工作流程。
      • 在工作流程定義檔案中,使用下列程式碼更新 submit_pyspark_extract_job 函式,透過您建立的連接器從 Oracle 資料庫擷取資料。

        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  jars: file:///opt/spark/jars/ojdbc11.jar
                  args:
                    - ${"--host_port=" + args.ORACLE_HOST_PORT}
                    - ${"--user=" + args.ORACLE_USER}
                    - ${"--password=" + args.ORACLE_PASSWORD}
                    - ${"--database=" + args.ORACE_DATABASE}
                    - ${"--project=" + args.TARGET_PROJECT_ID}
                    - ${"--location=" + args.CLOUD_REGION}
                    - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--folder=" + WORKFLOW_ID}
                runtimeConfig:
                  version: "2.0"
                  containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark"
                environmentConfig:
                  executionConfig:
                      serviceAccount: ${args.SERVICE_ACCOUNT}
            result: RESPONSE_MESSAGE
        
      • 在工作流程定義檔案中,使用下列程式碼更新 submit_import_job 函式,匯入項目。這個函式會呼叫 metadataJobs.create API 方法,執行中繼資料匯入工作。

        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  scope:
                    entry_groups:
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}
                    entry_types:
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"
                    aspect_types:
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance"
                      -"projects/dataplex-types/locations/global/aspectTypes/schema"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"
            result: IMPORT_JOB_RESPONSE
        

        提供您手動呼叫 API 方法時所加入的相同項目類型和切面類型。請注意,每個字串結尾都沒有逗號。

      • 執行工作流程時,請提供下列執行階段引數:

        {
          "CLOUD_REGION": "us-central1",
          "ORACLE_USER": "system",
          "ORACLE_HOST_PORT": "x.x.x.x:1521",
          "ORACLE_DATABASE": "xe",
          "ADDITIONAL_CONNECTOR_ARGS": [],
        }
        
    2. 選用:使用 Cloud Logging 查看代管式連線管道的記錄。記錄酬載包含 Dataproc Serverless 批次工作和中繼資料匯入工作的記錄連結 (如適用)。詳情請參閱查看工作流程記錄

    3. 選用:如要提升受管理連線管道的安全性、效能和功能,請考慮採取下列做法:

      1. 使用 Secret Manager 儲存第三方資料來源的憑證。
      2. 使用 PySpark 將 JSON Lines 輸出內容平行寫入多個中繼資料匯入檔案。
      3. 使用前置字元將大檔案 (超過 100 MB) 分割成較小的檔案。
      4. 新增更多自訂層面,從來源擷取額外的業務和技術中繼資料。

    Oracle 來源的中繼資料資源範例

    這個範例連接器會從 Oracle 資料庫擷取中繼資料,並將中繼資料對應至相應的 Dataplex Universal Catalog 中繼資料資源。

    階層注意事項

    Dataplex Universal Catalog 中的每個系統都有根項目,也就是該系統的父項項目。通常根項目會是 instance 項目類型。 下表顯示 Oracle 系統的項目類型和層面類型範例階層。舉例來說,oracle-database 項目類型會連結至同樣名為 oracle-database 的切面類型。

    項目類型 ID 說明 連結的切面類型 ID
    oracle-instance 匯入系統的根目錄。 oracle-instance
    oracle-database Oracle 資料庫。 oracle-database
    oracle-schema 資料庫結構定義。 oracle-schema
    oracle-table 表格。

    oracle-table

    schema

    oracle-view 檢視畫面。

    oracle-view

    schema

    schema 切面類型是由 Dataplex Universal Catalog 定義的全域切面類型。其中包含資料表、檢視區塊或其他有資料欄的實體中,各個欄位的說明。oracle-schema 自訂構面類型包含 Oracle 資料庫結構定義的名稱。

    匯入項目欄位範例

    連接器應遵循下列 Oracle 資源慣例。

    • 完整名稱:Oracle 資源的完整名稱使用下列命名範本。禁止使用的字元會以反引號逸出。

      資源 範本 範例
      執行個體

      SOURCE:ADDRESS

      使用系統的主機和通訊埠號碼或網域名稱。

      oracle:`localhost:1521`oracle:`myinstance.com`
      資料庫 SOURCE:ADDRESS.DATABASE oracle:`localhost:1521`.xe
      結構定義 SOURCE:ADDRESS.DATABASE.SCHEMA oracle:`localhost:1521`.xe.sys
      資料表 SOURCE:ADDRESS.DATABASE.SCHEMA.TABLE_NAME oracle:`localhost:1521`.xe.sys.orders
      查看 SOURCE:ADDRESS.DATABASE.SCHEMA.VIEW_NAME oracle:`localhost:1521`.xe.sys.orders_view
    • 項目名稱或項目 ID:Oracle 資源的項目使用下列命名範本。系統會將禁止使用的字元替換成允許使用的字元。資源使用前置字串 projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries

      資源 範本 範例
      執行個體 PREFIX/HOST_PORT projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
      資料庫 PREFIX/HOST_PORT/databases/DATABASE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
      結構定義 PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
      資料表 PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
      查看 PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEW projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
    • 父項項目:如果項目不是系統的根項目,項目可以有父項項目欄位,說明其在階層中的位置。這個欄位應包含父項項目的名稱。建議您產生這個值。

      下表列出 Oracle 資源的父項項目。

      項目 上層項目
      執行個體 "" (空字串)
      資料庫 執行個體名稱
      結構定義 資料庫名稱
      資料表 結構定義名稱
      查看 結構定義名稱
    • 面向對應:面向對應必須至少包含一個描述要匯入實體的面向。以下是 Oracle 資料表的層面地圖範例。

      "example-project.us-central1.oracle-table": {
          "aspect_type": "example-project.us-central1.oracle-table",
          "path": "",
          "data": {}
       },

      您可以在 global 位置找到預先定義的方面類型 (例如 schema),這些類型會定義 dataplex-types 專案中的表格或檢視結構。

    • 構面鍵:構面鍵採用 PROJECT.LOCATION.ASPECT_TYPE 命名格式。下表列出 Oracle 資源的構面鍵範例。

      項目 層面鍵範例
      執行個體 example-project.us-central1.oracle-instance
      資料庫 example-project.us-central1.oracle-database
      結構定義 example-project.us-central1.oracle-schema
      資料表 example-project.us-central1.oracle-table
      查看 example-project.us-central1.oracle-view

    後續步驟