Desenvolva um conetor personalizado para a importação de metadados

Este documento fornece um modelo de referência para criar um conector personalizado que extrai metadados de uma origem externa. Usa o conetor quando executa um pipeline de conetividade gerido que importa metadados para o catálogo universal do Dataplex.

Pode criar conetores para extrair metadados de origens externas. Por exemplo, pode criar um conetor para extrair dados de origens como o MySQL, o SQL Server, o Oracle, o Snowflake, o Databricks e outras.

Use o conector de exemplo neste documento como ponto de partida para criar os seus próprios conectores. O conetor de exemplo liga-se a uma base de dados Oracle Database Express Edition (XE). O conector é criado em Python, embora também possa usar Java, Scala ou R.

Como funcionam os conetores

Um conetor extrai metadados de uma origem de dados de terceiros, transforma os metadados no formato do Dataplex Universal Catalog ImportItem e gera ficheiros de importação de metadados que podem ser importados pelo Dataplex Universal Catalog.

O conector faz parte de um pipeline de conetividade gerido. Um pipeline de conetividade gerido é um fluxo de trabalho organizado que usa para importar metadados do catálogo universal do Dataplex. O pipeline de conetividade gerido executa o conetor e realiza outras tarefas no fluxo de trabalho de importação, como executar uma tarefa de importação de metadados e capturar registos.

O pipeline de conetividade gerido executa o conetor através de uma tarefa em lote do Google Cloud Serverless para Apache Spark. O Serverless para Apache Spark oferece um ambiente de execução do Spark sem servidor. Embora possa criar um conetor que não use o Spark, recomendamos que use o Spark porque pode melhorar o desempenho do conetor.

Requisitos do conetor

O conector tem os seguintes requisitos:

  • O conector tem de ser uma imagem do Artifact Registry que possa ser executada no Serverless para Apache Spark.
  • O conetor tem de gerar ficheiros de metadados num formato que possa ser importado por uma tarefa de importação de metadados do catálogo universal do Dataplex (o método da API metadataJobs.create). Para ver os requisitos detalhados, consulte o artigo Ficheiro de importação de metadados.
  • O conector tem de aceitar os seguintes argumentos da linha de comandos para receber informações do pipeline:

    Argumento da linha de comandos Valor que o pipeline fornece
    target_project_id PROJECT_ID
    target_location_id REGION
    target_entry_group_id ENTRY_GROUP_ID
    output_bucket CLOUD_STORAGE_BUCKET_ID
    output_folder FOLDER_ID

    O conetor usa estes argumentos para gerar metadados num grupo de entradas de destino projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID, e para escrever num contentor do Cloud Storage gs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID. Cada execução do pipeline cria uma nova pasta FOLDER_ID no contentor CLOUD_STORAGE_BUCKET_ID. O conetor deve escrever ficheiros de importação de metadados nesta pasta.

Os modelos de pipelines suportam conetores PySpark. Os modelos partem do princípio de que o controlador (mainPythonFileUri) é um ficheiro local na imagem do conector com o nome main.py. Pode modificar os modelos de pipeline para outros cenários, como um conector do Spark, um URI do controlador diferente ou outras opções.

Veja como usar o PySpark para criar um item de importação no ficheiro de importação de metadados.

"""PySpark schemas for the data."""
entry_source_schema = StructType([
      StructField("display_name", StringType()),
      StructField("source", StringType())])

aspect_schema = MapType(StringType(),
                        StructType([
                            StructField("aspect_type", StringType()),
                            StructField("data", StructType([
                            ]))
                          ])
                        )

entry_schema = StructType([
  StructField("name", StringType()),
  StructField("entry_type", StringType()),
  StructField("fully_qualified_name", StringType()),
  StructField("parent_entry", StringType()),
  StructField("entry_source", entry_source_schema),
  StructField("aspects", aspect_schema)
])

import_item_schema = StructType([
  StructField("entry", entry_schema),
  StructField("aspect_keys", ArrayType(StringType())),
  StructField("update_mask", ArrayType(StringType()))
])

Antes de começar

Este guia pressupõe que tem conhecimentos de Python e PySpark.

Reveja as seguintes informações:

Faça o seguinte. Crie todos os recursos na mesma Google Cloud localização.

  1. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  2. Verify that billing is enabled for your Google Cloud project.

  3. Enable the Dataplex, Dataproc, Workflows, and Artifact Registry APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable dataplex.googleapis.com dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com
  4. Install the Google Cloud CLI.

  5. Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.

  6. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  7. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  8. Set up authentication:

    1. Ensure that you have the Create Service Accounts IAM role (roles/iam.serviceAccountCreator). Learn how to grant roles.
    2. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    3. Grant the roles/owner IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
  9. Crie um contentor do Cloud Storage para armazenar os ficheiros de importação de metadados.

  10. Crie os seguintes recursos de metadados no mesmo projeto.

    Para ver valores de exemplo, consulte a secção Exemplos de recursos de metadados para uma origem Oracle deste documento.

    1. Crie um grupo de entradas.
    2. Crie tipos de aspetos personalizados para as entradas que quer importar. Use a convenção de nomenclatura SOURCE-ENTITY_TO_IMPORT.

      Por exemplo, para uma base de dados Oracle, crie um tipo de aspeto denominado oracle-database.

      Opcionalmente, pode criar tipos de aspetos adicionais para armazenar outras informações.

    3. Crie tipos de entradas personalizados para os recursos que quer importar e atribua-lhes os tipos de aspetos relevantes. Use a convenção de nomenclatura SOURCE-ENTITY_TO_IMPORT.

      Por exemplo, para uma base de dados Oracle, crie um tipo de entrada denominado oracle-database. Associe-o ao tipo de aspeto denominado oracle-database.

  11. Certifique-se de que a sua origem de terceiros é acessível a partir do seu Google Cloud projeto. Para mais informações, consulte o artigo Configuração de rede sem servidor para o Apache Spark.
  12. Crie um conetor Python básico

    O conector Python básico de exemplo cria entradas de nível superior para uma origem de dados Oracle através das classes da biblioteca cliente do catálogo universal do Dataplex. Em seguida, fornece os valores para os campos de entrada.

    O conetor cria um ficheiro de importação de metadados com as seguintes entradas:

    • Uma entrada instance, com o tipo de entrada projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance. Esta entrada representa um sistema Oracle Database XE.
    • Uma entrada database, que representa uma base de dados no sistema Oracle Database XE.

    Para criar um conector Python básico, faça o seguinte:

    1. Clone o repositório cloud-dataplex.

    2. Configure um ambiente local. Recomendamos que use um ambiente virtual.

      mkdir venv
      python -m venv venv/
      source venv/bin/activate
      

      Use as versões ativas ou de manutenção do Python. As versões 3.7 e posteriores do Python são suportadas.

    3. Crie um projeto Python.

    4. Requisitos de instalação:

      pip install -r requirements.txt
      

      Os seguintes requisitos estão instalados:

      google-cloud-dataplex==2.2.2
      google-cloud-storage
      google-cloud-secret-manager
      
    5. Adicione um ficheiro de pipeline main.py na raiz do projeto.

      from src import bootstrap
      
      
      if __name__ == '__main__':
          bootstrap.run()
      

      Quando implementa o seu código no Serverless para Apache Spark, o ficheiro main.py serve como ponto de entrada para a execução. Recomendamos que minimize a quantidade de informações armazenadas no ficheiro main.py. Use este ficheiro para chamar funções e classes definidas no conector, como a classe src/bootstap.py.

    6. Crie uma pasta src para armazenar a maioria da lógica do conector.

    7. Atualize o ficheiro src/cmd_reader.py com uma classe Python para aceitar argumentos da linha de comando. Pode usar o módulo argeparse para o fazer.

      """Command line reader."""
      import argparse
      
      
      def read_args():
          """Reads arguments from the command line."""
          parser = argparse.ArgumentParser()
      
          # Dataplex arguments
          parser.add_argument("--target_project_id", type=str, required=True,
              help="The name of the target Google Cloud project to import the metadata into.")
          parser.add_argument("--target_location_id", type=str, required=True,
              help="The target Google Cloud location where the metadata will be imported into.")
          parser.add_argument("--target_entry_group_id", type=str, required=True,
              help="The ID of the entry group to import metadata into. "
                   "The metadata will be imported into entry group with the following"
                   "full resource name: projects/${target_project_id}/"
                   "locations/${target_location_id}/entryGroups/${target_entry_group_id}.")
      
          # Oracle arguments
          parser.add_argument("--host_port", type=str, required=True,
              help="Oracle host and port number separated by the colon (:).")
          parser.add_argument("--user", type=str, required=True, help="Oracle User.")
          parser.add_argument("--password-secret", type=str, required=True,
              help="Secret resource name in the Secret Manager for the Oracle password.")
          parser.add_argument("--database", type=str, required=True,
              help="Source Oracle database.")
      
          # Google Cloud Storage arguments
          # It is assumed that the bucket is in the same region as the entry group
          parser.add_argument("--output_bucket", type=str, required=True,
              help="The Cloud Storage bucket to write the generated metadata import file.")
          parser.add_argument("--output_folder", type=str, required=True,
              help="A folder in the Cloud Storage bucket, to write the generated metadata import files.")
      
          return vars(parser.parse_known_args()[0])
      

      Em ambientes de produção, recomendamos que armazene a palavra-passe no Secret Manager.

    8. Atualize o ficheiro src/constants.py com código para criar constantes.

      """Constants that are used in the different files."""
      import enum
      
      SOURCE_TYPE = "oracle"
      
      # Symbols for replacement
      FORBIDDEN = "#"
      ALLOWED = "!"
      
      
      class EntryType(enum.Enum):
          """Types of Oracle entries."""
          INSTANCE: str = "projects/{project}/locations/{location}/entryTypes/oracle-instance"
          DATABASE: str = "projects/{project}/locations/{location}/entryTypes/oracle-database"
          DB_SCHEMA: str = "projects/{project}/locations/{location}/entryTypes/oracle-schema"
          TABLE: str = "projects/{project}/locations/{location}/entryTypes/oracle-table"
          VIEW: str = "projects/{project}/locations/{location}/entryTypes/oracle-view"
      
    9. Atualize o ficheiro src/name_builder.py com métodos para criar os recursos de metadados que quer que o conetor crie para os seus recursos Oracle. Use as convenções descritas na secção Exemplos de recursos de metadados para uma origem Oracle deste documento.

      """Builds Dataplex hierarchy identifiers."""
      from typing import Dict
      from src.constants import EntryType, SOURCE_TYPE
      
      
      # Oracle cluster users start with C## prefix, but Dataplex doesn't accept #.
      # In that case in names it is changed to C!!, and escaped with backticks in FQNs
      FORBIDDEN_SYMBOL = "#"
      ALLOWED_SYMBOL = "!"
      
      
      def create_fqn(config: Dict[str, str], entry_type: EntryType,
                     schema_name: str = "", table_name: str = ""):
          """Creates a fully qualified name or Dataplex v1 hierarchy name."""
          if FORBIDDEN_SYMBOL in schema_name:
              schema_name = f"`{schema_name}`"
      
          if entry_type == EntryType.INSTANCE:
              # Requires backticks to escape column
              return f"{SOURCE_TYPE}:`{config['host_port']}`"
          if entry_type == EntryType.DATABASE:
              instance = create_fqn(config, EntryType.INSTANCE)
              return f"{instance}.{config['database']}"
          if entry_type == EntryType.DB_SCHEMA:
              database = create_fqn(config, EntryType.DATABASE)
              return f"{database}.{schema_name}"
          if entry_type in [EntryType.TABLE, EntryType.VIEW]:
              database = create_fqn(config, EntryType.DATABASE)
              return f"{database}.{schema_name}.{table_name}"
          return ""
      
      
      def create_name(config: Dict[str, str], entry_type: EntryType,
                      schema_name: str = "", table_name: str = ""):
          """Creates a Dataplex v2 hierarchy name."""
          if FORBIDDEN_SYMBOL in schema_name:
              schema_name = schema_name.replace(FORBIDDEN_SYMBOL, ALLOWED_SYMBOL)
          if entry_type == EntryType.INSTANCE:
              name_prefix = (
                  f"projects/{config['target_project_id']}/"
                  f"locations/{config['target_location_id']}/"
                  f"entryGroups/{config['target_entry_group_id']}/"
                  f"entries/"
              )
              return name_prefix + config["host_port"].replace(":", "@")
          if entry_type == EntryType.DATABASE:
              instance = create_name(config, EntryType.INSTANCE)
              return f"{instance}/databases/{config['database']}"
          if entry_type == EntryType.DB_SCHEMA:
              database = create_name(config, EntryType.DATABASE)
              return f"{database}/database_schemas/{schema_name}"
          if entry_type == EntryType.TABLE:
              db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
              return f"{db_schema}/tables/{table_name}"
          if entry_type == EntryType.VIEW:
              db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
              return f"{db_schema}/views/{table_name}"
          return ""
      
      
      def create_parent_name(config: Dict[str, str], entry_type: EntryType,
                             parent_name: str = ""):
          """Generates a Dataplex v2 name of the parent."""
          if entry_type == EntryType.DATABASE:
              return create_name(config, EntryType.INSTANCE)
          if entry_type == EntryType.DB_SCHEMA:
              return create_name(config, EntryType.DATABASE)
          if entry_type == EntryType.TABLE:
              return create_name(config, EntryType.DB_SCHEMA, parent_name)
          return ""
      
      
      def create_entry_aspect_name(config: Dict[str, str], entry_type: EntryType):
          """Generates an entry aspect name."""
          last_segment = entry_type.value.split("/")[-1]
          return f"{config['target_project_id']}.{config['target_location_id']}.{last_segment}"
      

      Uma vez que o ficheiro name_builder.py é usado para o código principal do Python e o código principal do PySpark, recomendamos que escreva os métodos como funções puras, em vez de como membros de uma classe.

    10. Atualize o ficheiro src/top_entry_builder.py com código para preencher as entradas de nível superior com dados.

      """Non-Spark approach for building the entries."""
      import dataclasses
      import json
      from typing import List, Dict
      
      import proto
      from google.cloud import dataplex_v1
      
      from src.constants import EntryType
      from src import name_builder as nb
      
      
      @dataclasses.dataclass(slots=True)
      class ImportItem:
          """A template class for Import API."""
      
          entry: dataplex_v1.Entry = dataclasses.field(default_factory=dataplex_v1.Entry)
          aspect_keys: List[str] = dataclasses.field(default_factory=list)
          update_mask: List[str] = dataclasses.field(default_factory=list)
      
      
      def _dict_factory(data: object):
          """Factory function required for converting Entry dataclass to dict."""
      
          def convert(obj: object):
              if isinstance(obj, proto.Message):
                  return proto.Message.to_dict(obj)
              return obj
      
          return dict((k, convert(v)) for k, v in data)
      
      
      def _create_entry(config: Dict[str, str], entry_type: EntryType):
          """Creates an entry based on a Dataplex library."""
          entry = dataplex_v1.Entry()
          entry.name = nb.create_name(config, entry_type)
          entry.entry_type = entry_type.value.format(
              project=config["target_project_id"], location=config["target_location_id"]
          )
          entry.fully_qualified_name = nb.create_fqn(config, entry_type)
          entry.parent_entry = nb.create_parent_name(config, entry_type)
      
          aspect_key = nb.create_entry_aspect_name(config, entry_type)
      
          # Add mandatory aspect
          entry_aspect = dataplex_v1.Aspect()
          entry_aspect.aspect_type = aspect_key
          entry_aspect.data = {}
          entry.aspects[aspect_key] = entry_aspect
      
          return entry
      
      
      def _entry_to_import_item(entry: dataplex_v1.Entry):
          """Packs entry to import item, accepted by the API,"""
          import_item = ImportItem()
          import_item.entry = entry
          import_item.aspect_keys = list(entry.aspects.keys())
          import_item.update_mask = "aspects"
      
          return import_item
      
      
      def create(config, entry_type: EntryType):
          """Creates an entry, packs it to Import Item and converts to json."""
          import_item = _entry_to_import_item(_create_entry(config, entry_type))
          return json.dumps(dataclasses.asdict(import_item, dict_factory=_dict_factory))
      
    11. Atualize o ficheiro src/bootstrap.py com código para gerar o ficheiro de importação de metadados e executar o conetor.

      """The entrypoint of a pipeline."""
      from typing import Dict
      
      from src.constants import EntryType
      from src import cmd_reader
      from src import secret_manager
      from src import entry_builder
      from src import gcs_uploader
      from src import top_entry_builder
      from src.oracle_connector import OracleConnector
      
      
      FILENAME = "output.jsonl"
      
      
      def write_jsonl(output_file, json_strings):
          """Writes a list of string to the file in JSONL format."""
      
          # For simplicity, dataset is written into the one file. But it is not
          # mandatory, and the order doesn't matter for Import API.
          # The PySpark itself could dump entries into many smaller JSONL files.
          # Due to performance, it's recommended to dump to many smaller files.
          for string in json_strings:
              output_file.write(string + "\n")
      
      
      def process_dataset(
          connector: OracleConnector,
          config: Dict[str, str],
          schema_name: str,
          entry_type: EntryType,
      ):
          """Builds dataset and converts it to jsonl."""
          df_raw = connector.get_dataset(schema_name, entry_type)
          df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
          return df.toJSON().collect()
      
      
      def run():
          """Runs a pipeline."""
          config = cmd_reader.read_args()
          config["password"] = secret_manager.get_password(config["password_secret"])
          connector = OracleConnector(config)
      
          with open(FILENAME, "w", encoding="utf-8") as file:
              # Write top entries that don't require connection to the database
              file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
              file.writelines("\n")
              file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
      
              # Get schemas, write them and collect to the list
              df_raw_schemas = connector.get_db_schemas()
              schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
              schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
      
              write_jsonl(file, schemas_json)
      
              # Ingest tables and views for every schema in a list
              for schema in schemas:
                  print(f"Processing tables for {schema}")
                  tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                  write_jsonl(file, tables_json)
                  print(f"Processing views for {schema}")
                  views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                  write_jsonl(file, views_json)
      
          gcs_uploader.upload(config, FILENAME)
      
    12. Executar o código localmente.

      É devolvido um ficheiro de importação de metadados denominado output.jsonl. O ficheiro tem duas linhas, cada uma representando um item de importação. O pipeline de conetividade gerido lê este ficheiro quando executa a tarefa de importação de metadados.

    13. Opcional: expanda o exemplo anterior para usar as classes da biblioteca cliente do Dataplex Universal Catalog para criar itens de importação para tabelas, esquemas e vistas. Também pode executar o exemplo de Python no Serverless para Apache Spark.

      Recomendamos que crie um conetor que use o Spark (e seja executado no Serverless for Apache Spark), porque pode melhorar o desempenho do seu conetor.

    Crie um conetor PySpark

    Este exemplo baseia-se na API PySpark DataFrame. Pode instalar o PySpark SQL e executá-lo localmente antes de o executar no Serverless para Apache Spark. Se instalar e executar o PySpark localmente, instale a biblioteca PySpark através do pip, mas não precisa de instalar um cluster Spark local.

    Por motivos de desempenho, este exemplo não usa classes predefinidas da biblioteca PySpark. Em alternativa, o exemplo cria DataFrames, converte os DataFrames em entradas JSON e, em seguida, escreve o resultado num ficheiro de importação de metadados no formato JSON Lines que pode ser importado para o catálogo universal do Dataplex.

    Para criar um conector com o PySpark, faça o seguinte:

    1. Clone o repositório cloud-dataplex.

    2. Instale o PySpark:

      pip install pyspark
      
    3. Requisitos de instalação:

      pip install -r requirements.txt
      

      Os seguintes requisitos estão instalados:

      google-cloud-dataplex==2.2.2
      google-cloud-storage
      google-cloud-secret-manager
      
    4. Atualize o ficheiro oracle_connector.py com código para ler dados de uma origem de dados Oracle e devolver DataFrames.

      """Reads Oracle using PySpark."""
      from typing import Dict
      from pyspark.sql import SparkSession, DataFrame
      
      from src.constants import EntryType
      
      
      SPARK_JAR_PATH = "/opt/spark/jars/ojdbc11.jar"
      
      
      class OracleConnector:
          """Reads data from Oracle and returns Spark Dataframes."""
      
          def __init__(self, config: Dict[str, str]):
              # PySpark entrypoint
              self._spark = SparkSession.builder.appName("OracleIngestor") \
                  .config("spark.jars", SPARK_JAR_PATH) \
                  .getOrCreate()
      
              self._config = config
              self._url = f"jdbc:oracle:thin:@{config['host_port']}:{config['database']}"
      
          def _execute(self, query: str) -> DataFrame:
              """A generic method to execute any query."""
              return self._spark.read.format("jdbc") \
                  .option("driver", "oracle.jdbc.OracleDriver") \
                  .option("url", self._url) \
                  .option("query", query) \
                  .option("user", self._config["user"]) \
                  .option("password", self._config["password"]) \
                  .load()
      
          def get_db_schemas(self) -> DataFrame:
              """In Oracle, schemas are usernames."""
              query = "SELECT username FROM dba_users"
              return self._execute(query)
      
          def _get_columns(self, schema_name: str, object_type: str) -> str:
              """Gets a list of columns in tables or views in a batch."""
              # Every line here is a column that belongs to the table or to the view.
              # This SQL gets data from ALL the tables in a given schema.
              return (f"SELECT col.TABLE_NAME, col.COLUMN_NAME, "
                      f"col.DATA_TYPE, col.NULLABLE "
                      f"FROM all_tab_columns col "
                      f"INNER JOIN DBA_OBJECTS tab "
                      f"ON tab.OBJECT_NAME = col.TABLE_NAME "
                      f"WHERE tab.OWNER = '{schema_name}' "
                      f"AND tab.OBJECT_TYPE = '{object_type}'")
      
          def get_dataset(self, schema_name: str, entry_type: EntryType):
              """Gets data for a table or a view."""
              # Dataset means that these entities can contain end user data.
              short_type = entry_type.name  # table or view, or the title of enum value
              query = self._get_columns(schema_name, short_type)
              return self._execute(query)
      

      Adicione consultas SQL para devolver os metadados que quer importar. As consultas têm de devolver as seguintes informações:

      • Esquemas de base de dados
      • Tabelas que pertencem a estes esquemas
      • Colunas pertencentes a estas tabelas, incluindo o nome da coluna, o tipo de dados da coluna e se a coluna é anulável ou obrigatória

      Todas as colunas de todas as tabelas e vistas são armazenadas na mesma tabela de sistema. Pode selecionar colunas com o método _get_columns. Consoante os parâmetros que fornecer, pode selecionar colunas para as tabelas ou para as vistas separadamente.

      Tenha em conta o seguinte:

      • No Oracle, um esquema de base de dados é propriedade de um utilizador da base de dados e tem o mesmo nome que esse utilizador.
      • Os objetos de esquema são estruturas lógicas criadas pelos utilizadores. Os objetos, como tabelas ou índices, podem conter dados, e os objetos, como visualizações ou sinónimos, consistem apenas numa definição.
      • O ficheiro ojdbc11.jar contém o controlador JDBC da Oracle.
    5. Atualize o ficheiro src/entry_builder.py com métodos partilhados para aplicar transformações do Spark.

      """Creates entries with PySpark."""
      import pyspark.sql.functions as F
      from pyspark.sql.types import StringType
      
      from src.constants import EntryType, SOURCE_TYPE
      from src import name_builder as nb
      
      
      @F.udf(returnType=StringType())
      def choose_metadata_type_udf(data_type: str):
          """Choose the metadata type based on Oracle native type."""
          if data_type.startswith("NUMBER") or data_type in ["FLOAT", "LONG"]:
              return "NUMBER"
          if data_type.startswith("VARCHAR") or data_type.startswith("NVARCHAR2"):
              return "STRING"
          if data_type == "DATE":
              return "DATETIME"
          return "OTHER"
      
      
      def create_entry_source(column):
          """Create Entry Source segment."""
          return F.named_struct(F.lit("display_name"),
                                column,
                                F.lit("system"),
                                F.lit(SOURCE_TYPE))
      
      
      def create_entry_aspect(entry_aspect_name):
          """Create aspect with general information (usually it is empty)."""
          return F.create_map(
              F.lit(entry_aspect_name),
              F.named_struct(
                  F.lit("aspect_type"),
                  F.lit(entry_aspect_name),
                  F.lit("data"),
                  F.create_map()
                  )
              )
      
      
      def convert_to_import_items(df, aspect_keys):
          """Convert entries to import items."""
          entry_columns = ["name", "fully_qualified_name", "parent_entry",
                           "entry_source", "aspects", "entry_type"]
      
          # Puts entry to "entry" key, a list of keys from aspects in "aspects_keys"
          # and "aspects" string in "update_mask"
          return df.withColumn("entry", F.struct(entry_columns)) \
            .withColumn("aspect_keys", F.array([F.lit(key) for key in aspect_keys])) \
            .withColumn("update_mask", F.array(F.lit("aspects"))) \
            .drop(*entry_columns)
      
      
      def build_schemas(config, df_raw_schemas):
          """Create a dataframe with database schemas from the list of usernames.
          Args:
              df_raw_schemas - a dataframe with only one column called USERNAME
          Returns:
              A dataframe with Dataplex-readable schemas.
          """
          entry_type = EntryType.DB_SCHEMA
          entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
      
          # For schema, parent name is the name of the database
          parent_name =  nb.create_parent_name(config, entry_type)
      
          # Create user-defined function.
          create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, x),
                                  StringType())
          create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, x),
                                 StringType())
      
          # Fills the missed project and location into the entry type string
          full_entry_type = entry_type.value.format(
              project=config["target_project_id"],
              location=config["target_location_id"])
      
          # Converts a list of schema names to the Dataplex-compatible form
          column = F.col("USERNAME")
          df = df_raw_schemas.withColumn("name", create_name_udf(column)) \
            .withColumn("fully_qualified_name", create_fqn_udf(column)) \
            .withColumn("parent_entry", F.lit(parent_name)) \
            .withColumn("entry_type", F.lit(full_entry_type)) \
            .withColumn("entry_source", create_entry_source(column)) \
            .withColumn("aspects", create_entry_aspect(entry_aspect_name)) \
          .drop(column)
      
          df = convert_to_import_items(df, [entry_aspect_name])
          return df
      
      
      def build_dataset(config, df_raw, db_schema, entry_type):
          """Build table entries from a flat list of columns.
          Args:
              df_raw - a plain dataframe with TABLE_NAME, COLUMN_NAME, DATA_TYPE,
                       and NULLABLE columns
              db_schema - parent database schema
              entry_type - entry type: table or view
          Returns:
              A dataframe with Dataplex-readable data of tables of views.
          """
          schema_key = "dataplex-types.global.schema"
      
          # The transformation below does the following
          # 1. Alters NULLABLE content from Y/N to NULLABLE/REQUIRED
          # 2. Renames NULLABLE to mode
          # 3. Renames DATA_TYPE to dataType
          # 4. Creates metadataType column based on dataType column
          # 5. Renames COLUMN_NAME to name
          df = df_raw \
            .withColumn("mode", F.when(F.col("NULLABLE") == 'Y', "NULLABLE").otherwise("REQUIRED")) \
            .drop("NULLABLE") \
            .withColumnRenamed("DATA_TYPE", "dataType") \
            .withColumn("metadataType", choose_metadata_type_udf("dataType")) \
            .withColumnRenamed("COLUMN_NAME", "name")
      
          # The transformation below aggregate fields, denormalizing the table
          # TABLE_NAME becomes top-level filed, and the rest is put into
          # the array type called "fields"
          aspect_columns = ["name", "mode", "dataType", "metadataType"]
          df = df.withColumn("columns", F.struct(aspect_columns))\
            .groupby('TABLE_NAME') \
            .agg(F.collect_list("columns").alias("fields"))
      
          # Create nested structured called aspects.
          # Fields are becoming a part of a `schema` struct
          # There is also an entry_aspect that is repeats entry_type as aspect_type
          entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
          df = df.withColumn("schema",
                             F.create_map(F.lit(schema_key),
                                          F.named_struct(
                                              F.lit("aspect_type"),
                                              F.lit(schema_key),
                                              F.lit("data"),
                                              F.create_map(F.lit("fields"),
                                                           F.col("fields")))
                                          )
                             )\
            .withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \
          .drop("fields")
      
          # Merge separate aspect columns into the one map called 'aspects'
          df = df.select(F.col("TABLE_NAME"),
                         F.map_concat("schema", "entry_aspect").alias("aspects"))
      
          # Define user-defined functions to fill the general information
          # and hierarchy names
          create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type,
                                                           db_schema, x),
                                  StringType())
      
          create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type,
                                                         db_schema, x), StringType())
      
          parent_name = nb.create_parent_name(config, entry_type, db_schema)
          full_entry_type = entry_type.value.format(
              project=config["target_project_id"],
              location=config["target_location_id"])
      
          # Fill the top-level fields
          column = F.col("TABLE_NAME")
          df = df.withColumn("name", create_name_udf(column)) \
            .withColumn("fully_qualified_name", create_fqn_udf(column)) \
            .withColumn("entry_type", F.lit(full_entry_type)) \
            .withColumn("parent_entry", F.lit(parent_name)) \
            .withColumn("entry_source", create_entry_source(column)) \
          .drop(column)
      
          df = convert_to_import_items(df, [schema_key, entry_aspect_name])
          return df
      

      Tenha em conta o seguinte:

    6. Atualize o ficheiro bootstrap.py com código para gerar o ficheiro de importação de metadados e executar o conetor.

      """The entrypoint of a pipeline."""
      from typing import Dict
      
      from src.constants import EntryType
      from src import cmd_reader
      from src import secret_manager
      from src import entry_builder
      from src import gcs_uploader
      from src import top_entry_builder
      from src.oracle_connector import OracleConnector
      
      
      FILENAME = "output.jsonl"
      
      
      def write_jsonl(output_file, json_strings):
          """Writes a list of string to the file in JSONL format."""
      
          # For simplicity, dataset is written into the one file. But it is not
          # mandatory, and the order doesn't matter for Import API.
          # The PySpark itself could dump entries into many smaller JSONL files.
          # Due to performance, it's recommended to dump to many smaller files.
          for string in json_strings:
              output_file.write(string + "\n")
      
      
      def process_dataset(
          connector: OracleConnector,
          config: Dict[str, str],
          schema_name: str,
          entry_type: EntryType,
      ):
          """Builds dataset and converts it to jsonl."""
          df_raw = connector.get_dataset(schema_name, entry_type)
          df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
          return df.toJSON().collect()
      
      
      def run():
          """Runs a pipeline."""
          config = cmd_reader.read_args()
          config["password"] = secret_manager.get_password(config["password_secret"])
          connector = OracleConnector(config)
      
          with open(FILENAME, "w", encoding="utf-8") as file:
              # Write top entries that don't require connection to the database
              file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
              file.writelines("\n")
              file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
      
              # Get schemas, write them and collect to the list
              df_raw_schemas = connector.get_db_schemas()
              schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
              schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
      
              write_jsonl(file, schemas_json)
      
              # Ingest tables and views for every schema in a list
              for schema in schemas:
                  print(f"Processing tables for {schema}")
                  tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                  write_jsonl(file, tables_json)
                  print(f"Processing views for {schema}")
                  views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                  write_jsonl(file, views_json)
      
          gcs_uploader.upload(config, FILENAME)
      

      Este exemplo guarda o ficheiro de importação de metadados como um único ficheiro JSON Lines. Pode usar ferramentas do PySpark, como a classe DataFrameWriter, para gerar lotes de JSON em paralelo.

      O conetor pode escrever entradas no ficheiro de importação de metadados em qualquer ordem.

    7. Atualize o ficheiro gcs_uploader.py com código para carregar o ficheiro de importação de metadados para um contentor do Cloud Storage.

      """Sends files to GCP storage."""
      from typing import Dict
      from google.cloud import storage
      
      
      def upload(config: Dict[str, str], filename: str):
          """Uploads a file to GCP bucket."""
          client = storage.Client()
          bucket = client.get_bucket(config["output_bucket"])
          folder = config["output_folder"]
      
          blob = bucket.blob(f"{folder}/{filename}")
          blob.upload_from_filename(filename)
      
    8. Crie a imagem do conetor.

      Se o seu conetor contiver vários ficheiros ou quiser usar bibliotecas que não estão incluídas na imagem Docker predefinida, tem de usar um contentor personalizado. O Serverless para Apache Spark executa cargas de trabalho em contentores do Docker. Crie uma imagem Docker personalizada do conetor e armazene a imagem no Artifact Registry. O Serverless para Apache Spark lê a imagem do Artifact Registry.

      1. Crie um Dockerfile:

        FROM debian:11-slim
        
        ENV DEBIAN_FRONTEND=noninteractive
        
        RUN apt update && apt install -y procps tini
        RUN apt install -y wget
        
        ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
        RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
        COPY ojdbc11.jar "${SPARK_EXTRA_JARS_DIR}"
        
        ENV CONDA_HOME=/opt/miniconda3
        ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
        ENV PATH=${CONDA_HOME}/bin:${PATH}
        RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py311_24.9.2-0-Linux-x86_64.sh
        
        RUN bash Miniconda3-py310_23.3.1-0-Linux-x86_64.sh -b -p /opt/miniconda3 \
          && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
          && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
          && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
          && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
        
        RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
            && ${CONDA_HOME}/bin/mamba install \
              conda \
              google-cloud-dataproc \
              google-cloud-logging \
              google-cloud-monitoring \
              google-cloud-storage
        
        RUN apt update && apt install -y git
        COPY requirements.txt .
        RUN python -m pip install -r requirements.txt
        
        ENV PYTHONPATH=/opt/python/packages
        RUN mkdir -p "${PYTHONPATH}/src/"
        COPY src/ "${PYTHONPATH}/src/"
        COPY main.py .
        
        RUN groupadd -g 1099 spark
        RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
        USER spark

        Use o Conda como gestor de pacotes. O Serverless for Apache Spark monta pyspark no contentor no momento da execução, pelo que não precisa de instalar dependências do PySpark na sua imagem de contentor personalizada.

      2. Crie a imagem de contentor personalizada e envie-a para o Artifact Registry.

        #!/bin/bash
        
        IMAGE=oracle-pyspark:0.0.1
        PROJECT=<PROJECT_ID>
        
        
        REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pyspark
        
        docker build -t "${IMAGE}" .
        
        # Tag and push to GCP container registry
        gcloud config set project ${PROJECT}
        gcloud auth configure-docker us-central1-docker.pkg.dev
        docker tag "${IMAGE}" "${REPO_IMAGE}"
        docker push "${REPO_IMAGE}"
        

        Uma vez que uma imagem pode ter vários nomes, pode usar a etiqueta Docker para atribuir um alias à imagem.

    9. Execute o conetor no Serverless para Apache Spark. Para enviar uma tarefa em lote do PySpark através da imagem do contentor personalizado, execute o comando gcloud dataproc batches submit pyspark.

      gcloud dataproc batches submit pyspark main.py --project=PROJECT \
          --region=REGION --batch=BATCH_ID \
          --container-image=CUSTOM_CONTAINER_IMAGE \
          --service-account=SERVICE_ACCOUNT_NAME \
          --jars=PATH_TO_JAR_FILES \
          --properties=PYSPARK_PROPERTIES \
          -- PIPELINE_ARGUMENTS
      

      Tenha em conta o seguinte:

      • Os ficheiros JAR são controladores do Spark. Para ler a partir do Oracle, MySQL ou Postgres, tem de fornecer ao Apache Spark um pacote específico. O pacote pode estar localizado no Cloud Storage ou no interior do contentor. Se o ficheiro JAR estiver no contentor, o caminho é semelhante a file:///path/to/file/driver.jar. Neste exemplo, o caminho para o ficheiro JAR é /opt/spark/jars/.
      • PIPELINE_ARGUMENTS são os argumentos da linha de comandos para o conetor.

      O conetor extrai metadados da base de dados Oracle, gera um ficheiro de importação de metadados e guarda o ficheiro de importação de metadados num contentor do Cloud Storage.

    10. Para importar manualmente os metadados no ficheiro de importação de metadados para o Catálogo universal do Dataplex, execute uma tarefa de metadados. Use o metadataJobs.create método.

      1. Na linha de comandos, adicione variáveis de ambiente e crie um alias para o comando curl.

        PROJECT_ID=PROJECT
        LOCATION_ID=LOCATION
        DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID
        alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
        
      2. Chame o método API, transmitindo os tipos de entradas e os tipos de aspetos que quer importar.

        gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF
        {
          "type": "IMPORT",
          "import_spec": {
            "source_storage_uri": "gs://BUCKET/FOLDER/",
            "entry_sync_mode": "FULL",
            "aspect_sync_mode": "INCREMENTAL",
            "scope": {
              "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"],
              "entry_types": [
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"],
        
              "aspect_types": [
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance",
                "projects/dataplex-types/locations/global/aspectTypes/schema",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"],
              },
            },
          }
        EOF
        )"
        

        O tipo de aspeto schema é um tipo de aspeto global definido pelo Dataplex Universal Catalog.

        Tenha em atenção que o formato que usa para os nomes dos tipos de aspetos quando chama o método da API é diferente do formato que usa no código do conector.

      3. Opcional: use o Cloud Logging para ver os registos da tarefa de metadados. Para mais informações, consulte o artigo Monitorize os registos do Dataplex Universal Catalog.

    Configure a orquestração de pipelines

    As secções anteriores mostraram como criar um conetor de exemplo e executar o conetor manualmente.

    Num ambiente de produção, executa o conector como parte de um pipeline de conetividade gerido, usando uma plataforma de orquestração como o Workflows.

    1. Para executar um pipeline de conetividade gerido com o conetor de exemplo, siga os passos para importar metadados através dos fluxos de trabalho. Faça o seguinte:

      • Crie o fluxo de trabalho na mesma Google Cloud localização que o conector.
      • No ficheiro de definição do fluxo de trabalho, atualize a função submit_pyspark_extract_job com o seguinte código para extrair dados da base de dados Oracle através do conetor que criou.

        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  jars: file:///opt/spark/jars/ojdbc11.jar
                  args:
                    - ${"--host_port=" + args.ORACLE_HOST_PORT}
                    - ${"--user=" + args.ORACLE_USER}
                    - ${"--password=" + args.ORACLE_PASSWORD}
                    - ${"--database=" + args.ORACE_DATABASE}
                    - ${"--project=" + args.TARGET_PROJECT_ID}
                    - ${"--location=" + args.CLOUD_REGION}
                    - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--folder=" + WORKFLOW_ID}
                runtimeConfig:
                  version: "2.0"
                  containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark"
                environmentConfig:
                  executionConfig:
                      serviceAccount: ${args.SERVICE_ACCOUNT}
            result: RESPONSE_MESSAGE
        
      • No ficheiro de definição do fluxo de trabalho, atualize a função submit_import_job com o seguinte código para importar as entradas. A função chama o método da API metadataJobs.create para executar uma tarefa de importação de metadados.

        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  scope:
                    entry_groups:
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}
                    entry_types:
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"
                    aspect_types:
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance"
                      -"projects/dataplex-types/locations/global/aspectTypes/schema"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"
            result: IMPORT_JOB_RESPONSE
        

        Forneça os mesmos tipos de entradas e tipos de aspetos que incluiu quando chamou o método da API manualmente. Tenha em atenção que não existe uma vírgula no final de cada string.

      • Quando executar o fluxo de trabalho, forneça os seguintes argumentos de tempo de execução:

        {
          "CLOUD_REGION": "us-central1",
          "ORACLE_USER": "system",
          "ORACLE_HOST_PORT": "x.x.x.x:1521",
          "ORACLE_DATABASE": "xe",
          "ADDITIONAL_CONNECTOR_ARGS": [],
        }
        
    2. Opcional: use o Cloud Logging para ver os registos do pipeline de conetividade gerida. A carga útil do registo inclui um link para os registos da tarefa em lote do Serverless for Apache Spark e da tarefa de importação de metadados, conforme relevante. Para mais informações, consulte o artigo Veja os registos do fluxo de trabalho.

    3. Opcional: para melhorar a segurança, o desempenho e a funcionalidade do seu pipeline de conetividade gerido, considere fazer o seguinte:

      1. Use o Secret Manager para armazenar as credenciais da sua origem de dados de terceiros.
      2. Use o PySpark para escrever a saída JSON Lines em vários ficheiros de importação de metadados em paralelo.
      3. Use um prefixo para dividir ficheiros grandes (mais de 100 MB) em ficheiros mais pequenos.
      4. Adicione mais aspetos personalizados que captam metadados técnicos e empresariais adicionais da sua origem.

    Exemplos de recursos de metadados para uma origem Oracle

    O conetor de exemplo extrai metadados de uma base de dados Oracle e mapeia os metadados para os recursos de metadados do Dataplex Universal Catalog correspondentes.

    Considerações sobre a hierarquia

    Todos os sistemas no Dataplex Universal Catalog têm uma entrada raiz que é a entrada principal do sistema. Normalmente, a entrada raiz tem um tipo de entrada instance. A tabela seguinte mostra a hierarquia de exemplo de tipos de entradas e tipos de aspetos para um sistema Oracle. Por exemplo, o tipo de entrada oracle-database está associado a um tipo de aspeto que também se chama oracle-database.

    ID do tipo de entrada Descrição ID do tipo de aspeto associado
    oracle-instance A raiz do sistema importado. oracle-instance
    oracle-database A base de dados Oracle. oracle-database
    oracle-schema O esquema da base de dados. oracle-schema
    oracle-table Uma tabela.

    oracle-table

    schema

    oracle-view Uma vista.

    oracle-view

    schema

    O tipo de aspeto schema é um tipo de aspeto global definido pelo Dataplex Universal Catalog. Contém uma descrição dos campos numa tabela, numa vista ou noutra entidade que tenha colunas. O oracle-schematipo de aspeto personalizado contém o nome do esquema da base de dados Oracle.

    Exemplo de campos de artigos de importação

    O conector deve usar as seguintes convenções para recursos do Oracle.

    • Nomes totalmente qualificados: os nomes totalmente qualificados para recursos Oracle usam o seguinte modelo de nomenclatura. Os carateres proibidos são escapados com acentos graves.

      Recurso Modelo Exemplo
      Instância

      SOURCE:ADDRESS

      Use o número da porta e do anfitrião ou o nome de domínio do sistema.

      oracle:`localhost:1521` ou oracle:`myinstance.com`
      Bases de dados SOURCE:ADDRESS.DATABASE oracle:`localhost:1521`.xe
      Esquema SOURCE:ADDRESS.DATABASE.SCHEMA oracle:`localhost:1521`.xe.sys
      Tabela SOURCE:ADDRESS.DATABASE.SCHEMA.TABLE_NAME oracle:`localhost:1521`.xe.sys.orders
      Ver SOURCE:ADDRESS.DATABASE.SCHEMA.VIEW_NAME oracle:`localhost:1521`.xe.sys.orders_view
    • Nomes de entradas ou IDs de entradas: as entradas para recursos da Oracle usam o seguinte modelo de nomenclatura. Os carateres proibidos são substituídos por um caráter permitido. Os recursos usam o prefixo projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries.

      Recurso Modelo Exemplo
      Instância PREFIX/HOST_PORT projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
      Bases de dados PREFIX/HOST_PORT/databases/DATABASE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
      Esquema PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
      Tabela PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
      Ver PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEW projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
    • Entradas principais: se uma entrada não for uma entrada de raiz para o sistema, a entrada pode ter um campo de entrada principal que descreve a respetiva posição na hierarquia. O campo deve conter o nome da entrada principal. Recomendamos que gere este valor.

      A tabela seguinte mostra as entradas principais para recursos da Oracle.

      Entrada Entrada principal
      Instância "" (string vazia)
      Bases de dados Nome da instância
      Esquema Nome da base de dados
      Tabela Nome do esquema
      Ver Nome do esquema
    • Mapa de aspetos: o mapa de aspetos tem de conter, pelo menos, um aspeto que descreva a entidade a importar. Segue-se um exemplo de um mapa de aspetos para uma tabela Oracle.

      "example-project.us-central1.oracle-table": {
          "aspect_type": "example-project.us-central1.oracle-table",
          "path": "",
          "data": {}
       },

      Pode encontrar tipos de aspetos predefinidos (como schema) que definem a estrutura da tabela ou da visualização no dataplex-types projeto, na localização global.

    • Chaves de aspeto: as chaves de aspeto usam o formato de nomenclatura PROJECT.LOCATION.ASPECT_TYPE. A tabela seguinte mostra exemplos de chaves de aspeto para recursos Oracle.

      Entrada Exemplo de chave de aspeto
      Instância example-project.us-central1.oracle-instance
      Bases de dados example-project.us-central1.oracle-database
      Esquema example-project.us-central1.oracle-schema
      Tabela example-project.us-central1.oracle-table
      Ver example-project.us-central1.oracle-view

    O que se segue?