Benutzerdefinierten Connector für den Metadatenimport entwickeln

In diesem Dokument finden Sie eine Referenzvorlage für die Erstellung eines benutzerdefinierten Connectors, mit dem Metadaten aus einer Drittanbieterquelle extrahiert werden. Sie verwenden den Connector, wenn Sie eine verwaltete Verbindungspipeline ausführen, die Metadaten in Dataplex importiert.

Sie können Connectors erstellen, um Metadaten aus Drittanbieterquellen zu extrahieren. Sie können beispielsweise einen Connector erstellen, um Daten aus Quellen wie MySQL, SQL Server, Oracle, Snowflake und Databricks zu extrahieren.

Verwenden Sie den Beispiel-Connector in diesem Dokument als Ausgangspunkt, um einen eigenen Connector zu erstellen. Connectors. Der Beispiel-Connector stellt eine Verbindung zu einer Oracle Database Express Edition (XE)-Datenbank her. Der Connector ist in Python geschrieben, Sie können aber auch Java, Scala oder R verwenden.

Funktionsweise von Connectors

Ein Connector extrahiert Metadaten aus einer Drittanbieterdatenquelle, wandelt sie in das Dataplex-ImportItem-Format um und generiert Metadatenimportdateien, die von Dataplex importiert werden können.

Der Connector ist Teil einer verwalteten Verbindungspipeline. Eine verwaltete Verbindungspipeline ist ein orchestrierter Workflow, mit dem Sie Dataplex Catalog-Metadaten. Die verwaltete Konnektivitätspipeline führt den Connector aus und führt andere Aufgaben im Importworkflow aus, z. B. einen Metadatenimportjob ausführen und Protokolle erfassen.

Die verwaltete Konnektivitätspipeline führt den Connector mit einem Dataproc Serverless-Batchjob aus. Dataproc Serverless bietet eine serverlose Spark-Ausführungsumgebung. Sie können zwar einen Connector erstellen, der Spark nicht verwendet, wir empfehlen jedoch die Verwendung von Spark, da sich dadurch die Leistung des Connectors verbessern lässt.

Anforderungen an den Anschluss

Für den Connector gelten die folgenden Anforderungen:

  • Der Connector muss ein Artifact Registry-Image sein, das ausgeführt werden kann Dataproc Serverless
  • Der Connector muss Metadatendateien in einem Format generieren, das von einem Dataplex-Metadatenimportjob (metadataJobs.create API-Methode) importiert werden kann. Ausführliche Anforderungen finden Sie unter Metadatenimportdatei.
  • Der Connector muss die folgenden Befehlszeilenargumente akzeptieren, um Informationen aus der Pipeline:

    Befehlszeilenargument Wert der Pipeline
    target_project_id PROJECT_ID
    target_location_id REGION
    target_entry_group_id ENTRY_GROUP_ID
    output_bucket CLOUD_STORAGE_BUCKET_ID
    output_folder FOLDER_ID

    Der Connector verwendet diese Argumente, um Metadaten in einer Zieleintragsgruppeprojects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID zu generieren und in einen Cloud Storage-Bucketgs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID zu schreiben. Bei jeder Ausführung der Pipeline wird im Bucket CLOUD_STORAGE_BUCKET_ID ein neuer Ordner FOLDER_ID erstellt. Der Connector sollte Folgendes schreiben: Metadaten-Importdateien in diesen Ordner.

Die Pipelinevorlagen unterstützen PySpark-Connectors. Bei den Vorlagen wird davon ausgegangen, dass der Treiber (mainPythonFileUri) eine lokale Datei auf dem Connector-Image mit dem Namen main.py ist. Sie können die Pipelinevorlagen für andere Szenarien, etwa einen Spark-Connector, Treiber-URI oder andere Optionen.

So erstellen Sie mit PySpark ein Importelement in der Metadatenimportdatei:

"""PySpark schemas for the data."""
entry_source_schema = StructType([
      StructField("display_name", StringType()),
      StructField("source", StringType())])

aspect_schema = MapType(StringType(),
                        StructType([
                            StructField("aspect_type", StringType()),
                            StructField("data", StructType([
                            ]))
                          ])
                        )

entry_schema = StructType([
  StructField("name", StringType()),
  StructField("entry_type", StringType()),
  StructField("fully_qualified_name", StringType()),
  StructField("parent_entry", StringType()),
  StructField("entry_source", entry_source_schema),
  StructField("aspects", aspect_schema)
])

import_item_schema = StructType([
  StructField("entry", entry_schema),
  StructField("aspect_keys", ArrayType(StringType())),
  StructField("update_mask", ArrayType(StringType()))
])

Hinweis

In dieser Anleitung wird davon ausgegangen, dass Sie mit Python und PySpark vertraut sind.

Überprüfen Sie die folgenden Informationen:

