Développer un connecteur personnalisé pour l'importation de métadonnées

Ce document fournit un modèle de référence pour créer un connecteur personnalisé qui extrait des métadonnées à partir d'une source tierce. Vous utilisez le connecteur lorsque vous exécutez un pipeline de connectivité géré qui importe des métadonnées dans Dataplex.

Vous pouvez créer des connecteurs pour extraire des métadonnées à partir de sources tierces. Par exemple, vous pouvez créer un connecteur pour extraire des données à partir de sources telles que MySQL, SQL Server, Oracle, Snowflake, Databricks, etc.

Utilisez l'exemple de connecteur dans ce document comme point de départ pour créer vos propres connecteurs. L'exemple de connecteur se connecte à une base de données Oracle Database Express Edition (XE). Le connecteur est créé en Python, mais vous pouvez également utiliser Java, Scala ou R.

Fonctionnement des connecteurs

Un connecteur extrait les métadonnées d'une source de données tierce, les transforme au format ImportItem de Dataplex et génère des fichiers d'importation de métadonnées pouvant être importés par Dataplex.

Le connecteur fait partie d'un pipeline de connectivité géré. Un pipeline de connectivité géré est un workflow orchestré que vous utilisez pour importer les métadonnées du catalogue Dataplex. Le pipeline de connectivité géré exécute le connecteur et effectue d'autres tâches dans le workflow d'importation, telles que l'exécution d'un job d'importation de métadonnées et la capture de journaux.

Le pipeline de connectivité géré exécute le connecteur à l'aide d'un job par lot Dataproc sans serveur. Dataproc sans serveur fournit un environnement d'exécution Spark sans serveur. Bien que vous puissiez créer un connecteur qui n'utilise pas Spark, nous vous recommandons de l'utiliser, car il peut améliorer les performances de votre connecteur.

Exigences concernant les connecteurs

Le connecteur présente les exigences suivantes:

  • Le connecteur doit être une image Artifact Registry pouvant être exécutée sur Dataproc Serverless.
  • Le connecteur doit générer des fichiers de métadonnées dans un format pouvant être importé par une tâche d'importation de métadonnées Dataplex (méthode de l'API metadataJobs.create). Pour en savoir plus, consultez la section Fichier d'importation de métadonnées.
  • Le connecteur doit accepter les arguments de ligne de commande suivants pour recevoir des informations du pipeline:

    Argument de ligne de commande Valeur fournie par le pipeline
    target_project_id PROJECT_ID
    target_location_id REGION
    target_entry_group_id ENTRY_GROUP_ID
    output_bucket CLOUD_STORAGE_BUCKET_ID
    output_folder FOLDER_ID

    Le connecteur utilise ces arguments pour générer des métadonnées dans un groupe d'entrées cible projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID et pour écrire dans un bucket Cloud Storage gs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID. Chaque exécution du pipeline crée un dossier FOLDER_ID dans le bucket CLOUD_STORAGE_BUCKET_ID. Le connecteur doit écrire les fichiers d'importation de métadonnées dans ce dossier.

Les modèles de pipeline sont compatibles avec les connecteurs PySpark. Les modèles supposent que le pilote (mainPythonFileUri) est un fichier local sur l'image du connecteur nommé main.py. Vous pouvez modifier les modèles de pipeline pour d'autres scénarios, tels qu'un connecteur Spark, un autre URI de pilote ou d'autres options.

Voici comment utiliser PySpark pour créer un élément d'importation dans le fichier d'importation de métadonnées.

"""PySpark schemas for the data."""
entry_source_schema = StructType([
      StructField("display_name", StringType()),
      StructField("source", StringType())])

aspect_schema = MapType(StringType(),
                        StructType([
                            StructField("aspect_type", StringType()),
                            StructField("data", StructType([
                            ]))
                          ])
                        )

entry_schema = StructType([
  StructField("name", StringType()),
  StructField("entry_type", StringType()),
  StructField("fully_qualified_name", StringType()),
  StructField("parent_entry", StringType()),
  StructField("entry_source", entry_source_schema),
  StructField("aspects", aspect_schema)
])

import_item_schema = StructType([
  StructField("entry", entry_schema),
  StructField("aspect_keys", ArrayType(StringType())),
  StructField("update_mask", ArrayType(StringType()))
])

Avant de commencer

Dans ce guide, nous partons du principe que vous connaissez bien Python et PySpark.

Consultez les informations suivantes:

Procédez comme suit : Créez toutes les ressources au même Google Cloud emplacement.

  1. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the Dataplex, Dataproc, Workflows, and Artifact Registry APIs:

    gcloud services enable dataplex.googleapis.com dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com
  4. Install the Google Cloud CLI.
  5. To initialize the gcloud CLI, run the following command:

    gcloud init
  6. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  7. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant the roles/owner IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
  8. Créez un bucket Cloud Storage pour stocker les fichiers d'importation des métadonnées.

  9. Créez les ressources du catalogue Dataplex suivantes dans le même projet.

    Pour obtenir des exemples de valeurs, consultez la section Exemples de ressources Dataplex Catalog pour une source Oracle de ce document.

    1. Créez un groupe d'entrées.
    2. Créez des types d'aspects personnalisés pour les entrées que vous souhaitez importer. Utilisez la convention d'attribution de noms SOURCE-ENTITY_TO_IMPORT.

      Vous pouvez également créer d'autres types d'aspects pour stocker d'autres informations.

    3. Créez des types d'entrées personnalisés pour les ressources que vous souhaitez importer, puis attribuez-leur les types d'aspects appropriés. Utilisez la convention d'attribution de noms SOURCE-ENTITY_TO_IMPORT.

      Par exemple, pour une base de données Oracle, créez un type d'entrée nommé oracle-database. Associez-le au type d'aspect nommé oracle-database.

  10. Assurez-vous que votre source tierce est accessible depuis votre projet Google Cloud . Pour en savoir plus, consultez la section Configuration du réseau Dataproc sans serveur pour Spark.

Créer un connecteur Python de base

