이 문서에서는 서드 파티 소스에서 메타데이터를 추출하는 맞춤 커넥터를 빌드하는 데 사용할 수 있는 참조 템플릿을 제공합니다. 메타데이터를 Dataplex로 가져오는 관리형 연결 파이프라인을 실행할 때 이 커넥터를 사용합니다.
커넥터를 빌드하여 서드 파티 소스에서 메타데이터를 추출할 수 있습니다. 예를 들어 MySQL, SQL Server, Oracle, Snowflake, Databricks 등의 소스에서 데이터를 추출하는 커넥터를 빌드할 수 있습니다.
이 문서의 커넥터 예시를 시작점으로 사용하여 자체 커넥터를 빌드하세요. 이 커넥터 예시는 Oracle Database Express Edition (XE) 데이터베이스에 연결합니다. 커넥터는 Python으로 빌드되지만 Java, Scala, R을 사용할 수도 있습니다.
커넥터의 작동 원리
커넥터는 서드 파티 데이터 소스에서 메타데이터를 추출하고, 메타데이터를 Dataplex ImportItem
형식으로 변환하고, Dataplex에서 가져올 수 있는 메타데이터 가져오기 파일을 생성합니다.
커넥터는 관리형 연결 파이프라인의 일부입니다. 관리형 연결 파이프라인은 Dataplex 카탈로그 메타데이터를 가져오는 데 사용하는 조정된 워크플로입니다. 관리형 연결 파이프라인은 커넥터를 실행하고 메타데이터 가져오기 작업 실행, 로그 캡처와 같은 가져오기 워크플로의 다른 작업을 실행합니다.
관리형 연결 파이프라인은 Dataproc Serverless 일괄 작업을 사용하여 커넥터를 실행합니다. Dataproc Serverless는 서버리스 Spark 실행 환경을 제공합니다. Spark를 사용하지 않는 커넥터를 빌드할 수 있지만 Spark를 사용하면 커넥터의 성능을 개선할 수 있으므로 Spark를 사용하는 것이 좋습니다.
커넥터 요구사항
커넥터의 요구사항은 다음과 같습니다.
- 커넥터는 Dataproc Serverless에서 실행할 수 있는 Artifact Registry 이미지여야 합니다.
- 커넥터는 Dataplex 메타데이터 가져오기 작업 (
metadataJobs.create
API 메서드)에서 가져올 수 있는 형식으로 메타데이터 파일을 생성해야 합니다. 자세한 요구사항은 메타데이터 가져오기 파일을 참고하세요. 커넥터는 파이프라인에서 정보를 수신하려면 다음 명령줄 인수를 수락해야 합니다.
명령줄 인수 파이프라인에서 제공하는 값 target_project_id
PROJECT_ID target_location_id
REGION target_entry_group_id
ENTRY_GROUP_ID output_bucket
CLOUD_STORAGE_BUCKET_ID output_folder
FOLDER_ID 커넥터는 이러한 인수를 사용하여 대상 항목 그룹
projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID
에서 메타데이터를 생성하고 Cloud Storage 버킷gs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID
에 씁니다. 파이프라인을 실행할 때마다 버킷 CLOUD_STORAGE_BUCKET_ID에 새 폴더 FOLDER_ID가 생성됩니다. 커넥터는 이 폴더에 메타데이터 가져오기 파일을 작성해야 합니다.
파이프라인 템플릿은 PySpark 커넥터를 지원합니다. 템플릿은 드라이버(mainPythonFileUri
)가 main.py
라는 커넥터 이미지의 로컬 파일이라고 가정합니다. Spark 커넥터, 다른 드라이버 URI 또는 기타 옵션과 같은 다른 시나리오에 맞게 파이프라인 템플릿을 수정할 수 있습니다.
다음은 PySpark를 사용하여 메타데이터 가져오기 파일에 가져오기 항목을 만드는 방법입니다.
"""PySpark schemas for the data."""
entry_source_schema = StructType([
StructField("display_name", StringType()),
StructField("source", StringType())])
aspect_schema = MapType(StringType(),
StructType([
StructField("aspect_type", StringType()),
StructField("data", StructType([
]))
])
)
entry_schema = StructType([
StructField("name", StringType()),
StructField("entry_type", StringType()),
StructField("fully_qualified_name", StringType()),
StructField("parent_entry", StringType()),
StructField("entry_source", entry_source_schema),
StructField("aspects", aspect_schema)
])
import_item_schema = StructType([
StructField("entry", entry_schema),
StructField("aspect_keys", ArrayType(StringType())),
StructField("update_mask", ArrayType(StringType()))
])
시작하기 전에
이 가이드에서는 사용자가 Python 및 PySpark에 익숙하다고 가정합니다.
다음 정보를 검토하세요.
다음 단계를 따르세요. 모든 리소스를 동일한 Google Cloud 위치에 만듭니다.
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataplex, Dataproc, Workflows, and Artifact Registry APIs:
gcloud services enable dataplex.googleapis.com
dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com - Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant the
roles/owner
IAM role to the service account:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service account
-
-
메타데이터 가져오기 파일을 저장할 Cloud Storage 버킷을 만듭니다.
-
동일한 프로젝트에서 다음 Dataplex 카탈로그 리소스를 만듭니다.
값의 예시는 이 문서의 Oracle 소스의 Dataplex 카탈로그 리소스 예시 섹션을 참고하세요.
- 항목 그룹을 만듭니다.
-
가져오려는 항목에 대해 맞춤 관점 유형을 만듭니다.
SOURCE
-ENTITY_TO_IMPORT
이름 지정 규칙을 사용합니다.원하는 경우 다른 정보를 저장하기 위해 측정기준 유형을 추가로 만들 수 있습니다.
-
가져오려는 리소스에 대해 맞춤 항목 유형을 만들고 관련 관점 유형을 할당합니다.
SOURCE
-ENTITY_TO_IMPORT
이름 지정 규칙을 사용합니다.예를 들어 Oracle 데이터베이스의 경우
oracle-database
라는 항목 유형을 만듭니다.oracle-database
이라는 평가항목 유형에 연결합니다.
- Google Cloud 프로젝트에서 서드 파티 소스에 액세스할 수 있는지 확인합니다. 자세한 내용은 Spark 네트워크 구성을 위한 서버리스 Dataproc를 참고하세요.
기본 Python 커넥터 만들기
기본 Python 커넥터 예에서는 Dataplex 클라이언트 라이브러리 클래스를 사용하여 Oracle 데이터 소스의 최상위 항목을 만듭니다. 그런 다음 입력란에 값을 제공합니다.
커넥터는 다음 항목이 포함된 메타데이터 가져오기 파일을 만듭니다.
- 항목 유형이
projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance
인instance
항목 이 항목은 Oracle Database XE 시스템을 나타냅니다. - Oracle Database XE 시스템 내의 데이터베이스를 나타내는
database
항목
기본 Python 커넥터를 빌드하려면 다음 단계를 따르세요.
cloud-dataplex
저장소를 클론합니다.로컬 환경을 설정합니다. 가상 환경을 사용하는 것이 좋습니다.
mkdir venv python -m venv venv/ source venv/bin/activate
Python 프로젝트를 만듭니다.
설치 요구사항:
pip install -r requirements.txt
다음 요구사항이 설치됩니다.
프로젝트 루트에
main.py
파이프라인 파일을 추가합니다.Dataproc Serverless에 코드를 배포할 때
main.py
파일이 실행 진입점 역할을 합니다.main.py
파일에 저장되는 정보의 양을 최소화하는 것이 좋습니다. 이 파일을 사용하여 커넥터 내에 정의된 함수와 클래스(예:src/bootstap.py
클래스)를 호출합니다.커넥터의 로직 대부분을 저장할
src
폴더를 만듭니다.명령줄 인수를 허용하도록 Python 클래스로
src/cmd_reader.py
파일을 업데이트합니다. argeparse 모듈을 사용하여 이 작업을 수행할 수 있습니다.프로덕션 환경에서는 Secret Manager에 비밀번호를 저장하는 것이 좋습니다.
상수를 만드는 코드로
src/constants.py
파일을 업데이트합니다.커넥터가 Oracle 리소스에 대해 만들려는 Dataplex 카탈로그 리소스를 빌드하는 메서드로
src/name_builder.py
파일을 업데이트합니다. 이 문서의 Oracle 소스의 Dataplex 카탈로그 리소스 예시 섹션에 설명된 규칙을 따르세요.name_builder.py
파일은 Python 핵심 코드와 PySpark 핵심 코드 모두에 사용되므로 메서드를 클래스의 구성원이 아닌 순수 함수로 작성하는 것이 좋습니다.최상위 항목을 데이터로 채우는 코드로
src/top_entry_builder.py
파일을 업데이트합니다.메타데이터 가져오기 파일을 생성하고 커넥터를 실행하는 코드로
src/bootstrap.py
파일을 업데이트합니다.코드를 로컬에서 실행합니다.
output.jsonl
이라는 메타데이터 가져오기 파일이 반환됩니다. 파일에는 각각 가져오기 항목을 나타내는 두 줄이 있습니다. 관리형 연결 파이프라인은 메타데이터 가져오기 작업을 실행할 때 이 파일을 읽습니다.선택사항: Dataplex 클라이언트 라이브러리 클래스를 사용하여 테이블, 스키마, 뷰의 가져오기 항목을 만들도록 이전 예시를 확장합니다. Dataproc Serverless에서 Python 예시를 실행할 수도 있습니다.
Spark를 사용하고 Dataproc Serverless에서 실행되는 커넥터를 만드는 것이 좋습니다. 이렇게 하면 커넥터의 성능이 개선될 수 있기 때문입니다.
PySpark 커넥터 만들기
이 예는 PySpark DataFrame API를 기반으로 합니다. Dataproc Serverless에서 실행하기 전에 PySpark SQL을 설치하고 로컬에서 실행할 수 있습니다. PySpark를 로컬에 설치하고 실행하는 경우 pip를 사용하여 PySpark 라이브러리를 설치하지만 로컬 Spark 클러스터를 설치할 필요는 없습니다.
성능상의 이유로 이 예에서는 PySpark 라이브러리의 사전 정의된 클래스를 사용하지 않습니다. 대신 이 예에서는 DataFrame을 만들고 DataFrame을 JSON 항목으로 변환한 후 Dataplex로 가져올 수 있는 JSON Lines 형식의 메타데이터 가져오기 파일에 출력을 씁니다.
PySpark를 사용하여 커넥터를 빌드하려면 다음 단계를 따르세요.
cloud-dataplex
저장소를 클론합니다.PySpark를 설치합니다.
pip install pyspark
설치 요구사항:
pip install -r requirements.txt
다음 요구사항이 설치됩니다.
Oracle 데이터 소스에서 데이터를 읽고 DataFrames를 반환하는 코드로
oracle_connector.py
파일을 업데이트합니다.가져오려는 메타데이터를 반환하는 SQL 쿼리를 추가합니다. 쿼리는 다음 정보를 반환해야 합니다.
- 데이터베이스 스키마
- 이러한 스키마에 속한 테이블
- 이러한 테이블에 속하는 열(열 이름, 열 데이터 유형, 열의 nullable 여부 또는 필수 여부 포함)
모든 테이블과 보기의 모든 열이 동일한 시스템 테이블에 저장됩니다.
_get_columns
메서드를 사용하여 열을 선택할 수 있습니다. 제공하는 매개변수에 따라 테이블 또는 뷰의 열을 별도로 선택할 수 있습니다.다음에 유의하세요.
- Oracle에서 데이터베이스 스키마는 데이터베이스 사용자가 소유하며 해당 사용자와 동일한 이름을 갖습니다.
- 스키마 객체는 사용자가 만드는 논리적 구조입니다. 테이블이나 색인과 같은 객체는 데이터를 보유할 수 있으며 뷰나 동의어와 같은 객체는 정의로만 구성됩니다.
ojdbc11.jar
파일에는 Oracle JDBC 드라이버가 포함되어 있습니다.
Spark 변환을 적용하기 위한 공유 메서드로
src/entry_builder.py
파일을 업데이트합니다.다음에 유의하세요.
- 이 메서드는 커넥터가 Oracle 리소스에 대해 만드는 Dataplex 카탈로그 리소스를 빌드합니다. 이 문서의 Oracle 소스의 Dataplex 카탈로그 리소스 예시 섹션에 설명된 규칙을 따르세요.
convert_to_import_items
메서드는 스키마, 테이블, 뷰에 적용됩니다. 커넥터의 출력은 개별 항목이 아닌metadataJobs.create
메서드로 처리할 수 있는 하나 이상의 가져오기 항목이어야 합니다.- 뷰에서도 열은
TABLE_NAME
라고 합니다.
메타데이터 가져오기 파일을 생성하고 커넥터를 실행하는 코드로
bootstrap.py
파일을 업데이트합니다.이 예에서는 메타데이터 가져오기 파일을 단일 JSON Lines 파일로 저장합니다.
DataFrameWriter
클래스와 같은 PySpark 도구를 사용하여 JSON 일괄 처리를 동시에 출력할 수 있습니다.커넥터는 메타데이터 가져오기 파일에 항목을 순서에 관계없이 쓸 수 있습니다.
메타데이터 가져오기 파일을 Cloud Storage 버킷에 업로드하는 코드로
gcs_uploader.py
파일을 업데이트합니다.커넥터 이미지를 빌드합니다.
커넥터에 여러 파일이 포함되어 있거나 기본 Docker 이미지에 포함되지 않은 라이브러리를 사용하려는 경우 커스텀 컨테이너를 사용해야 합니다. Spark를 위한 서버리스 Dataproc은 Docker 컨테이너 내에서 워크로드를 실행합니다. 커넥터의 맞춤 Docker 이미지를 만들고 Artifact Registry에 이미지를 저장합니다. Dataproc Serverless는 Artifact Registry에서 이미지를 읽습니다.
Dockerfile을 만듭니다.
Conda를 패키지 관리자로 사용합니다. Spark를 위한 Dataproc 서버리스는 런타임 시
pyspark
를 컨테이너에 마운트하므로 커스텀 컨테이너 이미지에 PySpark 종속 항목을 설치할 필요가 없습니다.맞춤 컨테이너 이미지를 빌드하고 Artifact Registry에 푸시합니다.
하나의 이미지에 여러 이름을 지정할 수 있으므로 Docker 태그를 사용하여 이미지에 별칭을 할당할 수 있습니다.
Dataproc Serverless에서 커넥터를 실행합니다. 커스텀 컨테이너 이미지를 사용하여 PySpark 일괄 작업을 제출하려면
gcloud dataproc batches submit pyspark
명령어를 실행합니다.gcloud dataproc batches submit pyspark main.py --project=PROJECT \ --region=REGION --batch=BATCH_ID \ --container-image=CUSTOM_CONTAINER_IMAGE \ --service-account=SERVICE_ACCOUNT_NAME \ --jars=PATH_TO_JAR_FILES \ --properties=PYSPARK_PROPERTIES \ -- PIPELINE_ARGUMENTS
다음에 유의하세요.
- JAR 파일은 Spark용 드라이버입니다. Oracle, MySQL 또는 PostgreSQL에서 읽으려면 Apache Spark에 특정 패키지를 제공해야 합니다. 패키지는 Cloud Storage 또는 컨테이너 내에 있을 수 있습니다. JAR 파일이 컨테이너 내에 있으면 경로는
file:///path/to/file/driver.jar
와 유사합니다. 이 예에서 JAR 파일의 경로는/opt/spark/jars/
입니다. - PIPELINE_ARGUMENTS는 커넥터의 명령줄 인수입니다.
커넥터는 Oracle 데이터베이스에서 메타데이터를 추출하고, 메타데이터 가져오기 파일을 생성하고, 메타데이터 가져오기 파일을 Cloud Storage 버킷에 저장합니다.
- JAR 파일은 Spark용 드라이버입니다. Oracle, MySQL 또는 PostgreSQL에서 읽으려면 Apache Spark에 특정 패키지를 제공해야 합니다. 패키지는 Cloud Storage 또는 컨테이너 내에 있을 수 있습니다. JAR 파일이 컨테이너 내에 있으면 경로는
메타데이터 가져오기 파일의 메타데이터를 Dataplex로 수동으로 가져오려면 메타데이터 작업을 실행합니다.
metadataJobs.create
메서드를 사용합니다.명령줄에서 환경 변수를 추가하고 curl 명령어의 별칭을 만듭니다.
PROJECT_ID=PROJECT LOCATION_ID=LOCATION DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
가져오려는 항목 유형과 관점 유형을 전달하여 API 메서드를 호출합니다.
gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF { "type": "IMPORT", "import_spec": { "source_storage_uri": "gs://BUCKET/FOLDER/", "entry_sync_mode": "FULL", "aspect_sync_mode": "INCREMENTAL", "scope": { "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"], "entry_types": [ "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"], "aspect_types": [ "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance", "projects/dataplex-types/locations/global/aspectTypes/schema", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"], }, }, } EOF )"
schema
관점 유형은 Dataplex에서 정의하는 전역 관점 유형입니다.API 메서드를 호출할 때 측정기준 유형 이름에 사용하는 형식은 커넥터 코드에서 사용하는 형식과 다릅니다.
선택사항: Cloud Logging을 사용하여 메타데이터 작업의 로그를 봅니다. 자세한 내용은 Dataplex 로그 모니터링을 참고하세요.
파이프라인 조정 설정
이전 섹션에서는 예시 커넥터를 빌드하고 커넥터를 수동으로 실행하는 방법을 보여줬습니다.
프로덕션 환경에서는 워크플로와 같은 조정 플랫폼을 사용하여 관리형 연결 파이프라인의 일부로 커넥터를 실행합니다.
예시 커넥터로 관리형 연결 파이프라인을 실행하려면 워크플로를 사용하여 메타데이터를 가져오는 단계를 따르세요. 다음 작업을 실행합니다.
- 커넥터와 동일한 Google Cloud 위치에 워크플로를 만듭니다.
워크플로 정의 파일에서 다음 코드로
submit_pyspark_extract_job
함수를 업데이트하여 만든 커넥터를 사용하여 Oracle 데이터베이스에서 데이터를 추출합니다.- submit_pyspark_extract_job: call: http.post args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" headers: Content-Type: "application/json" query: batchId: ${WORKFLOW_ID} body: pysparkBatch: mainPythonFileUri: file:///main.py jars: file:///opt/spark/jars/ojdbc11.jar args: - ${"--host_port=" + args.ORACLE_HOST_PORT} - ${"--user=" + args.ORACLE_USER} - ${"--password=" + args.ORACLE_PASSWORD} - ${"--database=" + args.ORACE_DATABASE} - ${"--project=" + args.TARGET_PROJECT_ID} - ${"--location=" + args.CLOUD_REGION} - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID} - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID} - ${"--folder=" + WORKFLOW_ID} runtimeConfig: version: "2.0" containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark" environmentConfig: executionConfig: serviceAccount: ${args.SERVICE_ACCOUNT} result: RESPONSE_MESSAGE
워크플로 정의 파일에서 다음 코드로
submit_import_job
함수를 업데이트하여 항목을 가져옵니다. 이 함수는metadataJobs.create
API 메서드를 호출하여 메타데이터 가져오기 작업을 실행합니다.- submit_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" body: type: IMPORT import_spec: source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"} entry_sync_mode: FULL aspect_sync_mode: INCREMENTAL scope: entry_groups: - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID} entry_types: -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view" aspect_types: -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance" -"projects/dataplex-types/locations/global/aspectTypes/schema" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view" result: IMPORT_JOB_RESPONSE
API 메서드를 수동으로 호출할 때 포함한 것과 동일한 항목 유형과 관점 유형을 제공합니다. 각 문자열 끝에는 쉼표가 없습니다.
워크플로를 실행할 때 다음 런타임 인수를 제공합니다.
{ "CLOUD_REGION": "us-central1", "ORACLE_USER": "system", "ORACLE_HOST_PORT": "x.x.x.x:1521", "ORACLE_DATABASE": "xe", "ADDITIONAL_CONNECTOR_ARGS": [], }
선택사항: Cloud Logging을 사용하여 관리형 연결 파이프라인의 로그를 확인합니다. 로그 페이로드에는 Dataproc Serverless 일괄 작업 및 메타데이터 가져오기 작업의 로그 링크(해당하는 경우)가 포함됩니다. 자세한 내용은 워크플로 로그 보기를 참고하세요.
선택사항: 관리형 연결 파이프라인의 보안, 성능, 기능을 개선하려면 다음을 고려하세요.
- Secret Manager를 사용하여 서드 파티 데이터 소스의 사용자 인증 정보를 저장합니다.
- PySpark를 사용하여 JSON Lines 출력을 여러 메타데이터 가져오기 파일에 동시에 작성합니다.
- 접두사를 사용하여 큰 파일 (100MB 초과)을 더 작은 파일로 분할합니다.
- 소스에서 비즈니스 및 기술 메타데이터를 추가로 캡처하는 맞춤 측정기준을 더 추가합니다.
Oracle 소스의 Dataplex 카탈로그 리소스 예
이 커넥터 예시는 Oracle 데이터베이스에서 메타데이터를 추출하고 메타데이터를 상응하는 Dataplex 카탈로그 리소스에 매핑합니다.
계층 구조 고려사항
Dataplex의 모든 시스템에는 시스템의 상위 항목인 루트 항목이 있습니다. 일반적으로 루트 항목에는 instance
항목 유형이 있습니다.
다음 표는 Oracle 시스템의 항목 유형 및 관점 유형 계층 구조의 예를 보여줍니다.
항목 유형 ID | 설명 | 연결된 측정항목 유형 ID |
---|---|---|
oracle-instance |
가져온 시스템의 루트입니다. | oracle-instance |
oracle-database |
Oracle 데이터베이스 | oracle-database |
oracle-schema |
데이터베이스 스키마 | oracle-schema |
oracle-table |
테이블. |
|
oracle-view |
뷰 |
|
schema
관점 유형은 Dataplex에서 정의하는 전역 관점 유형입니다. 열이 있는 테이블, 뷰 또는 기타 항목의 필드에 관한 설명이 포함되어 있습니다. oracle-schema
맞춤 측정기준 유형에는 Oracle 데이터베이스 스키마의 이름이 포함됩니다.
가져오기 항목 필드의 예
커넥터는 Oracle 리소스에 다음 규칙을 사용해야 합니다.
-
정규화된 이름: Oracle 리소스의 정규화된 이름은 다음 명명 템플릿을 사용합니다. 금지된 문자는 백틱으로 이스케이프 처리됩니다.
리소스 템플릿 예시 인스턴스 SOURCE
:ADDRESS
시스템의 호스트 및 포트 번호 또는 도메인 이름을 사용합니다.
oracle:`localhost:1521`
또는oracle:`myinstance.com`
데이터베이스 SOURCE
:ADDRESS
.DATABASE
oracle:`localhost:1521`.xe
스키마 SOURCE
:ADDRESS
.DATABASE
.SCHEMA
oracle:`localhost:1521`.xe.sys
테이블 SOURCE
:ADDRESS
.DATABASE
.SCHEMA
.TABLE_NAME
oracle:`localhost:1521`.xe.sys.orders
보기 SOURCE
:ADDRESS
.DATABASE
.SCHEMA
.VIEW_NAME
oracle:`localhost:1521`.xe.sys.orders_view
-
항목 이름 또는 항목 ID: Oracle 리소스의 항목은 다음 명명 템플릿을 사용합니다. 금지된 문자는 허용되는 문자로 대체됩니다. 리소스는
projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries
접두어를 사용합니다.리소스 템플릿 예시 인스턴스 PREFIX
/HOST_PORT
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
데이터베이스 PREFIX
/HOST_PORT
/databases/DATABASE
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
스키마 PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
테이블 PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
/tables/TABLE
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
보기 PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
/views/VIEW
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
-
상위 항목: 항목이 시스템의 루트 항목이 아닌 경우 항목에 계층 구조에서의 위치를 설명하는 상위 항목 필드가 있을 수 있습니다. 이 필드에는 상위 항목의 이름이 포함되어야 합니다. 이 값을 생성하는 것이 좋습니다.
다음 표에는 Oracle 리소스의 상위 항목이 나와 있습니다.
항목 상위 항목 인스턴스 ""
'(빈 문자열)데이터베이스 인스턴스 이름 스키마 데이터베이스 이름 테이블 스키마 이름 보기 스키마 이름 측정기준 맵: 측정기준 맵에는 가져올 항목을 설명하는 측정기준이 하나 이상 포함되어야 합니다. 다음은 Oracle 테이블의 측면 맵 예입니다.
"example-project.us-central1.oracle-table": { "aspect_type": "example-project.us-central1.oracle-table", "path": "", "data": {} },
global
위치에서dataplex-types
프로젝트의 테이블 또는 뷰 구조를 정의하는 사전 정의된 관점 유형 (예:schema
)을 찾을 수 있습니다.-
측면 키: 측면 키는 PROJECT.LOCATION.ASPECT_TYPE라는 이름 지정 형식을 사용합니다. 다음 표에는 Oracle 리소스의 측면 키 예가 나와 있습니다.
항목 aspect key 예 인스턴스 example-project.us-central1.oracle-instance
데이터베이스 example-project.us-central1.oracle-database
스키마 example-project.us-central1.oracle-schema
테이블 example-project.us-central1.oracle-table
보기 example-project.us-central1.oracle-view