Gehen Sie dazu so vor: Alle Ressourcen am selben Google Cloud-Speicherort erstellen

  1. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  2. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  3. Enable the Dataplex, Dataproc, Workflows, and Artifact Registry APIs:

    gcloud services enable dataplex.googleapis.com dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com
  4. Install the Google Cloud CLI.
  5. To initialize the gcloud CLI, run the following command:

    gcloud init
  6. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  7. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant the roles/owner IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
  8. Erstellen Sie einen Cloud Storage-Bucket, um die Metadatenimportdateien zu speichern.

  9. Erstellen Sie die folgenden Dataplex Catalog-Ressourcen im selben Projekt.

    Beispielwerte finden Sie in der Beispiele für Dataplex Catalog-Ressourcen für eine Oracle-Quelle dieses Dokuments.

    1. Erstellen Sie eine Eintragsgruppe.
    2. Erstellen Sie benutzerdefinierte Aspekttypen für die Einträge, die Sie importieren möchten. Namenskonvention verwenden SOURCEENTITY_TO_IMPORT.

      Optional können Sie zusätzliche Aspekttypen erstellen, um andere Informationen zu speichern.

    3. Erstellen Sie benutzerdefinierte Eintragstypen für die Ressourcen, die Sie importieren möchten, und weisen Sie ihnen die entsprechenden Aspekttypen zu. Verwenden Sie die Namenskonvention SOURCEENTITY_TO_IMPORT.

      Erstellen Sie beispielsweise für eine Oracle-Datenbank einen Eintragstyp mit dem Namen oracle-database Verknüpfen Sie sie mit dem Aspekttyp namens oracle-database.

  10. Achten Sie darauf, dass Sie von Ihrem Google Cloud-Projekt aus auf die Drittanbieterquelle zugreifen können. Weitere Informationen finden Sie unter Dataproc Serverless for Spark-Netzwerkkonfiguration.

Einfachen Python-Connector erstellen

Im Beispiel für einen einfachen Python-Connector werden mithilfe der Klassen der Dataplex-Clientbibliothek Einträge der obersten Ebene für eine Oracle-Datenquelle erstellt. Anschließend geben Sie die Werte für die Eingabefelder an.

Der Connector erstellt eine Metadatenimportdatei mit den folgenden Einträgen:

  • Einen instance-Eintrag mit dem Eintragstyp projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance. Dieser Eintrag stellt ein Oracle Database XE-System dar.
  • Ein database-Eintrag, der eine Datenbank in Oracle Database XE darstellt System.

