Benutzerdefinierten Connector für den Metadatenimport entwickeln

Dieses Dokument enthält eine Referenzvorlage, mit der Sie einen benutzerdefinierten Connector erstellen können, der extrahiert Metadaten aus einer Drittanbieterquelle. Sie verwenden den Connector, wenn Sie eine verwaltete Verbindungspipeline ausführen, die Metadaten in Dataplex importiert.

Sie können Connectors erstellen, um Metadaten aus Drittanbieterquellen zu extrahieren. Für können Sie beispielsweise um Daten aus Quellen wie MySQL, SQL Server, Oracle, Snowflake, Databricks und andere.

Verwenden Sie den Beispiel-Connector in diesem Dokument als Ausgangspunkt, um einen eigenen Connector zu erstellen. Connectors. Der Beispiel-Connector stellt eine Verbindung zu einer Oracle Database Express Edition (XE) her. Der Connector ist in Python geschrieben, Sie können aber auch Java, Scala oder R verwenden.

Funktionsweise von Connectors

Ein Connector extrahiert Metadaten aus einer Datenquelle eines Drittanbieters und wandelt den in das Dataplex-Format ImportItem und generiert Metadaten-Importdateien, die von Dataplex importiert werden können.

Der Connector ist Teil einer verwalteten Verbindungspipeline. Eine verwaltete Verbindungspipeline ist ein orchestrierter Workflow, mit dem Sie Dataplex Catalog-Metadaten. Die Pipeline für verwaltete Verbindungen führt den Connector aus und führt andere Aufgaben im Importworkflow aus, z. B. einen Metadatenimportjob ausführen und Logs erfassen.

Die verwaltete Konnektivitätspipeline führt den Connector mit einem Dataproc Serverless-Batchjob aus. Dataproc Serverless bietet eine serverlose Spark-Ausführungsumgebung. Sie können zwar einen Connector erstellen, verwendet Spark nicht. Wir empfehlen die Verwendung von Spark, da es den die Leistung des Connectors.

Connector-Anforderungen

Für den Connector gelten die folgenden Anforderungen:

  • Der Connector muss ein Artifact Registry-Image sein, das auf Dataproc Serverless ausgeführt werden kann.
  • Der Connector muss Metadatendateien in einem Format generieren, das importiert werden kann durch einen Dataplex-Metadatenimportjob (den metadataJobs.create API-Methode). Ausführliche Anforderungen finden Sie unter Metadatenimportdatei.
  • Der Connector muss die folgenden Befehlszeilenargumente akzeptieren, um Informationen aus der Pipeline:

    Befehlszeilenargument Wert, den die Pipeline bietet
    target_project_id PROJECT_ID
    target_location_id REGION
    target_entry_group_id ENTRY_GROUP_ID
    output_bucket CLOUD_STORAGE_BUCKET_ID
    output_folder FOLDER_ID

    Der Connector verwendet diese Argumente, um Metadaten in einer Zieleintragsgruppeprojects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID zu generieren und in einen Cloud Storage-Bucketgs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID zu schreiben. Bei jeder Ausführung der Pipeline wird im Bucket CLOUD_STORAGE_BUCKET_ID ein neuer Ordner FOLDER_ID erstellt. Der Connector sollte Metadatenimportdateien in diesen Ordner schreiben.

Die Pipeline-Vorlagen unterstützen PySpark-Connectors. Bei den Vorlagen wird davon ausgegangen, dass der Treiber (mainPythonFileUri) eine lokale Datei auf dem Connector-Image mit dem Namen main.py ist. Sie können die Pipelinevorlagen für andere Szenarien, etwa einen Spark-Connector, Treiber-URI oder andere Optionen.

So erstellen Sie mit PySpark ein Importelement im Metadatenimport -Datei.

"""PySpark schemas for the data."""
entry_source_schema = StructType([
      StructField("display_name", StringType()),
      StructField("source", StringType())])

aspect_schema = MapType(StringType(),
                        StructType([
                            StructField("aspect_type", StringType()),
                            StructField("data", StructType([
                            ]))
                          ])
                        )

entry_schema = StructType([
  StructField("name", StringType()),
  StructField("entry_type", StringType()),
  StructField("fully_qualified_name", StringType()),
  StructField("parent_entry", StringType()),
  StructField("entry_source", entry_source_schema),
  StructField("aspects", aspect_schema)
])

import_item_schema = StructType([
  StructField("entry", entry_schema),
  StructField("aspect_keys", ArrayType(StringType())),
  StructField("update_mask", ArrayType(StringType()))
])

Hinweise

In diesem Leitfaden wird davon ausgegangen, dass Sie mit Python und PySpark vertraut sind.

Überprüfen Sie die folgenden Informationen:

Gehen Sie so vor: Alle Ressourcen am selben Google Cloud-Speicherort erstellen

  1. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the Dataplex, Dataproc, Workflows, and Artifact Registry APIs:

    gcloud services enable dataplex.googleapis.com dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com
  4. Install the Google Cloud CLI.
  5. To initialize the gcloud CLI, run the following command:

    gcloud init
  6. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  7. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant the roles/owner IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
  8. Cloud Storage-Bucket erstellen, um Metadaten-Importdateien speichern.

  9. Erstellen Sie die folgenden Dataplex Catalog-Ressourcen im selben Projekt.

    Beispielwerte finden Sie in der Beispiele für Dataplex Catalog-Ressourcen für eine Oracle-Quelle dieses Dokuments.

    1. Erstellen Sie eine Eintragsgruppe.
    2. Benutzerdefinierte Aspekttypen erstellen für die Einträge, die Sie importieren möchten. Verwenden Sie die Namenskonvention SOURCEENTITY_TO_IMPORT.

      Optional können Sie zusätzliche Aspekttypen erstellen, um andere Informationen zu speichern.

    3. Erstellen Sie benutzerdefinierte Eintragstypen für die Ressourcen, die Sie importieren möchten, und weisen Sie ihnen die entsprechenden Aspekttypen zu. Verwenden Sie die Namenskonvention SOURCEENTITY_TO_IMPORT.

      Erstellen Sie beispielsweise für eine Oracle-Datenbank einen Eintragstyp mit dem Namen oracle-database Verknüpfen Sie sie mit dem Aspekttyp namens oracle-database.

  10. Achten Sie darauf, dass Ihre Drittanbieterquelle über Ihr Google Cloud-Projekt Weitere Informationen finden Sie unter Dataproc Serverless for Spark-Netzwerkkonfiguration.