L'exemple de connecteur Python de base crée des entrées de niveau supérieur pour une source de données Oracle à l'aide des classes de la bibliothèque cliente Dataplex. Vous fournissez ensuite les valeurs pour les champs de saisie.

Le connecteur crée un fichier d'importation de métadonnées avec les entrées suivantes:

  • Entrée instance, avec le type d'entrée projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance. Cette entrée représente un système Oracle Database XE.
  • Entrée database, qui représente une base de données dans le système Oracle Database XE.

Pour créer un connecteur Python de base, procédez comme suit:

  1. Clonez le dépôt cloud-dataplex.

  2. Configurez un environnement local. Nous vous recommandons d'utiliser un environnement virtuel.

    mkdir venv
    python -m venv venv/
    source venv/bin/activate
    

    Utilisez les versions actives ou de maintenance de Python. Les versions 3.7 et ultérieures de Python sont compatibles.

  3. Créez un projet Python.

  4. Configuration requise:

    pip install -r requirements.txt
    

    Les exigences suivantes sont installées:

    google-cloud-dataplex==2.2.2
    google-cloud-storage
    google-cloud-secret-manager
    
  5. Ajoutez un fichier de pipeline main.py à la racine du projet.

    from src import bootstrap
    
    
    if __name__ == '__main__':
        bootstrap.run()
    

    Lorsque vous déployez votre code sur Dataproc sans serveur, le fichier main.py sert de point d'entrée pour l'exécution. Nous vous recommandons de réduire au maximum la quantité d'informations stockées dans le fichier main.py. Utilisez ce fichier pour appeler les fonctions et les classes définies dans votre connecteur, comme la classe src/bootstap.py.

  6. Créez un dossier src pour stocker la majeure partie de la logique de votre connecteur.

  7. Mettez à jour le fichier src/cmd_reader.py avec une classe Python pour accepter les arguments de ligne de commande. Pour ce faire, vous pouvez utiliser le module argeparse.

    """Command line reader."""
    import argparse
    
    
    def read_args():
        """Reads arguments from the command line."""
        parser = argparse.ArgumentParser()
    
        # Dataplex arguments
        parser.add_argument("--target_project_id", type=str, required=True,
            help="The name of the target Google Cloud project to import the metadata into.")
        parser.add_argument("--target_location_id", type=str, required=True,
            help="The target Google Cloud location where the metadata will be imported into.")
        parser.add_argument("--target_entry_group_id", type=str, required=True,
            help="The ID of the entry group to import metadata into. "
                 "The metadata will be imported into entry group with the following"
                 "full resource name: projects/${target_project_id}/"
                 "locations/${target_location_id}/entryGroups/${target_entry_group_id}.")
    
        # Oracle arguments
        parser.add_argument("--host_port", type=str, required=True,
            help="Oracle host and port number separated by the colon (:).")
        parser.add_argument("--user", type=str, required=True, help="Oracle User.")
        parser.add_argument("--password-secret", type=str, required=True,
            help="Secret resource name in the Secret Manager for the Oracle password.")
        parser.add_argument("--database", type=str, required=True,
            help="Source Oracle database.")
    
        # Google Cloud Storage arguments
        # It is assumed that the bucket is in the same region as the entry group
        parser.add_argument("--output_bucket", type=str, required=True,
            help="The Cloud Storage bucket to write the generated metadata import file.")
        parser.add_argument("--output_folder", type=str, required=True,
            help="A folder in the Cloud Storage bucket, to write the generated metadata import files.")
    
        return vars(parser.parse_known_args()[0])
    

    Dans les environnements de production, nous vous recommandons de stocker le mot de passe dans Secret Manager.

  8. Mettez à jour le fichier src/constants.py avec du code permettant de créer des constantes.

    """Constants that are used in the different files."""
    import enum
    
    SOURCE_TYPE = "oracle"
    
    # Symbols for replacement
    FORBIDDEN = "#"
    ALLOWED = "!"
    
    
    class EntryType(enum.Enum):
        """Types of Oracle entries."""
        INSTANCE: str = "projects/{project}/locations/{location}/entryTypes/oracle-instance"
        DATABASE: str = "projects/{project}/locations/{location}/entryTypes/oracle-database"
        DB_SCHEMA: str = "projects/{project}/locations/{location}/entryTypes/oracle-schema"
        TABLE: str = "projects/{project}/locations/{location}/entryTypes/oracle-table"
        VIEW: str = "projects/{project}/locations/{location}/entryTypes/oracle-view"
    
  9. Mettez à jour le fichier src/name_builder.py avec des méthodes permettant de créer les ressources du catalogue Dataplex que vous souhaitez que le connecteur crée pour vos ressources Oracle. Utilisez les conventions décrites dans la section Exemples de ressources Dataplex Catalog pour une source Oracle de ce document.

    """Builds Dataplex hierarchy identifiers."""
    from typing import Dict
    from src.constants import EntryType, SOURCE_TYPE
    
    
    # Oracle cluster users start with C## prefix, but Dataplex doesn't accept #.
    # In that case in names it is changed to C!!, and escaped with backticks in FQNs
    FORBIDDEN_SYMBOL = "#"
    ALLOWED_SYMBOL = "!"
    
    
    def create_fqn(config: Dict[str, str], entry_type: EntryType,
                   schema_name: str = "", table_name: str = ""):
        """Creates a fully qualified name or Dataplex v1 hierarchy name."""
        if FORBIDDEN_SYMBOL in schema_name:
            schema_name = f"`{schema_name}`"
    
        if entry_type == EntryType.INSTANCE:
            # Requires backticks to escape column
            return f"{SOURCE_TYPE}:`{config['host_port']}`"
        if entry_type == EntryType.DATABASE:
            instance = create_fqn(config, EntryType.INSTANCE)
            return f"{instance}.{config['database']}"
        if entry_type == EntryType.DB_SCHEMA:
            database = create_fqn(config, EntryType.DATABASE)
            return f"{database}.{schema_name}"
        if entry_type in [EntryType.TABLE, EntryType.VIEW]:
            database = create_fqn(config, EntryType.DATABASE)
            return f"{database}.{schema_name}.{table_name}"
        return ""
    
    
    def create_name(config: Dict[str, str], entry_type: EntryType,
                    schema_name: str = "", table_name: str = ""):
        """Creates a Dataplex v2 hierarchy name."""
        if FORBIDDEN_SYMBOL in schema_name:
            schema_name = schema_name.replace(FORBIDDEN_SYMBOL, ALLOWED_SYMBOL)
        if entry_type == EntryType.INSTANCE:
            name_prefix = (
                f"projects/{config['target_project_id']}/"
                f"locations/{config['target_location_id']}/"
                f"entryGroups/{config['target_entry_group_id']}/"
                f"entries/"
            )
            return name_prefix + config["host_port"].replace(":", "@")
        if entry_type == EntryType.DATABASE:
            instance = create_name(config, EntryType.INSTANCE)
            return f"{instance}/databases/{config['database']}"
        if entry_type == EntryType.DB_SCHEMA:
            database = create_name(config, EntryType.DATABASE)
            return f"{database}/database_schemas/{schema_name}"
        if entry_type == EntryType.TABLE:
            db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
            return f"{db_schema}/tables/{table_name}"
        if entry_type == EntryType.VIEW:
            db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
            return f"{db_schema}/views/{table_name}"
        return ""
    
    
    def create_parent_name(config: Dict[str, str], entry_type: EntryType,
                           parent_name: str = ""):
        """Generates a Dataplex v2 name of the parent."""
        if entry_type == EntryType.DATABASE:
            return create_name(config, EntryType.INSTANCE)
        if entry_type == EntryType.DB_SCHEMA:
            return create_name(config, EntryType.DATABASE)
        if entry_type == EntryType.TABLE:
            return create_name(config, EntryType.DB_SCHEMA, parent_name)
        return ""
    
    
    def create_entry_aspect_name(config: Dict[str, str], entry_type: EntryType):
        """Generates an entry aspect name."""
        last_segment = entry_type.value.split("/")[-1]
        return f"{config['target_project_id']}.{config['target_location_id']}.{last_segment}"
    

    Étant donné que le fichier name_builder.py est utilisé à la fois pour le code principal Python et le code principal PySpark, nous vous recommandons d'écrire les méthodes en tant que fonctions pures, plutôt qu'en tant que membres d'une classe.

  10. Mettez à jour le fichier src/top_entry_builder.py avec du code pour remplir les entrées de niveau supérieur avec des données.

    """Non-Spark approach for building the entries."""
    import dataclasses
    import json
    from typing import List, Dict
    
    import proto
    from google.cloud import dataplex_v1
    
    from src.constants import EntryType
    from src import name_builder as nb
    
    
    @dataclasses.dataclass(slots=True)
    class ImportItem:
        """A template class for Import API."""
    
        entry: dataplex_v1.Entry = dataclasses.field(default_factory=dataplex_v1.Entry)
        aspect_keys: List[str] = dataclasses.field(default_factory=list)
        update_mask: List[str] = dataclasses.field(default_factory=list)
    
    
    def _dict_factory(data: object):
        """Factory function required for converting Entry dataclass to dict."""
    
        def convert(obj: object):
            if isinstance(obj, proto.Message):
                return proto.Message.to_dict(obj)
            return obj
    
        return dict((k, convert(v)) for k, v in data)
    
    
    def _create_entry(config: Dict[str, str], entry_type: EntryType):
        """Creates an entry based on a Dataplex library."""
        entry = dataplex_v1.Entry()
        entry.name = nb.create_name(config, entry_type)
        entry.entry_type = entry_type.value.format(
            project=config["target_project_id"], location=config["target_location_id"]
        )
        entry.fully_qualified_name = nb.create_fqn(config, entry_type)
        entry.parent_entry = nb.create_parent_name(config, entry_type)
    
        aspect_key = nb.create_entry_aspect_name(config, entry_type)
    
        # Add mandatory aspect
        entry_aspect = dataplex_v1.Aspect()
        entry_aspect.aspect_type = aspect_key
        entry_aspect.data = {}
        entry.aspects[aspect_key] = entry_aspect
    
        return entry
    
    
    def _entry_to_import_item(entry: dataplex_v1.Entry):
        """Packs entry to import item, accepted by the API,"""
        import_item = ImportItem()
        import_item.entry = entry
        import_item.aspect_keys = list(entry.aspects.keys())
        import_item.update_mask = "aspects"
    
        return import_item
    
    
    def create(config, entry_type: EntryType):
        """Creates an entry, packs it to Import Item and converts to json."""
        import_item = _entry_to_import_item(_create_entry(config, entry_type))
        return json.dumps(dataclasses.asdict(import_item, dict_factory=_dict_factory))
    
  11. Modifiez le fichier src/bootstrap.py avec du code pour générer le fichier d'importation des métadonnées et exécuter le connecteur.

    """The entrypoint of a pipeline."""
    from typing import Dict
    
    from src.constants import EntryType
    from src import cmd_reader
    from src import secret_manager
    from src import entry_builder
    from src import gcs_uploader
    from src import top_entry_builder
    from src.oracle_connector import OracleConnector
    
    
    FILENAME = "output.jsonl"
    
    
    def write_jsonl(output_file, json_strings):
        """Writes a list of string to the file in JSONL format."""
    
        # For simplicity, dataset is written into the one file. But it is not
        # mandatory, and the order doesn't matter for Import API.
        # The PySpark itself could dump entries into many smaller JSONL files.
        # Due to performance, it's recommended to dump to many smaller files.
        for string in json_strings:
            output_file.write(string + "\n")
    
    
    def process_dataset(
        connector: OracleConnector,
        config: Dict[str, str],
        schema_name: str,
        entry_type: EntryType,
    ):
        """Builds dataset and converts it to jsonl."""
        df_raw = connector.get_dataset(schema_name, entry_type)
        df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
        return df.toJSON().collect()
    
    
    def run():
        """Runs a pipeline."""
        config = cmd_reader.read_args()
        config["password"] = secret_manager.get_password(config["password_secret"])
        connector = OracleConnector(config)
    
        with open(FILENAME, "w", encoding="utf-8") as file:
            # Write top entries that don't require connection to the database
            file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
            file.writelines("\n")
            file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
    
            # Get schemas, write them and collect to the list
            df_raw_schemas = connector.get_db_schemas()
            schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
            schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
    
            write_jsonl(file, schemas_json)
    
            # Ingest tables and views for every schema in a list
            for schema in schemas:
                print(f"Processing tables for {schema}")
                tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                write_jsonl(file, tables_json)
                print(f"Processing views for {schema}")
                views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                write_jsonl(file, views_json)
    
        gcs_uploader.upload(config, FILENAME)
    
  12. Exécutez le code en local.

    Un fichier d'importation de métadonnées nommé output.jsonl est renvoyé. Le fichier comporte deux lignes, chacune représentant un élément d'importation. Le pipeline de connectivité géré lit ce fichier lors de l'exécution de la tâche d'importation de métadonnées.

  13. Facultatif: étendez l'exemple précédent pour utiliser les classes de la bibliothèque cliente Dataplex afin de créer des éléments d'importation pour les tables, les schémas et les vues. Vous pouvez également exécuter l'exemple Python sur Dataproc sans serveur.

    Nous vous recommandons de créer un connecteur qui utilise Spark (et s'exécute sur Dataproc sans serveur), car il peut améliorer les performances de votre connecteur.

Créer un connecteur PySpark

Cet exemple est basé sur l'API DataFrame PySpark. Vous pouvez installer PySpark SQL et l'exécuter localement avant de l'exécuter sur Dataproc sans serveur. Si vous installez et exécutez PySpark localement, installez la bibliothèque PySpark à l'aide de pip, mais vous n'avez pas besoin d'installer de cluster Spark local.

Pour des raisons de performances, cet exemple n'utilise pas de classes prédéfinies de la bibliothèque PySpark. À la place, l'exemple crée des DataFrames, les convertit en entrées JSON, puis écrit la sortie dans un fichier d'importation de métadonnées au format JSON Lines qui peut être importé dans Dataplex.

Pour créer un connecteur à l'aide de PySpark, procédez comme suit:

  1. Clonez le dépôt cloud-dataplex.

  2. Installez PySpark:

    pip install pyspark
    
  3. Configuration requise:

    pip install -r requirements.txt
    

    Les exigences suivantes sont installées:

    google-cloud-dataplex==2.2.2
    google-cloud-storage
    google-cloud-secret-manager
    
  4. Mettez à jour le fichier oracle_connector.py avec du code permettant de lire les données d'une source de données Oracle et de renvoyer des DataFrames.

    """Reads Oracle using PySpark."""
    from typing import Dict
    from pyspark.sql import SparkSession, DataFrame
    
    from src.constants import EntryType
    
    
    SPARK_JAR_PATH = "/opt/spark/jars/ojdbc11.jar"
    
    
    class OracleConnector:
        """Reads data from Oracle and returns Spark Dataframes."""
    
        def __init__(self, config: Dict[str, str]):
            # PySpark entrypoint
            self._spark = SparkSession.builder.appName("OracleIngestor") \
                .config("spark.jars", SPARK_JAR_PATH) \
                .getOrCreate()
    
            self._config = config
            self._url = f"jdbc:oracle:thin:@{config['host_port']}:{config['database']}"
    
        def _execute(self, query: str) -> DataFrame:
            """A generic method to execute any query."""
            return self._spark.read.format("jdbc") \
                .option("driver", "oracle.jdbc.OracleDriver") \
                .option("url", self._url) \
                .option("query", query) \
                .option("user", self._config["user"]) \
                .option("password", self._config["password"]) \
                .load()
    
        def get_db_schemas(self) -> DataFrame:
            """In Oracle, schemas are usernames."""
            query = "SELECT username FROM dba_users"
            return self._execute(query)
    
        def _get_columns(self, schema_name: str, object_type: str) -> str:
            """Gets a list of columns in tables or views in a batch."""
            # Every line here is a column that belongs to the table or to the view.
            # This SQL gets data from ALL the tables in a given schema.
            return (f"SELECT col.TABLE_NAME, col.COLUMN_NAME, "
                    f"col.DATA_TYPE, col.NULLABLE "
                    f"FROM all_tab_columns col "
                    f"INNER JOIN DBA_OBJECTS tab "
                    f"ON tab.OBJECT_NAME = col.TABLE_NAME "
                    f"WHERE tab.OWNER = '{schema_name}' "
                    f"AND tab.OBJECT_TYPE = '{object_type}'")
    
        def get_dataset(self, schema_name: str, entry_type: EntryType):
            """Gets data for a table or a view."""
            # Dataset means that these entities can contain end user data.
            short_type = entry_type.name  # table or view, or the title of enum value
            query = self._get_columns(schema_name, short_type)
            return self._execute(query)
    

    Ajoutez des requêtes SQL pour afficher les métadonnées que vous souhaitez importer. Les requêtes doivent renvoyer les informations suivantes:

    • Schémas de base de données
    • Tables appartenant à ces schémas
    • Colonnes appartenant à ces tables, y compris le nom de la colonne, le type de données de la colonne et si la colonne est nullable ou obligatoire

    Toutes les colonnes de toutes les tables et vues sont stockées dans la même table système. Vous pouvez sélectionner des colonnes à l'aide de la méthode _get_columns. En fonction des paramètres que vous fournissez, vous pouvez sélectionner des colonnes pour les tables ou pour les vues séparément.

    Veuillez noter les points suivants :

    • Dans Oracle, un schéma de base de données appartient à un utilisateur de base de données et porte le même nom que cet utilisateur.
    • Les objets de schéma sont des structures logiques créées par les utilisateurs. Les objets tels que les tables ou les index peuvent contenir des données, tandis que les objets tels que les vues ou les synonymes ne consistent qu'en une définition.
    • Le fichier ojdbc11.jar contient le pilote JDBC Oracle.
  5. Mettez à jour le fichier src/entry_builder.py avec des méthodes partagées pour appliquer des transformations Spark.

    """Creates entries with PySpark."""
    import pyspark.sql.functions as F
    from pyspark.sql.types import StringType
    
    from src.constants import EntryType, SOURCE_TYPE
    from src import name_builder as nb
    
    
    @F.udf(returnType=StringType())
    def choose_metadata_type_udf(data_type: str):
        """Choose the metadata type based on Oracle native type."""
        if data_type.startswith("NUMBER") or data_type in ["FLOAT", "LONG"]:
            return "NUMBER"
        if data_type.startswith("VARCHAR") or data_type.startswith("NVARCHAR2"):
            return "STRING"
        if data_type == "DATE":
            return "DATETIME"
        return "OTHER"
    
    
    def create_entry_source(column):
        """Create Entry Source segment."""
        return F.named_struct(F.lit("display_name"),
                              column,
                              F.lit("system"),
                              F.lit(SOURCE_TYPE))
    
    
    def create_entry_aspect(entry_aspect_name):
        """Create aspect with general information (usually it is empty)."""
        return F.create_map(
            F.lit(entry_aspect_name),
            F.named_struct(
                F.lit("aspect_type"),
                F.lit(entry_aspect_name),
                F.lit("data"),
                F.create_map()
                )
            )
    
    
    def convert_to_import_items(df, aspect_keys):
        """Convert entries to import items."""
        entry_columns = ["name", "fully_qualified_name", "parent_entry",
                         "entry_source", "aspects", "entry_type"]
    
        # Puts entry to "entry" key, a list of keys from aspects in "aspects_keys"
        # and "aspects" string in "update_mask"
        return df.withColumn("entry", F.struct(entry_columns)) \
          .withColumn("aspect_keys", F.array([F.lit(key) for key in aspect_keys])) \
          .withColumn("update_mask", F.array(F.lit("aspects"))) \
          .drop(*entry_columns)
    
    
    def build_schemas(config, df_raw_schemas):
        """Create a dataframe with database schemas from the list of usernames.
        Args:
            df_raw_schemas - a dataframe with only one column called USERNAME
        Returns:
            A dataframe with Dataplex-readable schemas.
        """
        entry_type = EntryType.DB_SCHEMA
        entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
    
        # For schema, parent name is the name of the database
        parent_name =  nb.create_parent_name(config, entry_type)
    
        # Create user-defined function.
        create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, x),
                                StringType())
        create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, x),
                               StringType())
    
        # Fills the missed project and location into the entry type string
        full_entry_type = entry_type.value.format(
            project=config["target_project_id"],
            location=config["target_location_id"])
    
        # Converts a list of schema names to the Dataplex-compatible form
        column = F.col("USERNAME")
        df = df_raw_schemas.withColumn("name", create_name_udf(column)) \
          .withColumn("fully_qualified_name", create_fqn_udf(column)) \
          .withColumn("parent_entry", F.lit(parent_name)) \
          .withColumn("entry_type", F.lit(full_entry_type)) \
          .withColumn("entry_source", create_entry_source(column)) \
          .withColumn("aspects", create_entry_aspect(entry_aspect_name)) \
        .drop(column)
    
        df = convert_to_import_items(df, [entry_aspect_name])
        return df
    
    
    def build_dataset(config, df_raw, db_schema, entry_type):
        """Build table entries from a flat list of columns.
        Args:
            df_raw - a plain dataframe with TABLE_NAME, COLUMN_NAME, DATA_TYPE,
                     and NULLABLE columns
            db_schema - parent database schema
            entry_type - entry type: table or view
        Returns:
            A dataframe with Dataplex-readable data of tables of views.
        """
        schema_key = "dataplex-types.global.schema"
    
        # The transformation below does the following
        # 1. Alters NULLABLE content from Y/N to NULLABLE/REQUIRED
        # 2. Renames NULLABLE to mode
        # 3. Renames DATA_TYPE to dataType
        # 4. Creates metadataType column based on dataType column
        # 5. Renames COLUMN_NAME to name
        df = df_raw \
          .withColumn("mode", F.when(F.col("NULLABLE") == 'Y', "NULLABLE").otherwise("REQUIRED")) \
          .drop("NULLABLE") \
          .withColumnRenamed("DATA_TYPE", "dataType") \
          .withColumn("metadataType", choose_metadata_type_udf("dataType")) \
          .withColumnRenamed("COLUMN_NAME", "name")
    
        # The transformation below aggregate fields, denormalizing the table
        # TABLE_NAME becomes top-level filed, and the rest is put into
        # the array type called "fields"
        aspect_columns = ["name", "mode", "dataType", "metadataType"]
        df = df.withColumn("columns", F.struct(aspect_columns))\
          .groupby('TABLE_NAME') \
          .agg(F.collect_list("columns").alias("fields"))
    
        # Create nested structured called aspects.
        # Fields are becoming a part of a `schema` struct
        # There is also an entry_aspect that is repeats entry_type as aspect_type
        entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
        df = df.withColumn("schema",
                           F.create_map(F.lit(schema_key),
                                        F.named_struct(
                                            F.lit("aspect_type"),
                                            F.lit(schema_key),
                                            F.lit("data"),
                                            F.create_map(F.lit("fields"),
                                                         F.col("fields")))
                                        )
                           )\
          .withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \
        .drop("fields")
    
        # Merge separate aspect columns into the one map called 'aspects'
        df = df.select(F.col("TABLE_NAME"),
                       F.map_concat("schema", "entry_aspect").alias("aspects"))
    
        # Define user-defined functions to fill the general information
        # and hierarchy names
        create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type,
                                                         db_schema, x),
                                StringType())
    
        create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type,
                                                       db_schema, x), StringType())
    
        parent_name = nb.create_parent_name(entry_type, db_schema)
        full_entry_type = entry_type.value.format(
            project=config["target_project_id"],
            location=config["target_location_id"])
    
        # Fill the top-level fields
        column = F.col("TABLE_NAME")
        df = df.withColumn("name", create_name_udf(column)) \
          .withColumn("fully_qualified_name", create_fqn_udf(column)) \
          .withColumn("entry_type", F.lit(full_entry_type)) \
          .withColumn("parent_entry", F.lit(parent_name)) \
          .withColumn("entry_source", create_entry_source(column)) \
        .drop(column)
    
        df = convert_to_import_items(df, [schema_key, entry_aspect_name])
        return df
    

    Veuillez noter les points suivants :

    • Les méthodes génèrent les ressources du catalogue Dataplex que le connecteur crée pour vos ressources Oracle. Utilisez les conventions décrites dans la section Exemples de ressources Dataplex Catalog pour une source Oracle de ce document.
    • La méthode convert_to_import_items s'applique aux schémas, aux tables et aux vues. Assurez-vous que la sortie du connecteur est un ou plusieurs éléments d'importation pouvant être traités par la méthode metadataJobs.create, et non des entrées individuelles.
    • Même dans une vue, la colonne est appelée TABLE_NAME.
  6. Modifiez le fichier bootstrap.py avec du code pour générer le fichier d'importation des métadonnées et exécuter le connecteur.

    """The entrypoint of a pipeline."""
    from typing import Dict
    
    from src.constants import EntryType
    from src import cmd_reader
    from src import secret_manager
    from src import entry_builder
    from src import gcs_uploader
    from src import top_entry_builder
    from src.oracle_connector import OracleConnector
    
    
    FILENAME = "output.jsonl"
    
    
    def write_jsonl(output_file, json_strings):
        """Writes a list of string to the file in JSONL format."""
    
        # For simplicity, dataset is written into the one file. But it is not
        # mandatory, and the order doesn't matter for Import API.
        # The PySpark itself could dump entries into many smaller JSONL files.
        # Due to performance, it's recommended to dump to many smaller files.
        for string in json_strings:
            output_file.write(string + "\n")
    
    
    def process_dataset(
        connector: OracleConnector,
        config: Dict[str, str],
        schema_name: str,
        entry_type: EntryType,
    ):
        """Builds dataset and converts it to jsonl."""
        df_raw = connector.get_dataset(schema_name, entry_type)
        df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
        return df.toJSON().collect()
    
    
    def run():
        """Runs a pipeline."""
        config = cmd_reader.read_args()
        config["password"] = secret_manager.get_password(config["password_secret"])
        connector = OracleConnector(config)
    
        with open(FILENAME, "w", encoding="utf-8") as file:
            # Write top entries that don't require connection to the database
            file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
            file.writelines("\n")
            file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
    
            # Get schemas, write them and collect to the list
            df_raw_schemas = connector.get_db_schemas()
            schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
            schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
    
            write_jsonl(file, schemas_json)
    
            # Ingest tables and views for every schema in a list
            for schema in schemas:
                print(f"Processing tables for {schema}")
                tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                write_jsonl(file, tables_json)
                print(f"Processing views for {schema}")
                views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                write_jsonl(file, views_json)
    
        gcs_uploader.upload(config, FILENAME)
    

    Cet exemple enregistre le fichier d'importation des métadonnées en tant que fichier JSON Lines unique. Vous pouvez utiliser des outils PySpark tels que la classe DataFrameWriter pour générer des lots de fichiers JSON en parallèle.

    Le connecteur peut écrire des entrées dans le fichier d'importation des métadonnées dans n'importe quel ordre.

  7. Mettez à jour le fichier gcs_uploader.py avec du code pour importer le fichier d'importation des métadonnées dans un bucket Cloud Storage.

    """Sends files to GCP storage."""
    from typing import Dict
    from google.cloud import storage
    
    
    def upload(config: Dict[str, str], filename: str):
        """Uploads a file to GCP bucket."""
        client = storage.Client()
        bucket = client.get_bucket(config["output_bucket"])
        folder = config["output_folder"]
    
        blob = bucket.blob(f"{folder}/{filename}")
        blob.upload_from_filename(filename)
    
  8. Créez l'image du connecteur.

    Si votre connecteur contient plusieurs fichiers ou si vous souhaitez utiliser des bibliothèques qui ne sont pas incluses dans l'image Docker par défaut, vous devez utiliser un conteneur personnalisé. Dataproc sans serveur pour Spark exécute des charges de travail dans des conteneurs Docker. Créez une image Docker personnalisée du connecteur et stockez-la dans Artifact Registry. Dataproc Serverless lit l'image depuis Artifact Registry.

    1. Créez un fichier Dockerfile:

      FROM debian:11-slim
      
      ENV DEBIAN_FRONTEND=noninteractive
      
      RUN apt update && apt install -y procps tini
      RUN apt install -y wget
      
      ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
      RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
      COPY ojdbc11.jar "${SPARK_EXTRA_JARS_DIR}"
      
      ENV CONDA_HOME=/opt/miniconda3
      ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
      ENV PATH=${CONDA_HOME}/bin:${PATH}
      RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py310_23.3.1-0-Linux-x86_64.sh
      
      RUN bash Miniconda3-py310_23.3.1-0-Linux-x86_64.sh -b -p /opt/miniconda3 \
        && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
        && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
        && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
        && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
      
      RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
          && ${CONDA_HOME}/bin/mamba install \
            conda \
            google-cloud-dataproc \
            google-cloud-logging \
            google-cloud-monitoring \
            google-cloud-storage
      
      RUN apt update && apt install -y git
      COPY requirements.txt .
      RUN python -m pip install -r requirements.txt
      
      ENV PYTHONPATH=/opt/python/packages
      RUN mkdir -p "${PYTHONPATH}/src/"
      COPY src/ "${PYTHONPATH}/src/"
      COPY main.py .
      
      RUN groupadd -g 1099 spark
      RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
      USER spark

      Utilisez Conda comme gestionnaire de paquets. Dataproc sans serveur pour Spark associe pyspark au conteneur au moment de l'exécution. Vous n'avez donc pas besoin d'installer les dépendances PySpark dans votre image de conteneur personnalisée.

    2. Créez l'image de conteneur personnalisée et transférez-la vers Artifact Registry.

      #!/bin/bash
      
      IMAGE=oracle-pyspark:0.0.1
      PROJECT=<PROJECT_ID>
      
      
      REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pyspark
      
      docker build -t "${IMAGE}" .
      
      # Tag and push to GCP container registry
      gcloud config set project ${PROJECT}
      gcloud auth configure-docker us-central1-docker.pkg.dev
      docker tag "${IMAGE}" "${REPO_IMAGE}"
      docker push "${REPO_IMAGE}"
      

      Comme une image peut avoir plusieurs noms, vous pouvez utiliser la balise Docker pour lui attribuer un alias.

  9. Exécutez le connecteur sur Dataproc sans serveur. Pour envoyer un job par lot PySpark à l'aide de l'image de conteneur personnalisée, exécutez la commande gcloud dataproc batches submit pyspark.

    gcloud dataproc batches submit pyspark main.py --project=PROJECT \
        --region=REGION --batch=BATCH_ID \
        --container-image=CUSTOM_CONTAINER_IMAGE \
        --service-account=SERVICE_ACCOUNT_NAME \
        --jars=PATH_TO_JAR_FILES \
        --properties=PYSPARK_PROPERTIES \
        -- PIPELINE_ARGUMENTS
    

    Veuillez noter les points suivants :

    • Les fichiers JAR sont des pilotes pour Spark. Pour lire à partir d'Oracle, MySQL ou PostgreSQL, vous devez fournir à Apache Spark un package spécifique. Le package peut se trouver dans Cloud Storage ou dans le conteneur. Si le fichier JAR se trouve dans le conteneur, le chemin d'accès est semblable à file:///path/to/file/driver.jar. Dans cet exemple, le chemin d'accès au fichier JAR est /opt/spark/jars/.
    • PIPELINE_ARGUMENTS correspond aux arguments de ligne de commande du connecteur.

    Le connecteur extrait les métadonnées de la base de données Oracle, génère un fichier d'importation de métadonnées et l'enregistre dans un bucket Cloud Storage.

  10. Pour importer manuellement les métadonnées du fichier d'importation de métadonnées dans Dataplex, exécutez un job de métadonnées. Exécutez la méthode metadataJobs.create.

    1. Dans la ligne de commande, ajoutez des variables d'environnement et créez un alias pour la commande curl.

      PROJECT_ID=PROJECT
      LOCATION_ID=LOCATION
      DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID
      alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
      
    2. Appelez la méthode de l'API en transmettant les types d'entrées et les types d'aspects que vous souhaitez importer.

      gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF
      {
        "type": "IMPORT",
        "import_spec": {
          "source_storage_uri": "gs://BUCKET/FOLDER/",
          "entry_sync_mode": "FULL",
          "aspect_sync_mode": "INCREMENTAL",
          "scope": {
            "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"],
            "entry_types": [
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"],
      
            "aspect_types": [
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance",
              "projects/dataplex-types/locations/global/aspectTypes/schema",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"],
            },
          },
        }
      EOF
      )"
      

      Le type d'aspect schema est un type d'aspect global défini par Dataplex.

      Notez que le format que vous utilisez pour les noms de type d'aspect lorsque vous appelez la méthode de l'API est différent du format que vous utilisez dans le code du connecteur.

    3. Facultatif: utilisez Cloud Logging pour afficher les journaux de la tâche de métadonnées. Pour en savoir plus, consultez la section Surveiller les journaux Dataplex.

