Desenvolver um conector personalizado para importação de metadados

Este documento fornece um modelo de referência para você criar um conector personalizado que extrai metadados de uma fonte externa. Use o conector ao executar um pipeline de conectividade gerenciada que importe metadados para o Dataplex.

Você pode criar conectores para extrair metadados de fontes de terceiros. Por exemplo, é possível criar um conector para extrair dados de fontes como MySQL, SQL Server, Oracle, Snowflake, Databricks e outras.

Use o exemplo de conector neste documento como ponto de partida para criar seus próprios conectores. O exemplo de conector se conecta a um banco de dados do Oracle Database Express Edition (XE). O conector é criado em Python, mas você também pode usar Java, Scala ou R.

Como funcionam os conectores

Um conector extrai metadados de uma origem de dados de terceiros, transforma os metadados no formato ImportItem do Dataplex e gera arquivos de importação de metadados que podem ser importados pelo Dataplex.

O conector faz parte de um pipeline de conectividade gerenciada. Um pipeline de conectividade gerenciado é um fluxo de trabalho orquestrado que você usa para importar metadados do Dataplex Catalog. O pipeline de conectividade gerenciada executa o conector e realiza outras tarefas no fluxo de trabalho de importação, como executar um job de importação de metadados e capturar registros.

O pipeline de conectividade gerenciado executa o conector usando um job em lote do Dataproc Serverless. O Dataproc Serverless oferece um ambiente de execução do Spark sem servidor. Embora seja possível criar um conector que não use o Spark, recomendamos que você use o Spark porque ele pode melhorar o desempenho do conector.

Requisitos do conector

O conector tem os seguintes requisitos:

  • O conector precisa ser uma imagem do Artifact Registry que possa ser executada no Dataproc sem servidor.
  • O conector precisa gerar arquivos de metadados em um formato que possa ser importado por um job de importação de metadados do Dataplex (o método da API metadataJobs.create). Para requisitos detalhados, consulte Arquivo de importação de metadados.
  • O conector precisa aceitar os seguintes argumentos de linha de comando para receber informações do pipeline:

    Argumento de linha de comando Valor que o pipeline oferece
    target_project_id PROJECT_ID
    target_location_id REGION
    target_entry_group_id ENTRY_GROUP_ID
    output_bucket CLOUD_STORAGE_BUCKET_ID
    output_folder FOLDER_ID

    O conector usa esses argumentos para gerar metadados em um grupo de entrada de destino projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID e gravar em um bucket do Cloud Storage gs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID. Cada execução do pipeline cria uma nova pasta FOLDER_ID no bucket CLOUD_STORAGE_BUCKET_ID. O conector precisa gravar arquivos de importação de metadados nessa pasta.

Os modelos de pipeline são compatíveis com os conectores PySpark. Os modelos presumem que o driver (mainPythonFileUri) seja um arquivo local na imagem do conector chamada main.py. É possível modificar os modelos de pipeline para outros cenários, como um conector do Spark, um URI de driver diferente ou outras opções.

Confira como usar o PySpark para criar um item de importação no arquivo de importação de metadados.

"""PySpark schemas for the data."""
entry_source_schema = StructType([
      StructField("display_name", StringType()),
      StructField("source", StringType())])

aspect_schema = MapType(StringType(),
                        StructType([
                            StructField("aspect_type", StringType()),
                            StructField("data", StructType([
                            ]))
                          ])
                        )

entry_schema = StructType([
  StructField("name", StringType()),
  StructField("entry_type", StringType()),
  StructField("fully_qualified_name", StringType()),
  StructField("parent_entry", StringType()),
  StructField("entry_source", entry_source_schema),
  StructField("aspects", aspect_schema)
])

import_item_schema = StructType([
  StructField("entry", entry_schema),
  StructField("aspect_keys", ArrayType(StringType())),
  StructField("update_mask", ArrayType(StringType()))
])

Antes de começar

Neste guia, pressupomos que você já conhece o Python e o PySpark.

Confira as seguintes informações:

Faça o seguinte. Crie todos os recursos no mesmo local Google Cloud.

  1. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the Dataplex, Dataproc, Workflows, and Artifact Registry APIs:

    gcloud services enable dataplex.googleapis.com dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com
  4. Install the Google Cloud CLI.
  5. To initialize the gcloud CLI, run the following command:

    gcloud init
  6. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  7. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant the roles/owner IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
  8. Crie um bucket do Cloud Storage para armazenar os arquivos de importação de metadados.

  9. Crie os seguintes recursos do Dataplex Catalog no mesmo projeto.

    Para conferir valores de exemplo, consulte a seção Exemplo de recursos do Dataplex Catalog para uma origem do Oracle deste documento.

    1. Crie um grupo de entrada.
    2. Crie tipos de aspecto personalizados para as entradas que você quer importar. Use a convenção de nomenclatura SOURCE-ENTITY_TO_IMPORT.

      Opcionalmente, você pode criar outros tipos de aspecto para armazenar outras informações.

    3. Crie tipos de entrada personalizados para os recursos que você quer importar e atribua os tipos de aspecto relevantes a eles. Use a convenção de nomenclatura SOURCE-ENTITY_TO_IMPORT.

      Por exemplo, para um banco de dados Oracle, crie um tipo de entrada chamado oracle-database. Vincule-o ao tipo de aspecto chamado oracle-database.

  10. Verifique se a origem de terceiros pode ser acessada no projeto Google Cloud . Para mais informações, consulte Configuração de rede do Dataproc Serverless para Spark.

Criar um conector básico do Python

O exemplo de conector básico do Python cria entradas de nível superior para uma fonte de dados do Oracle usando as classes da biblioteca de cliente do Dataplex. Em seguida, forneça os valores para os campos de entrada.