Einfachen Python-Connector erstellen

Mit dem einfachen Python-Beispiel-Connector werden Einträge der obersten Ebene für eine Oracle-Datenquelle mithilfe der Dataplex-Clientbibliotheksklassen. Geben Sie dann die Werte für die Eingabefelder ein.

Der Connector erstellt eine Metadatenimportdatei mit den folgenden Einträgen:

  • Einen instance-Eintrag mit dem Eintragstyp projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance. Dieser Eintrag stellt ein Oracle Database XE-System dar.
  • Ein database-Eintrag, der eine Datenbank in Oracle Database XE darstellt System.

So erstellen Sie einen einfachen Python-Connector:

  1. Klonen Sie cloud-dataplex-Repository.

  2. Lokale Umgebung einrichten Wir empfehlen die Verwendung einer virtuellen Umgebung.

    mkdir venv
    python -m venv venv/
    source venv/bin/activate
    

    Verwenden Sie die aktiven oder Wartungsversionen von Python. Unterstützt werden Python-Versionen 3.7 und höher.

  3. Erstellen Sie ein Python-Projekt.

  4. Installationsvoraussetzungen:

    pip install -r requirements.txt
    

    Folgende Voraussetzungen müssen installiert sein:

    google-cloud-dataplex==2.2.2
    google-cloud-storage
    google-cloud-secret-manager
    
  5. Fügen Sie im Stammverzeichnis des Projekts eine main.py-Pipelinedatei hinzu.

    from src import bootstrap
    
    
    if __name__ == '__main__':
        bootstrap.run()
    

    Wenn Sie Ihren Code in Dataproc Serverless bereitstellen, dient die Datei main.py als Ausgangspunkt für die Ausführung. Wir empfehlen, die Menge der in der Datei main.py gespeicherten Informationen; verwenden zum Aufrufen von Funktionen und Klassen, die in Ihrem Connector definiert sind, wie z. B. die Klasse src/bootstap.py.

  6. Erstellen Sie einen src-Ordner, um den Großteil der Logik für den Connector zu speichern.

  7. Aktualisieren Sie die src/cmd_reader.py-Datei mit einer Python-Klasse, um Befehlszeilenargumente zu akzeptieren. Dazu können Sie das Modul argeparse verwenden.

    """Command line reader."""
    import argparse
    
    
    def read_args():
        """Reads arguments from the command line."""
        parser = argparse.ArgumentParser()
    
        # Dataplex arguments
        parser.add_argument("--target_project_id", type=str, required=True,
            help="The name of the target Google Cloud project to import the metadata into.")
        parser.add_argument("--target_location_id", type=str, required=True,
            help="The target Google Cloud location where the metadata will be imported into.")
        parser.add_argument("--target_entry_group_id", type=str, required=True,
            help="The ID of the entry group to import metadata into. "
                 "The metadata will be imported into entry group with the following"
                 "full resource name: projects/${target_project_id}/"
                 "locations/${target_location_id}/entryGroups/${target_entry_group_id}.")
    
        # Oracle arguments
        parser.add_argument("--host_port", type=str, required=True,
            help="Oracle host and port number separated by the colon (:).")
        parser.add_argument("--user", type=str, required=True, help="Oracle User.")
        parser.add_argument("--password-secret", type=str, required=True,
            help="Secret resource name in the Secret Manager for the Oracle password.")
        parser.add_argument("--database", type=str, required=True,
            help="Source Oracle database.")
    
        # Google Cloud Storage arguments
        # It is assumed that the bucket is in the same region as the entry group
        parser.add_argument("--output_bucket", type=str, required=True,
            help="The Cloud Storage bucket to write the generated metadata import file.")
        parser.add_argument("--output_folder", type=str, required=True,
            help="A folder in the Cloud Storage bucket, to write the generated metadata import files.")
    
        return vars(parser.parse_known_args()[0])
    

    In Produktionsumgebungen empfehlen wir, das Passwort in Secret Manager zu speichern.

  8. Aktualisieren Sie die Datei src/constants.py mit Code zum Erstellen von Konstanten.

    """Constants that are used in the different files."""
    import enum
    
    SOURCE_TYPE = "oracle"
    
    # Symbols for replacement
    FORBIDDEN = "#"
    ALLOWED = "!"
    
    
    class EntryType(enum.Enum):
        """Types of Oracle entries."""
        INSTANCE: str = "projects/{project}/locations/{location}/entryTypes/oracle-instance"
        DATABASE: str = "projects/{project}/locations/{location}/entryTypes/oracle-database"
        DB_SCHEMA: str = "projects/{project}/locations/{location}/entryTypes/oracle-schema"
        TABLE: str = "projects/{project}/locations/{location}/entryTypes/oracle-table"
        VIEW: str = "projects/{project}/locations/{location}/entryTypes/oracle-view"
    
  9. Aktualisieren Sie die src/name_builder.py-Datei mit Methoden zum Erstellen der Dataplex Catalog-Ressourcen, die der Connector für Ihre Oracle-Ressourcen erstellen soll. Verwenden Sie die Konventionen, die im Abschnitt Beispiel für Dataplex Catalog-Ressourcen für eine Oracle-Quelle dieses Dokuments beschrieben sind.

    """Builds Dataplex hierarchy identifiers."""
    from typing import Dict
    from src.constants import EntryType, SOURCE_TYPE
    
    
    # Oracle cluster users start with C## prefix, but Dataplex doesn't accept #.
    # In that case in names it is changed to C!!, and escaped with backticks in FQNs
    FORBIDDEN_SYMBOL = "#"
    ALLOWED_SYMBOL = "!"
    
    
    def create_fqn(config: Dict[str, str], entry_type: EntryType,
                   schema_name: str = "", table_name: str = ""):
        """Creates a fully qualified name or Dataplex v1 hierarchy name."""
        if FORBIDDEN_SYMBOL in schema_name:
            schema_name = f"`{schema_name}`"
    
        if entry_type == EntryType.INSTANCE:
            # Requires backticks to escape column
            return f"{SOURCE_TYPE}:`{config['host_port']}`"
        if entry_type == EntryType.DATABASE:
            instance = create_fqn(config, EntryType.INSTANCE)
            return f"{instance}.{config['database']}"
        if entry_type == EntryType.DB_SCHEMA:
            database = create_fqn(config, EntryType.DATABASE)
            return f"{database}.{schema_name}"
        if entry_type in [EntryType.TABLE, EntryType.VIEW]:
            database = create_fqn(config, EntryType.DATABASE)
            return f"{database}.{schema_name}.{table_name}"
        return ""
    
    
    def create_name(config: Dict[str, str], entry_type: EntryType,
                    schema_name: str = "", table_name: str = ""):
        """Creates a Dataplex v2 hierarchy name."""
        if FORBIDDEN_SYMBOL in schema_name:
            schema_name = schema_name.replace(FORBIDDEN_SYMBOL, ALLOWED_SYMBOL)
        if entry_type == EntryType.INSTANCE:
            name_prefix = (
                f"projects/{config['target_project_id']}/"
                f"locations/{config['target_location_id']}/"
                f"entryGroups/{config['target_entry_group_id']}/"
                f"entries/"
            )
            return name_prefix + config["host_port"].replace(":", "@")
        if entry_type == EntryType.DATABASE:
            instance = create_name(config, EntryType.INSTANCE)
            return f"{instance}/databases/{config['database']}"
        if entry_type == EntryType.DB_SCHEMA:
            database = create_name(config, EntryType.DATABASE)
            return f"{database}/database_schemas/{schema_name}"
        if entry_type == EntryType.TABLE:
            db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
            return f"{db_schema}/tables/{table_name}"
        if entry_type == EntryType.VIEW:
            db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
            return f"{db_schema}/views/{table_name}"
        return ""
    
    
    def create_parent_name(config: Dict[str, str], entry_type: EntryType,
                           parent_name: str = ""):
        """Generates a Dataplex v2 name of the parent."""
        if entry_type == EntryType.DATABASE:
            return create_name(config, EntryType.INSTANCE)
        if entry_type == EntryType.DB_SCHEMA:
            return create_name(config, EntryType.DATABASE)
        if entry_type == EntryType.TABLE:
            return create_name(config, EntryType.DB_SCHEMA, parent_name)
        return ""
    
    
    def create_entry_aspect_name(config: Dict[str, str], entry_type: EntryType):
        """Generates an entry aspect name."""
        last_segment = entry_type.value.split("/")[-1]
        return f"{config['target_project_id']}.{config['target_location_id']}.{last_segment}"
    

    Da die name_builder.py-Datei sowohl für den Python-Kerncode als auch für den PySpark-Kerncode verwendet wird, empfehlen wir, die Methoden als reine Funktionen und nicht als Mitglieder einer Klasse zu schreiben.

  10. Aktualisieren Sie die Datei src/top_entry_builder.py mit Code, um die Einträge auf oberster Ebene mit Daten zu füllen.

    """Non-Spark approach for building the entries."""
    import dataclasses
    import json
    from typing import List, Dict
    
    import proto
    from google.cloud import dataplex_v1
    
    from src.constants import EntryType
    from src import name_builder as nb
    
    
    @dataclasses.dataclass(slots=True)
    class ImportItem:
        """A template class for Import API."""
    
        entry: dataplex_v1.Entry = dataclasses.field(default_factory=dataplex_v1.Entry)
        aspect_keys: List[str] = dataclasses.field(default_factory=list)
        update_mask: List[str] = dataclasses.field(default_factory=list)
    
    
    def _dict_factory(data: object):
        """Factory function required for converting Entry dataclass to dict."""
    
        def convert(obj: object):
            if isinstance(obj, proto.Message):
                return proto.Message.to_dict(obj)
            return obj
    
        return dict((k, convert(v)) for k, v in data)
    
    
    def _create_entry(config: Dict[str, str], entry_type: EntryType):
        """Creates an entry based on a Dataplex library."""
        entry = dataplex_v1.Entry()
        entry.name = nb.create_name(config, entry_type)
        entry.entry_type = entry_type.value.format(
            project=config["target_project_id"], location=config["target_location_id"]
        )
        entry.fully_qualified_name = nb.create_fqn(config, entry_type)
        entry.parent_entry = nb.create_parent_name(config, entry_type)
    
        aspect_key = nb.create_entry_aspect_name(config, entry_type)
    
        # Add mandatory aspect
        entry_aspect = dataplex_v1.Aspect()
        entry_aspect.aspect_type = aspect_key
        entry_aspect.data = {}
        entry.aspects[aspect_key] = entry_aspect
    
        return entry
    
    
    def _entry_to_import_item(entry: dataplex_v1.Entry):
        """Packs entry to import item, accepted by the API,"""
        import_item = ImportItem()
        import_item.entry = entry
        import_item.aspect_keys = list(entry.aspects.keys())
        import_item.update_mask = "aspects"
    
        return import_item
    
    
    def create(config, entry_type: EntryType):
        """Creates an entry, packs it to Import Item and converts to json."""
        import_item = _entry_to_import_item(_create_entry(config, entry_type))
        return json.dumps(dataclasses.asdict(import_item, dict_factory=_dict_factory))
    
  11. Aktualisieren Sie die Datei src/bootstrap.py mit Code, um generieren die Metadaten-Importdatei und führen den Connector aus.

    """The entrypoint of a pipeline."""
    from typing import Dict
    
    from src.constants import EntryType
    from src import cmd_reader
    from src import secret_manager
    from src import entry_builder
    from src import gcs_uploader
    from src import top_entry_builder
    from src.oracle_connector import OracleConnector
    
    
    FILENAME = "output.jsonl"
    
    
    def write_jsonl(output_file, json_strings):
        """Writes a list of string to the file in JSONL format."""
    
        # For simplicity, dataset is written into the one file. But it is not
        # mandatory, and the order doesn't matter for Import API.
        # The PySpark itself could dump entries into many smaller JSONL files.
        # Due to performance, it's recommended to dump to many smaller files.
        for string in json_strings:
            output_file.write(string + "\n")
    
    
    def process_dataset(
        connector: OracleConnector,
        config: Dict[str, str],
        schema_name: str,
        entry_type: EntryType,
    ):
        """Builds dataset and converts it to jsonl."""
        df_raw = connector.get_dataset(schema_name, entry_type)
        df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
        return df.toJSON().collect()
    
    
    def run():
        """Runs a pipeline."""
        config = cmd_reader.read_args()
        config["password"] = secret_manager.get_password(config["password_secret"])
        connector = OracleConnector(config)
    
        with open(FILENAME, "w", encoding="utf-8") as file:
            # Write top entries that don't require connection to the database
            file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
            file.writelines("\n")
            file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
    
            # Get schemas, write them and collect to the list
            df_raw_schemas = connector.get_db_schemas()
            schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
            schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
    
            write_jsonl(file, schemas_json)
    
            # Ingest tables and views for every schema in a list
            for schema in schemas:
                print(f"Processing tables for {schema}")
                tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                write_jsonl(file, tables_json)
                print(f"Processing views for {schema}")
                views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                write_jsonl(file, views_json)
    
        gcs_uploader.upload(config, FILENAME)
    
  12. Führen Sie den Code lokal aus.

    Es wird eine Metadatenimportdatei mit dem Namen output.jsonl zurückgegeben. Die Datei enthält zwei Zeilen, die jeweils ein Importelement darstellen. Die Pipeline für verwaltete Verbindungen liest diese Datei beim Ausführen des Metadatenimportjobs.

  13. Optional: Erweitern Sie das vorherige Beispiel, um Dataplex zu verwenden. Clientbibliotheksklassen verwenden, um Importelemente für Tabellen, Schemas und Ansichten zu erstellen. Sie können das Python-Beispiel auch in Dataproc Serverless ausführen.

    Wir empfehlen, einen Connector zu erstellen, der Spark verwendet und auf Dataproc Serverless ausgeführt wird, da sich so die Leistung des Connectors verbessern lässt.