Configurer l'orchestration de pipeline

Les sections précédentes ont expliqué comment créer un exemple de connecteur et l'exécuter manuellement.

Dans un environnement de production, vous exécutez le connecteur dans le cadre d'un pipeline de connectivité géré à l'aide d'une plate-forme d'orchestration telle que Workflows.

  1. Pour exécuter un pipeline de connectivité géré avec l'exemple de connecteur, suivez la procédure d'importation des métadonnées à l'aide de Workflows. Procédez comme suit:

    • Créez le workflow au même Google Cloud emplacement que le connecteur.
    • Dans le fichier de définition du workflow, mettez à jour la fonction submit_pyspark_extract_job avec le code suivant pour extraire des données de la base de données Oracle à l'aide du connecteur que vous avez créé.

      - submit_pyspark_extract_job:
          call: http.post
          args:
            url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
            auth:
              type: OAuth2
              scopes: "https://www.googleapis.com/auth/cloud-platform"
            headers:
              Content-Type: "application/json"
            query:
              batchId: ${WORKFLOW_ID}
            body:
              pysparkBatch:
                mainPythonFileUri: file:///main.py
                jars: file:///opt/spark/jars/ojdbc11.jar
                args:
                  - ${"--host_port=" + args.ORACLE_HOST_PORT}
                  - ${"--user=" + args.ORACLE_USER}
                  - ${"--password=" + args.ORACLE_PASSWORD}
                  - ${"--database=" + args.ORACE_DATABASE}
                  - ${"--project=" + args.TARGET_PROJECT_ID}
                  - ${"--location=" + args.CLOUD_REGION}
                  - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID}
                  - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                  - ${"--folder=" + WORKFLOW_ID}
              runtimeConfig:
                version: "2.0"
                containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark"
              environmentConfig:
                executionConfig:
                    serviceAccount: ${args.SERVICE_ACCOUNT}
          result: RESPONSE_MESSAGE
      
    • Dans le fichier de définition du workflow, mettez à jour la fonction submit_import_job avec le code suivant pour importer les entrées. La fonction appelle la méthode API metadataJobs.create pour exécuter une tâche d'importation de métadonnées.

      - submit_import_job:
          call: http.post
          args:
            url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
            auth:
              type: OAuth2
              scopes: "https://www.googleapis.com/auth/cloud-platform"
            body:
              type: IMPORT
              import_spec:
                source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                entry_sync_mode: FULL
                aspect_sync_mode: INCREMENTAL
                scope:
                  entry_groups:
                    - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}
                  entry_types:
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"
                  aspect_types:
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance"
                    -"projects/dataplex-types/locations/global/aspectTypes/schema"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"
          result: IMPORT_JOB_RESPONSE
      

      Indiquez les mêmes types d'entrée et de type d'aspect que ceux que vous avez inclus lorsque vous avez appelé manuellement la méthode de l'API. Notez qu'aucune virgule ne figure à la fin de chaque chaîne.

    • Lorsque vous exécutez le workflow, fournissez les arguments d'exécution suivants:

      {
        "CLOUD_REGION": "us-central1",
        "ORACLE_USER": "system",
        "ORACLE_HOST_PORT": "x.x.x.x:1521",
        "ORACLE_DATABASE": "xe",
        "ADDITIONAL_CONNECTOR_ARGS": [],
      }
      
  2. Facultatif: Utilisez Cloud Logging pour afficher les journaux du pipeline de connectivité gérée. La charge utile de journalisation inclut un lien vers les journaux de la tâche par lot Dataproc sans serveur et de la tâche d'importation de métadonnées, le cas échéant. Pour en savoir plus, consultez la section Afficher les journaux de workflow.

  3. Facultatif: Pour améliorer la sécurité, les performances et les fonctionnalités de votre pipeline de connectivité géré, envisagez de procéder comme suit:

    1. Utilisez Secret Manager pour stocker les identifiants de votre source de données tierce.
    2. Utilisez PySpark pour écrire la sortie JSON Lines dans plusieurs fichiers d'importation de métadonnées en parallèle.
    3. Utilisez un préfixe pour diviser les fichiers volumineux (plus de 100 Mo) en fichiers plus petits.
    4. Ajoutez d'autres aspects personnalisés qui capturent des métadonnées métier et techniques supplémentaires à partir de votre source.