O conector cria um arquivo de importação de metadados com as seguintes entradas:

  • Uma entrada instance, com o tipo de entrada projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance. Essa entrada representa um sistema do Oracle Database XE.
  • Uma entrada database, que representa um banco de dados no sistema Oracle Database XE.

Para criar um conector básico do Python, faça o seguinte:

  1. Clone o repositório cloud-dataplex.

  2. Configure um ambiente local. Recomendamos que você use um ambiente virtual.

    mkdir venv
    python -m venv venv/
    source venv/bin/activate
    

    Use as versões ativas ou de manutenção do Python. As versões 3.7 e posteriores do Python são compatíveis.

  3. Crie um projeto Python.

  4. Requisitos de instalação:

    pip install -r requirements.txt
    

    Os requisitos a seguir são instalados:

    google-cloud-dataplex==2.2.2
    google-cloud-storage
    google-cloud-secret-manager
    
  5. Adicione um arquivo de pipeline main.py na raiz do projeto.

    from src import bootstrap
    
    
    if __name__ == '__main__':
        bootstrap.run()
    

    Ao implantar seu código no Dataproc Serverless, o arquivo main.py serve como ponto de entrada para a execução. Recomendamos que você minimize a quantidade de informações armazenadas no arquivo main.py. Use esse arquivo para chamar funções e classes definidas no conector, como a classe src/bootstap.py.

  6. Crie uma pasta src para armazenar a maior parte da lógica do conector.

  7. Atualize o arquivo src/cmd_reader.py com uma classe Python para aceitar argumentos de linha de comando. Para isso, use o módulo argeparse.

    """Command line reader."""
    import argparse
    
    
    def read_args():
        """Reads arguments from the command line."""
        parser = argparse.ArgumentParser()
    
        # Dataplex arguments
        parser.add_argument("--target_project_id", type=str, required=True,
            help="The name of the target Google Cloud project to import the metadata into.")
        parser.add_argument("--target_location_id", type=str, required=True,
            help="The target Google Cloud location where the metadata will be imported into.")
        parser.add_argument("--target_entry_group_id", type=str, required=True,
            help="The ID of the entry group to import metadata into. "
                 "The metadata will be imported into entry group with the following"
                 "full resource name: projects/${target_project_id}/"
                 "locations/${target_location_id}/entryGroups/${target_entry_group_id}.")
    
        # Oracle arguments
        parser.add_argument("--host_port", type=str, required=True,
            help="Oracle host and port number separated by the colon (:).")
        parser.add_argument("--user", type=str, required=True, help="Oracle User.")
        parser.add_argument("--password-secret", type=str, required=True,
            help="Secret resource name in the Secret Manager for the Oracle password.")
        parser.add_argument("--database", type=str, required=True,
            help="Source Oracle database.")
    
        # Google Cloud Storage arguments
        # It is assumed that the bucket is in the same region as the entry group
        parser.add_argument("--output_bucket", type=str, required=True,
            help="The Cloud Storage bucket to write the generated metadata import file.")
        parser.add_argument("--output_folder", type=str, required=True,
            help="A folder in the Cloud Storage bucket, to write the generated metadata import files.")
    
        return vars(parser.parse_known_args()[0])
    

    Em ambientes de produção, recomendamos armazenar a senha no Secret Manager.

  8. Atualize o arquivo src/constants.py com o código para criar constantes.

    """Constants that are used in the different files."""
    import enum
    
    SOURCE_TYPE = "oracle"
    
    # Symbols for replacement
    FORBIDDEN = "#"
    ALLOWED = "!"
    
    
    class EntryType(enum.Enum):
        """Types of Oracle entries."""
        INSTANCE: str = "projects/{project}/locations/{location}/entryTypes/oracle-instance"
        DATABASE: str = "projects/{project}/locations/{location}/entryTypes/oracle-database"
        DB_SCHEMA: str = "projects/{project}/locations/{location}/entryTypes/oracle-schema"
        TABLE: str = "projects/{project}/locations/{location}/entryTypes/oracle-table"
        VIEW: str = "projects/{project}/locations/{location}/entryTypes/oracle-view"
    
  9. Atualize o arquivo src/name_builder.py com métodos para criar os recursos do Dataplex Catalog que você quer que o conector crie para seus recursos do Oracle. Use as convenções descritas na seção Exemplo de recursos do Dataplex Catalog para uma origem do Oracle deste documento.

    """Builds Dataplex hierarchy identifiers."""
    from typing import Dict
    from src.constants import EntryType, SOURCE_TYPE
    
    
    # Oracle cluster users start with C## prefix, but Dataplex doesn't accept #.
    # In that case in names it is changed to C!!, and escaped with backticks in FQNs
    FORBIDDEN_SYMBOL = "#"
    ALLOWED_SYMBOL = "!"
    
    
    def create_fqn(config: Dict[str, str], entry_type: EntryType,
                   schema_name: str = "", table_name: str = ""):
        """Creates a fully qualified name or Dataplex v1 hierarchy name."""
        if FORBIDDEN_SYMBOL in schema_name:
            schema_name = f"`{schema_name}`"
    
        if entry_type == EntryType.INSTANCE:
            # Requires backticks to escape column
            return f"{SOURCE_TYPE}:`{config['host_port']}`"
        if entry_type == EntryType.DATABASE:
            instance = create_fqn(config, EntryType.INSTANCE)
            return f"{instance}.{config['database']}"
        if entry_type == EntryType.DB_SCHEMA:
            database = create_fqn(config, EntryType.DATABASE)
            return f"{database}.{schema_name}"
        if entry_type in [EntryType.TABLE, EntryType.VIEW]:
            database = create_fqn(config, EntryType.DATABASE)
            return f"{database}.{schema_name}.{table_name}"
        return ""
    
    
    def create_name(config: Dict[str, str], entry_type: EntryType,
                    schema_name: str = "", table_name: str = ""):
        """Creates a Dataplex v2 hierarchy name."""
        if FORBIDDEN_SYMBOL in schema_name:
            schema_name = schema_name.replace(FORBIDDEN_SYMBOL, ALLOWED_SYMBOL)
        if entry_type == EntryType.INSTANCE:
            name_prefix = (
                f"projects/{config['target_project_id']}/"
                f"locations/{config['target_location_id']}/"
                f"entryGroups/{config['target_entry_group_id']}/"
                f"entries/"
            )
            return name_prefix + config["host_port"].replace(":", "@")
        if entry_type == EntryType.DATABASE:
            instance = create_name(config, EntryType.INSTANCE)
            return f"{instance}/databases/{config['database']}"
        if entry_type == EntryType.DB_SCHEMA:
            database = create_name(config, EntryType.DATABASE)
            return f"{database}/database_schemas/{schema_name}"
        if entry_type == EntryType.TABLE:
            db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
            return f"{db_schema}/tables/{table_name}"
        if entry_type == EntryType.VIEW:
            db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
            return f"{db_schema}/views/{table_name}"
        return ""
    
    
    def create_parent_name(config: Dict[str, str], entry_type: EntryType,
                           parent_name: str = ""):
        """Generates a Dataplex v2 name of the parent."""
        if entry_type == EntryType.DATABASE:
            return create_name(config, EntryType.INSTANCE)
        if entry_type == EntryType.DB_SCHEMA:
            return create_name(config, EntryType.DATABASE)
        if entry_type == EntryType.TABLE:
            return create_name(config, EntryType.DB_SCHEMA, parent_name)
        return ""
    
    
    def create_entry_aspect_name(config: Dict[str, str], entry_type: EntryType):
        """Generates an entry aspect name."""
        last_segment = entry_type.value.split("/")[-1]
        return f"{config['target_project_id']}.{config['target_location_id']}.{last_segment}"
    

    Como o arquivo name_builder.py é usado para o código principal do Python e do PySpark, recomendamos que você escreva os métodos como funções puras, em vez de membros de uma classe.

  10. Atualize o arquivo src/top_entry_builder.py com o código para preencher as entradas de nível superior com dados.

    """Non-Spark approach for building the entries."""
    import dataclasses
    import json
    from typing import List, Dict
    
    import proto
    from google.cloud import dataplex_v1
    
    from src.constants import EntryType
    from src import name_builder as nb
    
    
    @dataclasses.dataclass(slots=True)
    class ImportItem:
        """A template class for Import API."""
    
        entry: dataplex_v1.Entry = dataclasses.field(default_factory=dataplex_v1.Entry)
        aspect_keys: List[str] = dataclasses.field(default_factory=list)
        update_mask: List[str] = dataclasses.field(default_factory=list)
    
    
    def _dict_factory(data: object):
        """Factory function required for converting Entry dataclass to dict."""
    
        def convert(obj: object):
            if isinstance(obj, proto.Message):
                return proto.Message.to_dict(obj)
            return obj
    
        return dict((k, convert(v)) for k, v in data)
    
    
    def _create_entry(config: Dict[str, str], entry_type: EntryType):
        """Creates an entry based on a Dataplex library."""
        entry = dataplex_v1.Entry()
        entry.name = nb.create_name(config, entry_type)
        entry.entry_type = entry_type.value.format(
            project=config["target_project_id"], location=config["target_location_id"]
        )
        entry.fully_qualified_name = nb.create_fqn(config, entry_type)
        entry.parent_entry = nb.create_parent_name(config, entry_type)
    
        aspect_key = nb.create_entry_aspect_name(config, entry_type)
    
        # Add mandatory aspect
        entry_aspect = dataplex_v1.Aspect()
        entry_aspect.aspect_type = aspect_key
        entry_aspect.data = {}
        entry.aspects[aspect_key] = entry_aspect
    
        return entry
    
    
    def _entry_to_import_item(entry: dataplex_v1.Entry):
        """Packs entry to import item, accepted by the API,"""
        import_item = ImportItem()
        import_item.entry = entry
        import_item.aspect_keys = list(entry.aspects.keys())
        import_item.update_mask = "aspects"
    
        return import_item
    
    
    def create(config, entry_type: EntryType):
        """Creates an entry, packs it to Import Item and converts to json."""
        import_item = _entry_to_import_item(_create_entry(config, entry_type))
        return json.dumps(dataclasses.asdict(import_item, dict_factory=_dict_factory))
    
  11. Atualize o arquivo src/bootstrap.py com o código para gerar o arquivo de importação de metadados e executar o conector.

    """The entrypoint of a pipeline."""
    from typing import Dict
    
    from src.constants import EntryType
    from src import cmd_reader
    from src import secret_manager
    from src import entry_builder
    from src import gcs_uploader
    from src import top_entry_builder
    from src.oracle_connector import OracleConnector
    
    
    FILENAME = "output.jsonl"
    
    
    def write_jsonl(output_file, json_strings):
        """Writes a list of string to the file in JSONL format."""
    
        # For simplicity, dataset is written into the one file. But it is not
        # mandatory, and the order doesn't matter for Import API.
        # The PySpark itself could dump entries into many smaller JSONL files.
        # Due to performance, it's recommended to dump to many smaller files.
        for string in json_strings:
            output_file.write(string + "\n")
    
    
    def process_dataset(
        connector: OracleConnector,
        config: Dict[str, str],
        schema_name: str,
        entry_type: EntryType,
    ):
        """Builds dataset and converts it to jsonl."""
        df_raw = connector.get_dataset(schema_name, entry_type)
        df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
        return df.toJSON().collect()
    
    
    def run():
        """Runs a pipeline."""
        config = cmd_reader.read_args()
        config["password"] = secret_manager.get_password(config["password_secret"])
        connector = OracleConnector(config)
    
        with open(FILENAME, "w", encoding="utf-8") as file:
            # Write top entries that don't require connection to the database
            file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
            file.writelines("\n")
            file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
    
            # Get schemas, write them and collect to the list
            df_raw_schemas = connector.get_db_schemas()
            schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
            schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
    
            write_jsonl(file, schemas_json)
    
            # Ingest tables and views for every schema in a list
            for schema in schemas:
                print(f"Processing tables for {schema}")
                tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                write_jsonl(file, tables_json)
                print(f"Processing views for {schema}")
                views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                write_jsonl(file, views_json)
    
        gcs_uploader.upload(config, FILENAME)
    
  12. Execute o código localmente.

    Um arquivo de importação de metadados chamado output.jsonl é retornado. O arquivo tem duas linhas, cada uma representando um item de importação. O pipeline de conectividade gerenciada lê esse arquivo ao executar o job de importação de metadados.

  13. Opcional: estenda o exemplo anterior para usar as classes da biblioteca de cliente do Dataplex para criar itens de importação de tabelas, esquemas e visualizações. Também é possível executar o exemplo de Python no Dataproc sem servidor.

    Recomendamos que você crie um conector que use o Spark (e seja executado no Dataproc Serverless), porque ele pode melhorar a performance do conector.