So erstellen Sie einen einfachen Python-Connector:

  1. Lokale Umgebung einrichten Wir empfehlen die Verwendung einer virtuellen Umgebung.

    mkdir venv
    python -m venv venv/
    source venv/bin/activate
    

    Verwenden Sie die Methode aktiv oder Wartung Versionen von Python. Unterstützt werden Python-Versionen 3.7 und höher.

  2. Erstellen Sie ein Python-Projekt.

  3. Fügen Sie eine requirements.txt-Datei mit folgendem Code hinzu:

    • google-cloud-dataplex
    • google-cloud-storage

    Installationsanforderungen:

    pip install -r requirements.txt
    
  4. Fügen Sie im Stammverzeichnis des Projekts eine main.py-Pipelinedatei hinzu.

    Wenn Sie Ihren Code in Dataproc Serverless bereitstellen, dient die Datei main.py als Ausgangspunkt für die Ausführung. Wir empfehlen, die Menge der in der Datei main.py gespeicherten Informationen; verwenden zum Aufrufen von Funktionen und Klassen, die in Ihrem Connector definiert sind, wie z. B. die Klasse src/bootstap.py.

  5. Erstellen Sie einen Ordner src, um den Großteil der Logik für Ihren Connector zu speichern.

  6. Erstellen Sie eine Datei mit dem Namen src/cmd_reader.py mit einer Python-Klasse, die Befehlszeilenargumente akzeptiert. Dazu können Sie das Modul argeparse verwenden.

    import argparse
    
    def read_args():
        """Reads arguments from the command line."""
        parser = argparse.ArgumentParser()
        parser.add_argument("-p", "--project", type=str, required=True,
                            help="Google Cloud project")
        parser.add_argument("-l", "--location", type=str, required=True,
                            help="Google Cloud location")
        parser.add_argument("-g", "--entry_group", type=str, required=True,
                            help="Dataplex entry group")
        parser.add_argument("--host_port", type=str, required=True,
                            help="Oracle host and port")
        parser.add_argument("--user", type=str, required=True, help="Oracle user")
        parser.add_argument("--password-secret", type=str, required=True,
                            help="An ID in Secret Manager for the Oracle password.")
        parser.add_argument("-d", "--database", type=str, required=True,
                            help="Oracle database")
        parser.add_argument("--bucket", type=str, required=True,
                            help="Cloud Storage bucket")
        parser.add_argument("--folder", type=str, required=True,
                            help="Folder in the bucket")
    
        return vars(parser.parse_known_args()[0])
    

    Die folgenden Argumente sind mindestens erforderlich, um eine Oracle-Datenbank zu lesen:

    • project: das Google Cloud-Projekt, das die Eintragsgruppe, die Eintragstypen und die Aspekttypen enthält.
    • location: der Google Cloud-Standort der Eintragsgruppe. Eintragstypen und Aspekttypen.
    • entry_group: eine vorhandene Eintragsgruppe in Dataplex. Die Die importierten Metadaten beziehen sich auf die Einträge, die zu diesem Eintrag gehören. Gruppe.
    • host_port: Host- und Portnummer für die Oracle-Instanz.
    • user: der Oracle-Nutzer.
    • password-secret: das Passwort für den Oracle-Nutzer. In Produktion sollten Sie das Passwort in einer der Secret Manager:
    • database: die Oracle-Zieldatenbank.
    • bucket: Der Cloud Storage-Bucket, in dem die Metadatenimportdatei gespeichert ist.
    • folder: ein Ordner im Cloud Storage-Bucket, der für Folgendes verwendet wird: separate Metadaten-Importdateien. Bei jeder Workflowausführung wird ein neues Ordner.
  7. Erstellen Sie eine Datei mit dem Namen src/constants.py. Fügen Sie folgenden Code hinzu, um ein Konstanten.

    SOURCE_TYPE = 'oracle'
    
    class EntryType(enum.Enum):
       """Types of Oracle entries."""
       INSTANCE: str = "projects/{project}/locations/{location}/entryTypes/oracle-instance"
       DATABASE: str = "projects/{project}/locations/{location}/entryTypes/oracle-database"
       DB_SCHEMA: str = "projects/{project}/locations/{location}/entryTypes/oracle-schema"
       TABLE: str = "projects/{project}/locations/{location}/entryTypes/oracle-table"
       VIEW: str = "projects/{project}/locations/{location}/entryTypes/oracle-view"
    
  8. Erstellen Sie eine Datei mit dem Namen src/name_builder.py. Schreiben Sie Methoden zum Erstellen der Dataplex Catalog-Ressourcen, die der Connector für Ihre Oracle-Ressourcen erstellen soll. Verwenden Sie die Konventionen, die im Abschnitt Beispiel für Dataplex Catalog-Ressourcen für eine Oracle-Quelle dieses Dokuments beschrieben werden.

    Da die name_builder.py-Datei sowohl für den Python-Kerncode als auch für den PySpark-Kerncode verwendet wird, empfehlen wir, die Methoden als reine Funktionen und nicht als Mitglieder einer Klasse zu schreiben.

  9. Erstellen Sie eine Datei mit dem Namen src/top_level_builder.py. Fügen Sie folgenden Code die Einträge auf oberster Ebene mit Daten zu füllen.

    from src.common import EntryType
    from src import name_builder as nb
    
    @dataclasses.dataclass(slots=True)
    class ImportItem:
        entry: dataplex_v1.Entry = dataclasses.field(
            default_factory=dataplex_v1.Entry)
        aspect_keys: List[str] = dataclasses.field(default_factory=list)
        update_mask: List[str] = dataclasses.field(default_factory=list)
    
    def dict_factory(data: object):
        """Factory function required for converting Entry dataclass to dict."""
        def convert(obj: object):
          if isinstance(obj, proto.Message):
            return proto.Message.to_dict(obj)
          return obj
    
        return dict((k, convert(v)) for k, v in data)
    
    class TopEntryBuilder:
        def __init__(self, config):
            self._config = config
            self._project = config["project"]
            self._location = config["location"]
    
        def create_jsonl_item(self, entry_type: EntryType):
            item = self.entry_to_import_item(self.create_entry(entry_type))
            return self.to_json(item)
    
        def create_entry(self, entry_type: EntryType):
            entry = dataplex_v1.Entry()
            entry.name = nb.create_name(self._config, entry_type)
            entry.entry_type = entry_type.value.format(project=self._project, location=self._location)
            entry.fully_qualified_name = nb.create_fqn(self._config, entry_type)
            entry.parent_entry = nb.create_parent_name(self._config, entry_type)
    
            aspect_key = nb.create_entry_aspect_name(self._config, entry_type)
    
            entry_aspect = dataplex_v1.Aspect()
            entry_aspect.aspect_type = aspect_key
            entry_aspect.data = {}
            entry.aspects[aspect_key] = entry_aspect
    
            return entry
    
        def entry_to_import_item(self, entry: dataplex_v1.Entry):
            import_item = ImportItem()
            import_item.entry = entry
            import_item.aspect_keys = list(entry.aspects.keys())
            import_item.update_mask = "aspects"
    
            return import_item
    
        def to_json(self, import_item: ImportItem):
            return json.dumps(dataclasses.asdict(import_item, dict_factory=dict_factory))
    
        import_item.aspect_keys = list(entry.aspects.keys())
        import_item.update_mask = "aspects"
    
        return import_item
    
      def to_json(self, import_item: ImportItem):
        return json.dumps(dataclasses.asdict(import_item, dict_factory=dict_factory))
    
  10. Erstellen Sie eine Datei mit dem Namen src/bootstrap.py. Fügen Sie den folgenden Code hinzu, generiert die Metadatenimportdatei und führt den Connector aus.

    FILENAME = "output.jsonl"
    
    def write_jsonl(output_file, json_strings):
        """Writes a list of strings to the file in JSONL format."""
        for string in json_strings:
            output_file.write(string + "\n")
    
    def run():
        """Runs a pipeline."""
        config = cmd_reader.read_args()
        with open(FILENAME, "w", encoding="utf-8") as file:
            # Write the top entries
            file.write(top_entry_builder.create(EntryType.INSTANCE))
            file.write(top_entry_builder.create(EntryType.DATABASE))
    
  11. Führen Sie den Code lokal aus.

    Es wird eine Metadatenimportdatei mit dem Namen output.jsonl zurückgegeben. Die Datei enthält zwei , die jeweils ein Importelement darstellen. Die Pipeline für verwaltete Verbindungen liest diese Datei beim Ausführen des Metadatenimportjobs.

  12. Optional: Erweitern Sie das vorherige Beispiel, um Dataplex zu verwenden. Clientbibliotheksklassen verwenden, um Importelemente für Tabellen, Schemas und Ansichten zu erstellen. Sie können das Python-Beispiel auch auf Dataproc Serverless ausführen.

    Wir empfehlen, einen Connector zu erstellen, der Spark verwendet und auf Dataproc Serverless), da es die die Leistung des Connectors.

PySpark-Connector erstellen

Dieses Beispiel basiert auf der PySpark DataFrame API. Sie können PySpark SQL installieren und lokal ausführen, auf Dataproc Serverless ausgeführt. Wenn Sie PySpark lokal installieren und ausführen, installieren Sie die PySpark-Bibliothek mit pip. Sie müssen jedoch keinen lokalen Spark-Cluster installieren.