Exemples de ressources du catalogue Dataplex pour une source Oracle

L'exemple de connecteur extrait les métadonnées d'une base de données Oracle et les met en correspondance avec les ressources du catalogue Dataplex correspondantes.

Considérations concernant la hiérarchie

Chaque système de Dataplex possède une entrée racine qui est l'entrée parente du système. En règle générale, l'entrée racine a un type d'entrée instance. Le tableau suivant présente un exemple de hiérarchie des types d'entrées et des types d'aspects pour un système Oracle.

ID du type d'entrée Description ID du type d'aspect associé
oracle-instance Racine du système importé. oracle-instance
oracle-database La base de données Oracle oracle-database
oracle-schema Schéma de la base de données. oracle-schema
oracle-table Un tableau.

oracle-table

schema

oracle-view Une vue.

oracle-view

schema

Le type d'aspect schema est un type d'aspect global défini par Dataplex. Il contient une description des champs d'une table, d'une vue ou d'une autre entité comportant des colonnes. Le type d'aspect personnalisé oracle-schema contient le nom du schéma de la base de données Oracle.

Exemples de champs d'article d'importation

Le connecteur doit utiliser les conventions suivantes pour les ressources Oracle.

  • Noms complets: les noms complets des ressources Oracle utilisent le modèle d'attribution de noms suivant. Les caractères interdits sont échappés avec des caractères graves.

    Ressource Modèle Exemple
    Instance

    SOURCE : ADDRESS

    Utilisez le nom d'hôte et le numéro de port ou le nom de domaine du système.

    oracle:`localhost:1521` ou oracle:`myinstance.com`
    Base de données SOURCE:ADDRESS.DATABASE oracle:`localhost:1521`.xe
    Schéma SOURCE:ADDRESS.DATABASE.SCHEMA oracle:`localhost:1521`.xe.sys
    Table SOURCE:ADDRESS.DATABASE.SCHEMA.TABLE_NAME oracle:`localhost:1521`.xe.sys.orders
    Afficher SOURCE:ADDRESS.DATABASE.SCHEMA.VIEW_NAME oracle:`localhost:1521`.xe.sys.orders_view
  • Noms ou ID d'entrée: les entrées pour les ressources Oracle utilisent le modèle d'attribution de noms suivant. Les caractères interdits sont remplacés par un caractère autorisé. Les ressources utilisent le préfixe projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries.

    Ressource Modèle Exemple
    Instance PREFIX/HOST_PORT projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
    Base de données PREFIX/HOST_PORT/databases/DATABASE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
    Schéma PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
    Table PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
    Afficher PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEW projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
  • Entrées parentes: si une entrée n'est pas une entrée racine pour le système, elle peut comporter un champ d'entrée parente qui décrit sa position dans la hiérarchie. Le champ doit contenir le nom de l'entrée parente. Nous vous recommandons de générer cette valeur.

    Le tableau suivant présente les entrées parentes des ressources Oracle.

    Entrée Entrée parent
    Instance "" (chaîne vide)
    Base de données Nom de l'instance
    Schéma Nom de la base de données
    Table Nom du schéma
    Afficher Nom du schéma
  • Carte des aspects: la carte des aspects doit contenir au moins un aspect décrivant l'entité à importer. Voici un exemple de carte des aspects pour une table Oracle.

    "example-project.us-central1.oracle-table": {
        "aspect_type": "example-project.us-central1.oracle-table",
        "path": "",
        "data": {}
     },

    Vous pouvez trouver des types d'aspects prédéfinis (comme schema) qui définissent la structure de la table ou de la vue dans le projet dataplex-types, à l'emplacement global.

  • Clés d'aspect: les clés d'aspect utilisent le format de dénomination PROJECT.LOCATION.ASPECT_TYPE. Le tableau suivant présente des exemples de clés d'aspect pour les ressources Oracle.

    Entrée Exemple de clé d'aspect
    Instance example-project.us-central1.oracle-instance
    Base de données example-project.us-central1.oracle-database
    Schéma example-project.us-central1.oracle-schema
    Table example-project.us-central1.oracle-table
    Afficher example-project.us-central1.oracle-view

Étape suivante