In diesem Dokument finden Sie eine Referenzvorlage für die Erstellung eines benutzerdefinierten Connectors, mit dem Metadaten aus einer Drittanbieterquelle extrahiert werden. Sie verwenden den Connector, wenn Sie eine verwaltete Verbindungspipeline ausführen, die Metadaten in Dataplex importiert.
Sie können Connectors erstellen, um Metadaten aus Drittanbieterquellen zu extrahieren. Sie können beispielsweise einen Connector erstellen, um Daten aus Quellen wie MySQL, SQL Server, Oracle, Snowflake und Databricks zu extrahieren.
Verwenden Sie den Beispiel-Connector in diesem Dokument als Ausgangspunkt für die Erstellung eigener Connectoren. Der Beispiel-Connector stellt eine Verbindung zu einer Oracle Database Express Edition (XE) her. Der Connector ist in Python geschrieben, Sie können aber auch Java, Scala oder R verwenden.
Funktionsweise von Connectors
Ein Connector extrahiert Metadaten aus einer Drittanbieterdatenquelle, wandelt sie in das Dataplex-ImportItem
-Format um und generiert Metadatenimportdateien, die von Dataplex importiert werden können.
Der Connector ist Teil einer verwalteten Konnektivitätspipeline. Eine verwaltete Konnektivitätspipeline ist ein orchestrierter Workflow, mit dem Sie Dataplex Catalog-Metadaten importieren. Die verwaltete Konnektivitätspipeline führt den Connector aus und führt andere Aufgaben im Importworkflow aus, z. B. einen Metadatenimportjob und das Erfassen von Protokollen.
Die verwaltete Konnektivitätspipeline führt den Connector mit einem Dataproc Serverless-Batchjob aus. Dataproc Serverless bietet eine serverlose Spark-Ausführungsumgebung. Sie können zwar einen Connector erstellen, der Spark nicht verwendet, wir empfehlen jedoch die Verwendung von Spark, da sich dadurch die Leistung des Connectors verbessern lässt.
Anforderungen an Steckverbinder
Für den Connector gelten die folgenden Anforderungen:
- Der Connector muss ein Artifact Registry-Image sein, das auf Dataproc Serverless ausgeführt werden kann.
- Der Connector muss Metadatendateien in einem Format generieren, das von einem Dataplex-Metadatenimportjob (
metadataJobs.create
API-Methode) importiert werden kann. Ausführliche Anforderungen finden Sie unter Metadatenimportdatei. Der Connector muss die folgenden Befehlszeilenargumente akzeptieren, um Informationen aus der Pipeline zu empfangen:
Befehlszeilenargument Wert der Pipeline target_project_id
PROJECT_ID target_location_id
REGION target_entry_group_id
ENTRY_GROUP_ID output_bucket
CLOUD_STORAGE_BUCKET_ID output_folder
FOLDER_ID Der Connector verwendet diese Argumente, um Metadaten in einer Zieleintragsgruppe
projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID
zu generieren und in einen Cloud Storage-Bucketgs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID
zu schreiben. Bei jeder Ausführung der Pipeline wird im Bucket CLOUD_STORAGE_BUCKET_ID ein neuer Ordner FOLDER_ID erstellt. Der Connector sollte Metadatenimportdateien in diesen Ordner schreiben.
Die Pipeline-Vorlagen unterstützen PySpark-Connectors. Bei den Vorlagen wird davon ausgegangen, dass der Treiber (mainPythonFileUri
) eine lokale Datei auf dem Connector-Image mit dem Namen main.py
ist. Sie können die Pipeline-Vorlagen für andere Szenarien anpassen, z. B. für einen Spark-Connector, einen anderen Treiber-URI oder andere Optionen.
So erstellen Sie mit PySpark ein Importelement in der Metadatenimportdatei:
"""PySpark schemas for the data."""
entry_source_schema = StructType([
StructField("display_name", StringType()),
StructField("source", StringType())])
aspect_schema = MapType(StringType(),
StructType([
StructField("aspect_type", StringType()),
StructField("data", StructType([
]))
])
)
entry_schema = StructType([
StructField("name", StringType()),
StructField("entry_type", StringType()),
StructField("fully_qualified_name", StringType()),
StructField("parent_entry", StringType()),
StructField("entry_source", entry_source_schema),
StructField("aspects", aspect_schema)
])
import_item_schema = StructType([
StructField("entry", entry_schema),
StructField("aspect_keys", ArrayType(StringType())),
StructField("update_mask", ArrayType(StringType()))
])
Hinweis
In diesem Leitfaden wird davon ausgegangen, dass Sie mit Python und PySpark vertraut sind.
Lesen Sie sich die folgenden Informationen durch:
Gehen Sie so vor: Erstellen Sie alle Ressourcen am selben Google Cloud Speicherort.
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataplex, Dataproc, Workflows, and Artifact Registry APIs:
gcloud services enable dataplex.googleapis.com
dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com - Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant the
roles/owner
IAM role to the service account:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service account
-
-
Erstellen Sie einen Cloud Storage-Bucket, um die Metadatenimportdateien zu speichern.
-
Erstellen Sie die folgenden Dataplex Catalog-Ressourcen im selben Projekt.
Beispielwerte finden Sie im Abschnitt Beispielressourcen für den Dataplex Catalog für eine Oracle-Quelle dieses Dokuments.
- Erstellen Sie eine Eintragsgruppe.
-
Erstellen Sie benutzerdefinierte Aspekttypen für die Einträge, die Sie importieren möchten. Verwenden Sie die Namenskonvention
SOURCE
–ENTITY_TO_IMPORT
.Optional können Sie zusätzliche Aspekttypen erstellen, um andere Informationen zu speichern.
-
Erstellen Sie benutzerdefinierte Eintragstypen für die Ressourcen, die Sie importieren möchten, und weisen Sie ihnen die entsprechenden Aspekttypen zu. Verwenden Sie die Namenskonvention
SOURCE
–ENTITY_TO_IMPORT
.Erstellen Sie beispielsweise für eine Oracle-Datenbank einen Eintragstyp namens
oracle-database
. Verknüpfen Sie sie mit dem Aspekttyp namensoracle-database
.
- Prüfen Sie, ob auf die Drittanbieterquelle von Ihrem Google Cloud -Projekt aus zugegriffen werden kann. Weitere Informationen finden Sie unter Dataproc Serverless for Spark-Netzwerkkonfiguration.
Einfachen Python-Connector erstellen
Im Beispiel für einen einfachen Python-Connector werden mithilfe der Klassen der Dataplex-Clientbibliothek Einträge der obersten Ebene für eine Oracle-Datenquelle erstellt. Geben Sie dann die Werte für die Eingabefelder ein.
Der Connector erstellt eine Metadatenimportdatei mit den folgenden Einträgen:
- Ein
instance
-Eintrag vom Typprojects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance
. Dieser Eintrag steht für ein Oracle Database XE-System. - Ein
database
-Eintrag, der eine Datenbank im Oracle Database XE-System darstellt.
So erstellen Sie einen einfachen Python-Connector:
Klonen Sie das
cloud-dataplex
-Repository.Richten Sie eine lokale Umgebung ein. Wir empfehlen die Verwendung einer virtuellen Umgebung.
mkdir venv python -m venv venv/ source venv/bin/activate
Verwenden Sie die aktiven oder Wartungsversionen von Python. Python-Versionen 3.7 und höher werden unterstützt.
Erstellen Sie ein Python-Projekt.
Installationsanforderungen:
pip install -r requirements.txt
Folgende Voraussetzungen sind installiert:
Fügen Sie im Stammverzeichnis des Projekts eine
main.py
-Pipelinedatei hinzu.Wenn Sie Ihren Code in Dataproc Serverless bereitstellen, dient die Datei
main.py
als Ausgangspunkt für die Ausführung. Wir empfehlen, die Anzahl der Informationen in dermain.py
-Datei zu minimieren. Verwenden Sie diese Datei, um Funktionen und Klassen aufzurufen, die in Ihrem Connector definiert sind, z. B. diesrc/bootstap.py
-Klasse.Erstellen Sie einen Ordner
src
, um den Großteil der Logik für Ihren Connector zu speichern.Aktualisieren Sie die
src/cmd_reader.py
-Datei mit einer Python-Klasse, um Befehlszeilenargumente zu akzeptieren. Dazu können Sie das Modul argeparse verwenden.In Produktionsumgebungen empfehlen wir, das Passwort in Secret Manager zu speichern.
Aktualisieren Sie die Datei
src/constants.py
mit Code zum Erstellen von Konstanten.Aktualisieren Sie die
src/name_builder.py
-Datei mit Methoden zum Erstellen der Dataplex Catalog-Ressourcen, die der Connector für Ihre Oracle-Ressourcen erstellen soll. Verwenden Sie die Konventionen, die im Abschnitt Beispiel für Dataplex Catalog-Ressourcen für eine Oracle-Quelle dieses Dokuments beschrieben sind.Da die
name_builder.py
-Datei sowohl für den Python-Kerncode als auch für den PySpark-Kerncode verwendet wird, empfehlen wir, die Methoden als reine Funktionen und nicht als Mitglieder einer Klasse zu schreiben.Aktualisieren Sie die Datei
src/top_entry_builder.py
mit Code, um die Einträge der obersten Ebene mit Daten zu füllen.Aktualisieren Sie die Datei
src/bootstrap.py
mit Code, um die Metadatenimportdatei zu generieren und den Connector auszuführen.Führen Sie den Code lokal aus.
Es wird eine Metadatenimportdatei mit dem Namen
output.jsonl
zurückgegeben. Die Datei enthält zwei Zeilen, die jeweils ein Importelement darstellen. Die verwaltete Konnektivitätspipeline liest diese Datei beim Ausführen des Metadatenimportjobs.Optional: Erweitern Sie das vorherige Beispiel, um mit den Klassen der Dataplex-Clientbibliothek Importelemente für Tabellen, Schemas und Ansichten zu erstellen. Sie können das Python-Beispiel auch in Dataproc Serverless ausführen.
Wir empfehlen, einen Connector zu erstellen, der Spark verwendet und auf Dataproc Serverless ausgeführt wird, da sich so die Leistung des Connectors verbessern lässt.
PySpark-Connector erstellen
Dieses Beispiel basiert auf der PySpark DataFrame API. Sie können PySpark SQL installieren und lokal ausführen, bevor Sie es in Dataproc Serverless ausführen. Wenn Sie PySpark lokal installieren und ausführen, installieren Sie die PySpark-Bibliothek mit pip. Sie müssen jedoch keinen lokalen Spark-Cluster installieren.
Aus Leistungsgründen werden in diesem Beispiel keine vordefinierten Klassen aus der PySpark-Bibliothek verwendet. Stattdessen werden im Beispiel DataFrames erstellt, in JSON-Einträge konvertiert und die Ausgabe in eine Metadatenimportdatei im JSON-Format geschrieben, die in Dataplex importiert werden kann.
So erstellen Sie einen Connector mit PySpark:
Klonen Sie das
cloud-dataplex
-Repository.Installieren Sie PySpark:
pip install pyspark
Installationsanforderungen:
pip install -r requirements.txt
Folgende Voraussetzungen sind installiert:
Aktualisieren Sie die Datei
oracle_connector.py
mit Code, um Daten aus einer Oracle-Datenquelle zu lesen und DataFrames zurückzugeben.Fügen Sie SQL-Abfragen hinzu, um die Metadaten zurückzugeben, die Sie importieren möchten. Die Abfragen müssen die folgenden Informationen zurückgeben:
- Datenbankschemata
- Tabellen, die zu diesen Schemas gehören
- Spalten, die zu diesen Tabellen gehören, einschließlich Spaltenname, Spaltendatentyp und Angabe, ob die Spalte nullable oder erforderlich ist
Alle Spalten aller Tabellen und Ansichten werden in derselben Systemtabelle gespeichert. Sie können Spalten mit der Methode
_get_columns
auswählen. Je nach den angegebenen Parametern können Sie Spalten für die Tabellen oder für die Ansichten separat auswählen.Wichtige Hinweise:
- In Oracle ist ein Datenbankschema einem Datenbanknutzer zugewiesen und hat denselben Namen wie dieser Nutzer.
- Schemaobjekte sind logische Strukturen, die von Nutzern erstellt werden. Objekte wie Tabellen oder Indexe können Daten enthalten, während Objekte wie Ansichten oder Synonyme nur aus einer Definition bestehen.
- Die Datei
ojdbc11.jar
enthält den Oracle JDBC-Treiber.
Aktualisieren Sie die Datei
src/entry_builder.py
mit freigegebenen Methoden zum Anwenden von Spark-Transformationen.Wichtige Hinweise:
- Mit den Methoden werden die Dataplex Catalog-Ressourcen erstellt, die der Connector für Ihre Oracle-Ressourcen erstellt. Verwenden Sie die Konventionen, die im Abschnitt Beispiel für Dataplex Catalog-Ressourcen für eine Oracle-Quelle dieses Dokuments beschrieben sind.
- Die Methode
convert_to_import_items
gilt für Schemas, Tabellen und Ansichten. Die Ausgabe des Connectors muss aus einem oder mehreren Importelementen bestehen, die mit der MethodemetadataJobs.create
verarbeitet werden können. Einzeleinträge sind nicht zulässig. - Auch in einer Ansicht wird die Spalte
TABLE_NAME
genannt.
Aktualisieren Sie die Datei
bootstrap.py
mit Code, um die Metadatenimportdatei zu generieren und den Connector auszuführen.In diesem Beispiel wird die Metadatenimportdatei als einzelne JSON Lines-Datei gespeichert. Sie können PySpark-Tools wie die Klasse
DataFrameWriter
verwenden, um JSON-Batches parallel auszugeben.Der Connector kann Einträge in beliebiger Reihenfolge in die Metadatenimportdatei schreiben.
Aktualisieren Sie die Datei
gcs_uploader.py
mit Code, um die Metadatenimportdatei in einen Cloud Storage-Bucket hochzuladen.Erstellen Sie das Connector-Image.
Wenn Ihr Connector mehrere Dateien enthält oder Sie Bibliotheken verwenden möchten, die nicht im Standard-Docker-Image enthalten sind, müssen Sie einen benutzerdefinierten Container verwenden. Mit Dataproc Serverless for Spark werden Arbeitslasten in Docker-Containern ausgeführt. Erstellen Sie ein benutzerdefiniertes Docker-Image des Connectors und speichern Sie das Image in Artifact Registry. Dataproc Serverless liest das Image aus Artifact Registry.
Erstellen Sie ein Dockerfile:
Verwenden Sie Conda als Paketmanager. Bei Dataproc Serverless for Spark wird
pyspark
zur Laufzeit im Container bereitgestellt. Sie müssen also keine PySpark-Abhängigkeiten in Ihrem benutzerdefinierten Container-Image installieren.Erstellen Sie das benutzerdefinierte Container-Image und übertragen Sie es per Push an Artifact Registry.
Da ein Image mehrere Namen haben kann, können Sie dem Image mit dem Docker-Tag einen Alias zuweisen.
Führen Sie den Connector in Dataproc Serverless aus. Wenn Sie einen PySpark-Batchjob mit dem benutzerdefinierten Container-Image einreichen möchten, führen Sie den Befehl
gcloud dataproc batches submit pyspark
aus.gcloud dataproc batches submit pyspark main.py --project=PROJECT \ --region=REGION --batch=BATCH_ID \ --container-image=CUSTOM_CONTAINER_IMAGE \ --service-account=SERVICE_ACCOUNT_NAME \ --jars=PATH_TO_JAR_FILES \ --properties=PYSPARK_PROPERTIES \ -- PIPELINE_ARGUMENTS
Wichtige Hinweise:
- Die JAR-Dateien sind Treiber für Spark. Wenn Sie Daten aus Oracle, MySQL oder PostgreSQL lesen möchten, müssen Sie Apache Spark ein bestimmtes Paket zur Verfügung stellen. Das Paket kann sich in Cloud Storage oder im Container befinden. Wenn sich die JAR-Datei im Container befindet, sieht der Pfad in etwa so aus:
file:///path/to/file/driver.jar
. In diesem Beispiel lautet der Pfad zur JAR-Datei/opt/spark/jars/
. - PIPELINE_ARGUMENTS sind die Befehlszeilenargumente für den Connector.
Der Connector extrahiert Metadaten aus der Oracle-Datenbank, generiert eine Metadatenimportdatei und speichert sie in einem Cloud Storage-Bucket.
- Die JAR-Dateien sind Treiber für Spark. Wenn Sie Daten aus Oracle, MySQL oder PostgreSQL lesen möchten, müssen Sie Apache Spark ein bestimmtes Paket zur Verfügung stellen. Das Paket kann sich in Cloud Storage oder im Container befinden. Wenn sich die JAR-Datei im Container befindet, sieht der Pfad in etwa so aus:
Wenn Sie die Metadaten in der Metadatenimportdatei manuell in Dataplex importieren möchten, führen Sie einen Metadatenjob aus. Verwenden Sie die Methode
metadataJobs.create
:Fügen Sie in der Befehlszeile Umgebungsvariablen hinzu und erstellen Sie einen Alias für den curl-Befehl.
PROJECT_ID=PROJECT LOCATION_ID=LOCATION DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
Rufen Sie die API-Methode auf und übergeben Sie die Eintrags- und Aspekttypen, die Sie importieren möchten.
gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF { "type": "IMPORT", "import_spec": { "source_storage_uri": "gs://BUCKET/FOLDER/", "entry_sync_mode": "FULL", "aspect_sync_mode": "INCREMENTAL", "scope": { "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"], "entry_types": [ "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"], "aspect_types": [ "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance", "projects/dataplex-types/locations/global/aspectTypes/schema", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"], }, }, } EOF )"
Der Aspekttyp
schema
ist ein globaler Aspekttyp, der von Dataplex definiert wird.Das Format, das Sie für die Namen von Aspekttypen beim Aufrufen der API-Methode verwenden, unterscheidet sich vom Format, das Sie im Connector-Code verwenden.
Optional: Verwenden Sie Cloud Logging, um Logs für den Metadaten-Job aufzurufen. Weitere Informationen finden Sie unter Dataplex-Logs überwachen.
Pipelineorchestrierung einrichten
In den vorherigen Abschnitten wurde gezeigt, wie Sie einen Beispiel-Connector erstellen und manuell ausführen.
In einer Produktionsumgebung führen Sie den Connector als Teil einer verwalteten Konnektivitätspipeline mit einer Orchestrierungsplattform wie Workflows aus.
Wenn Sie eine verwaltete Konnektivitätspipeline mit dem Beispiel-Connector ausführen möchten, folgen Sie der Anleitung zum Importieren von Metadaten mithilfe von Workflows. Gehen Sie so vor:
- Erstellen Sie den Workflow am selben Speicherort wie den Google Cloud Connector.
Aktualisieren Sie in der Workflowdefinition die Funktion
submit_pyspark_extract_job
mit dem folgenden Code, um Daten mithilfe des von Ihnen erstellten Connectors aus der Oracle-Datenbank zu extrahieren.- submit_pyspark_extract_job: call: http.post args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" headers: Content-Type: "application/json" query: batchId: ${WORKFLOW_ID} body: pysparkBatch: mainPythonFileUri: file:///main.py jars: file:///opt/spark/jars/ojdbc11.jar args: - ${"--host_port=" + args.ORACLE_HOST_PORT} - ${"--user=" + args.ORACLE_USER} - ${"--password=" + args.ORACLE_PASSWORD} - ${"--database=" + args.ORACE_DATABASE} - ${"--project=" + args.TARGET_PROJECT_ID} - ${"--location=" + args.CLOUD_REGION} - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID} - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID} - ${"--folder=" + WORKFLOW_ID} runtimeConfig: version: "2.0" containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark" environmentConfig: executionConfig: serviceAccount: ${args.SERVICE_ACCOUNT} result: RESPONSE_MESSAGE
Aktualisieren Sie in der Workflowdefinition die Funktion
submit_import_job
mit dem folgenden Code, um die Einträge zu importieren. Die Funktion ruft die API-MethodemetadataJobs.create
auf, um einen Metadatenimportjob auszuführen.- submit_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" body: type: IMPORT import_spec: source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"} entry_sync_mode: FULL aspect_sync_mode: INCREMENTAL scope: entry_groups: - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID} entry_types: -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view" aspect_types: -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance" -"projects/dataplex-types/locations/global/aspectTypes/schema" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view" result: IMPORT_JOB_RESPONSE
Geben Sie dieselben Eintrags- und Aspekttypen an, die Sie beim manuellen Aufrufen der API-Methode angegeben haben. Beachten Sie, dass am Ende jedes Strings kein Komma steht.
Geben Sie beim Ausführen des Workflows die folgenden Laufzeitargumente an:
{ "CLOUD_REGION": "us-central1", "ORACLE_USER": "system", "ORACLE_HOST_PORT": "x.x.x.x:1521", "ORACLE_DATABASE": "xe", "ADDITIONAL_CONNECTOR_ARGS": [], }
Optional: Verwenden Sie Cloud Logging, um Logs für die Pipeline für die verwaltete Konnektivität aufzurufen. Die Protokollnutzlast enthält einen Link zu den Protokollen für den Dataproc Serverless-Batchjob und den Metadatenimportjob, sofern zutreffend. Weitere Informationen finden Sie unter Workflow-Logs ansehen.
Optional: Sie können die Sicherheit, Leistung und Funktionalität Ihrer verwalteten Konnektivitätspipeline mit den folgenden Maßnahmen verbessern:
- Verwenden Sie Secret Manager, um die Anmeldedaten für die Drittanbieterdatenquelle zu speichern.
- Mit PySpark können Sie die JSON Lines-Ausgabe parallel in mehrere Metadaten-Importdateien schreiben.
- Verwenden Sie ein Präfix, um große Dateien (mehr als 100 MB) in kleinere Dateien aufzuteilen.
- Fügen Sie weitere benutzerdefinierte Aspekte hinzu, mit denen zusätzliche geschäftliche und technische Metadaten aus Ihrer Quelle erfasst werden.
Beispiel für Dataplex Catalog-Ressourcen für eine Oracle-Quelle
Der Beispiel-Connector extrahiert Metadaten aus einer Oracle-Datenbank und ordnet sie den entsprechenden Dataplex Catalog-Ressourcen zu.
Hinweise zur Hierarchie
Jedes System in Dataplex hat einen Stammeintrag, der der übergeordnete Eintrag für das System ist. Normalerweise hat der Stammeintrag den Eintragstyp instance
.
Die folgende Tabelle zeigt die Beispielhierarchie der Datensatz- und Aspekttypen für ein Oracle-System.
Eintragstyp-ID | Beschreibung | ID des verknüpften Aspekttyps |
---|---|---|
oracle-instance |
Der Stamm des importierten Systems. | oracle-instance |
oracle-database |
Die Oracle-Datenbank. | oracle-database |
oracle-schema |
Das Datenbankschema. | oracle-schema |
oracle-table |
Eine Tabelle. |
|
oracle-view |
Eine Ansicht. |
|
Der Aspekttyp schema
ist ein globaler Aspekttyp, der von Dataplex definiert wird. Sie enthält eine Beschreibung der Felder in einer Tabelle, Ansicht oder einem anderen Element mit Spalten. Der benutzerdefinierte Aspekttyp oracle-schema
enthält den Namen des Oracle-Datenbankschemas.
Beispiel für Importelementfelder
Der Connector sollte die folgenden Konventionen für Oracle-Ressourcen verwenden.
-
Vollständig qualifizierte Namen: Für vollständig qualifizierte Namen von Oracle-Ressourcen wird die folgende Namensvorlage verwendet. Unzulässige Zeichen werden mit Backticks maskiert.
Ressource Vorlage Beispiel Instanz SOURCE
:ADDRESS
Verwenden Sie den Host und die Portnummer oder den Domainnamen des Systems.
oracle:`localhost:1521`
oderoracle:`myinstance.com`
Datenbank SOURCE
:ADDRESS
.DATABASE
oracle:`localhost:1521`.xe
Schema SOURCE
:ADDRESS
.DATABASE
.SCHEMA
oracle:`localhost:1521`.xe.sys
Tabelle SOURCE
:ADDRESS
.DATABASE
.SCHEMA
.TABLE_NAME
oracle:`localhost:1521`.xe.sys.orders
Ansehen SOURCE
:ADDRESS
.DATABASE
.SCHEMA
.VIEW_NAME
oracle:`localhost:1521`.xe.sys.orders_view
-
Eintragsnamen oder -IDs: Für Einträge für Oracle-Ressourcen wird die folgende Benennungsvorlage verwendet. Unzulässige Zeichen werden durch ein zulässiges Zeichen ersetzt. Ressourcen verwenden das Präfix
projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries
.Ressource Vorlage Beispiel Instanz PREFIX
/HOST_PORT
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
Datenbank PREFIX
/HOST_PORT
/databases/DATABASE
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
Schema PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
Tabelle PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
/tables/TABLE
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
Ansehen PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
/views/VIEW
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
-
Übergeordnete Einträge: Wenn ein Eintrag kein Stammeintrag für das System ist, kann er ein Feld für übergeordnete Einträge haben, das seine Position in der Hierarchie beschreibt. Das Feld sollte den Namen des übergeordneten Eintrags enthalten. Wir empfehlen, diesen Wert zu generieren.
Die folgende Tabelle enthält die übergeordneten Einträge für Oracle-Ressourcen.
Eintrag Übergeordneter Eintrag Instanz ""
(leerer String)Datenbank Instanzname Schema Datenbankname Tabelle Schemaname Ansehen Schemaname Aspektkarte: Die Aspektkarte muss mindestens einen Aspekt enthalten, der die zu importierende Entität beschreibt. Hier ist eine Beispiel-Aspektkarte für eine Oracle-Tabelle.
"example-project.us-central1.oracle-table": { "aspect_type": "example-project.us-central1.oracle-table", "path": "", "data": {} },
Vordefinierte Aspekttypen (z. B.
schema
), die die Tabellen- oder Ansichtsstruktur imdataplex-types
-Projekt definieren, finden Sie unterglobal
.-
Aspektschlüssel: Aspektschlüssel verwenden das Benennungsformat PROJECT.LOCATION.ASPECT_TYPE. Die folgende Tabelle enthält Beispielaspektschlüssel für Oracle-Ressourcen.
Eintrag Beispiel für einen Aspektschlüssel Instanz example-project.us-central1.oracle-instance
Datenbank example-project.us-central1.oracle-database
Schema example-project.us-central1.oracle-schema
Tabelle example-project.us-central1.oracle-table
Ansehen example-project.us-central1.oracle-view