Criar um conector do PySpark

Este exemplo é baseado na API DataFrame do PySpark. É possível instalar e executar o PySpark SQL localmente antes de executar no Dataproc sem servidor. Se você instalar e executar o PySpark localmente, instale a biblioteca PySpark usando o pip, mas não será necessário instalar um cluster local do Spark.

Por motivos de desempenho, este exemplo não usa classes predefinidas da biblioteca PySpark. Em vez disso, o exemplo cria DataFrames, os converte em entradas JSON e grava a saída em um arquivo de importação de metadados no formato JSON Lines que pode ser importado para o Dataplex.

Para criar um conector usando o PySpark, faça o seguinte:

  1. Clone o repositório cloud-dataplex.

  2. Instale o PySpark:

    pip install pyspark
    
  3. Requisitos de instalação:

    pip install -r requirements.txt
    

    Os requisitos a seguir são instalados:

    google-cloud-dataplex==2.2.2
    google-cloud-storage
    google-cloud-secret-manager
    
  4. Atualize o arquivo oracle_connector.py com o código para ler dados de uma fonte de dados do Oracle e retornar DataFrames.

    """Reads Oracle using PySpark."""
    from typing import Dict
    from pyspark.sql import SparkSession, DataFrame
    
    from src.constants import EntryType
    
    
    SPARK_JAR_PATH = "/opt/spark/jars/ojdbc11.jar"
    
    
    class OracleConnector:
        """Reads data from Oracle and returns Spark Dataframes."""
    
        def __init__(self, config: Dict[str, str]):
            # PySpark entrypoint
            self._spark = SparkSession.builder.appName("OracleIngestor") \
                .config("spark.jars", SPARK_JAR_PATH) \
                .getOrCreate()
    
            self._config = config
            self._url = f"jdbc:oracle:thin:@{config['host_port']}:{config['database']}"
    
        def _execute(self, query: str) -> DataFrame:
            """A generic method to execute any query."""
            return self._spark.read.format("jdbc") \
                .option("driver", "oracle.jdbc.OracleDriver") \
                .option("url", self._url) \
                .option("query", query) \
                .option("user", self._config["user"]) \
                .option("password", self._config["password"]) \
                .load()
    
        def get_db_schemas(self) -> DataFrame:
            """In Oracle, schemas are usernames."""
            query = "SELECT username FROM dba_users"
            return self._execute(query)
    
        def _get_columns(self, schema_name: str, object_type: str) -> str:
            """Gets a list of columns in tables or views in a batch."""
            # Every line here is a column that belongs to the table or to the view.
            # This SQL gets data from ALL the tables in a given schema.
            return (f"SELECT col.TABLE_NAME, col.COLUMN_NAME, "
                    f"col.DATA_TYPE, col.NULLABLE "
                    f"FROM all_tab_columns col "
                    f"INNER JOIN DBA_OBJECTS tab "
                    f"ON tab.OBJECT_NAME = col.TABLE_NAME "
                    f"WHERE tab.OWNER = '{schema_name}' "
                    f"AND tab.OBJECT_TYPE = '{object_type}'")
    
        def get_dataset(self, schema_name: str, entry_type: EntryType):
            """Gets data for a table or a view."""
            # Dataset means that these entities can contain end user data.
            short_type = entry_type.name  # table or view, or the title of enum value
            query = self._get_columns(schema_name, short_type)
            return self._execute(query)
    

    Adicione consultas SQL para retornar os metadados que você quer importar. As consultas precisam retornar as seguintes informações:

    • Esquemas de banco de dados
    • Tabelas que pertencem a esses esquemas
    • Colunas que pertencem a essas tabelas, incluindo o nome da coluna, o tipo de dados da coluna e se a coluna é anulável ou obrigatória

    Todas as colunas de todas as tabelas e visualizações são armazenadas na mesma tabela do sistema. É possível selecionar colunas com o método _get_columns. Dependendo dos parâmetros fornecidos, você pode selecionar colunas para as tabelas ou para as visualizações separadamente.

    Observe o seguinte:

    • No Oracle, um esquema de banco de dados é de propriedade de um usuário do banco de dados e tem o mesmo nome desse usuário.
    • Os objetos de esquema são estruturas lógicas criadas pelos usuários. Objetos como tabelas ou índices podem conter dados, e objetos como visualizações ou sinônimos consistem apenas em uma definição.
    • O arquivo ojdbc11.jar contém o driver JDBC do Oracle.
  5. Atualize o arquivo src/entry_builder.py com métodos compartilhados para aplicar transformações do Spark.

    """Creates entries with PySpark."""
    import pyspark.sql.functions as F
    from pyspark.sql.types import StringType
    
    from src.constants import EntryType, SOURCE_TYPE
    from src import name_builder as nb
    
    
    @F.udf(returnType=StringType())
    def choose_metadata_type_udf(data_type: str):
        """Choose the metadata type based on Oracle native type."""
        if data_type.startswith("NUMBER") or data_type in ["FLOAT", "LONG"]:
            return "NUMBER"
        if data_type.startswith("VARCHAR") or data_type.startswith("NVARCHAR2"):
            return "STRING"
        if data_type == "DATE":
            return "DATETIME"
        return "OTHER"
    
    
    def create_entry_source(column):
        """Create Entry Source segment."""
        return F.named_struct(F.lit("display_name"),
                              column,
                              F.lit("system"),
                              F.lit(SOURCE_TYPE))
    
    
    def create_entry_aspect(entry_aspect_name):
        """Create aspect with general information (usually it is empty)."""
        return F.create_map(
            F.lit(entry_aspect_name),
            F.named_struct(
                F.lit("aspect_type"),
                F.lit(entry_aspect_name),
                F.lit("data"),
                F.create_map()
                )
            )
    
    
    def convert_to_import_items(df, aspect_keys):
        """Convert entries to import items."""
        entry_columns = ["name", "fully_qualified_name", "parent_entry",
                         "entry_source", "aspects", "entry_type"]
    
        # Puts entry to "entry" key, a list of keys from aspects in "aspects_keys"
        # and "aspects" string in "update_mask"
        return df.withColumn("entry", F.struct(entry_columns)) \
          .withColumn("aspect_keys", F.array([F.lit(key) for key in aspect_keys])) \
          .withColumn("update_mask", F.array(F.lit("aspects"))) \
          .drop(*entry_columns)
    
    
    def build_schemas(config, df_raw_schemas):
        """Create a dataframe with database schemas from the list of usernames.
        Args:
            df_raw_schemas - a dataframe with only one column called USERNAME
        Returns:
            A dataframe with Dataplex-readable schemas.
        """
        entry_type = EntryType.DB_SCHEMA
        entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
    
        # For schema, parent name is the name of the database
        parent_name =  nb.create_parent_name(config, entry_type)
    
        # Create user-defined function.
        create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, x),
                                StringType())
        create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, x),
                               StringType())
    
        # Fills the missed project and location into the entry type string
        full_entry_type = entry_type.value.format(
            project=config["target_project_id"],
            location=config["target_location_id"])
    
        # Converts a list of schema names to the Dataplex-compatible form
        column = F.col("USERNAME")
        df = df_raw_schemas.withColumn("name", create_name_udf(column)) \
          .withColumn("fully_qualified_name", create_fqn_udf(column)) \
          .withColumn("parent_entry", F.lit(parent_name)) \
          .withColumn("entry_type", F.lit(full_entry_type)) \
          .withColumn("entry_source", create_entry_source(column)) \
          .withColumn("aspects", create_entry_aspect(entry_aspect_name)) \
        .drop(column)
    
        df = convert_to_import_items(df, [entry_aspect_name])
        return df
    
    
    def build_dataset(config, df_raw, db_schema, entry_type):
        """Build table entries from a flat list of columns.
        Args:
            df_raw - a plain dataframe with TABLE_NAME, COLUMN_NAME, DATA_TYPE,
                     and NULLABLE columns
            db_schema - parent database schema
            entry_type - entry type: table or view
        Returns:
            A dataframe with Dataplex-readable data of tables of views.
        """
        schema_key = "dataplex-types.global.schema"
    
        # The transformation below does the following
        # 1. Alters NULLABLE content from Y/N to NULLABLE/REQUIRED
        # 2. Renames NULLABLE to mode
        # 3. Renames DATA_TYPE to dataType
        # 4. Creates metadataType column based on dataType column
        # 5. Renames COLUMN_NAME to name
        df = df_raw \
          .withColumn("mode", F.when(F.col("NULLABLE") == 'Y', "NULLABLE").otherwise("REQUIRED")) \
          .drop("NULLABLE") \
          .withColumnRenamed("DATA_TYPE", "dataType") \
          .withColumn("metadataType", choose_metadata_type_udf("dataType")) \
          .withColumnRenamed("COLUMN_NAME", "name")
    
        # The transformation below aggregate fields, denormalizing the table
        # TABLE_NAME becomes top-level filed, and the rest is put into
        # the array type called "fields"
        aspect_columns = ["name", "mode", "dataType", "metadataType"]
        df = df.withColumn("columns", F.struct(aspect_columns))\
          .groupby('TABLE_NAME') \
          .agg(F.collect_list("columns").alias("fields"))
    
        # Create nested structured called aspects.
        # Fields are becoming a part of a `schema` struct
        # There is also an entry_aspect that is repeats entry_type as aspect_type
        entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
        df = df.withColumn("schema",
                           F.create_map(F.lit(schema_key),
                                        F.named_struct(
                                            F.lit("aspect_type"),
                                            F.lit(schema_key),
                                            F.lit("data"),
                                            F.create_map(F.lit("fields"),
                                                         F.col("fields")))
                                        )
                           )\
          .withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \
        .drop("fields")
    
        # Merge separate aspect columns into the one map called 'aspects'
        df = df.select(F.col("TABLE_NAME"),
                       F.map_concat("schema", "entry_aspect").alias("aspects"))
    
        # Define user-defined functions to fill the general information
        # and hierarchy names
        create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type,
                                                         db_schema, x),
                                StringType())
    
        create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type,
                                                       db_schema, x), StringType())
    
        parent_name = nb.create_parent_name(entry_type, db_schema)
        full_entry_type = entry_type.value.format(
            project=config["target_project_id"],
            location=config["target_location_id"])
    
        # Fill the top-level fields
        column = F.col("TABLE_NAME")
        df = df.withColumn("name", create_name_udf(column)) \
          .withColumn("fully_qualified_name", create_fqn_udf(column)) \
          .withColumn("entry_type", F.lit(full_entry_type)) \
          .withColumn("parent_entry", F.lit(parent_name)) \
          .withColumn("entry_source", create_entry_source(column)) \
        .drop(column)
    
        df = convert_to_import_items(df, [schema_key, entry_aspect_name])
        return df
    

    Observe o seguinte:

    • Os métodos criam os recursos do Dataplex Catalog que o conector cria para seus recursos Oracle. Use as convenções descritas na seção Exemplo de recursos do Dataplex Catalog para uma origem do Oracle deste documento.
    • O método convert_to_import_items se aplica a esquemas, tabelas e visualizações. Verifique se a saída do conector é um ou mais itens de importação que podem ser processados pelo método metadataJobs.create, não entradas individuais.
    • Mesmo em uma visualização, a coluna é chamada de TABLE_NAME.
  6. Atualize o arquivo bootstrap.py com o código para gerar o arquivo de importação de metadados e executar o conector.

    """The entrypoint of a pipeline."""
    from typing import Dict
    
    from src.constants import EntryType
    from src import cmd_reader
    from src import secret_manager
    from src import entry_builder
    from src import gcs_uploader
    from src import top_entry_builder
    from src.oracle_connector import OracleConnector
    
    
    FILENAME = "output.jsonl"
    
    
    def write_jsonl(output_file, json_strings):
        """Writes a list of string to the file in JSONL format."""
    
        # For simplicity, dataset is written into the one file. But it is not
        # mandatory, and the order doesn't matter for Import API.
        # The PySpark itself could dump entries into many smaller JSONL files.
        # Due to performance, it's recommended to dump to many smaller files.
        for string in json_strings:
            output_file.write(string + "\n")
    
    
    def process_dataset(
        connector: OracleConnector,
        config: Dict[str, str],
        schema_name: str,
        entry_type: EntryType,
    ):
        """Builds dataset and converts it to jsonl."""
        df_raw = connector.get_dataset(schema_name, entry_type)
        df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
        return df.toJSON().collect()
    
    
    def run():
        """Runs a pipeline."""
        config = cmd_reader.read_args()
        config["password"] = secret_manager.get_password(config["password_secret"])
        connector = OracleConnector(config)
    
        with open(FILENAME, "w", encoding="utf-8") as file:
            # Write top entries that don't require connection to the database
            file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
            file.writelines("\n")
            file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
    
            # Get schemas, write them and collect to the list
            df_raw_schemas = connector.get_db_schemas()
            schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
            schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
    
            write_jsonl(file, schemas_json)
    
            # Ingest tables and views for every schema in a list
            for schema in schemas:
                print(f"Processing tables for {schema}")
                tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                write_jsonl(file, tables_json)
                print(f"Processing views for {schema}")
                views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                write_jsonl(file, views_json)
    
        gcs_uploader.upload(config, FILENAME)
    

    Neste exemplo, o arquivo de importação de metadados é salvo como um único arquivo JSON Lines. É possível usar ferramentas do PySpark, como a classe DataFrameWriter, para gerar lotes de JSON em paralelo.

    O conector pode gravar entradas no arquivo de importação de metadados em qualquer ordem.

  7. Atualize o arquivo gcs_uploader.py com o código para fazer upload do arquivo de importação de metadados para um bucket do Cloud Storage.

    """Sends files to GCP storage."""
    from typing import Dict
    from google.cloud import storage
    
    
    def upload(config: Dict[str, str], filename: str):
        """Uploads a file to GCP bucket."""
        client = storage.Client()
        bucket = client.get_bucket(config["output_bucket"])
        folder = config["output_folder"]
    
        blob = bucket.blob(f"{folder}/{filename}")
        blob.upload_from_filename(filename)
    
  8. Crie a imagem do conector.

    Se o conector tiver vários arquivos ou se você quiser usar bibliotecas que não estão incluídas na imagem padrão do Docker, use um contêiner personalizado. O Dataproc Serverless para Spark executa cargas de trabalho em contêineres do Docker. Crie uma imagem personalizada do Docker do conector e armazene a imagem no Artifact Registry. O Dataproc Serverless lê a imagem do Artifact Registry.

    1. Crie um Dockerfile:

      FROM debian:11-slim
      
      ENV DEBIAN_FRONTEND=noninteractive
      
      RUN apt update && apt install -y procps tini
      RUN apt install -y wget
      
      ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
      RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
      COPY ojdbc11.jar "${SPARK_EXTRA_JARS_DIR}"
      
      ENV CONDA_HOME=/opt/miniconda3
      ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
      ENV PATH=${CONDA_HOME}/bin:${PATH}
      RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py310_23.3.1-0-Linux-x86_64.sh
      
      RUN bash Miniconda3-py310_23.3.1-0-Linux-x86_64.sh -b -p /opt/miniconda3 \
        && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
        && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
        && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
        && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
      
      RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
          && ${CONDA_HOME}/bin/mamba install \
            conda \
            google-cloud-dataproc \
            google-cloud-logging \
            google-cloud-monitoring \
            google-cloud-storage
      
      RUN apt update && apt install -y git
      COPY requirements.txt .
      RUN python -m pip install -r requirements.txt
      
      ENV PYTHONPATH=/opt/python/packages
      RUN mkdir -p "${PYTHONPATH}/src/"
      COPY src/ "${PYTHONPATH}/src/"
      COPY main.py .
      
      RUN groupadd -g 1099 spark
      RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
      USER spark

      Use o Conda como gerenciador de pacotes. O Dataproc sem servidor para Spark monta pyspark no contêiner no momento da execução. Portanto, não é necessário instalar dependências do PySpark na imagem de contêiner personalizada.

    2. Crie a imagem do contêiner personalizado e envie-a para o Artifact Registry.

      #!/bin/bash
      
      IMAGE=oracle-pyspark:0.0.1
      PROJECT=<PROJECT_ID>
      
      
      REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pyspark
      
      docker build -t "${IMAGE}" .
      
      # Tag and push to GCP container registry
      gcloud config set project ${PROJECT}
      gcloud auth configure-docker us-central1-docker.pkg.dev
      docker tag "${IMAGE}" "${REPO_IMAGE}"
      docker push "${REPO_IMAGE}"
      

      Como uma imagem pode ter vários nomes, use a tag do Docker para atribuir um alias à imagem.

  9. Execute o conector no Dataproc sem servidor. Para enviar um job em lote do PySpark usando a imagem de contêiner personalizada, execute o comando gcloud dataproc batches submit pyspark.

    gcloud dataproc batches submit pyspark main.py --project=PROJECT \
        --region=REGION --batch=BATCH_ID \
        --container-image=CUSTOM_CONTAINER_IMAGE \
        --service-account=SERVICE_ACCOUNT_NAME \
        --jars=PATH_TO_JAR_FILES \
        --properties=PYSPARK_PROPERTIES \
        -- PIPELINE_ARGUMENTS
    

    Observe o seguinte:

    • Os arquivos JAR são drivers do Spark. Para ler do Oracle, MySQL ou Postgres, você precisa fornecer um pacote específico ao Apache Spark. O pacote pode estar localizado no Cloud Storage ou dentro do contêiner. Se o arquivo JAR estiver dentro do contêiner, o caminho será semelhante a file:///path/to/file/driver.jar. Neste exemplo, o caminho para o arquivo JAR é /opt/spark/jars/.
    • PIPELINE_ARGUMENTS são os argumentos de linha de comando do conector.

    O conector extrai metadados do banco de dados Oracle, gera um arquivo de importação de metadados e o salva em um bucket do Cloud Storage.

  10. Para importar manualmente os metadados no arquivo de importação de metadados para o Dataplex, execute um job de metadados. Use o método metadataJobs.create.

    1. Na linha de comando, adicione variáveis de ambiente e crie um alias para o comando curl.

      PROJECT_ID=PROJECT
      LOCATION_ID=LOCATION
      DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID
      alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
      
    2. Chame o método da API, transmitindo os tipos de entrada e de aspecto que você quer importar.

      gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF
      {
        "type": "IMPORT",
        "import_spec": {
          "source_storage_uri": "gs://BUCKET/FOLDER/",
          "entry_sync_mode": "FULL",
          "aspect_sync_mode": "INCREMENTAL",
          "scope": {
            "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"],
            "entry_types": [
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"],
      
            "aspect_types": [
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance",
              "projects/dataplex-types/locations/global/aspectTypes/schema",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"],
            },
          },
        }
      EOF
      )"
      

      O tipo de aspecto schema é um tipo de aspecto global definido pelo Dataplex.

      O formato usado para nomes de tipo de aspecto ao chamar o método da API é diferente do formato usado no código do conector.

    3. Opcional: use o Cloud Logging para conferir os registros do job de metadados. Para mais informações, consulte Monitorar os registros do Dataplex.