Aus Leistungsgründen werden in diesem Beispiel keine vordefinierten Klassen aus dem PySpark-Bibliothek. Stattdessen werden im Beispiel DataFrames erstellt, in JSON-Einträge konvertiert und dann in eine Metadatenimportdatei im JSON-Zeichenfolgenformat geschrieben, die in Dataplex importiert werden kann.

So erstellen Sie einen Connector mit PySpark:

  1. Fügen Sie den folgenden Code hinzu, der Daten aus einer Oracle-Datenquelle liest und DataFrames zurückgibt.

    SPARK_JAR_PATH="/opt/spark/jars/ojdbc11.jar"
    
    class OracleConnector:
        """Reads data from Oracle and returns Spark Dataframes."""
    
        def __init__(self, config: Dict[str, str]):
            # Spark entrypoint
            self._spark = SparkSession.builder.appName("OracleIngestor").config(
                "spark.jars", SPARK_JAR_PATH).getOrCreate()
            self._config = config
            self._url = (f"jdbc:oracle:thin:@{config['host_port']}"
                        f":{config['database']}")
    
        def _execute(self, query: str) -> DataFrame:
            return self._spark.read.format('jdbc') \
              .option("driver", "oracle.jdbc.OracleDriver") \
              .option("url",  self._url) \
              .option('query', query) \
              .option('user', self._config["user"]) \
              .option('password', self._config["password"]) \
              .load()
    
        def get_db_schemas(self) -> DataFrame:
            """In Oracle, schemas are usernames."""
            query = "SELECT username FROM dba_users"
            return self._execute(query)
    
        def _get_columns(self, schema_name: str, object_type: str) -> str:
            """Every line here is a column."""
            return (f"SELECT col.TABLE_NAME, col.COLUMN_NAME, "
                    f"col.DATA_TYPE, col.NULLABLE "
                    f"FROM all_tab_columns col "
                    f"INNER JOIN DBA_OBJECTS tab "
                    f"ON tab.OBJECT_NAME = col.TABLE_NAME "
                    f"WHERE tab.OWNER = '{schema_name}' "
                    f"AND tab.OBJECT_TYPE = '{object_type}'")
    
        def get_dataset(self, schema_name: str, entry_type: EntryType):
            """Get table or view."""
            short_type = entry_type.name # the title of enum value
            query = self._get_columns(schema_name, short_type)
            return self._execute(query)
    

    Fügen Sie SQL-Abfragen hinzu, um die zu importierenden Metadaten zurückzugeben. Die Abfragen müssen die folgenden Informationen zurückgeben:

    • Datenbankschemata
    • Tabellen, die zu diesen Schemas gehören
    • Spalten, die zu diesen Tabellen gehören, einschließlich Spaltenname, Spaltendatentyp und Angabe, ob die Spalte zulässig ist oder erforderlich ist

    Alle Spalten aller Tabellen und Ansichten werden in derselben Systemtabelle gespeichert. Sie können Spalten mit der Methode _get_columns auswählen. Abhängig von den von Ihnen angegebenen Parametern können Sie Spalten für die Tabellen oder für die Ansichten separat.

    Wichtige Hinweise:

    • In Oracle gehört ein Datenbankschema einem Datenbanknutzer und hat das gleiche als dieser Nutzer.
    • Schemaobjekte sind logische Strukturen, die von Nutzern erstellt werden. Objekte wie Tabellen oder Indexe können Daten enthalten, während Objekte wie Ansichten oder Synonyme nur aus einer Definition bestehen.
    • Die Datei ojdbc11.jar enthält den Oracle JDBC-Treiber.
  2. Erstellen Sie eine Datei mit dem Namen src/entry_builder.py. Fügen Sie die folgenden freigegebenen Methoden zum Anwenden von Spark-Transformationen hinzu.

    import pyspark.sql.functions as F
    from src import name_builder as nb
    
    @F.udf(returnType=StringType())
    def choose_metadata_type_udf(data_type: str):
        """Parse Oracle column type to Dataplex type."""
        USER_DEFINED_FUNCTION
    
    def create_entry_source(column):
        """Create Entry Source segment."""
        return F.named_struct(F.lit("display_name"),
                              column,
                              F.lit("system"),
                              F.lit(SOURCE_TYPE))
    
    def create_entry_aspect(entry_aspect_name):
        """Create aspect with general information (usually it is empty)."""
        return F.create_map(F.lit(entry_aspect_name),
                            F.named_struct(F.lit("aspect_type"),
                                          F.lit(entry_aspect_name),
                                          F.lit("data"),
                                          F.create_map()
                                          )
                            )
    
    def convert_to_import_items(df, aspect_keys):
        """Convert entries to import items."""
        entry_columns = ["name", "fully_qualified_name", "parent_entry",
                        "entry_source", "aspects", "entry_type"]
    
        return df.withColumn("entry", F.struct(entry_columns)) \
          .withColumn("aspect_keys", F.array([F.lit(key) for key in aspect_keys]))\
          .withColumn("update_mask", F.array(F.lit("aspects"))) \
          .drop(*entry_columns)
    

    Gehen Sie dazu so vor:

    • Die Methode choose_metadata_type_udf ist ein benutzerdefiniertes PySpark-Tool die Sie zum Indexieren von Spalten erstellen. Für USER_DEFINED_FUNCTION, schreiben Sie Methoden zum Erstellen der Dataplex Catalog-Ressourcen, die der Connector haben soll das für Ihre Oracle-Ressourcen erstellt werden soll. Verwenden Sie die Konventionen, die im Abschnitt Beispiel für Dataplex Catalog-Ressourcen für eine Oracle-Quelle dieses Dokuments beschrieben werden.
    • Die Methode convert_to_import_items gilt für Schemas, Tabellen und Ansichten. Die Ausgabe des Connectors muss aus einem oder mehreren Importelementen bestehen, die mit der Methode metadataJobs.create verarbeitet werden können. Einzeleinträge sind nicht zulässig.
  3. Fügen Sie in src/entry_builder.py den folgenden Code hinzu, um Spark anzuwenden Transformationen zu den Schemas.

    def build_schemas(config, df_raw_schemas):
      """Create a dataframe with database schemas."""
      entry_type = EntryType.DB_SCHEMA
      entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
      parent_name =  nb.create_parent_name(config, entry_type)
    
      create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, x),
                              StringType())
      create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, x),
                            StringType())
    
      # Remember to fill the missed project and location
      full_entry_type = entry_type.value.format(project=config["PROJECT"],
                                                location=config["LOCATION"])
    
      # To list of entries
      column = F.col("USERNAME")
      df = df_raw_schemas.withColumn("name", create_name_udf(column)) \
        .withColumn("fully_qualified_name", create_fqn_udf(column)) \
        .withColumn("parent_entry", F.lit(parent_name)) \
        .withColumn("entry_type", F.lit(full_entry_type)) \
        .withColumn("entry_source", create_entry_source(column)) \
        .withColumn("aspects", create_entry_aspect(entry_aspect_name)) \
      .drop(column)
    
      df = convert_to_import_items(df, [entry_aspect_name])
      return df
    
  4. Fügen Sie in src/entry_builder.py den folgenden Code hinzu, der Spark anwendet. Transformationen an die Tabellen und Ansichten. Der Code erstellt eine Liste von Tabellen mit Spaltendefinitionen, die von Dataplex gelesen werden können.

    def build_dataset(config, df_raw, db_schema, entry_type):
      """Build table entries from a flat list of columns."""
      schema_key = "dataplex-types.global.schema"
    
      # Format column names and details
      df = df_raw \
        .withColumn("mode", F.when(F.col("NULLABLE") == 'Y',
                                    "NULLABLE").otherwise("REQUIRED")) \
        .drop("NULLABLE") \
        .withColumnRenamed("DATA_TYPE", "dataType") \
        .withColumn("metadataType", choose_metadata_type_udf("dataType")) \
        .withColumnRenamed("COLUMN_NAME", "name")
    
      # Aggregate fields
      aspect_columns = ["name", "mode", "dataType", "metadataType"]
      df = df.withColumn("columns", F.struct(aspect_columns))\
        .groupby('TABLE_NAME') \
        .agg(F.collect_list("columns").alias("fields"))
    
      # Create aspects
      entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
      df = df.withColumn("schema",
                        F.create_map(F.lit(schema_key),
                                      F.named_struct(
                                          F.lit("aspect_type"),
                                          F.lit(schema_key),
                                          F.lit("data"),
                                          F.create_map(F.lit("fields"),
                                                      F.col("fields")))
                                      )
                        )\
        .withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \
      .drop("fields")
    
      # Merge separate aspect columns into the one map
      df = df.select(F.col("TABLE_NAME"),
                    F.map_concat("schema", "entry_aspect").alias("aspects"))
    
      # Fill general information and hierarchy names
      create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type,
                                                      db_schema, x),
                              StringType())
    
      create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type,
                                                    db_schema, x), StringType())
    
      parent_name = nb.create_parent_name(entry_type, db_schema)
      full_entry_type = entry_type.value.format(project=config["project"],
                                                location=config["location"])
    
      column = F.col("TABLE_NAME")
      df = df.withColumn("name", create_name_udf(column)) \
         .withColumn("fully_qualified_name", create_fqn_udf(column)) \
         .withColumn("entry_type", F.lit(full_entry_type)) \
         .withColumn("parent_entry", F.lit(parent_name)) \
         .withColumn("entry_source", create_entry_source(column)) \
      .drop(column)
    
      df = convert_to_import_items(df, [schema_key, entry_aspect_name])
      return df
    

    Beachten Sie, dass die Spalte auch in einer Ansicht TABLE_NAME heißt.

  5. Fügen Sie den folgenden Code hinzu, der die Metadatenimportdatei generiert und den Connector ausführt.

    def process_raw_dataset(df: pyspark.sql.dataframe.DataFrame,
                            config: Dict[str, str],
                            schema_name: str,
                            entry_type: EntryType):
        """Builds dataset and converts it to jsonl."""
        df = entry_builder.build_dataset(config, df, schema_name, entry_type)
        return df.toJSON().collect()
    
    def run():
        """Runs a pipeline."""
        config = cmd_reader.read_args()
        connector = OracleConnector(config)
    
        top_entry_builder = TopEntryBuilder(config)
        with open(FILENAME, "w", encoding="utf-8") as file:
            # Write top entries
            # ...
    
            # Get schemas
            df_raw_schemas = connector.get_db_schemas()
            schemas = [schema.USERNAME for schema in
                      df_raw_schemas.select('USERNAME').collect()]
            schemas_json_strings = entry_builder.build_schemas(config,
                                                              df_raw_schemas) \
                .toJSON() \
                .collect()
            write_jsonl(file, schemas_json_strings)
    
            for schema_name in schemas:
                print(f"Processing tables for {schema_name}")
                df_raw_tables = connector.get_table_columns(schema_name)
                tables_json_strings = process_raw_dataset(df_raw_tables,
                                                          config,
                                                          schema_name,
                                                          EntryType.TABLE)
                write_jsonl(file, tables_json_strings)
                print(f"Processing views for {schema_name}")
                df_raw_views = connector.get_view_columns(schema_name)
                views_json_strings = process_raw_dataset(df_raw_views,
                                                        config,
                                                        schema_name,
                                                        EntryType.VIEW)
                write_jsonl(file, views_json_strings)
    
        gcs_uploader.upload(config, FILENAME)
    

    In diesem Beispiel wird die Metadatenimportdatei als einzelne JSON Lines-Datei gespeichert. Ich können Sie mit PySpark-Tools wie der Klasse DataFrameWriter Batches von JSON parallel.

    Der Connector kann Einträge in beliebiger Reihenfolge in die Metadatenimportdatei schreiben.

  6. Fügen Sie den folgenden Code hinzu, mit dem die Metadaten-Importdatei in ein Cloud Storage-Bucket.

    def upload(config: Dict[str, str], filename: str):
        """Uploads a file to a bucket."""
        client = storage.Client()
        bucket = client.get_bucket(config["bucket"])
        folder = config["folder"]
    
        blob = bucket.blob(f"{folder}/{filename}")
        blob.upload_from_filename(filename)
    
  7. Erstellen Sie das Connector-Image.

    Wenn Ihr Connector mehrere Dateien enthält oder Sie Bibliotheken verwenden möchten, die nicht im Standard-Docker-Image enthalten sind, müssen Sie einen benutzerdefinierten Container verwenden. Mit Dataproc Serverless for Spark werden Arbeitslasten in Docker-Containern ausgeführt. Erstellen Sie ein benutzerdefiniertes Docker-Image des Connectors und speichern Sie das Image in Artifact Registry. Dataproc Serverless liest das Image aus Artifact Registry.

    1. Erstellen Sie ein Dockerfile:

      FROM debian:11-slim
      
      ENV DEBIAN_FRONTEND=noninteractive
      
      RUN apt update && apt install -y procps tini
      RUN apt install -y wget
      
      ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
      RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
      COPY ojdbc11.jar "${SPARK_EXTRA_JARS_DIR}"
      
      ENV CONDA_HOME=/opt/miniconda3
      ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
      ENV PATH=${CONDA_HOME}/bin:${PATH}
      RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py310_23.3.1-0-Linux-x86_64.sh
      
      RUN bash Miniconda3-py310_23.3.1-0-Linux-x86_64.sh -b -p /opt/miniconda3 \
        && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
        && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
        && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
        && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
      
      RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
          && ${CONDA_HOME}/bin/mamba install \
            conda \
            google-cloud-dataproc \
            google-cloud-logging \
            google-cloud-monitoring \
            google-cloud-storage
      
      RUN apt update && apt install -y git
      COPY requirements.txt .
      RUN python -m pip install -r requirements.txt
      
      ENV PYTHONPATH=/opt/python/packages
      RUN mkdir -p "${PYTHONPATH}/src/"
      COPY src/ "${PYTHONPATH}/src/"
      COPY main.py .
      
      RUN groupadd -g 1099 spark
      RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
      USER spark
      

      Verwenden Sie Conda als Paketmanager. Serverloses Dataproc für Spark stellt pyspark zur Laufzeit im Container bereit, sodass Sie Sie müssen keine PySpark-Abhängigkeiten in Ihrem benutzerdefinierten Container-Image installieren.

    2. Erstellen Sie das benutzerdefinierte Container-Image und übertragen Sie es per Push in Artifact Registry.

      #!/bin/bash
      
      IMAGE=oracle-pyspark:0.0.1
      PROJECT=example-project
      
      REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pyspark
      
      docker build -t "${IMAGE}" .
      
      # Tag and push to Artifact Registry
      gcloud config set project ${PROJECT}
      gcloud auth configure-docker us-central1-docker.pkg.dev
      docker tag "${IMAGE}" "${REPO_IMAGE}"
      docker push "${REPO_IMAGE}"
      

      Da ein Image mehrere Namen haben kann, können Sie dem Image mit dem Docker-Tag einen Alias zuweisen.

  8. Führen Sie den Connector in Dataproc Serverless aus. Führen Sie zum Senden eines PySpark-Batchjobs mit dem benutzerdefinierten Container-Image den Befehl gcloud dataproc batches submit pyspark-Befehl

    gcloud dataproc batches submit pyspark main.py --project=PROJECT \
        --region=REGION --batch=BATCH_ID \
        --container-image=CUSTOM_CONTAINER_IMAGE \
        --service-account=SERVICE_ACCOUNT_NAME \
        --jars=PATH_TO_JAR_FILES \
        --properties=PYSPARK_PROPERTIES \
        -- PIPELINE_ARGUMENTS
    

    Wichtige Hinweise:

    • Die JAR-Dateien sind Treiber für Spark. Wenn Sie Daten aus Oracle, MySQL oder PostgreSQL lesen möchten, müssen Sie Apache Spark ein bestimmtes Paket zur Verfügung stellen. Das Paket können sich in Cloud Storage oder im Container befinden. Wenn sich die JAR-Datei im Container befindet, sieht der Pfad in etwa so aus: file:///path/to/file/driver.jar. In diesem Beispiel ist der Pfad zum Die JAR-Datei ist /opt/spark/jars/.
    • PIPELINE_ARGUMENTS sind die Befehlszeilenargumente. für den Connector.

    Der Connector extrahiert Metadaten aus der Oracle-Datenbank, generiert ein Metadatenimportdatei und speichert die Metadatenimportdatei in einem Cloud Storage-Bucket.

  9. Um die Metadaten aus der Metadaten-Importdatei manuell in Dataplex, führen Sie einen Metadatenjob aus. Verwenden Sie die Methode metadataJobs.create:

    1. Fügen Sie in der Befehlszeile Umgebungsvariablen hinzu und erstellen Sie einen Alias für den curl-Befehl.

      PROJECT_ID=PROJECT
      LOCATION_ID=LOCATION
      DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID
      alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
      
    2. Rufen Sie die API-Methode auf und übergeben Sie die Datensatz- und Aspekttypen, die Sie importieren möchten.

      gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF
      {
        "type": "IMPORT",
        "import_spec": {
          "source_storage_uri": "gs://BUCKET/FOLDER/",
          "entry_sync_mode": "FULL",
          "aspect_sync_mode": "INCREMENTAL",
          "scope": {
            "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"],
            "entry_types": [
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"],
      
            "aspect_types": [
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance",
              "projects/dataplex-types/locations/global/aspectTypes/schema",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"],
            },
          },
        }
      EOF
      )"
      

      Der Aspekttyp schema ist ein globaler Aspekttyp, der von Dataplex definiert wird.

      Beachten Sie, dass das Format, das Sie für Namen der Aspekttypen beim Aufrufen der Die API-Methode unterscheidet sich vom Format, das Sie im Connector verwenden. Code.

    3. Optional: Verwenden Sie Cloud Logging, um Logs für den Metadaten-Job aufzurufen. Für finden Sie unter Dataplex-Logs überwachen