PySpark-Connector erstellen

Dieses Beispiel basiert auf der PySpark DataFrame API. Sie können PySpark SQL installieren und lokal ausführen, auf Dataproc Serverless ausgeführt. Wenn Sie PySpark installieren und ausführen installieren Sie die PySpark-Bibliothek mithilfe von pip. einem lokalen Spark-Cluster.

Aus Leistungsgründen werden in diesem Beispiel keine vordefinierten Klassen aus der PySpark-Bibliothek verwendet. Stattdessen werden im Beispiel DataFrames erstellt, in JSON-Einträge konvertiert und dann in eine Metadatenimportdatei im JSON-Format geschrieben, die in Dataplex importiert werden kann.

So erstellen Sie einen Connector mit PySpark:

  1. Klonen Sie cloud-dataplex-Repository.

  2. Installieren Sie PySpark:

    pip install pyspark
    
  3. Installationsvoraussetzungen:

    pip install -r requirements.txt
    

    Folgende Voraussetzungen müssen installiert sein:

    google-cloud-dataplex==2.2.2
    google-cloud-storage
    google-cloud-secret-manager
    
  4. Aktualisieren Sie die Datei oracle_connector.py mit Code, um Daten aus einer Oracle-Datenquelle zu lesen und DataFrames zurückzugeben.

    """Reads Oracle using PySpark."""
    from typing import Dict
    from pyspark.sql import SparkSession, DataFrame
    
    from src.constants import EntryType
    
    
    SPARK_JAR_PATH = "/opt/spark/jars/ojdbc11.jar"
    
    
    class OracleConnector:
        """Reads data from Oracle and returns Spark Dataframes."""
    
        def __init__(self, config: Dict[str, str]):
            # PySpark entrypoint
            self._spark = SparkSession.builder.appName("OracleIngestor") \
                .config("spark.jars", SPARK_JAR_PATH) \
                .getOrCreate()
    
            self._config = config
            self._url = f"jdbc:oracle:thin:@{config['host_port']}:{config['database']}"
    
        def _execute(self, query: str) -> DataFrame:
            """A generic method to execute any query."""
            return self._spark.read.format("jdbc") \
                .option("driver", "oracle.jdbc.OracleDriver") \
                .option("url", self._url) \
                .option("query", query) \
                .option("user", self._config["user"]) \
                .option("password", self._config["password"]) \
                .load()
    
        def get_db_schemas(self) -> DataFrame:
            """In Oracle, schemas are usernames."""
            query = "SELECT username FROM dba_users"
            return self._execute(query)
    
        def _get_columns(self, schema_name: str, object_type: str) -> str:
            """Gets a list of columns in tables or views in a batch."""
            # Every line here is a column that belongs to the table or to the view.
            # This SQL gets data from ALL the tables in a given schema.
            return (f"SELECT col.TABLE_NAME, col.COLUMN_NAME, "
                    f"col.DATA_TYPE, col.NULLABLE "
                    f"FROM all_tab_columns col "
                    f"INNER JOIN DBA_OBJECTS tab "
                    f"ON tab.OBJECT_NAME = col.TABLE_NAME "
                    f"WHERE tab.OWNER = '{schema_name}' "
                    f"AND tab.OBJECT_TYPE = '{object_type}'")
    
        def get_dataset(self, schema_name: str, entry_type: EntryType):
            """Gets data for a table or a view."""
            # Dataset means that these entities can contain end user data.
            short_type = entry_type.name  # table or view, or the title of enum value
            query = self._get_columns(schema_name, short_type)
            return self._execute(query)
    

    Fügen Sie SQL-Abfragen hinzu, um die Metadaten zurückzugeben, die Sie importieren möchten. Die Abfragen die folgenden Informationen zurückgeben müssen:

    • Datenbankschemas
    • Tabellen, die zu diesen Schemas gehören
    • Spalten, die zu diesen Tabellen gehören, einschließlich Spaltenname, Spaltendatentyp und Angabe, ob die Spalte zulässig ist oder erforderlich ist

    Alle Spalten aller Tabellen und Ansichten werden im selben Systemtabelle. Sie können Spalten mit der Methode _get_columns auswählen. Je nach den angegebenen Parametern können Sie Spalten für die Tabellen oder für die Ansichten separat auswählen.

    Wichtige Hinweise:

    • In Oracle ist ein Datenbankschema einem Datenbanknutzer zugewiesen und hat denselben Namen wie dieser Nutzer.
    • Schemaobjekte sind logische Strukturen, die von Nutzern erstellt werden. Objekte wie Tabellen oder Indexe können Daten enthalten, während Objekte wie Ansichten oder Synonyme nur aus einer Definition bestehen.
    • Die Datei ojdbc11.jar enthält die Oracle-JDBC-Treiber.
  5. Aktualisieren Sie die Datei src/entry_builder.py mit freigegebenen Methoden zum Anwenden von Spark-Transformationen.

    """Creates entries with PySpark."""
    import pyspark.sql.functions as F
    from pyspark.sql.types import StringType
    
    from src.constants import EntryType, SOURCE_TYPE
    from src import name_builder as nb
    
    
    @F.udf(returnType=StringType())
    def choose_metadata_type_udf(data_type: str):
        """Choose the metadata type based on Oracle native type."""
        if data_type.startswith("NUMBER") or data_type in ["FLOAT", "LONG"]:
            return "NUMBER"
        if data_type.startswith("VARCHAR") or data_type.startswith("NVARCHAR2"):
            return "STRING"
        if data_type == "DATE":
            return "DATETIME"
        return "OTHER"
    
    
    def create_entry_source(column):
        """Create Entry Source segment."""
        return F.named_struct(F.lit("display_name"),
                              column,
                              F.lit("system"),
                              F.lit(SOURCE_TYPE))
    
    
    def create_entry_aspect(entry_aspect_name):
        """Create aspect with general information (usually it is empty)."""
        return F.create_map(
            F.lit(entry_aspect_name),
            F.named_struct(
                F.lit("aspect_type"),
                F.lit(entry_aspect_name),
                F.lit("data"),
                F.create_map()
                )
            )
    
    
    def convert_to_import_items(df, aspect_keys):
        """Convert entries to import items."""
        entry_columns = ["name", "fully_qualified_name", "parent_entry",
                         "entry_source", "aspects", "entry_type"]
    
        # Puts entry to "entry" key, a list of keys from aspects in "aspects_keys"
        # and "aspects" string in "update_mask"
        return df.withColumn("entry", F.struct(entry_columns)) \
          .withColumn("aspect_keys", F.array([F.lit(key) for key in aspect_keys])) \
          .withColumn("update_mask", F.array(F.lit("aspects"))) \
          .drop(*entry_columns)
    
    
    def build_schemas(config, df_raw_schemas):
        """Create a dataframe with database schemas from the list of usernames.
        Args:
            df_raw_schemas - a dataframe with only one column called USERNAME
        Returns:
            A dataframe with Dataplex-readable schemas.
        """
        entry_type = EntryType.DB_SCHEMA
        entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
    
        # For schema, parent name is the name of the database
        parent_name =  nb.create_parent_name(config, entry_type)
    
        # Create user-defined function.
        create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, x),
                                StringType())
        create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, x),
                               StringType())
    
        # Fills the missed project and location into the entry type string
        full_entry_type = entry_type.value.format(
            project=config["target_project_id"],
            location=config["target_location_id"])
    
        # Converts a list of schema names to the Dataplex-compatible form
        column = F.col("USERNAME")
        df = df_raw_schemas.withColumn("name", create_name_udf(column)) \
          .withColumn("fully_qualified_name", create_fqn_udf(column)) \
          .withColumn("parent_entry", F.lit(parent_name)) \
          .withColumn("entry_type", F.lit(full_entry_type)) \
          .withColumn("entry_source", create_entry_source(column)) \
          .withColumn("aspects", create_entry_aspect(entry_aspect_name)) \
        .drop(column)
    
        df = convert_to_import_items(df, [entry_aspect_name])
        return df
    
    
    def build_dataset(config, df_raw, db_schema, entry_type):
        """Build table entries from a flat list of columns.
        Args:
            df_raw - a plain dataframe with TABLE_NAME, COLUMN_NAME, DATA_TYPE,
                     and NULLABLE columns
            db_schema - parent database schema
            entry_type - entry type: table or view
        Returns:
            A dataframe with Dataplex-readable data of tables of views.
        """
        schema_key = "dataplex-types.global.schema"
    
        # The transformation below does the following
        # 1. Alters NULLABLE content from Y/N to NULLABLE/REQUIRED
        # 2. Renames NULLABLE to mode
        # 3. Renames DATA_TYPE to dataType
        # 4. Creates metadataType column based on dataType column
        # 5. Renames COLUMN_NAME to name
        df = df_raw \
          .withColumn("mode", F.when(F.col("NULLABLE") == 'Y', "NULLABLE").otherwise("REQUIRED")) \
          .drop("NULLABLE") \
          .withColumnRenamed("DATA_TYPE", "dataType") \
          .withColumn("metadataType", choose_metadata_type_udf("dataType")) \
          .withColumnRenamed("COLUMN_NAME", "name")
    
        # The transformation below aggregate fields, denormalizing the table
        # TABLE_NAME becomes top-level filed, and the rest is put into
        # the array type called "fields"
        aspect_columns = ["name", "mode", "dataType", "metadataType"]
        df = df.withColumn("columns", F.struct(aspect_columns))\
          .groupby('TABLE_NAME') \
          .agg(F.collect_list("columns").alias("fields"))
    
        # Create nested structured called aspects.
        # Fields are becoming a part of a `schema` struct
        # There is also an entry_aspect that is repeats entry_type as aspect_type
        entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
        df = df.withColumn("schema",
                           F.create_map(F.lit(schema_key),
                                        F.named_struct(
                                            F.lit("aspect_type"),
                                            F.lit(schema_key),
                                            F.lit("data"),
                                            F.create_map(F.lit("fields"),
                                                         F.col("fields")))
                                        )
                           )\
          .withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \
        .drop("fields")
    
        # Merge separate aspect columns into the one map called 'aspects'
        df = df.select(F.col("TABLE_NAME"),
                       F.map_concat("schema", "entry_aspect").alias("aspects"))
    
        # Define user-defined functions to fill the general information
        # and hierarchy names
        create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type,
                                                         db_schema, x),
                                StringType())
    
        create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type,
                                                       db_schema, x), StringType())
    
        parent_name = nb.create_parent_name(entry_type, db_schema)
        full_entry_type = entry_type.value.format(
            project=config["target_project_id"],
            location=config["target_location_id"])
    
        # Fill the top-level fields
        column = F.col("TABLE_NAME")
        df = df.withColumn("name", create_name_udf(column)) \
          .withColumn("fully_qualified_name", create_fqn_udf(column)) \
          .withColumn("entry_type", F.lit(full_entry_type)) \
          .withColumn("parent_entry", F.lit(parent_name)) \
          .withColumn("entry_source", create_entry_source(column)) \
        .drop(column)
    
        df = convert_to_import_items(df, [schema_key, entry_aspect_name])
        return df
    

    Wichtige Hinweise:

    • Mit den Methoden werden die Dataplex Catalog-Ressourcen erstellt, die der Connector für Ihre Oracle-Ressourcen erstellt. Verwenden Sie die Konventionen, die im Abschnitt Beispiel für Dataplex Catalog-Ressourcen für eine Oracle-Quelle dieses Dokuments beschrieben sind.
    • Die Methode convert_to_import_items gilt für Schemas, Tabellen und Ansichten. Stellen Sie sicher, dass die Ausgabe des Connectors ein oder mehrere Importelemente ist, die verarbeitet werden können, metadataJobs.create-Methode, und nicht für einzelne Einträge.
    • Auch in einer Ansicht wird die Spalte TABLE_NAME genannt.
  6. Aktualisieren Sie die Datei bootstrap.py mit Code, um generieren die Metadaten-Importdatei und führen den Connector aus.

    """The entrypoint of a pipeline."""
    from typing import Dict
    
    from src.constants import EntryType
    from src import cmd_reader
    from src import secret_manager
    from src import entry_builder
    from src import gcs_uploader
    from src import top_entry_builder
    from src.oracle_connector import OracleConnector
    
    
    FILENAME = "output.jsonl"
    
    
    def write_jsonl(output_file, json_strings):
        """Writes a list of string to the file in JSONL format."""
    
        # For simplicity, dataset is written into the one file. But it is not
        # mandatory, and the order doesn't matter for Import API.
        # The PySpark itself could dump entries into many smaller JSONL files.
        # Due to performance, it's recommended to dump to many smaller files.
        for string in json_strings:
            output_file.write(string + "\n")
    
    
    def process_dataset(
        connector: OracleConnector,
        config: Dict[str, str],
        schema_name: str,
        entry_type: EntryType,
    ):
        """Builds dataset and converts it to jsonl."""
        df_raw = connector.get_dataset(schema_name, entry_type)
        df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
        return df.toJSON().collect()
    
    
    def run():
        """Runs a pipeline."""
        config = cmd_reader.read_args()
        config["password"] = secret_manager.get_password(config["password_secret"])
        connector = OracleConnector(config)
    
        with open(FILENAME, "w", encoding="utf-8") as file:
            # Write top entries that don't require connection to the database
            file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
            file.writelines("\n")
            file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
    
            # Get schemas, write them and collect to the list
            df_raw_schemas = connector.get_db_schemas()
            schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
            schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
    
            write_jsonl(file, schemas_json)
    
            # Ingest tables and views for every schema in a list
            for schema in schemas:
                print(f"Processing tables for {schema}")
                tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                write_jsonl(file, tables_json)
                print(f"Processing views for {schema}")
                views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                write_jsonl(file, views_json)
    
        gcs_uploader.upload(config, FILENAME)
    

    In diesem Beispiel wird die Metadatenimportdatei als einzelne JSON-Lines-Datei gespeichert. Ich können Sie mit PySpark-Tools wie der Klasse DataFrameWriter Batches von JSON parallel.

    Der Connector kann Einträge in beliebiger Reihenfolge in die Metadatenimportdatei schreiben.

  7. Aktualisieren Sie die Datei gcs_uploader.py mit Code, um den Metadatenimport hochzuladen in einen Cloud Storage-Bucket speichern.

    """Sends files to GCP storage."""
    from typing import Dict
    from google.cloud import storage
    
    
    def upload(config: Dict[str, str], filename: str):
        """Uploads a file to GCP bucket."""
        client = storage.Client()
        bucket = client.get_bucket(config["output_bucket"])
        folder = config["output_folder"]
    
        blob = bucket.blob(f"{folder}/{filename}")
        blob.upload_from_filename(filename)
    
  8. Erstellen Sie das Connector-Image.

    Wenn Ihr Connector mehrere Dateien enthält oder wenn Sie Bibliotheken, die nicht im standardmäßigen Docker-Image enthalten sind, müssen Sie einen benutzerdefinierten Container. Mit Dataproc Serverless for Spark werden Arbeitslasten in Docker-Containern ausgeführt. Erstellen Sie ein benutzerdefiniertes Docker-Image des Connectors und speichern Sie das in Artifact Registry speichern. Dataproc Serverless liest das Image aus Artifact Registry.

    1. Erstellen Sie ein Dockerfile:

      FROM debian:11-slim
      
      ENV DEBIAN_FRONTEND=noninteractive
      
      RUN apt update && apt install -y procps tini
      RUN apt install -y wget
      
      ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
      RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
      COPY ojdbc11.jar "${SPARK_EXTRA_JARS_DIR}"
      
      ENV CONDA_HOME=/opt/miniconda3
      ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
      ENV PATH=${CONDA_HOME}/bin:${PATH}
      RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py310_23.3.1-0-Linux-x86_64.sh
      
      RUN bash Miniconda3-py310_23.3.1-0-Linux-x86_64.sh -b -p /opt/miniconda3 \
        && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
        && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
        && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
        && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
      
      RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
          && ${CONDA_HOME}/bin/mamba install \
            conda \
            google-cloud-dataproc \
            google-cloud-logging \
            google-cloud-monitoring \
            google-cloud-storage
      
      RUN apt update && apt install -y git
      COPY requirements.txt .
      RUN python -m pip install -r requirements.txt
      
      ENV PYTHONPATH=/opt/python/packages
      RUN mkdir -p "${PYTHONPATH}/src/"
      COPY src/ "${PYTHONPATH}/src/"
      COPY main.py .
      
      RUN groupadd -g 1099 spark
      RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
      USER spark

      Verwenden Sie Conda als Paketmanager. Bei Dataproc Serverless for Spark wird pyspark zur Laufzeit im Container bereitgestellt. Sie müssen also keine PySpark-Abhängigkeiten in Ihrem benutzerdefinierten Container-Image installieren.

    2. Erstellen Sie das benutzerdefinierte Container-Image und übertragen Sie es per Push in Artifact Registry.

      #!/bin/bash
      
      IMAGE=oracle-pyspark:0.0.1
      PROJECT=qc-cloudsql-connector-devproj
      
      
      REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pyspark
      
      docker build -t "${IMAGE}" .
      
      # Tag and push to GCP container registry
      gcloud config set project ${PROJECT}
      gcloud auth configure-docker us-central1-docker.pkg.dev
      docker tag "${IMAGE}" "${REPO_IMAGE}"
      docker push "${REPO_IMAGE}"

      Da ein Image mehrere Namen haben kann, können Sie das Docker-Tag für folgende Aktionen verwenden: Weisen Sie dem Bild einen Alias zu.

  9. Führen Sie den Connector auf Dataproc Serverless aus. Führen Sie den Befehl gcloud dataproc batches submit pyspark-Befehl

    gcloud dataproc batches submit pyspark main.py --project=PROJECT \
        --region=REGION --batch=BATCH_ID \
        --container-image=CUSTOM_CONTAINER_IMAGE \
        --service-account=SERVICE_ACCOUNT_NAME \
        --jars=PATH_TO_JAR_FILES \
        --properties=PYSPARK_PROPERTIES \
        -- PIPELINE_ARGUMENTS
    

    Wichtige Hinweise:

    • Die JAR-Dateien sind Treiber für Spark. Um aus Oracle, MySQL oder Postgres müssen Sie Apache Spark ein bestimmtes Paket bereitstellen. Das Paket können sich in Cloud Storage oder im Container befinden. Wenn sich die JAR-Datei im Container befindet, sieht der Pfad in etwa so aus: file:///path/to/file/driver.jar. In diesem Beispiel lautet der Pfad zur JAR-Datei /opt/spark/jars/.
    • PIPELINE_ARGUMENTS sind die Befehlszeilenargumente für den Connector.

    Der Connector extrahiert Metadaten aus der Oracle-Datenbank, generiert eine Metadatenimportdatei und speichert sie in einem Cloud Storage-Bucket.

  10. Um die Metadaten aus der Metadaten-Importdatei manuell in Dataplex, führen Sie einen Metadatenjob aus. Verwenden Sie die Methode metadataJobs.create:

    1. Fügen Sie in der Befehlszeile Umgebungsvariablen hinzu und erstellen Sie einen Alias für den Befehl „curl“.

      PROJECT_ID=PROJECT
      LOCATION_ID=LOCATION
      DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID
      alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
      
    2. Rufen Sie die API-Methode auf und übergeben Sie die Datensatz- und Aspekttypen, die Sie importieren möchten.

      gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF
      {
        "type": "IMPORT",
        "import_spec": {
          "source_storage_uri": "gs://BUCKET/FOLDER/",
          "entry_sync_mode": "FULL",
          "aspect_sync_mode": "INCREMENTAL",
          "scope": {
            "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"],
            "entry_types": [
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"],
      
            "aspect_types": [
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance",
              "projects/dataplex-types/locations/global/aspectTypes/schema",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"],
            },
          },
        }
      EOF
      )"
      

      Der Aspekttyp schema ist ein globaler Aspekttyp, der durch Dataplex.

      Beachten Sie, dass das Format, das Sie für Namen der Aspekttypen beim Aufrufen der Die API-Methode unterscheidet sich vom Format, das Sie im Connector verwenden. Code.

    3. Optional: Verwenden Sie Cloud Logging, um Logs für den Metadatenjob anzusehen. Weitere Informationen finden Sie unter Dataplex-Logs überwachen.

