本文提供參考範本,協助您建構自訂連接器,從第三方來源擷取中繼資料。執行受管理連線管道時,您會使用連接器,將中繼資料匯入 Dataplex Universal Catalog。
您可以建構連接器,從第三方來源擷取中繼資料。舉例來說,您可以建立連接器,從 MySQL、SQL Server、Oracle、Snowflake、Databricks 等來源擷取資料。
您可以參考本文中的範例連接器,建立自己的連接器。範例連接器會連線至 Oracle Database Express Edition (XE) 資料庫。這個連結器是以 Python 建構,但您也可以使用 Java、Scala 或 R。
連結器的運作方式
連結器會從第三方資料來源擷取中繼資料,將中繼資料轉換為 Dataplex Universal Catalog ImportItem
格式,並產生可由 Dataplex Universal Catalog 匯入的中繼資料匯入檔案。
連接器是代管連線管道的一部分。受管理連線管道是自動化工作流程,用於匯入 Dataplex Universal Catalog 中繼資料。代管連線管道會執行連接器,並在匯入工作流程中執行其他工作,例如執行中繼資料匯入工作及擷取記錄。
受管理連線管道會使用 Dataproc Serverless 批次工作執行連接器。Dataproc Serverless 提供無伺服器 Spark 執行環境。雖然您可以建構不使用 Spark 的連接器,但我們建議使用 Spark,因為這有助於提升連接器的效能。
連接器需求
連接器必須符合下列規定:
- 連接器必須是可在 Dataproc Serverless 上執行的 Artifact Registry 映像檔。
- 連接器產生的中繼資料檔案格式必須可供 Dataplex Universal Catalog 中繼資料匯入工作 (
metadataJobs.create
API 方法) 匯入。如需詳細規定,請參閱「中繼資料匯入檔案」。 連接器必須接受下列指令列引數,才能從管道接收資訊:
指令列引數 管道提供的值 target_project_id
PROJECT_ID target_location_id
REGION target_entry_group_id
ENTRY_GROUP_ID output_bucket
CLOUD_STORAGE_BUCKET_ID output_folder
FOLDER_ID 連接器會使用這些引數,在目標項目群組
projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID
中產生中繼資料,並寫入 Cloud Storage 值區gs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID
。每次執行管道時,系統都會在 bucket CLOUD_STORAGE_BUCKET_ID 中建立新資料夾 FOLDER_ID。連接器應將中繼資料匯入檔案寫入這個資料夾。
管道範本支援 PySpark 連接器。範本會假設驅動程式 (mainPythonFileUri
) 是連接器映像檔中名為 main.py
的本機檔案。您可以修改其他情境的管道範本,例如 Spark 連接器、其他驅動程式 URI 或其他選項。
以下說明如何使用 PySpark 在中繼資料匯入檔案中建立匯入項目。
"""PySpark schemas for the data."""
entry_source_schema = StructType([
StructField("display_name", StringType()),
StructField("source", StringType())])
aspect_schema = MapType(StringType(),
StructType([
StructField("aspect_type", StringType()),
StructField("data", StructType([
]))
])
)
entry_schema = StructType([
StructField("name", StringType()),
StructField("entry_type", StringType()),
StructField("fully_qualified_name", StringType()),
StructField("parent_entry", StringType()),
StructField("entry_source", entry_source_schema),
StructField("aspects", aspect_schema)
])
import_item_schema = StructType([
StructField("entry", entry_schema),
StructField("aspect_keys", ArrayType(StringType())),
StructField("update_mask", ArrayType(StringType()))
])
事前準備
本指南假設您熟悉 Python 和 PySpark。
查看下列資訊:
請完成下列步驟。所有資源均須建立於相同位置。 Google Cloud
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataplex, Dataproc, Workflows, and Artifact Registry APIs:
gcloud services enable dataplex.googleapis.com
dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com -
Install the Google Cloud CLI.
-
如果您使用外部識別資訊提供者 (IdP),請先 使用聯合身分登入 gcloud CLI。
-
如要初始化 gcloud CLI,請執行下列指令:
gcloud init
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
-
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant the
roles/owner
IAM role to the service account:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service account
-
-
建立 Cloud Storage bucket,以儲存中繼資料匯入檔案。
-
在同一個專案中建立下列中繼資料資源。
如需範例值,請參閱本文的「Oracle 來源的範例中繼資料資源」一節。
- 建立項目群組。
-
為要匯入的項目建立自訂切面類型。使用命名慣例「
SOURCE
-ENTITY_TO_IMPORT
」。舉例來說,如果是 Oracle 資料庫,請建立名為
oracle-database
的構面類型。您也可以視需要建立其他面向類型,儲存其他資訊。
-
為要匯入的資源建立自訂項目類型,並指派相關切面類型。使用命名慣例「
SOURCE
-ENTITY_TO_IMPORT
」。舉例來說,如果是 Oracle 資料庫,請建立名為
oracle-database
的項目類型。將其連結至名為「oracle-database
」的指標類型。
- 確認可從 Google Cloud 專案存取第三方來源。詳情請參閱「Dataproc Serverless for Spark 網路設定」。
instance
項目,項目類型為projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance
。這個項目代表 Oracle Database XE 系統。database
項目,代表 Oracle Database XE 系統中的資料庫。設定本機環境。建議您使用虛擬環境。
mkdir venv python -m venv venv/ source venv/bin/activate
建立 Python 專案。
安裝必要項目:
pip install -r requirements.txt
已安裝下列必要項目:
在專案根目錄中新增
main.py
管道檔案。將程式碼部署至 Dataproc Serverless 時,
main.py
檔案會做為執行作業的進入點。建議您盡量減少main.py
檔案中儲存的資訊量,並使用這個檔案呼叫連接器中定義的函式和類別,例如src/bootstap.py
類別。建立
src
資料夾,儲存連接器的多數邏輯。使用 Python 類別更新
src/cmd_reader.py
檔案,以接受指令列引數。您可以使用 argeparse 模組執行這項操作。在正式環境中,建議您將密碼儲存在 Secret Manager。
使用程式碼更新
src/constants.py
檔案,建立常數。使用方法更新
src/name_builder.py
檔案,建構您希望連接器為 Oracle 資源建立的中繼資料資源。請使用本文件「Oracle 來源的範例中繼資料資源」一節中說明的慣例。由於
name_builder.py
檔案同時用於 Python 核心程式碼和 PySpark 核心程式碼,建議您將方法編寫為純函式,而非類別成員。使用程式碼更新
src/top_entry_builder.py
檔案,以資料填入頂層項目。使用程式碼更新
src/bootstrap.py
檔案,產生中繼資料匯入檔案並執行連接器。在本機執行程式碼。
系統會傳回名為
output.jsonl
的中繼資料匯入檔案。檔案有兩行,每一行代表一個匯入項目。執行中繼資料匯入作業時,代管連線管道會讀取這個檔案。選用:擴充先前的範例,使用 Dataplex Universal Catalog 用戶端程式庫類別,為表格、結構定義和檢視區塊建立匯入項目。您也可以在 Dataproc Serverless 上執行 Python 範例。
建議您建立使用 Spark 的連接器 (並在 Dataproc Serverless 上執行),因為這樣可以提升連接器的效能。
安裝 PySpark:
pip install pyspark
安裝必要項目:
pip install -r requirements.txt
已安裝下列必要項目:
使用程式碼更新
oracle_connector.py
檔案,從 Oracle 資料來源讀取資料並傳回 DataFrame。新增 SQL 查詢,傳回要匯入的中繼資料。查詢必須傳回下列資訊:
- 資料庫結構定義
- 屬於這些結構定義的資料表
- 這些資料表所屬的資料欄,包括資料欄名稱、資料欄資料類型,以及資料欄是否可為空值或必要
所有資料表和檢視區塊的所有資料欄,都會儲存在同一個系統資料表中。您可以使用
_get_columns
方法選取資料欄。 視您提供的參數而定,您可以分別為資料表或檢視畫面選取資料欄。注意事項:
- 在 Oracle 中,資料庫結構定義由資料庫使用者擁有,且名稱與該使用者相同。
- 架構物件是使用者建立的邏輯結構,資料表或索引等物件可以保存資料,而檢視表或同義字等物件只包含定義。
ojdbc11.jar
檔案包含 Oracle JDBC 驅動程式。
使用共用方法更新
src/entry_builder.py
檔案,套用 Spark 轉換。注意事項:
- 這些方法會建構連接器為 Oracle 資源建立的中繼資料資源。請使用本文件「Oracle 來源的範例中繼資料資源」一節中說明的慣例。
convert_to_import_items
方法適用於結構定義、資料表和檢視區塊。請確認連線器輸出的是一或多個可由metadataJobs.create
方法處理的匯入項目,而非個別項目。- 即使在檢視區塊中,該資料欄也稱為
TABLE_NAME
。
使用程式碼更新
bootstrap.py
檔案,產生中繼資料匯入檔案並執行連接器。這個範例會將中繼資料匯入檔案儲存為單一 JSON Lines 檔案。您可以使用
DataFrameWriter
類別等 PySpark 工具,平行輸出批次的 JSON。連接器可以依任意順序將項目寫入中繼資料匯入檔案。
使用程式碼更新
gcs_uploader.py
檔案,將中繼資料匯入檔案上傳至 Cloud Storage 值區。建構連接器映像檔。
如果連接器包含多個檔案,或您想使用預設 Docker 映像檔未納入的程式庫,就必須使用自訂容器。Dataproc Serverless for Spark 會在 Docker 容器中執行工作負載。建立連接器的自訂 Docker 映像檔,並將映像檔儲存在 Artifact Registry 中。 Dataproc Serverless 會從 Artifact Registry 讀取映像檔。
建立 Dockerfile:
使用 Conda 做為套件管理員。Dataproc Serverless for Spark 會在執行階段將
pyspark
掛接到容器中,因此您不需要在自訂容器映像檔中安裝 PySpark 依附元件。建構自訂容器映像檔,並推送至 Artifact Registry。
由於一個映像檔可以有多個名稱,因此您可以使用 Docker 標記為映像檔指派別名。
在 Dataproc Serverless 上執行連接器。 如要使用自訂容器映像檔提交 PySpark 批次工作,請執行
gcloud dataproc batches submit pyspark
指令。gcloud dataproc batches submit pyspark main.py --project=PROJECT \ --region=REGION --batch=BATCH_ID \ --container-image=CUSTOM_CONTAINER_IMAGE \ --service-account=SERVICE_ACCOUNT_NAME \ --jars=PATH_TO_JAR_FILES \ --properties=PYSPARK_PROPERTIES \ -- PIPELINE_ARGUMENTS
注意事項:
- JAR 檔案是 Spark 的驅動程式,如要從 Oracle、MySQL 或 Postgres 讀取資料,您必須為 Apache Spark 提供特定套件。套件可位於 Cloud Storage 或容器內。如果 JAR 檔案位於容器內,路徑會類似於
file:///path/to/file/driver.jar
。在本例中,JAR 檔案的路徑為/opt/spark/jars/
。 - PIPELINE_ARGUMENTS 是連接器的指令列引數。
這個連接器會從 Oracle 資料庫擷取中繼資料、產生中繼資料匯入檔案,並將中繼資料匯入檔案儲存至 Cloud Storage 值區。
- JAR 檔案是 Spark 的驅動程式,如要從 Oracle、MySQL 或 Postgres 讀取資料,您必須為 Apache Spark 提供特定套件。套件可位於 Cloud Storage 或容器內。如果 JAR 檔案位於容器內,路徑會類似於
如要將中繼資料匯入檔案中的中繼資料手動匯入 Dataplex Universal Catalog,請執行中繼資料工作。請使用
metadataJobs.create
方法。在指令列中新增環境變數,並為 curl 指令建立別名。
PROJECT_ID=PROJECT LOCATION_ID=LOCATION DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
呼叫 API 方法,並傳遞要匯入的項目類型和切面類型。
gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF { "type": "IMPORT", "import_spec": { "source_storage_uri": "gs://BUCKET/FOLDER/", "entry_sync_mode": "FULL", "aspect_sync_mode": "INCREMENTAL", "scope": { "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"], "entry_types": [ "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"], "aspect_types": [ "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance", "projects/dataplex-types/locations/global/aspectTypes/schema", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"], }, }, } EOF )"
schema
切面類型是由 Dataplex Universal Catalog 定義的全域切面類型。請注意,呼叫 API 方法時使用的面向類型名稱格式,與在連接器程式碼中使用的格式不同。
選用:使用 Cloud Logging 查看中繼資料工作的記錄。詳情請參閱「監控 Dataplex Universal Catalog 記錄」。
如要使用範例連接器執行代管連線管道,請按照使用工作流程匯入中繼資料的步驟操作。 請執行下列操作:
- 在與連結器相同的 Google Cloud 位置建立工作流程。
在工作流程定義檔案中,使用下列程式碼更新
submit_pyspark_extract_job
函式,透過您建立的連接器從 Oracle 資料庫擷取資料。- submit_pyspark_extract_job: call: http.post args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" headers: Content-Type: "application/json" query: batchId: ${WORKFLOW_ID} body: pysparkBatch: mainPythonFileUri: file:///main.py jars: file:///opt/spark/jars/ojdbc11.jar args: - ${"--host_port=" + args.ORACLE_HOST_PORT} - ${"--user=" + args.ORACLE_USER} - ${"--password=" + args.ORACLE_PASSWORD} - ${"--database=" + args.ORACE_DATABASE} - ${"--project=" + args.TARGET_PROJECT_ID} - ${"--location=" + args.CLOUD_REGION} - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID} - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID} - ${"--folder=" + WORKFLOW_ID} runtimeConfig: version: "2.0" containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark" environmentConfig: executionConfig: serviceAccount: ${args.SERVICE_ACCOUNT} result: RESPONSE_MESSAGE
在工作流程定義檔案中,使用下列程式碼更新
submit_import_job
函式,匯入項目。這個函式會呼叫metadataJobs.create
API 方法,執行中繼資料匯入工作。- submit_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" body: type: IMPORT import_spec: source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"} entry_sync_mode: FULL aspect_sync_mode: INCREMENTAL scope: entry_groups: - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID} entry_types: -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view" aspect_types: -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance" -"projects/dataplex-types/locations/global/aspectTypes/schema" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view" result: IMPORT_JOB_RESPONSE
提供您手動呼叫 API 方法時所加入的相同項目類型和切面類型。請注意,每個字串結尾都沒有逗號。
執行工作流程時,請提供下列執行階段引數:
{ "CLOUD_REGION": "us-central1", "ORACLE_USER": "system", "ORACLE_HOST_PORT": "x.x.x.x:1521", "ORACLE_DATABASE": "xe", "ADDITIONAL_CONNECTOR_ARGS": [], }
選用:使用 Cloud Logging 查看代管式連線管道的記錄。記錄酬載包含 Dataproc Serverless 批次工作和中繼資料匯入工作的記錄連結 (如適用)。詳情請參閱查看工作流程記錄。
選用:如要提升受管理連線管道的安全性、效能和功能,請考慮採取下列做法:
- 使用 Secret Manager 儲存第三方資料來源的憑證。
- 使用 PySpark 將 JSON Lines 輸出內容平行寫入多個中繼資料匯入檔案。
- 使用前置字元將大檔案 (超過 100 MB) 分割成較小的檔案。
- 新增更多自訂層面,從來源擷取額外的業務和技術中繼資料。
-
完整名稱:Oracle 資源的完整名稱使用下列命名範本。禁止使用的字元會以反引號逸出。
資源 範本 範例 執行個體 SOURCE
:ADDRESS
使用系統的主機和通訊埠號碼或網域名稱。
oracle:`localhost:1521`
或oracle:`myinstance.com`
資料庫 SOURCE
:ADDRESS
.DATABASE
oracle:`localhost:1521`.xe
結構定義 SOURCE
:ADDRESS
.DATABASE
.SCHEMA
oracle:`localhost:1521`.xe.sys
資料表 SOURCE
:ADDRESS
.DATABASE
.SCHEMA
.TABLE_NAME
oracle:`localhost:1521`.xe.sys.orders
查看 SOURCE
:ADDRESS
.DATABASE
.SCHEMA
.VIEW_NAME
oracle:`localhost:1521`.xe.sys.orders_view
-
項目名稱或項目 ID:Oracle 資源的項目使用下列命名範本。系統會將禁止使用的字元替換成允許使用的字元。資源使用前置字串
projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries
。資源 範本 範例 執行個體 PREFIX
/HOST_PORT
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
資料庫 PREFIX
/HOST_PORT
/databases/DATABASE
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
結構定義 PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
資料表 PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
/tables/TABLE
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
查看 PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
/views/VIEW
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
-
父項項目:如果項目不是系統的根項目,項目可以有父項項目欄位,說明其在階層中的位置。這個欄位應包含父項項目的名稱。建議您產生這個值。
下表列出 Oracle 資源的父項項目。
項目 上層項目 執行個體 ""
(空字串)資料庫 執行個體名稱 結構定義 資料庫名稱 資料表 結構定義名稱 查看 結構定義名稱 面向對應:面向對應必須至少包含一個描述要匯入實體的面向。以下是 Oracle 資料表的層面地圖範例。
"example-project.us-central1.oracle-table": { "aspect_type": "example-project.us-central1.oracle-table", "path": "", "data": {} },
您可以在
global
位置找到預先定義的方面類型 (例如schema
),這些類型會定義dataplex-types
專案中的表格或檢視結構。-
構面鍵:構面鍵採用 PROJECT.LOCATION.ASPECT_TYPE 命名格式。下表列出 Oracle 資源的構面鍵範例。
項目 層面鍵範例 執行個體 example-project.us-central1.oracle-instance
資料庫 example-project.us-central1.oracle-database
結構定義 example-project.us-central1.oracle-schema
資料表 example-project.us-central1.oracle-table
查看 example-project.us-central1.oracle-view
建立基本的 Python 連接器
這個基本 Python 連接器範例會使用 Dataplex Universal Catalog 用戶端程式庫類別,為 Oracle 資料來源建立頂層項目。然後為輸入欄位提供值。
連接器會建立中繼資料匯入檔案,其中包含下列項目:
如要建構基本 Python 連接器,請按照下列步驟操作:
建立 PySpark 連接器
本範例是以 PySpark DataFrame API 為基礎。您可以在 Dataproc Serverless 上執行 PySpark SQL 之前,先在本機安裝並執行。如要在本機安裝及執行 PySpark,請使用 pip 安裝 PySpark 程式庫,但不必安裝本機 Spark 叢集。
基於效能考量,這個範例不會使用 PySpark 程式庫的預先定義類別。這個範例會建立 DataFrame,將 DataFrame 轉換為 JSON 項目,然後以 JSON Lines 格式將輸出內容寫入中繼資料匯入檔案,以便匯入 Dataplex Universal Catalog。
如要使用 PySpark 建構連接器,請按照下列步驟操作:
設定管道自動化調度管理
前幾節說明如何建構範例連接器,以及手動執行連接器。
在實際工作環境中,您會使用 Workflows 等自動化調度平台,將連接器做為代管連線管道的一部分執行。
Oracle 來源的中繼資料資源範例
這個範例連接器會從 Oracle 資料庫擷取中繼資料,並將中繼資料對應至相應的 Dataplex Universal Catalog 中繼資料資源。
階層注意事項
Dataplex Universal Catalog 中的每個系統都有根項目,也就是該系統的父項項目。通常根項目會是 instance
項目類型。
下表顯示 Oracle 系統的項目類型和層面類型範例階層。舉例來說,oracle-database
項目類型會連結至同樣名為 oracle-database
的切面類型。
項目類型 ID | 說明 | 連結的切面類型 ID |
---|---|---|
oracle-instance |
匯入系統的根目錄。 | oracle-instance |
oracle-database |
Oracle 資料庫。 | oracle-database |
oracle-schema |
資料庫結構定義。 | oracle-schema |
oracle-table |
表格。 |
|
oracle-view |
檢視畫面。 |
|
schema
切面類型是由 Dataplex Universal Catalog 定義的全域切面類型。其中包含資料表、檢視區塊或其他有資料欄的實體中,各個欄位的說明。oracle-schema
自訂構面類型包含 Oracle 資料庫結構定義的名稱。
匯入項目欄位範例
連接器應遵循下列 Oracle 資源慣例。