Pipelineorchestrierung einrichten

In den vorherigen Abschnitten wurde gezeigt, wie Sie einen Beispiel-Connector erstellen und den manuell zu verbinden.

In einer Produktionsumgebung führen Sie den Connector als Teil einer verwalteten Konnektivitätspipeline mit einer Orchestrierungsplattform wie Workflows aus.

  1. Um eine verwaltete Verbindungspipeline mit dem Beispiel-Connector auszuführen, Schritte zum Importieren von Metadaten mit Workflows. Gehen Sie so vor:

    • Erstellen Sie den Workflow am selben Google Cloud-Standort wie den Connector.
    • Aktualisieren Sie in der Workflowdefinitionsdatei die submit_pyspark_extract_job. mit dem folgenden Code, um Daten aus der Oracle-Datenbank zu extrahieren mit dem von Ihnen erstellten Connector.

      - submit_pyspark_extract_job:
          call: http.post
          args:
            url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
            auth:
              type: OAuth2
              scopes: "https://www.googleapis.com/auth/cloud-platform"
            headers:
              Content-Type: "application/json"
            query:
              batchId: ${WORKFLOW_ID}
            body:
              pysparkBatch:
                mainPythonFileUri: file:///main.py
                jars: file:///opt/spark/jars/ojdbc11.jar
                args:
                  - ${"--host_port=" + args.ORACLE_HOST_PORT}
                  - ${"--user=" + args.ORACLE_USER}
                  - ${"--password=" + args.ORACLE_PASSWORD}
                  - ${"--database=" + args.ORACE_DATABASE}
                  - ${"--project=" + args.TARGET_PROJECT_ID}
                  - ${"--location=" + args.CLOUD_REGION}
                  - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID}
                  - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                  - ${"--folder=" + WORKFLOW_ID}
              runtimeConfig:
                version: "2.0"
                containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark"
              environmentConfig:
                executionConfig:
                    serviceAccount: ${args.SERVICE_ACCOUNT}
          result: RESPONSE_MESSAGE
      
    • Aktualisieren Sie in der Workflowdefinitionsdatei die Funktion submit_import_job. durch den folgenden Code, um die Einträge zu importieren. Die Funktion ruft die metadataJobs.create-API-Methode zum Ausführen eines Metadatenimportjobs.

      - submit_import_job:
          call: http.post
          args:
            url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
            auth:
              type: OAuth2
              scopes: "https://www.googleapis.com/auth/cloud-platform"
            body:
              type: IMPORT
              import_spec:
                source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                entry_sync_mode: FULL
                aspect_sync_mode: INCREMENTAL
                scope:
                  entry_groups:
                    - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}
                  entry_types:
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"
                  aspect_types:
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance"
                    -"projects/dataplex-types/locations/global/aspectTypes/schema"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"
          result: IMPORT_JOB_RESPONSE
      

      Geben Sie dieselben Eintragstypen und Aspekttypen an, die Sie bei der Erstellung die API-Methode manuell aufgerufen. Beachten Sie, dass am Ende jedes Strings kein Komma steht.

    • Geben Sie beim Ausführen des Workflows die folgenden Laufzeitargumente an:

      {
        "CLOUD_REGION": "us-central1",
        "ORACLE_USER": "system",
        "ORACLE_HOST_PORT": "x.x.x.x:1521",
        "ORACLE_DATABASE": "xe",
        "ADDITIONAL_CONNECTOR_ARGS": [],
      }
      
  2. Optional: Verwenden Sie Cloud Logging, um Logs für die Pipeline für die verwaltete Konnektivität aufzurufen. Die Protokollnutzlast enthält einen Link zu den Protokollen für den Dataproc Serverless-Batchjob und den Metadatenimportjob, sofern zutreffend. Weitere Informationen finden Sie unter Workflow-Logs ansehen.

  3. Optional: Zur Verbesserung der Sicherheit, Leistung und Funktionalität Ihres verwaltete Verbindungspipeline haben, sollten Sie Folgendes tun:

    1. Secret Manager verwenden zum Speichern der Anmeldedaten für die Datenquelle eines Drittanbieters.
    2. Mit PySpark die JSON-Zeilenausgabe in mehrere Metadatenimporte schreiben .
    3. Verwenden Sie ein Präfix, um große Dateien (mehr als 100 MB) in kleinere Dateien aufzuteilen.
    4. Fügen Sie weitere benutzerdefinierte Aspekte hinzu, mit denen zusätzliche geschäftliche und technische Metadaten aus Ihrer Quelle erfasst werden.