Pipelineorchestrierung einrichten

In den vorherigen Abschnitten wurde gezeigt, wie Sie einen Beispiel-Connector erstellen und manuell ausführen.

In einer Produktionsumgebung führen Sie den Connector als Teil einer verwalteten Konnektivitätspipeline mit einer Orchestrierungsplattform wie Workflows aus.

  1. Um eine verwaltete Verbindungspipeline mit dem Beispiel-Connector auszuführen, Schritte zum Importieren von Metadaten mit Workflows. Gehen Sie so vor:

    • Erstellen Sie den Workflow am selben Google Cloud-Standort wie den Connector.
    • Aktualisieren Sie in der Workflowdefinition die Funktion submit_pyspark_extract_job mit dem folgenden Code, um Daten mithilfe des von Ihnen erstellten Connectors aus der Oracle-Datenbank zu extrahieren.

      - submit_pyspark_extract_job:
          call: http.post
          args:
            url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
            auth:
              type: OAuth2
              scopes: "https://www.googleapis.com/auth/cloud-platform"
            headers:
              Content-Type: "application/json"
            query:
              batchId: ${WORKFLOW_ID}
            body:
              pysparkBatch:
                mainPythonFileUri: file:///main.py
                jars: file:///opt/spark/jars/ojdbc11.jar
                args:
                  - ${"--host_port=" + args.ORACLE_HOST_PORT}
                  - ${"--user=" + args.ORACLE_USER}
                  - ${"--password=" + args.ORACLE_PASSWORD}
                  - ${"--database=" + args.ORACE_DATABASE}
                  - ${"--project=" + args.TARGET_PROJECT_ID}
                  - ${"--location=" + args.CLOUD_REGION}
                  - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID}
                  - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                  - ${"--folder=" + WORKFLOW_ID}
              runtimeConfig:
                version: "2.0"
                containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark"
              environmentConfig:
                executionConfig:
                    serviceAccount: ${args.SERVICE_ACCOUNT}
          result: RESPONSE_MESSAGE
      
    • Aktualisieren Sie in der Workflowdefinitionsdatei die Funktion submit_import_job. durch den folgenden Code, um die Einträge zu importieren. Die Funktion ruft die API-Methode metadataJobs.create auf, um einen Metadatenimportjob auszuführen.

      - submit_import_job:
          call: http.post
          args:
            url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
            auth:
              type: OAuth2
              scopes: "https://www.googleapis.com/auth/cloud-platform"
            body:
              type: IMPORT
              import_spec:
                source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                entry_sync_mode: FULL
                aspect_sync_mode: INCREMENTAL
                scope:
                  entry_groups:
                    - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}
                  entry_types:
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"
                  aspect_types:
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance"
                    -"projects/dataplex-types/locations/global/aspectTypes/schema"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"
          result: IMPORT_JOB_RESPONSE
      

      Geben Sie dieselben Eintrags- und Aspekttypen an, die Sie beim manuellen Aufrufen der API-Methode angegeben haben. Am Ende steht kein Komma. jeder Zeichenfolge.

    • Geben Sie beim Ausführen des Workflows die folgenden Laufzeitargumente an:

      {
        "CLOUD_REGION": "us-central1",
        "ORACLE_USER": "system",
        "ORACLE_HOST_PORT": "x.x.x.x:1521",
        "ORACLE_DATABASE": "xe",
        "ADDITIONAL_CONNECTOR_ARGS": [],
      }
      
  2. Optional: Verwenden Sie Cloud Logging, um Logs für die Pipeline für die verwaltete Konnektivität aufzurufen. Die Log-Nutzlast enthält einen Link zu den Logs für den den serverlosen Batchjob von Dataproc und den Metadatenimportjob. relevant sein. Weitere Informationen finden Sie unter Workflow-Logs ansehen.

  3. Optional: Zur Verbesserung der Sicherheit, Leistung und Funktionalität Ihres verwaltete Verbindungspipeline haben, sollten Sie Folgendes tun:

    1. Verwenden Sie Secret Manager, um die Anmeldedaten für die Drittanbieterdatenquelle zu speichern.
    2. Mit PySpark die JSON-Zeilenausgabe in mehrere Metadatenimporte schreiben .
    3. Verwenden Sie ein Präfix, um große Dateien (mehr als 100 MB) in kleinere Dateien aufzuteilen.
    4. Fügen Sie weitere benutzerdefinierte Aspekte hinzu, mit denen zusätzliche geschäftliche und technische Metadaten aus Ihrer Quelle erfasst werden.