Configurar a orquestração de pipeline

As seções anteriores mostraram como criar um conector de exemplo e executá-lo manualmente.

Em um ambiente de produção, execute o conector como parte de um pipeline de conectividade gerenciado usando uma plataforma de orquestração, como os fluxos de trabalho.

  1. Para executar um pipeline de conectividade gerenciada com o conector de exemplo, siga as etapas para importar metadados usando fluxos de trabalho. Faça o seguinte:

    • Crie o fluxo de trabalho no mesmo Google Cloud local do conector.
    • No arquivo de definição do fluxo de trabalho, atualize a função submit_pyspark_extract_job com o código abaixo para extrair dados do banco de dados Oracle usando o conector que você criou.

      - submit_pyspark_extract_job:
          call: http.post
          args:
            url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
            auth:
              type: OAuth2
              scopes: "https://www.googleapis.com/auth/cloud-platform"
            headers:
              Content-Type: "application/json"
            query:
              batchId: ${WORKFLOW_ID}
            body:
              pysparkBatch:
                mainPythonFileUri: file:///main.py
                jars: file:///opt/spark/jars/ojdbc11.jar
                args:
                  - ${"--host_port=" + args.ORACLE_HOST_PORT}
                  - ${"--user=" + args.ORACLE_USER}
                  - ${"--password=" + args.ORACLE_PASSWORD}
                  - ${"--database=" + args.ORACE_DATABASE}
                  - ${"--project=" + args.TARGET_PROJECT_ID}
                  - ${"--location=" + args.CLOUD_REGION}
                  - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID}
                  - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                  - ${"--folder=" + WORKFLOW_ID}
              runtimeConfig:
                version: "2.0"
                containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark"
              environmentConfig:
                executionConfig:
                    serviceAccount: ${args.SERVICE_ACCOUNT}
          result: RESPONSE_MESSAGE
      
    • No arquivo de definição do fluxo de trabalho, atualize a função submit_import_job com o código abaixo para importar as entradas. A função chama o método da API metadataJobs.create para executar um job de importação de metadados.

      - submit_import_job:
          call: http.post
          args:
            url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
            auth:
              type: OAuth2
              scopes: "https://www.googleapis.com/auth/cloud-platform"
            body:
              type: IMPORT
              import_spec:
                source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                entry_sync_mode: FULL
                aspect_sync_mode: INCREMENTAL
                scope:
                  entry_groups:
                    - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}
                  entry_types:
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"
                  aspect_types:
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance"
                    -"projects/dataplex-types/locations/global/aspectTypes/schema"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"
          result: IMPORT_JOB_RESPONSE
      

      Informe os mesmos tipos de entrada e de aspecto que você incluiu ao chamar o método da API manualmente. Não há uma vírgula no final de cada string.

    • Ao executar o fluxo de trabalho, forneça os seguintes argumentos de execução:

      {
        "CLOUD_REGION": "us-central1",
        "ORACLE_USER": "system",
        "ORACLE_HOST_PORT": "x.x.x.x:1521",
        "ORACLE_DATABASE": "xe",
        "ADDITIONAL_CONNECTOR_ARGS": [],
      }
      
  2. Opcional: use o Cloud Logging para conferir os registros do pipeline de conectividade gerenciada. O payload de registro inclui um link para os registros do job em lote do Dataproc sem servidor e do job de importação de metadados, conforme for relevante. Para mais informações, consulte Conferir os registros do fluxo de trabalho.

  3. Opcional: para melhorar a segurança, o desempenho e a funcionalidade do pipeline de conectividade gerenciada, considere fazer o seguinte:

    1. Use o Gerenciador de secrets para armazenar as credenciais da sua fonte de dados externa.
    2. Use o PySpark para gravar a saída de linhas JSON em vários arquivos de importação de metadados em paralelo.
    3. Use um prefixo para dividir arquivos grandes (mais de 100 MB) em arquivos menores.
    4. Adicione mais aspectos personalizados que capturem outros metadados comerciais e técnicos da sua fonte.