Dataplex Catalog-Beispielressourcen für eine Oracle-Quelle

Der Beispiel-Connector extrahiert Metadaten aus einer Oracle-Datenbank und ordnet sie den entsprechenden Dataplex Catalog-Ressourcen zu.

Überlegungen zur Hierarchie

Jedes System in Dataplex hat einen Stammeintrag, der der übergeordnete Eintrag für das System ist. Normalerweise hat der Stammeintrag den Eintragstyp instance. Die folgende Tabelle zeigt die Beispielhierarchie der Eintrags- und Aspekttypen für ein Oracle-System.

Eintragstyp-ID Beschreibung ID des verknüpften Aspekttyps
oracle-instance Der Stamm des importierten Systems. oracle-instance
oracle-database Die Oracle-Datenbank. oracle-database
oracle-schema Das Datenbankschema. oracle-schema
oracle-table Eine Tabelle.

oracle-table

schema

oracle-view Eine Ansicht.

oracle-view

schema

Der Aspekttyp schema ist ein globaler Aspekttyp, der von Dataplex definiert wird. Es enthält eine Beschreibung der Felder in einer Tabelle, oder eine andere Entität mit Spalten. Der benutzerdefinierte Aspekttyp oracle-schema enthält den Namen des Oracle-Datenbankschemas.