Dataplex Catalog-Beispielressourcen für eine Oracle-Quelle

Mit dem Beispiel-Connector werden Metadaten aus einer Oracle-Datenbank extrahiert und Metadaten zu den entsprechenden Dataplex Catalog-Ressourcen.

Hinweise zur Hierarchie

Jedes System in Dataplex hat einen Stammeintrag, der der übergeordnete Eintrag für das System ist. Normalerweise hat der Stammeintrag den Eintragstyp instance. Die folgende Tabelle zeigt die Beispielhierarchie von Eintragstypen und Aspekttypen für ein Oracle-System.

Eintragstyp-ID Beschreibung ID des verknüpften Aspekttyps
oracle-instance Das Stammverzeichnis des importierten Systems. oracle-instance
oracle-database Die Oracle-Datenbank. oracle-database
oracle-schema Das Datenbankschema. oracle-schema
oracle-table Eine Tabelle.

oracle-table

schema

oracle-view Eine Ansicht.

oracle-view

schema

Der Aspekttyp schema ist ein globaler Aspekttyp, der von Dataplex definiert wird. Es enthält eine Beschreibung der Felder in einer Tabelle, oder eine andere Entität mit Spalten. Der benutzerdefinierte Aspekttyp oracle-schema enthält den Namen des Oracle-Datenbankschemas.