Exemplo de recursos do Dataplex Catalog para uma origem Oracle

O exemplo de conector extrai metadados de um banco de dados Oracle e os mapeia para os recursos correspondentes do Dataplex Catalog.

Considerações sobre hierarquia

Cada sistema no Dataplex tem uma entrada raiz que é a entrada mãe do sistema. Normalmente, a entrada raiz tem um tipo de entrada instance. A tabela a seguir mostra a hierarquia de exemplo de tipos de entrada e tipos de aspecto para um sistema Oracle.

ID do tipo de entrada Descrição ID do tipo de aspecto vinculado
oracle-instance A raiz do sistema importado. oracle-instance
oracle-database O banco de dados Oracle. oracle-database
oracle-schema O esquema do banco de dados. oracle-schema
oracle-table Uma tabela.

oracle-table

schema

oracle-view Uma visualização.

oracle-view

schema

O tipo de aspecto schema é um tipo de aspecto global definido pelo Dataplex. Ele contém uma descrição dos campos em uma tabela, visualização ou outra entidade que tenha colunas. O tipo de aspecto personalizado oracle-schema contém o nome do esquema do banco de dados Oracle.

Exemplos de campos de itens de importação

O conector precisa usar as seguintes convenções para recursos do Oracle.

  • Nomes totalmente qualificados: os nomes totalmente qualificados para recursos do Oracle usam o modelo de nomenclatura a seguir. Os caracteres proibidos são ignorados com chaves.

    Recurso Modelo Exemplo
    Instância

    SOURCE:ADDRESS

    Use o host e o número da porta ou o nome de domínio do sistema.

    oracle:`localhost:1521` ou oracle:`myinstance.com`
    Banco de dados SOURCE:ADDRESS.DATABASE oracle:`localhost:1521`.xe
    Esquema SOURCE:ADDRESS.DATABASE.SCHEMA oracle:`localhost:1521`.xe.sys
    Tabela SOURCE:ADDRESS.DATABASE.SCHEMA.TABLE_NAME oracle:`localhost:1521`.xe.sys.orders
    Ver SOURCE:ADDRESS.DATABASE.SCHEMA.VIEW_NAME oracle:`localhost:1521`.xe.sys.orders_view
  • Nomes ou IDs de entrada: as entradas para recursos Oracle usam o seguinte modelo de nomenclatura. Os caracteres proibidos são substituídos por um permitido. Os recursos usam o prefixo projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries.

    Recurso Modelo Exemplo
    Instância PREFIX/HOST_PORT projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
    Banco de dados PREFIX/HOST_PORT/bancos de dados/DATABASE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
    Esquema PREFIX/HOST_PORT/bancos de dados/DATABASE/schema_de_banco_de_dados/SCHEMA projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
    Tabela PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
    Ver PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEW projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
  • Entradas principais: se uma entrada não for raiz do sistema, ela pode ter um campo de entrada principal que descreve a posição dela na hierarquia. O campo precisa conter o nome da entrada pai. Recomendamos que você gere esse valor.

    A tabela a seguir mostra as entradas mãe dos recursos da Oracle.

    Entrada Entrada de familiar responsável
    Instância "" (string vazia)
    Banco de dados Nome da instância
    Esquema Nome do banco de dados
    Tabela Nome do esquema
    Ver Nome do esquema
  • Mapa de aspectos: o mapa de aspectos precisa conter pelo menos um aspecto que descreva a entidade a ser importada. Confira um exemplo de mapa de aspectos para uma tabela do Oracle.

    "example-project.us-central1.oracle-table": {
        "aspect_type": "example-project.us-central1.oracle-table",
        "path": "",
        "data": {}
     },

    Você pode encontrar tipos de aspecto predefinidos (como schema) que definem a estrutura de tabela ou visualização no projeto dataplex-types, no local global.

  • Chaves de aspecto: as chaves de aspecto usam o formato de nomenclatura PROJECT.LOCATION.ASPECT_TYPE. Confira na tabela a seguir exemplos de chaves de aspecto para recursos do Oracle.

    Entrada Exemplo de chave de aspecto
    Instância example-project.us-central1.oracle-instance
    Banco de dados example-project.us-central1.oracle-database
    Schema example-project.us-central1.oracle-schema
    Tabela example-project.us-central1.oracle-table
    Ver example-project.us-central1.oracle-view

A seguir