Beispiele für Importelementfelder

Der Connector sollte die folgenden Konventionen für Oracle-Ressourcen verwenden.

  • Voll qualifizierte Namen: voll qualifizierte Namen für Oracle Ressourcen verwenden die folgende Namensvorlage. Unzulässige Zeichen werden mit Backticks maskiert.

    Ressource Vorlage Beispiel
    Instanz

    SOURCE: ADDRESS

    Verwenden Sie die Host- und Portnummer oder den Domainnamen des Systems.

    oracle:`localhost:1521` oder oracle:`myinstance.com`
    Datenbank SOURCE:ADDRESS.DATABASE oracle:`localhost:1521`.xe
    Schema SOURCE:ADDRESS.DATABASE.SCHEMA oracle:`localhost:1521`.xe.sys
    Tabelle SOURCE:ADDRESS.DATABASE.SCHEMA.TABLE_NAME oracle:`localhost:1521`.xe.sys.orders
    Ansehen SOURCE:ADDRESS.DATABASE.SCHEMA.VIEW_NAME oracle:`localhost:1521`.xe.sys.orders_view
  • Eintragsnamen oder -IDs: Für Einträge für Oracle-Ressourcen wird die folgende Benennungsvorlage verwendet. Unzulässige Zeichen werden durch ein zulässige Zeichen. Ressourcen verwenden das Präfix projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries.

    Ressource Vorlage Beispiel
    Instanz PREFIX/HOST_PORT projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
    Datenbank PREFIX/HOST_PORT/databases/DATABASE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
    Schema PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
    Tabelle PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
    Ansehen PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEW projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
  • Übergeordnete Einträge: Wenn ein Eintrag kein Stammeintrag für das System ist, kann er ein Feld für übergeordnete Einträge haben, das seine Position in der Hierarchie beschreibt. Das Feld sollte den Namen des übergeordneten Eintrags enthalten. Mi. empfehlen, diesen Wert zu generieren.

    Die folgende Tabelle zeigt die übergeordneten Einträge für Oracle-Ressourcen.

    Eintrag Übergeordneter Eintrag
    Instanz "" (leerer String)
    Datenbank Instanzname
    Schema Datenbankname
    Tabelle Schemaname
    Ansehen Schemaname
  • Aspektkarte: Die Aspektkarte muss mindestens einen Aspekt enthalten, der die zu importierende Entität beschreibt. Hier ist eine Beispiel-Aspektkarte für eine Oracle-Tabelle.

    "example-project.us-central1.oracle-table": {
        "aspect_type": "example-project.us-central1.oracle-table",
        "path": "",
        "data": {}
     },

    Es gibt vordefinierte Aspekttypen (z. B. schema), die die Tabellen- oder Ansichtsstruktur im dataplex-types Projekt an dem Standort global.

  • Aspektschlüssel: Bei Aspektschlüsseln wird das Namensformat verwendet. PROJECT.LOCATION.ASPECT_TYPE. Die folgende Tabelle enthält Beispiele für Aspektschlüssel für Oracle-Ressourcen.

    Eintrag Beispiel für einen Aspektschlüssel
    Instanz example-project.us-central1.oracle-instance
    Datenbank example-project.us-central1.oracle-database
    Schema example-project.us-central1.oracle-schema
    Tabelle example-project.us-central1.oracle-table
    Ansehen example-project.us-central1.oracle-view

Nächste Schritte