Beispiel für Importelementfelder

Der Connector sollte die folgenden Konventionen für Oracle-Ressourcen verwenden.

  • Voll qualifizierte Namen: voll qualifizierte Namen für Oracle Ressourcen verwenden die folgende Namensvorlage. Unzulässige Zeichen sind mit Gravis entkommen.

    Ressource Vorlage Beispiel
    Instanz

    SOURCE: ADDRESS

    Verwenden Sie die Host- und Portnummer oder den Domainnamen des Systems.

    oracle:`localhost:1521` oder oracle:`myinstance.com`
    Datenbank SOURCE:ADDRESS.DATABASE oracle:`localhost:1521`.xe
    Schema SOURCE:ADDRESS,DATABASE,SCHEMA oracle:`localhost:1521`.xe.sys
    Tabelle SOURCE:ADDRESS.DATABASE.SCHEMA.TABLE_NAME oracle:`localhost:1521`.xe.sys.orders
    Ansehen SOURCE:ADDRESS.DATABASE.SCHEMA.VIEW_NAME oracle:`localhost:1521`.xe.sys.orders_view
  • Eintragsnamen oder -IDs: Für Einträge für Oracle-Ressourcen wird die folgende Benennungsvorlage verwendet. Unzulässige Zeichen werden durch ein zulässige Zeichen. Ressourcen verwenden das Präfix projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries.

    Ressource Vorlage Beispiel
    Instanz PREFIX/HOST_PORT projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
    Datenbank PREFIX/HOST_PORT/databases/DATABASE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
    Schema PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
    Tabelle PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
    Ansehen PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEW projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
  • Übergeordnete Einträge: Wenn ein Eintrag kein Stammeintrag für das System ist, wird der -Eintrag kann ein übergeordnetes Eingabefeld haben, das seine Position im Hierarchie. Das Feld sollte den Namen des übergeordneten Eintrags enthalten. Mi. empfehlen, diesen Wert zu generieren.

    Die folgende Tabelle enthält die übergeordneten Einträge für Oracle-Ressourcen.

    Eintrag Übergeordneter Eintrag
    Instanz "" (leerer String)
    Datenbank Instanzname
    Schema Datenbankname
    Tabelle Schemaname
    Ansehen Schemaname
  • Aspect Map: Die Aspektkarte muss mindestens einen Aspekt enthalten. der die zu importierende Entität beschreibt. Hier ist ein Beispiel für eine Oracle-Tabelle.

    "example-project.us-central1.oracle-table": {
        "aspect_type": "example-project.us-central1.oracle-table",
        "path": "",
        "data": {}
     },

    Es gibt vordefinierte Aspekttypen (z. B. schema), die die Tabellen- oder Ansichtsstruktur im dataplex-types Projekt an dem Standort global.

  • Aspektschlüssel: Aspektschlüssel verwenden das Benennungsformat PROJECT.LOCATION.ASPECT_TYPE. Die folgende Tabelle enthält Beispielaspektschlüssel für Oracle-Ressourcen.

    Eintrag Beispiel für Aspektschlüssel
    Instanz example-project.us-central1.oracle-instance
    Datenbank example-project.us-central1.oracle-database
    Schema example-project.us-central1.oracle-schema
    Tabelle example-project.us-central1.oracle-table
    Ansehen example-project.us-central1.oracle-view

Nächste Schritte