In diesem Dokument finden Sie eine Referenzvorlage für die Erstellung eines benutzerdefinierten Connectors, mit dem Metadaten aus einer Drittanbieterquelle extrahiert werden. Sie verwenden den Connector, wenn Sie eine verwaltete Verbindungspipeline ausführen, die Metadaten in Dataplex importiert.
Sie können Connectors erstellen, um Metadaten aus Drittanbieterquellen zu extrahieren. Sie können beispielsweise einen Connector erstellen, um Daten aus Quellen wie MySQL, SQL Server, Oracle, Snowflake und Databricks zu extrahieren.
Verwenden Sie den Beispiel-Connector in diesem Dokument als Ausgangspunkt, um einen eigenen Connector zu erstellen. Connectors. Der Beispiel-Connector stellt eine Verbindung zu einer Oracle Database Express Edition (XE)-Datenbank her. Der Connector ist in Python geschrieben, Sie können aber auch Java, Scala oder R verwenden.
Funktionsweise von Connectors
Ein Connector extrahiert Metadaten aus einer Drittanbieterdatenquelle, wandelt sie in das Dataplex-ImportItem
-Format um und generiert Metadatenimportdateien, die von Dataplex importiert werden können.
Der Connector ist Teil einer verwalteten Verbindungspipeline. Eine verwaltete Verbindungspipeline ist ein orchestrierter Workflow, mit dem Sie Dataplex Catalog-Metadaten. Die verwaltete Konnektivitätspipeline führt den Connector aus und führt andere Aufgaben im Importworkflow aus, z. B. einen Metadatenimportjob ausführen und Protokolle erfassen.
Die verwaltete Konnektivitätspipeline führt den Connector mit einem Dataproc Serverless-Batchjob aus. Dataproc Serverless bietet eine serverlose Spark-Ausführungsumgebung. Sie können zwar einen Connector erstellen, der Spark nicht verwendet, wir empfehlen jedoch die Verwendung von Spark, da sich dadurch die Leistung des Connectors verbessern lässt.
Anforderungen an den Anschluss
Für den Connector gelten die folgenden Anforderungen:
- Der Connector muss ein Artifact Registry-Image sein, das ausgeführt werden kann Dataproc Serverless
- Der Connector muss Metadatendateien in einem Format generieren, das von einem Dataplex-Metadatenimportjob (
metadataJobs.create
API-Methode) importiert werden kann. Ausführliche Anforderungen finden Sie unter Metadatenimportdatei. Der Connector muss die folgenden Befehlszeilenargumente akzeptieren, um Informationen aus der Pipeline:
Befehlszeilenargument Wert der Pipeline target_project_id
PROJECT_ID target_location_id
REGION target_entry_group_id
ENTRY_GROUP_ID output_bucket
CLOUD_STORAGE_BUCKET_ID output_folder
FOLDER_ID Der Connector verwendet diese Argumente, um Metadaten in einer Zieleintragsgruppe
projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID
zu generieren und in einen Cloud Storage-Bucketgs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID
zu schreiben. Bei jeder Ausführung der Pipeline wird im Bucket CLOUD_STORAGE_BUCKET_ID ein neuer Ordner FOLDER_ID erstellt. Der Connector sollte Folgendes schreiben: Metadaten-Importdateien in diesen Ordner.
Die Pipelinevorlagen unterstützen PySpark-Connectors. Bei den Vorlagen wird davon ausgegangen, dass der Treiber (mainPythonFileUri
) eine lokale Datei auf dem Connector-Image mit dem Namen main.py
ist. Sie können die
Pipelinevorlagen für andere Szenarien, etwa einen Spark-Connector,
Treiber-URI oder andere Optionen.
So erstellen Sie mit PySpark ein Importelement in der Metadatenimportdatei:
"""PySpark schemas for the data."""
entry_source_schema = StructType([
StructField("display_name", StringType()),
StructField("source", StringType())])
aspect_schema = MapType(StringType(),
StructType([
StructField("aspect_type", StringType()),
StructField("data", StructType([
]))
])
)
entry_schema = StructType([
StructField("name", StringType()),
StructField("entry_type", StringType()),
StructField("fully_qualified_name", StringType()),
StructField("parent_entry", StringType()),
StructField("entry_source", entry_source_schema),
StructField("aspects", aspect_schema)
])
import_item_schema = StructType([
StructField("entry", entry_schema),
StructField("aspect_keys", ArrayType(StringType())),
StructField("update_mask", ArrayType(StringType()))
])
Hinweis
In dieser Anleitung wird davon ausgegangen, dass Sie mit Python und PySpark vertraut sind.
Überprüfen Sie die folgenden Informationen:
- Dataplex Catalog-Metadatenkonzepte
- Dokumentation zu Metadaten-Importjobs
Gehen Sie dazu so vor: Alle Ressourcen am selben Google Cloud-Speicherort erstellen
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.
-
Enable the Dataplex, Dataproc, Workflows, and Artifact Registry APIs:
gcloud services enable dataplex.googleapis.com
dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com - Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant the
roles/owner
IAM role to the service account:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service account
-
-
Erstellen Sie einen Cloud Storage-Bucket, um die Metadatenimportdateien zu speichern.
-
Erstellen Sie die folgenden Dataplex Catalog-Ressourcen im selben Projekt.
Beispielwerte finden Sie in der Beispiele für Dataplex Catalog-Ressourcen für eine Oracle-Quelle dieses Dokuments.
- Erstellen Sie eine Eintragsgruppe.
-
Erstellen Sie benutzerdefinierte Aspekttypen für die Einträge, die Sie importieren möchten. Namenskonvention verwenden
SOURCE
–ENTITY_TO_IMPORT
.Optional können Sie zusätzliche Aspekttypen erstellen, um andere Informationen zu speichern.
-
Erstellen Sie benutzerdefinierte Eintragstypen für die Ressourcen, die Sie importieren möchten, und weisen Sie ihnen die entsprechenden Aspekttypen zu. Verwenden Sie die Namenskonvention
SOURCE
–ENTITY_TO_IMPORT
.Erstellen Sie beispielsweise für eine Oracle-Datenbank einen Eintragstyp mit dem Namen
oracle-database
Verknüpfen Sie sie mit dem Aspekttyp namensoracle-database
.
- Achten Sie darauf, dass Sie von Ihrem Google Cloud-Projekt aus auf die Drittanbieterquelle zugreifen können. Weitere Informationen finden Sie unter Dataproc Serverless for Spark-Netzwerkkonfiguration.
Einfachen Python-Connector erstellen
Im Beispiel für einen einfachen Python-Connector werden mithilfe der Klassen der Dataplex-Clientbibliothek Einträge der obersten Ebene für eine Oracle-Datenquelle erstellt. Anschließend geben Sie die Werte für die Eingabefelder an.
Der Connector erstellt eine Metadatenimportdatei mit den folgenden Einträgen:
- Einen
instance
-Eintrag mit dem Eintragstypprojects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance
. Dieser Eintrag stellt ein Oracle Database XE-System dar. - Ein
database
-Eintrag, der eine Datenbank in Oracle Database XE darstellt System.
So erstellen Sie einen einfachen Python-Connector:
Lokale Umgebung einrichten Wir empfehlen die Verwendung einer virtuellen Umgebung.
mkdir venv python -m venv venv/ source venv/bin/activate
Verwenden Sie die Methode aktiv oder Wartung Versionen von Python. Unterstützt werden Python-Versionen 3.7 und höher.
Erstellen Sie ein Python-Projekt.
Fügen Sie eine
requirements.txt
-Datei mit folgendem Code hinzu:google-cloud-dataplex
google-cloud-storage
Installationsanforderungen:
pip install -r requirements.txt
Fügen Sie im Stammverzeichnis des Projekts eine
main.py
-Pipelinedatei hinzu.Wenn Sie Ihren Code in Dataproc Serverless bereitstellen, dient die Datei
main.py
als Ausgangspunkt für die Ausführung. Wir empfehlen, die Menge der in der Dateimain.py
gespeicherten Informationen; verwenden zum Aufrufen von Funktionen und Klassen, die in Ihrem Connector definiert sind, wie z. B. die Klassesrc/bootstap.py
.Erstellen Sie einen Ordner
src
, um den Großteil der Logik für Ihren Connector zu speichern.Erstellen Sie eine Datei mit dem Namen
src/cmd_reader.py
mit einer Python-Klasse, die Befehlszeilenargumente akzeptiert. Dazu können Sie das Modul argeparse verwenden.import argparse def read_args(): """Reads arguments from the command line.""" parser = argparse.ArgumentParser() parser.add_argument("-p", "--project", type=str, required=True, help="Google Cloud project") parser.add_argument("-l", "--location", type=str, required=True, help="Google Cloud location") parser.add_argument("-g", "--entry_group", type=str, required=True, help="Dataplex entry group") parser.add_argument("--host_port", type=str, required=True, help="Oracle host and port") parser.add_argument("--user", type=str, required=True, help="Oracle user") parser.add_argument("--password-secret", type=str, required=True, help="An ID in Secret Manager for the Oracle password.") parser.add_argument("-d", "--database", type=str, required=True, help="Oracle database") parser.add_argument("--bucket", type=str, required=True, help="Cloud Storage bucket") parser.add_argument("--folder", type=str, required=True, help="Folder in the bucket") return vars(parser.parse_known_args()[0])
Die folgenden Argumente sind mindestens erforderlich, um eine Oracle-Datenbank zu lesen:
project
: das Google Cloud-Projekt, das die Eintragsgruppe, die Eintragstypen und die Aspekttypen enthält.location
: der Google Cloud-Standort der Eintragsgruppe. Eintragstypen und Aspekttypen.entry_group
: eine vorhandene Eintragsgruppe in Dataplex. Die Die importierten Metadaten beziehen sich auf die Einträge, die zu diesem Eintrag gehören. Gruppe.host_port
: Host- und Portnummer für die Oracle-Instanz.user
: der Oracle-Nutzer.password-secret
: das Passwort für den Oracle-Nutzer. In Produktion sollten Sie das Passwort in einer der Secret Manager:database
: die Oracle-Zieldatenbank.bucket
: Der Cloud Storage-Bucket, in dem die Metadatenimportdatei gespeichert ist.folder
: ein Ordner im Cloud Storage-Bucket, der für Folgendes verwendet wird: separate Metadaten-Importdateien. Bei jeder Workflowausführung wird ein neues Ordner.
Erstellen Sie eine Datei mit dem Namen
src/constants.py
. Fügen Sie folgenden Code hinzu, um ein Konstanten.SOURCE_TYPE = 'oracle' class EntryType(enum.Enum): """Types of Oracle entries.""" INSTANCE: str = "projects/{project}/locations/{location}/entryTypes/oracle-instance" DATABASE: str = "projects/{project}/locations/{location}/entryTypes/oracle-database" DB_SCHEMA: str = "projects/{project}/locations/{location}/entryTypes/oracle-schema" TABLE: str = "projects/{project}/locations/{location}/entryTypes/oracle-table" VIEW: str = "projects/{project}/locations/{location}/entryTypes/oracle-view"
Erstellen Sie eine Datei mit dem Namen
src/name_builder.py
. Schreiben Sie Methoden zum Erstellen der Dataplex Catalog-Ressourcen, die der Connector für Ihre Oracle-Ressourcen erstellen soll. Verwenden Sie die Konventionen, die im Abschnitt Beispiel für Dataplex Catalog-Ressourcen für eine Oracle-Quelle dieses Dokuments beschrieben werden.Da die
name_builder.py
-Datei sowohl für den Python-Kerncode als auch für den PySpark-Kerncode verwendet wird, empfehlen wir, die Methoden als reine Funktionen und nicht als Mitglieder einer Klasse zu schreiben.Erstellen Sie eine Datei mit dem Namen
src/top_level_builder.py
. Fügen Sie folgenden Code die Einträge auf oberster Ebene mit Daten zu füllen.from src.common import EntryType from src import name_builder as nb @dataclasses.dataclass(slots=True) class ImportItem: entry: dataplex_v1.Entry = dataclasses.field( default_factory=dataplex_v1.Entry) aspect_keys: List[str] = dataclasses.field(default_factory=list) update_mask: List[str] = dataclasses.field(default_factory=list) def dict_factory(data: object): """Factory function required for converting Entry dataclass to dict.""" def convert(obj: object): if isinstance(obj, proto.Message): return proto.Message.to_dict(obj) return obj return dict((k, convert(v)) for k, v in data) class TopEntryBuilder: def __init__(self, config): self._config = config self._project = config["project"] self._location = config["location"] def create_jsonl_item(self, entry_type: EntryType): item = self.entry_to_import_item(self.create_entry(entry_type)) return self.to_json(item) def create_entry(self, entry_type: EntryType): entry = dataplex_v1.Entry() entry.name = nb.create_name(self._config, entry_type) entry.entry_type = entry_type.value.format(project=self._project, location=self._location) entry.fully_qualified_name = nb.create_fqn(self._config, entry_type) entry.parent_entry = nb.create_parent_name(self._config, entry_type) aspect_key = nb.create_entry_aspect_name(self._config, entry_type) entry_aspect = dataplex_v1.Aspect() entry_aspect.aspect_type = aspect_key entry_aspect.data = {} entry.aspects[aspect_key] = entry_aspect return entry def entry_to_import_item(self, entry: dataplex_v1.Entry): import_item = ImportItem() import_item.entry = entry import_item.aspect_keys = list(entry.aspects.keys()) import_item.update_mask = "aspects" return import_item def to_json(self, import_item: ImportItem): return json.dumps(dataclasses.asdict(import_item, dict_factory=dict_factory)) import_item.aspect_keys = list(entry.aspects.keys()) import_item.update_mask = "aspects" return import_item def to_json(self, import_item: ImportItem): return json.dumps(dataclasses.asdict(import_item, dict_factory=dict_factory))
Erstellen Sie eine Datei mit dem Namen
src/bootstrap.py
. Fügen Sie den folgenden Code hinzu, generiert die Metadatenimportdatei und führt den Connector aus.FILENAME = "output.jsonl" def write_jsonl(output_file, json_strings): """Writes a list of strings to the file in JSONL format.""" for string in json_strings: output_file.write(string + "\n") def run(): """Runs a pipeline.""" config = cmd_reader.read_args() with open(FILENAME, "w", encoding="utf-8") as file: # Write the top entries file.write(top_entry_builder.create(EntryType.INSTANCE)) file.write(top_entry_builder.create(EntryType.DATABASE))
Führen Sie den Code lokal aus.
Es wird eine Metadatenimportdatei mit dem Namen
output.jsonl
zurückgegeben. Die Datei enthält zwei , die jeweils ein Importelement darstellen. Die Pipeline für verwaltete Verbindungen liest diese Datei beim Ausführen des Metadatenimportjobs.Optional: Erweitern Sie das vorherige Beispiel, um Dataplex zu verwenden. Clientbibliotheksklassen verwenden, um Importelemente für Tabellen, Schemas und Ansichten zu erstellen. Sie können das Python-Beispiel auch auf Dataproc Serverless ausführen.
Wir empfehlen, einen Connector zu erstellen, der Spark verwendet und auf Dataproc Serverless), da es die die Leistung des Connectors.
PySpark-Connector erstellen
Dieses Beispiel basiert auf der PySpark DataFrame API. Sie können PySpark SQL installieren und lokal ausführen, auf Dataproc Serverless ausgeführt. Wenn Sie PySpark lokal installieren und ausführen, installieren Sie die PySpark-Bibliothek mit pip. Sie müssen jedoch keinen lokalen Spark-Cluster installieren.
Aus Leistungsgründen werden in diesem Beispiel keine vordefinierten Klassen aus dem PySpark-Bibliothek. Stattdessen werden im Beispiel DataFrames erstellt, in JSON-Einträge konvertiert und dann in eine Metadatenimportdatei im JSON-Zeichenfolgenformat geschrieben, die in Dataplex importiert werden kann.
So erstellen Sie einen Connector mit PySpark:
Fügen Sie den folgenden Code hinzu, der Daten aus einer Oracle-Datenquelle liest und DataFrames zurückgibt.
SPARK_JAR_PATH="/opt/spark/jars/ojdbc11.jar" class OracleConnector: """Reads data from Oracle and returns Spark Dataframes.""" def __init__(self, config: Dict[str, str]): # Spark entrypoint self._spark = SparkSession.builder.appName("OracleIngestor").config( "spark.jars", SPARK_JAR_PATH).getOrCreate() self._config = config self._url = (f"jdbc:oracle:thin:@{config['host_port']}" f":{config['database']}") def _execute(self, query: str) -> DataFrame: return self._spark.read.format('jdbc') \ .option("driver", "oracle.jdbc.OracleDriver") \ .option("url", self._url) \ .option('query', query) \ .option('user', self._config["user"]) \ .option('password', self._config["password"]) \ .load() def get_db_schemas(self) -> DataFrame: """In Oracle, schemas are usernames.""" query = "SELECT username FROM dba_users" return self._execute(query) def _get_columns(self, schema_name: str, object_type: str) -> str: """Every line here is a column.""" return (f"SELECT col.TABLE_NAME, col.COLUMN_NAME, " f"col.DATA_TYPE, col.NULLABLE " f"FROM all_tab_columns col " f"INNER JOIN DBA_OBJECTS tab " f"ON tab.OBJECT_NAME = col.TABLE_NAME " f"WHERE tab.OWNER = '{schema_name}' " f"AND tab.OBJECT_TYPE = '{object_type}'") def get_dataset(self, schema_name: str, entry_type: EntryType): """Get table or view.""" short_type = entry_type.name # the title of enum value query = self._get_columns(schema_name, short_type) return self._execute(query)
Fügen Sie SQL-Abfragen hinzu, um die zu importierenden Metadaten zurückzugeben. Die Abfragen müssen die folgenden Informationen zurückgeben:
- Datenbankschemata
- Tabellen, die zu diesen Schemas gehören
- Spalten, die zu diesen Tabellen gehören, einschließlich Spaltenname, Spaltendatentyp und Angabe, ob die Spalte zulässig ist oder erforderlich ist
Alle Spalten aller Tabellen und Ansichten werden in derselben Systemtabelle gespeichert. Sie können Spalten mit der Methode
_get_columns
auswählen. Abhängig von den von Ihnen angegebenen Parametern können Sie Spalten für die Tabellen oder für die Ansichten separat.Wichtige Hinweise:
- In Oracle gehört ein Datenbankschema einem Datenbanknutzer und hat das gleiche als dieser Nutzer.
- Schemaobjekte sind logische Strukturen, die von Nutzern erstellt werden. Objekte wie Tabellen oder Indexe können Daten enthalten, während Objekte wie Ansichten oder Synonyme nur aus einer Definition bestehen.
- Die Datei
ojdbc11.jar
enthält den Oracle JDBC-Treiber.
Erstellen Sie eine Datei mit dem Namen
src/entry_builder.py
. Fügen Sie die folgenden freigegebenen Methoden zum Anwenden von Spark-Transformationen hinzu.import pyspark.sql.functions as F from src import name_builder as nb @F.udf(returnType=StringType()) def choose_metadata_type_udf(data_type: str): """Parse Oracle column type to Dataplex type.""" USER_DEFINED_FUNCTION def create_entry_source(column): """Create Entry Source segment.""" return F.named_struct(F.lit("display_name"), column, F.lit("system"), F.lit(SOURCE_TYPE)) def create_entry_aspect(entry_aspect_name): """Create aspect with general information (usually it is empty).""" return F.create_map(F.lit(entry_aspect_name), F.named_struct(F.lit("aspect_type"), F.lit(entry_aspect_name), F.lit("data"), F.create_map() ) ) def convert_to_import_items(df, aspect_keys): """Convert entries to import items.""" entry_columns = ["name", "fully_qualified_name", "parent_entry", "entry_source", "aspects", "entry_type"] return df.withColumn("entry", F.struct(entry_columns)) \ .withColumn("aspect_keys", F.array([F.lit(key) for key in aspect_keys]))\ .withColumn("update_mask", F.array(F.lit("aspects"))) \ .drop(*entry_columns)
Gehen Sie dazu so vor:
- Die Methode
choose_metadata_type_udf
ist ein benutzerdefiniertes PySpark-Tool die Sie zum Indexieren von Spalten erstellen. Für USER_DEFINED_FUNCTION, schreiben Sie Methoden zum Erstellen der Dataplex Catalog-Ressourcen, die der Connector haben soll das für Ihre Oracle-Ressourcen erstellt werden soll. Verwenden Sie die Konventionen, die im Abschnitt Beispiel für Dataplex Catalog-Ressourcen für eine Oracle-Quelle dieses Dokuments beschrieben werden. - Die Methode
convert_to_import_items
gilt für Schemas, Tabellen und Ansichten. Die Ausgabe des Connectors muss aus einem oder mehreren Importelementen bestehen, die mit der MethodemetadataJobs.create
verarbeitet werden können. Einzeleinträge sind nicht zulässig.
- Die Methode
Fügen Sie in
src/entry_builder.py
den folgenden Code hinzu, um Spark anzuwenden Transformationen zu den Schemas.def build_schemas(config, df_raw_schemas): """Create a dataframe with database schemas.""" entry_type = EntryType.DB_SCHEMA entry_aspect_name = nb.create_entry_aspect_name(config, entry_type) parent_name = nb.create_parent_name(config, entry_type) create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, x), StringType()) create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, x), StringType()) # Remember to fill the missed project and location full_entry_type = entry_type.value.format(project=config["PROJECT"], location=config["LOCATION"]) # To list of entries column = F.col("USERNAME") df = df_raw_schemas.withColumn("name", create_name_udf(column)) \ .withColumn("fully_qualified_name", create_fqn_udf(column)) \ .withColumn("parent_entry", F.lit(parent_name)) \ .withColumn("entry_type", F.lit(full_entry_type)) \ .withColumn("entry_source", create_entry_source(column)) \ .withColumn("aspects", create_entry_aspect(entry_aspect_name)) \ .drop(column) df = convert_to_import_items(df, [entry_aspect_name]) return df
Fügen Sie in
src/entry_builder.py
den folgenden Code hinzu, der Spark anwendet. Transformationen an die Tabellen und Ansichten. Der Code erstellt eine Liste von Tabellen mit Spaltendefinitionen, die von Dataplex gelesen werden können.def build_dataset(config, df_raw, db_schema, entry_type): """Build table entries from a flat list of columns.""" schema_key = "dataplex-types.global.schema" # Format column names and details df = df_raw \ .withColumn("mode", F.when(F.col("NULLABLE") == 'Y', "NULLABLE").otherwise("REQUIRED")) \ .drop("NULLABLE") \ .withColumnRenamed("DATA_TYPE", "dataType") \ .withColumn("metadataType", choose_metadata_type_udf("dataType")) \ .withColumnRenamed("COLUMN_NAME", "name") # Aggregate fields aspect_columns = ["name", "mode", "dataType", "metadataType"] df = df.withColumn("columns", F.struct(aspect_columns))\ .groupby('TABLE_NAME') \ .agg(F.collect_list("columns").alias("fields")) # Create aspects entry_aspect_name = nb.create_entry_aspect_name(config, entry_type) df = df.withColumn("schema", F.create_map(F.lit(schema_key), F.named_struct( F.lit("aspect_type"), F.lit(schema_key), F.lit("data"), F.create_map(F.lit("fields"), F.col("fields"))) ) )\ .withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \ .drop("fields") # Merge separate aspect columns into the one map df = df.select(F.col("TABLE_NAME"), F.map_concat("schema", "entry_aspect").alias("aspects")) # Fill general information and hierarchy names create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, db_schema, x), StringType()) create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, db_schema, x), StringType()) parent_name = nb.create_parent_name(entry_type, db_schema) full_entry_type = entry_type.value.format(project=config["project"], location=config["location"]) column = F.col("TABLE_NAME") df = df.withColumn("name", create_name_udf(column)) \ .withColumn("fully_qualified_name", create_fqn_udf(column)) \ .withColumn("entry_type", F.lit(full_entry_type)) \ .withColumn("parent_entry", F.lit(parent_name)) \ .withColumn("entry_source", create_entry_source(column)) \ .drop(column) df = convert_to_import_items(df, [schema_key, entry_aspect_name]) return df
Beachten Sie, dass die Spalte auch in einer Ansicht
TABLE_NAME
heißt.Fügen Sie den folgenden Code hinzu, der die Metadatenimportdatei generiert und den Connector ausführt.
def process_raw_dataset(df: pyspark.sql.dataframe.DataFrame, config: Dict[str, str], schema_name: str, entry_type: EntryType): """Builds dataset and converts it to jsonl.""" df = entry_builder.build_dataset(config, df, schema_name, entry_type) return df.toJSON().collect() def run(): """Runs a pipeline.""" config = cmd_reader.read_args() connector = OracleConnector(config) top_entry_builder = TopEntryBuilder(config) with open(FILENAME, "w", encoding="utf-8") as file: # Write top entries # ... # Get schemas df_raw_schemas = connector.get_db_schemas() schemas = [schema.USERNAME for schema in df_raw_schemas.select('USERNAME').collect()] schemas_json_strings = entry_builder.build_schemas(config, df_raw_schemas) \ .toJSON() \ .collect() write_jsonl(file, schemas_json_strings) for schema_name in schemas: print(f"Processing tables for {schema_name}") df_raw_tables = connector.get_table_columns(schema_name) tables_json_strings = process_raw_dataset(df_raw_tables, config, schema_name, EntryType.TABLE) write_jsonl(file, tables_json_strings) print(f"Processing views for {schema_name}") df_raw_views = connector.get_view_columns(schema_name) views_json_strings = process_raw_dataset(df_raw_views, config, schema_name, EntryType.VIEW) write_jsonl(file, views_json_strings) gcs_uploader.upload(config, FILENAME)
In diesem Beispiel wird die Metadatenimportdatei als einzelne JSON Lines-Datei gespeichert. Ich können Sie mit PySpark-Tools wie der Klasse
DataFrameWriter
Batches von JSON parallel.Der Connector kann Einträge in beliebiger Reihenfolge in die Metadatenimportdatei schreiben.
Fügen Sie den folgenden Code hinzu, mit dem die Metadaten-Importdatei in ein Cloud Storage-Bucket.
def upload(config: Dict[str, str], filename: str): """Uploads a file to a bucket.""" client = storage.Client() bucket = client.get_bucket(config["bucket"]) folder = config["folder"] blob = bucket.blob(f"{folder}/{filename}") blob.upload_from_filename(filename)
Erstellen Sie das Connector-Image.
Wenn Ihr Connector mehrere Dateien enthält oder Sie Bibliotheken verwenden möchten, die nicht im Standard-Docker-Image enthalten sind, müssen Sie einen benutzerdefinierten Container verwenden. Mit Dataproc Serverless for Spark werden Arbeitslasten in Docker-Containern ausgeführt. Erstellen Sie ein benutzerdefiniertes Docker-Image des Connectors und speichern Sie das Image in Artifact Registry. Dataproc Serverless liest das Image aus Artifact Registry.
Erstellen Sie ein Dockerfile:
FROM debian:11-slim ENV DEBIAN_FRONTEND=noninteractive RUN apt update && apt install -y procps tini RUN apt install -y wget ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/ RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}" COPY ojdbc11.jar "${SPARK_EXTRA_JARS_DIR}" ENV CONDA_HOME=/opt/miniconda3 ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python ENV PATH=${CONDA_HOME}/bin:${PATH} RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py310_23.3.1-0-Linux-x86_64.sh RUN bash Miniconda3-py310_23.3.1-0-Linux-x86_64.sh -b -p /opt/miniconda3 \ && ${CONDA_HOME}/bin/conda config --system --set always_yes True \ && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \ && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \ && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \ && ${CONDA_HOME}/bin/mamba install \ conda \ google-cloud-dataproc \ google-cloud-logging \ google-cloud-monitoring \ google-cloud-storage RUN apt update && apt install -y git COPY requirements.txt . RUN python -m pip install -r requirements.txt ENV PYTHONPATH=/opt/python/packages RUN mkdir -p "${PYTHONPATH}/src/" COPY src/ "${PYTHONPATH}/src/" COPY main.py . RUN groupadd -g 1099 spark RUN useradd -u 1099 -g 1099 -d /home/spark -m spark USER spark
Verwenden Sie Conda als Paketmanager. Serverloses Dataproc für Spark stellt
pyspark
zur Laufzeit im Container bereit, sodass Sie Sie müssen keine PySpark-Abhängigkeiten in Ihrem benutzerdefinierten Container-Image installieren.Erstellen Sie das benutzerdefinierte Container-Image und übertragen Sie es per Push in Artifact Registry.
#!/bin/bash IMAGE=oracle-pyspark:0.0.1 PROJECT=example-project REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pyspark docker build -t "${IMAGE}" . # Tag and push to Artifact Registry gcloud config set project ${PROJECT} gcloud auth configure-docker us-central1-docker.pkg.dev docker tag "${IMAGE}" "${REPO_IMAGE}" docker push "${REPO_IMAGE}"
Da ein Image mehrere Namen haben kann, können Sie dem Image mit dem Docker-Tag einen Alias zuweisen.
Führen Sie den Connector in Dataproc Serverless aus. Führen Sie zum Senden eines PySpark-Batchjobs mit dem benutzerdefinierten Container-Image den Befehl
gcloud dataproc batches submit pyspark
-Befehlgcloud dataproc batches submit pyspark main.py --project=PROJECT \ --region=REGION --batch=BATCH_ID \ --container-image=CUSTOM_CONTAINER_IMAGE \ --service-account=SERVICE_ACCOUNT_NAME \ --jars=PATH_TO_JAR_FILES \ --properties=PYSPARK_PROPERTIES \ -- PIPELINE_ARGUMENTS
Wichtige Hinweise:
- Die JAR-Dateien sind Treiber für Spark. Wenn Sie Daten aus Oracle, MySQL oder PostgreSQL lesen möchten, müssen Sie Apache Spark ein bestimmtes Paket zur Verfügung stellen. Das Paket
können sich in Cloud Storage oder im Container befinden. Wenn sich die JAR-Datei im Container befindet, sieht der Pfad in etwa so aus:
file:///path/to/file/driver.jar
. In diesem Beispiel ist der Pfad zum Die JAR-Datei ist/opt/spark/jars/
. - PIPELINE_ARGUMENTS sind die Befehlszeilenargumente. für den Connector.
Der Connector extrahiert Metadaten aus der Oracle-Datenbank, generiert ein Metadatenimportdatei und speichert die Metadatenimportdatei in einem Cloud Storage-Bucket.
- Die JAR-Dateien sind Treiber für Spark. Wenn Sie Daten aus Oracle, MySQL oder PostgreSQL lesen möchten, müssen Sie Apache Spark ein bestimmtes Paket zur Verfügung stellen. Das Paket
können sich in Cloud Storage oder im Container befinden. Wenn sich die JAR-Datei im Container befindet, sieht der Pfad in etwa so aus:
Um die Metadaten aus der Metadaten-Importdatei manuell in Dataplex, führen Sie einen Metadatenjob aus. Verwenden Sie die Methode
metadataJobs.create
:Fügen Sie in der Befehlszeile Umgebungsvariablen hinzu und erstellen Sie einen Alias für den curl-Befehl.
PROJECT_ID=PROJECT LOCATION_ID=LOCATION DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
Rufen Sie die API-Methode auf und übergeben Sie die Datensatz- und Aspekttypen, die Sie importieren möchten.
gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF { "type": "IMPORT", "import_spec": { "source_storage_uri": "gs://BUCKET/FOLDER/", "entry_sync_mode": "FULL", "aspect_sync_mode": "INCREMENTAL", "scope": { "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"], "entry_types": [ "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"], "aspect_types": [ "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance", "projects/dataplex-types/locations/global/aspectTypes/schema", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"], }, }, } EOF )"
Der Aspekttyp
schema
ist ein globaler Aspekttyp, der von Dataplex definiert wird.Beachten Sie, dass das Format, das Sie für Namen der Aspekttypen beim Aufrufen der Die API-Methode unterscheidet sich vom Format, das Sie im Connector verwenden. Code.
Optional: Verwenden Sie Cloud Logging, um Logs für den Metadaten-Job aufzurufen. Für finden Sie unter Dataplex-Logs überwachen
Pipelineorchestrierung einrichten
In den vorherigen Abschnitten wurde gezeigt, wie Sie einen Beispiel-Connector erstellen und den manuell zu verbinden.
In einer Produktionsumgebung führen Sie den Connector als Teil einer verwalteten Konnektivitätspipeline mit einer Orchestrierungsplattform wie Workflows aus.
Um eine verwaltete Verbindungspipeline mit dem Beispiel-Connector auszuführen, Schritte zum Importieren von Metadaten mit Workflows. Gehen Sie so vor:
- Erstellen Sie den Workflow am selben Google Cloud-Standort wie den Connector.
Aktualisieren Sie in der Workflowdefinitionsdatei die
submit_pyspark_extract_job
. mit dem folgenden Code, um Daten aus der Oracle-Datenbank zu extrahieren mit dem von Ihnen erstellten Connector.- submit_pyspark_extract_job: call: http.post args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" headers: Content-Type: "application/json" query: batchId: ${WORKFLOW_ID} body: pysparkBatch: mainPythonFileUri: file:///main.py jars: file:///opt/spark/jars/ojdbc11.jar args: - ${"--host_port=" + args.ORACLE_HOST_PORT} - ${"--user=" + args.ORACLE_USER} - ${"--password=" + args.ORACLE_PASSWORD} - ${"--database=" + args.ORACE_DATABASE} - ${"--project=" + args.TARGET_PROJECT_ID} - ${"--location=" + args.CLOUD_REGION} - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID} - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID} - ${"--folder=" + WORKFLOW_ID} runtimeConfig: version: "2.0" containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark" environmentConfig: executionConfig: serviceAccount: ${args.SERVICE_ACCOUNT} result: RESPONSE_MESSAGE
Aktualisieren Sie in der Workflowdefinitionsdatei die Funktion
submit_import_job
. durch den folgenden Code, um die Einträge zu importieren. Die Funktion ruft diemetadataJobs.create
-API-Methode zum Ausführen eines Metadatenimportjobs.- submit_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" body: type: IMPORT import_spec: source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"} entry_sync_mode: FULL aspect_sync_mode: INCREMENTAL scope: entry_groups: - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID} entry_types: -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view" aspect_types: -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance" -"projects/dataplex-types/locations/global/aspectTypes/schema" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view" result: IMPORT_JOB_RESPONSE
Geben Sie dieselben Eintragstypen und Aspekttypen an, die Sie bei der Erstellung die API-Methode manuell aufgerufen. Beachten Sie, dass am Ende jedes Strings kein Komma steht.
Geben Sie beim Ausführen des Workflows die folgenden Laufzeitargumente an:
{ "CLOUD_REGION": "us-central1", "ORACLE_USER": "system", "ORACLE_HOST_PORT": "x.x.x.x:1521", "ORACLE_DATABASE": "xe", "ADDITIONAL_CONNECTOR_ARGS": [], }
Optional: Verwenden Sie Cloud Logging, um Logs für die Pipeline für die verwaltete Konnektivität aufzurufen. Die Protokollnutzlast enthält einen Link zu den Protokollen für den Dataproc Serverless-Batchjob und den Metadatenimportjob, sofern zutreffend. Weitere Informationen finden Sie unter Workflow-Logs ansehen.
Optional: Zur Verbesserung der Sicherheit, Leistung und Funktionalität Ihres verwaltete Verbindungspipeline haben, sollten Sie Folgendes tun:
- Secret Manager verwenden zum Speichern der Anmeldedaten für die Datenquelle eines Drittanbieters.
- Mit PySpark die JSON-Zeilenausgabe in mehrere Metadatenimporte schreiben .
- Verwenden Sie ein Präfix, um große Dateien (mehr als 100 MB) in kleinere Dateien aufzuteilen.
- Fügen Sie weitere benutzerdefinierte Aspekte hinzu, mit denen zusätzliche geschäftliche und technische Metadaten aus Ihrer Quelle erfasst werden.
Dataplex Catalog-Beispielressourcen für eine Oracle-Quelle
Der Beispiel-Connector extrahiert Metadaten aus einer Oracle-Datenbank und ordnet sie den entsprechenden Dataplex Catalog-Ressourcen zu.
Überlegungen zur Hierarchie
Jedes System in Dataplex hat einen Stammeintrag, der der übergeordnete Eintrag für das System ist. Normalerweise hat der Stammeintrag den Eintragstyp instance
.
Die folgende Tabelle zeigt die Beispielhierarchie der Eintrags- und Aspekttypen für ein Oracle-System.
Eintragstyp-ID | Beschreibung | ID des verknüpften Aspekttyps |
---|---|---|
oracle-instance |
Der Stamm des importierten Systems. | oracle-instance |
oracle-database |
Die Oracle-Datenbank. | oracle-database |
oracle-schema |
Das Datenbankschema. | oracle-schema |
oracle-table |
Eine Tabelle. |
|
oracle-view |
Eine Ansicht. |
|
Der Aspekttyp schema
ist ein globaler Aspekttyp, der von Dataplex definiert wird. Es enthält eine Beschreibung der Felder in einer Tabelle,
oder eine andere Entität mit Spalten. Der benutzerdefinierte Aspekttyp oracle-schema
enthält den Namen des Oracle-Datenbankschemas.
Beispiele für Importelementfelder
Der Connector sollte die folgenden Konventionen für Oracle-Ressourcen verwenden.
-
Voll qualifizierte Namen: voll qualifizierte Namen für Oracle Ressourcen verwenden die folgende Namensvorlage. Unzulässige Zeichen werden mit Backticks maskiert.
Ressource Vorlage Beispiel Instanz SOURCE
:ADDRESS
Verwenden Sie die Host- und Portnummer oder den Domainnamen des Systems.
oracle:`localhost:1521`
oderoracle:`myinstance.com`
Datenbank SOURCE
:ADDRESS
.DATABASE
oracle:`localhost:1521`.xe
Schema SOURCE
:ADDRESS
.DATABASE
.SCHEMA
oracle:`localhost:1521`.xe.sys
Tabelle SOURCE
:ADDRESS
.DATABASE
.SCHEMA
.TABLE_NAME
oracle:`localhost:1521`.xe.sys.orders
Ansehen SOURCE
:ADDRESS
.DATABASE
.SCHEMA
.VIEW_NAME
oracle:`localhost:1521`.xe.sys.orders_view
-
Eintragsnamen oder -IDs: Für Einträge für Oracle-Ressourcen wird die folgende Benennungsvorlage verwendet. Unzulässige Zeichen werden durch ein zulässige Zeichen. Ressourcen verwenden das Präfix
projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries
.Ressource Vorlage Beispiel Instanz PREFIX
/HOST_PORT
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
Datenbank PREFIX
/HOST_PORT
/databases/DATABASE
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
Schema PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
Tabelle PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
/tables/TABLE
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
Ansehen PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
/views/VIEW
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
-
Übergeordnete Einträge: Wenn ein Eintrag kein Stammeintrag für das System ist, kann er ein Feld für übergeordnete Einträge haben, das seine Position in der Hierarchie beschreibt. Das Feld sollte den Namen des übergeordneten Eintrags enthalten. Mi. empfehlen, diesen Wert zu generieren.
Die folgende Tabelle zeigt die übergeordneten Einträge für Oracle-Ressourcen.
Eintrag Übergeordneter Eintrag Instanz ""
(leerer String)Datenbank Instanzname Schema Datenbankname Tabelle Schemaname Ansehen Schemaname Aspektkarte: Die Aspektkarte muss mindestens einen Aspekt enthalten, der die zu importierende Entität beschreibt. Hier ist eine Beispiel-Aspektkarte für eine Oracle-Tabelle.
"example-project.us-central1.oracle-table": { "aspect_type": "example-project.us-central1.oracle-table", "path": "", "data": {} },
Es gibt vordefinierte Aspekttypen (z. B.
schema
), die die Tabellen- oder Ansichtsstruktur imdataplex-types
Projekt an dem Standortglobal
.-
Aspektschlüssel: Bei Aspektschlüsseln wird das Namensformat verwendet. PROJECT.LOCATION.ASPECT_TYPE. Die folgende Tabelle enthält Beispiele für Aspektschlüssel für Oracle-Ressourcen.
Eintrag Beispiel für einen Aspektschlüssel Instanz example-project.us-central1.oracle-instance
Datenbank example-project.us-central1.oracle-database
Schema example-project.us-central1.oracle-schema
Tabelle example-project.us-central1.oracle-table
Ansehen example-project.us-central1.oracle-view