Desenvolver um conector personalizado para a importação de metadados

Este documento fornece um modelo de referência para você criar um conector personalizado que extrai metadados de uma fonte de terceiros. Use o conector ao executar um pipeline de conectividade gerenciada que importe metadados para o Dataplex.

Você pode criar conectores para extrair metadados de fontes externas. Por exemplo, é possível criar um conector para extrair dados de fontes como MySQL, SQL Server, Oracle, Snowflake, Databricks e outras.

Use o exemplo de conector neste documento como ponto de partida para criar seus próprios conectores. O conector de exemplo se conecta a um banco de dados Oracle Express Edition (XE). O conector é criado em Python, mas você também pode usar Java, Scala ou R.

Como funcionam os conectores

Um conector extrai metadados de uma origem de dados de terceiros, transforma os metadados no formato ImportItem do Dataplex e gera arquivos de importação de metadados que podem ser importados pelo Dataplex.

O conector faz parte de um pipeline de conectividade gerenciado. Um gerenciador o pipeline de conectividade é um fluxo de trabalho orquestrado que você usa para importar Metadados do catálogo do Dataplex. O pipeline de conectividade gerenciada executa o conector e realiza outras tarefas no fluxo de trabalho de importação, como executar um job de importação de metadados e capturar registros.

O pipeline de conectividade gerenciada executa o conector usando um Dataproc sem servidor em lote. O Dataproc sem servidor oferece sem servidor do Spark. É possível criar um conector que não use o Spark, recomendamos que use o Spark, porque ele pode melhorar a o desempenho do seu conector.

Requisitos do conector

O conector tem os seguintes requisitos:

  • O conector precisa ser uma imagem do Artifact Registry que possa ser executada Dataproc sem servidor
  • O conector precisa gerar arquivos de metadados em um formato que possa ser importado por um job de importação de metadados do Dataplex (o metadataJobs.create método da API). Para conferir os requisitos detalhados, consulte Arquivo de importação de metadados.
  • O conector precisa aceitar os seguintes argumentos da linha de comando para receber informações do pipeline:

    Argumento de linha de comando Valor que o pipeline fornece
    target_project_id PROJECT_ID
    target_location_id REGION
    target_entry_group_id ENTRY_GROUP_ID
    output_bucket CLOUD_STORAGE_BUCKET_ID
    output_folder FOLDER_ID

    O conector usa esses argumentos para gerar metadados em um grupo de entrada de destino projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID e gravar em um bucket do Cloud Storage gs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID. Cada execução do pipeline cria uma nova pasta FOLDER_ID no bucket CLOUD_STORAGE_BUCKET_ID. O conector precisa gravar arquivos de importação de metadados nessa pasta.

Os modelos de pipeline são compatíveis com os conectores PySpark. Os modelos presumem que o driver (mainPythonFileUri) é um arquivo local na imagem do conector chamado main.py. É possível modificar modelos de pipeline para outros cenários, como um conector Spark, um modelo URI do driver ou outras opções.

Veja como usar o PySpark para criar um item de importação na importação de metadados .

"""PySpark schemas for the data."""
entry_source_schema = StructType([
      StructField("display_name", StringType()),
      StructField("source", StringType())])

aspect_schema = MapType(StringType(),
                        StructType([
                            StructField("aspect_type", StringType()),
                            StructField("data", StructType([
                            ]))
                          ])
                        )

entry_schema = StructType([
  StructField("name", StringType()),
  StructField("entry_type", StringType()),
  StructField("fully_qualified_name", StringType()),
  StructField("parent_entry", StringType()),
  StructField("entry_source", entry_source_schema),
  StructField("aspects", aspect_schema)
])

import_item_schema = StructType([
  StructField("entry", entry_schema),
  StructField("aspect_keys", ArrayType(StringType())),
  StructField("update_mask", ArrayType(StringType()))
])

Antes de começar

Para acompanhar este guia, é necessário ter familiaridade com Python e PySpark.

Confira as seguintes informações:

Faça o seguinte. Criar todos os recursos no mesmo Google Cloud o local.

  1. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the Dataplex, Dataproc, Workflows, and Artifact Registry APIs:

    gcloud services enable dataplex.googleapis.com dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com
  4. Install the Google Cloud CLI.
  5. To initialize the gcloud CLI, run the following command:

    gcloud init
  6. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  7. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant the roles/owner IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
  8. Crie um bucket do Cloud Storage para armazenar os arquivos de importação de metadados.

  9. Crie os seguintes recursos do catálogo do Dataplex no mesmo projeto.

    Para conferir exemplos de valores, consulte a seção Exemplo de recursos do Dataplex Catalog para uma origem do Oracle deste documento.

    1. Crie um grupo de entrada.
    2. Crie tipos de aspecto personalizados para as entradas que você quer importar. Use a convenção de nomenclatura SOURCE-ENTITY_TO_IMPORT.

      Opcionalmente, é possível criar tipos de aspecto adicionais para armazenar outras informações.

    3. Crie tipos de entrada personalizados para os recursos que você quer importar e atribua os tipos de aspecto relevantes a eles. Use a convenção de nomenclatura SOURCE-ENTITY_TO_IMPORT.

      Por exemplo, para um banco de dados Oracle, crie um tipo de entrada chamado oracle-database: Vincule-o ao tipo de aspecto chamado oracle-database.

  10. Verifique se a fonte de terceiros pode ser acessada no seu projeto do Google Cloud. Para mais informações, consulte Dataproc sem servidor para configuração de rede do Spark.

Criar um conector Python básico

O exemplo de conector básico do Python cria entradas de nível superior para uma fonte de dados do Oracle usando as classes da biblioteca de cliente do Dataplex. Em seguida, forneça os valores dos campos de entrada.

O conector cria um arquivo de importação de metadados com as seguintes entradas:

  • Uma entrada instance, com o tipo de entrada projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance. Essa entrada representa um sistema do Oracle Database XE.
  • Uma entrada database, que representa um banco de dados no sistema Oracle Database XE.

Para criar um conector Python básico, faça o seguinte:

  1. Clone o repositório cloud-dataplex.

  2. Configurar um ambiente local. Recomendamos que você use um ambiente virtual.

    mkdir venv
    python -m venv venv/
    source venv/bin/activate
    

    Use o ativa ou manutenção mais recentes do Python. As versões 3.7 e posteriores do Python são compatíveis.

  3. Crie um projeto em Python.

  4. Requisitos de instalação:

    pip install -r requirements.txt
    

    Os seguintes requisitos estão instalados:

    google-cloud-dataplex==2.2.2
    google-cloud-storage
    google-cloud-secret-manager
    
  5. Adicione um arquivo de pipeline main.py na raiz do projeto.

    from src import bootstrap
    
    
    if __name__ == '__main__':
        bootstrap.run()
    

    Ao implantar seu código no Dataproc sem servidor, o main.py serve como ponto de entrada para a execução. Recomendamos que você minimize a quantidade de informações armazenadas no arquivo main.py. Use esse arquivo para chamar funções e classes definidas no conector, como a classe src/bootstap.py.

  6. Crie uma pasta src para armazenar a maior parte da lógica do conector.

  7. Atualize o arquivo src/cmd_reader.py com uma classe Python para aceitar a linha de comando . Você pode usar o argeparse para fazer isso.

    """Command line reader."""
    import argparse
    
    
    def read_args():
        """Reads arguments from the command line."""
        parser = argparse.ArgumentParser()
    
        # Dataplex arguments
        parser.add_argument("--target_project_id", type=str, required=True,
            help="The name of the target Google Cloud project to import the metadata into.")
        parser.add_argument("--target_location_id", type=str, required=True,
            help="The target Google Cloud location where the metadata will be imported into.")
        parser.add_argument("--target_entry_group_id", type=str, required=True,
            help="The ID of the entry group to import metadata into. "
                 "The metadata will be imported into entry group with the following"
                 "full resource name: projects/${target_project_id}/"
                 "locations/${target_location_id}/entryGroups/${target_entry_group_id}.")
    
        # Oracle arguments
        parser.add_argument("--host_port", type=str, required=True,
            help="Oracle host and port number separated by the colon (:).")
        parser.add_argument("--user", type=str, required=True, help="Oracle User.")
        parser.add_argument("--password-secret", type=str, required=True,
            help="Secret resource name in the Secret Manager for the Oracle password.")
        parser.add_argument("--database", type=str, required=True,
            help="Source Oracle database.")
    
        # Google Cloud Storage arguments
        # It is assumed that the bucket is in the same region as the entry group
        parser.add_argument("--output_bucket", type=str, required=True,
            help="The Cloud Storage bucket to write the generated metadata import file.")
        parser.add_argument("--output_folder", type=str, required=True,
            help="A folder in the Cloud Storage bucket, to write the generated metadata import files.")
    
        return vars(parser.parse_known_args()[0])
    

    Em ambientes de produção, recomendamos armazenar a senha no Secret Manager.

  8. Atualize o arquivo src/constants.py com o código para criar constantes.

    """Constants that are used in the different files."""
    import enum
    
    SOURCE_TYPE = "oracle"
    
    # Symbols for replacement
    FORBIDDEN = "#"
    ALLOWED = "!"
    
    
    class EntryType(enum.Enum):
        """Types of Oracle entries."""
        INSTANCE: str = "projects/{project}/locations/{location}/entryTypes/oracle-instance"
        DATABASE: str = "projects/{project}/locations/{location}/entryTypes/oracle-database"
        DB_SCHEMA: str = "projects/{project}/locations/{location}/entryTypes/oracle-schema"
        TABLE: str = "projects/{project}/locations/{location}/entryTypes/oracle-table"
        VIEW: str = "projects/{project}/locations/{location}/entryTypes/oracle-view"
    
  9. Atualize o arquivo src/name_builder.py com métodos para criar os recursos do Dataplex Catalog que você quer que o conector crie para seus recursos do Oracle. Use as convenções descritas na seção Exemplo de recursos do Dataplex Catalog para uma origem do Oracle deste documento.

    """Builds Dataplex hierarchy identifiers."""
    from typing import Dict
    from src.constants import EntryType, SOURCE_TYPE
    
    
    # Oracle cluster users start with C## prefix, but Dataplex doesn't accept #.
    # In that case in names it is changed to C!!, and escaped with backticks in FQNs
    FORBIDDEN_SYMBOL = "#"
    ALLOWED_SYMBOL = "!"
    
    
    def create_fqn(config: Dict[str, str], entry_type: EntryType,
                   schema_name: str = "", table_name: str = ""):
        """Creates a fully qualified name or Dataplex v1 hierarchy name."""
        if FORBIDDEN_SYMBOL in schema_name:
            schema_name = f"`{schema_name}`"
    
        if entry_type == EntryType.INSTANCE:
            # Requires backticks to escape column
            return f"{SOURCE_TYPE}:`{config['host_port']}`"
        if entry_type == EntryType.DATABASE:
            instance = create_fqn(config, EntryType.INSTANCE)
            return f"{instance}.{config['database']}"
        if entry_type == EntryType.DB_SCHEMA:
            database = create_fqn(config, EntryType.DATABASE)
            return f"{database}.{schema_name}"
        if entry_type in [EntryType.TABLE, EntryType.VIEW]:
            database = create_fqn(config, EntryType.DATABASE)
            return f"{database}.{schema_name}.{table_name}"
        return ""
    
    
    def create_name(config: Dict[str, str], entry_type: EntryType,
                    schema_name: str = "", table_name: str = ""):
        """Creates a Dataplex v2 hierarchy name."""
        if FORBIDDEN_SYMBOL in schema_name:
            schema_name = schema_name.replace(FORBIDDEN_SYMBOL, ALLOWED_SYMBOL)
        if entry_type == EntryType.INSTANCE:
            name_prefix = (
                f"projects/{config['target_project_id']}/"
                f"locations/{config['target_location_id']}/"
                f"entryGroups/{config['target_entry_group_id']}/"
                f"entries/"
            )
            return name_prefix + config["host_port"].replace(":", "@")
        if entry_type == EntryType.DATABASE:
            instance = create_name(config, EntryType.INSTANCE)
            return f"{instance}/databases/{config['database']}"
        if entry_type == EntryType.DB_SCHEMA:
            database = create_name(config, EntryType.DATABASE)
            return f"{database}/database_schemas/{schema_name}"
        if entry_type == EntryType.TABLE:
            db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
            return f"{db_schema}/tables/{table_name}"
        if entry_type == EntryType.VIEW:
            db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
            return f"{db_schema}/views/{table_name}"
        return ""
    
    
    def create_parent_name(config: Dict[str, str], entry_type: EntryType,
                           parent_name: str = ""):
        """Generates a Dataplex v2 name of the parent."""
        if entry_type == EntryType.DATABASE:
            return create_name(config, EntryType.INSTANCE)
        if entry_type == EntryType.DB_SCHEMA:
            return create_name(config, EntryType.DATABASE)
        if entry_type == EntryType.TABLE:
            return create_name(config, EntryType.DB_SCHEMA, parent_name)
        return ""
    
    
    def create_entry_aspect_name(config: Dict[str, str], entry_type: EntryType):
        """Generates an entry aspect name."""
        last_segment = entry_type.value.split("/")[-1]
        return f"{config['target_project_id']}.{config['target_location_id']}.{last_segment}"
    

    Como o arquivo name_builder.py é usado para o código principal do Python e código principal do PySpark, recomendamos que você escreva os métodos puros em vez de membros de uma classe.

  10. Atualize o arquivo src/top_entry_builder.py com código para preencher as entradas de nível superior com dados.

    """Non-Spark approach for building the entries."""
    import dataclasses
    import json
    from typing import List, Dict
    
    import proto
    from google.cloud import dataplex_v1
    
    from src.constants import EntryType
    from src import name_builder as nb
    
    
    @dataclasses.dataclass(slots=True)
    class ImportItem:
        """A template class for Import API."""
    
        entry: dataplex_v1.Entry = dataclasses.field(default_factory=dataplex_v1.Entry)
        aspect_keys: List[str] = dataclasses.field(default_factory=list)
        update_mask: List[str] = dataclasses.field(default_factory=list)
    
    
    def _dict_factory(data: object):
        """Factory function required for converting Entry dataclass to dict."""
    
        def convert(obj: object):
            if isinstance(obj, proto.Message):
                return proto.Message.to_dict(obj)
            return obj
    
        return dict((k, convert(v)) for k, v in data)
    
    
    def _create_entry(config: Dict[str, str], entry_type: EntryType):
        """Creates an entry based on a Dataplex library."""
        entry = dataplex_v1.Entry()
        entry.name = nb.create_name(config, entry_type)
        entry.entry_type = entry_type.value.format(
            project=config["target_project_id"], location=config["target_location_id"]
        )
        entry.fully_qualified_name = nb.create_fqn(config, entry_type)
        entry.parent_entry = nb.create_parent_name(config, entry_type)
    
        aspect_key = nb.create_entry_aspect_name(config, entry_type)
    
        # Add mandatory aspect
        entry_aspect = dataplex_v1.Aspect()
        entry_aspect.aspect_type = aspect_key
        entry_aspect.data = {}
        entry.aspects[aspect_key] = entry_aspect
    
        return entry
    
    
    def _entry_to_import_item(entry: dataplex_v1.Entry):
        """Packs entry to import item, accepted by the API,"""
        import_item = ImportItem()
        import_item.entry = entry
        import_item.aspect_keys = list(entry.aspects.keys())
        import_item.update_mask = "aspects"
    
        return import_item
    
    
    def create(config, entry_type: EntryType):
        """Creates an entry, packs it to Import Item and converts to json."""
        import_item = _entry_to_import_item(_create_entry(config, entry_type))
        return json.dumps(dataclasses.asdict(import_item, dict_factory=_dict_factory))
    
  11. Atualize o arquivo src/bootstrap.py com o código para gerar o arquivo de importação de metadados e executar o conector.

    """The entrypoint of a pipeline."""
    from typing import Dict
    
    from src.constants import EntryType
    from src import cmd_reader
    from src import secret_manager
    from src import entry_builder
    from src import gcs_uploader
    from src import top_entry_builder
    from src.oracle_connector import OracleConnector
    
    
    FILENAME = "output.jsonl"
    
    
    def write_jsonl(output_file, json_strings):
        """Writes a list of string to the file in JSONL format."""
    
        # For simplicity, dataset is written into the one file. But it is not
        # mandatory, and the order doesn't matter for Import API.
        # The PySpark itself could dump entries into many smaller JSONL files.
        # Due to performance, it's recommended to dump to many smaller files.
        for string in json_strings:
            output_file.write(string + "\n")
    
    
    def process_dataset(
        connector: OracleConnector,
        config: Dict[str, str],
        schema_name: str,
        entry_type: EntryType,
    ):
        """Builds dataset and converts it to jsonl."""
        df_raw = connector.get_dataset(schema_name, entry_type)
        df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
        return df.toJSON().collect()
    
    
    def run():
        """Runs a pipeline."""
        config = cmd_reader.read_args()
        config["password"] = secret_manager.get_password(config["password_secret"])
        connector = OracleConnector(config)
    
        with open(FILENAME, "w", encoding="utf-8") as file:
            # Write top entries that don't require connection to the database
            file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
            file.writelines("\n")
            file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
    
            # Get schemas, write them and collect to the list
            df_raw_schemas = connector.get_db_schemas()
            schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
            schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
    
            write_jsonl(file, schemas_json)
    
            # Ingest tables and views for every schema in a list
            for schema in schemas:
                print(f"Processing tables for {schema}")
                tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                write_jsonl(file, tables_json)
                print(f"Processing views for {schema}")
                views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                write_jsonl(file, views_json)
    
        gcs_uploader.upload(config, FILENAME)
    
  12. Execute o código localmente.

    Um arquivo de importação de metadados chamado output.jsonl é retornado. O arquivo tem duas linhas, cada uma representando um item de importação. O pipeline de conectividade gerenciada lê esse arquivo ao executar o job de importação de metadados.

  13. Opcional: estenda o exemplo anterior para usar as classes da biblioteca de cliente do Dataplex para criar itens de importação de tabelas, esquemas e visualizações. Também é possível executar o exemplo do Python no Dataproc sem servidor.

    Recomendamos que você crie um conector que use o Spark (e seja executado no Dataproc Serverless), porque ele pode melhorar a performance do conector.

Criar um conector do PySpark

Este exemplo é baseado na API DataFrame do PySpark É possível instalar o PySpark SQL e executá-lo localmente antes em execução no Dataproc sem servidor. Se você instalar e executar o PySpark localmente, instale a biblioteca PySpark usando o pip, mas não será necessário instalar um cluster local do Spark.

Por motivos de desempenho, este exemplo não usa classes predefinidas do biblioteca PySpark. Em vez disso, o exemplo cria DataFrames, os converte em entradas JSON e grava a saída em um arquivo de importação de metadados no formato JSON Lines que pode ser importado para o Dataplex.

Para criar um conector usando o PySpark, faça o seguinte:

  1. Clone o Repositório cloud-dataplex.

  2. Instale o PySpark:

    pip install pyspark
    
  3. Requisitos de instalação:

    pip install -r requirements.txt
    

    Os seguintes requisitos estão instalados:

    google-cloud-dataplex==2.2.2
    google-cloud-storage
    google-cloud-secret-manager
    
  4. Atualize o arquivo oracle_connector.py com código para ler dados de um A fonte de dados Oracle e retorna DataFrames.

    """Reads Oracle using PySpark."""
    from typing import Dict
    from pyspark.sql import SparkSession, DataFrame
    
    from src.constants import EntryType
    
    
    SPARK_JAR_PATH = "/opt/spark/jars/ojdbc11.jar"
    
    
    class OracleConnector:
        """Reads data from Oracle and returns Spark Dataframes."""
    
        def __init__(self, config: Dict[str, str]):
            # PySpark entrypoint
            self._spark = SparkSession.builder.appName("OracleIngestor") \
                .config("spark.jars", SPARK_JAR_PATH) \
                .getOrCreate()
    
            self._config = config
            self._url = f"jdbc:oracle:thin:@{config['host_port']}:{config['database']}"
    
        def _execute(self, query: str) -> DataFrame:
            """A generic method to execute any query."""
            return self._spark.read.format("jdbc") \
                .option("driver", "oracle.jdbc.OracleDriver") \
                .option("url", self._url) \
                .option("query", query) \
                .option("user", self._config["user"]) \
                .option("password", self._config["password"]) \
                .load()
    
        def get_db_schemas(self) -> DataFrame:
            """In Oracle, schemas are usernames."""
            query = "SELECT username FROM dba_users"
            return self._execute(query)
    
        def _get_columns(self, schema_name: str, object_type: str) -> str:
            """Gets a list of columns in tables or views in a batch."""
            # Every line here is a column that belongs to the table or to the view.
            # This SQL gets data from ALL the tables in a given schema.
            return (f"SELECT col.TABLE_NAME, col.COLUMN_NAME, "
                    f"col.DATA_TYPE, col.NULLABLE "
                    f"FROM all_tab_columns col "
                    f"INNER JOIN DBA_OBJECTS tab "
                    f"ON tab.OBJECT_NAME = col.TABLE_NAME "
                    f"WHERE tab.OWNER = '{schema_name}' "
                    f"AND tab.OBJECT_TYPE = '{object_type}'")
    
        def get_dataset(self, schema_name: str, entry_type: EntryType):
            """Gets data for a table or a view."""
            # Dataset means that these entities can contain end user data.
            short_type = entry_type.name  # table or view, or the title of enum value
            query = self._get_columns(schema_name, short_type)
            return self._execute(query)
    

    Adicione consultas SQL para retornar os metadados que você quer importar. As consultas precisará retornar as seguintes informações:

    • Esquemas de banco de dados
    • Tabelas que pertencem a esses esquemas
    • as colunas que pertencem a essas tabelas, incluindo o nome e a coluna o tipo de dados e se a coluna é anulável ou obrigatória

    Todas as colunas de todas as tabelas e visualizações são armazenadas na mesma tabela do sistema. É possível selecionar colunas com o método _get_columns. Dependendo dos parâmetros que você fornecer, selecione colunas para das tabelas ou das visualizações separadamente.

    Observe o seguinte:

    • No Oracle, um esquema de banco de dados pertence a um usuário de banco de dados e tem os mesmos como esse usuário.
    • Objetos de esquema são estruturas lógicas criadas pelos usuários. Objetos como tabelas ou índices, podem conter dados, e objetos como visualizações ou sinônimos consistem em apenas uma definição.
    • O arquivo ojdbc11.jar contém a Driver Oracle JDBC
  5. Atualize o arquivo src/entry_builder.py com métodos compartilhados para aplicar transformações do Spark.

    """Creates entries with PySpark."""
    import pyspark.sql.functions as F
    from pyspark.sql.types import StringType
    
    from src.constants import EntryType, SOURCE_TYPE
    from src import name_builder as nb
    
    
    @F.udf(returnType=StringType())
    def choose_metadata_type_udf(data_type: str):
        """Choose the metadata type based on Oracle native type."""
        if data_type.startswith("NUMBER") or data_type in ["FLOAT", "LONG"]:
            return "NUMBER"
        if data_type.startswith("VARCHAR") or data_type.startswith("NVARCHAR2"):
            return "STRING"
        if data_type == "DATE":
            return "DATETIME"
        return "OTHER"
    
    
    def create_entry_source(column):
        """Create Entry Source segment."""
        return F.named_struct(F.lit("display_name"),
                              column,
                              F.lit("system"),
                              F.lit(SOURCE_TYPE))
    
    
    def create_entry_aspect(entry_aspect_name):
        """Create aspect with general information (usually it is empty)."""
        return F.create_map(
            F.lit(entry_aspect_name),
            F.named_struct(
                F.lit("aspect_type"),
                F.lit(entry_aspect_name),
                F.lit("data"),
                F.create_map()
                )
            )
    
    
    def convert_to_import_items(df, aspect_keys):
        """Convert entries to import items."""
        entry_columns = ["name", "fully_qualified_name", "parent_entry",
                         "entry_source", "aspects", "entry_type"]
    
        # Puts entry to "entry" key, a list of keys from aspects in "aspects_keys"
        # and "aspects" string in "update_mask"
        return df.withColumn("entry", F.struct(entry_columns)) \
          .withColumn("aspect_keys", F.array([F.lit(key) for key in aspect_keys])) \
          .withColumn("update_mask", F.array(F.lit("aspects"))) \
          .drop(*entry_columns)
    
    
    def build_schemas(config, df_raw_schemas):
        """Create a dataframe with database schemas from the list of usernames.
        Args:
            df_raw_schemas - a dataframe with only one column called USERNAME
        Returns:
            A dataframe with Dataplex-readable schemas.
        """
        entry_type = EntryType.DB_SCHEMA
        entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
    
        # For schema, parent name is the name of the database
        parent_name =  nb.create_parent_name(config, entry_type)
    
        # Create user-defined function.
        create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, x),
                                StringType())
        create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, x),
                               StringType())
    
        # Fills the missed project and location into the entry type string
        full_entry_type = entry_type.value.format(
            project=config["target_project_id"],
            location=config["target_location_id"])
    
        # Converts a list of schema names to the Dataplex-compatible form
        column = F.col("USERNAME")
        df = df_raw_schemas.withColumn("name", create_name_udf(column)) \
          .withColumn("fully_qualified_name", create_fqn_udf(column)) \
          .withColumn("parent_entry", F.lit(parent_name)) \
          .withColumn("entry_type", F.lit(full_entry_type)) \
          .withColumn("entry_source", create_entry_source(column)) \
          .withColumn("aspects", create_entry_aspect(entry_aspect_name)) \
        .drop(column)
    
        df = convert_to_import_items(df, [entry_aspect_name])
        return df
    
    
    def build_dataset(config, df_raw, db_schema, entry_type):
        """Build table entries from a flat list of columns.
        Args:
            df_raw - a plain dataframe with TABLE_NAME, COLUMN_NAME, DATA_TYPE,
                     and NULLABLE columns
            db_schema - parent database schema
            entry_type - entry type: table or view
        Returns:
            A dataframe with Dataplex-readable data of tables of views.
        """
        schema_key = "dataplex-types.global.schema"
    
        # The transformation below does the following
        # 1. Alters NULLABLE content from Y/N to NULLABLE/REQUIRED
        # 2. Renames NULLABLE to mode
        # 3. Renames DATA_TYPE to dataType
        # 4. Creates metadataType column based on dataType column
        # 5. Renames COLUMN_NAME to name
        df = df_raw \
          .withColumn("mode", F.when(F.col("NULLABLE") == 'Y', "NULLABLE").otherwise("REQUIRED")) \
          .drop("NULLABLE") \
          .withColumnRenamed("DATA_TYPE", "dataType") \
          .withColumn("metadataType", choose_metadata_type_udf("dataType")) \
          .withColumnRenamed("COLUMN_NAME", "name")
    
        # The transformation below aggregate fields, denormalizing the table
        # TABLE_NAME becomes top-level filed, and the rest is put into
        # the array type called "fields"
        aspect_columns = ["name", "mode", "dataType", "metadataType"]
        df = df.withColumn("columns", F.struct(aspect_columns))\
          .groupby('TABLE_NAME') \
          .agg(F.collect_list("columns").alias("fields"))
    
        # Create nested structured called aspects.
        # Fields are becoming a part of a `schema` struct
        # There is also an entry_aspect that is repeats entry_type as aspect_type
        entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
        df = df.withColumn("schema",
                           F.create_map(F.lit(schema_key),
                                        F.named_struct(
                                            F.lit("aspect_type"),
                                            F.lit(schema_key),
                                            F.lit("data"),
                                            F.create_map(F.lit("fields"),
                                                         F.col("fields")))
                                        )
                           )\
          .withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \
        .drop("fields")
    
        # Merge separate aspect columns into the one map called 'aspects'
        df = df.select(F.col("TABLE_NAME"),
                       F.map_concat("schema", "entry_aspect").alias("aspects"))
    
        # Define user-defined functions to fill the general information
        # and hierarchy names
        create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type,
                                                         db_schema, x),
                                StringType())
    
        create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type,
                                                       db_schema, x), StringType())
    
        parent_name = nb.create_parent_name(entry_type, db_schema)
        full_entry_type = entry_type.value.format(
            project=config["target_project_id"],
            location=config["target_location_id"])
    
        # Fill the top-level fields
        column = F.col("TABLE_NAME")
        df = df.withColumn("name", create_name_udf(column)) \
          .withColumn("fully_qualified_name", create_fqn_udf(column)) \
          .withColumn("entry_type", F.lit(full_entry_type)) \
          .withColumn("parent_entry", F.lit(parent_name)) \
          .withColumn("entry_source", create_entry_source(column)) \
        .drop(column)
    
        df = convert_to_import_items(df, [schema_key, entry_aspect_name])
        return df
    

    Observe o seguinte:

    • Os métodos criam os recursos do Dataplex Catalog que o conector cria para seus recursos Oracle. Use as convenções descritas nos Exemplo de recursos do catálogo do Dataplex para uma origem do Oracle deste documento.
    • O método convert_to_import_items se aplica a esquemas, tabelas e visualizações. Verifique se o resultado do conector é um ou mais itens de importação que podem ser processadas pela método metadataJobs.create, e não entradas individuais.
    • Mesmo em uma visualização, a coluna é chamada de TABLE_NAME.
  6. Atualize o arquivo bootstrap.py com o código para gerar o arquivo de importação de metadados e executar o conector.

    """The entrypoint of a pipeline."""
    from typing import Dict
    
    from src.constants import EntryType
    from src import cmd_reader
    from src import secret_manager
    from src import entry_builder
    from src import gcs_uploader
    from src import top_entry_builder
    from src.oracle_connector import OracleConnector
    
    
    FILENAME = "output.jsonl"
    
    
    def write_jsonl(output_file, json_strings):
        """Writes a list of string to the file in JSONL format."""
    
        # For simplicity, dataset is written into the one file. But it is not
        # mandatory, and the order doesn't matter for Import API.
        # The PySpark itself could dump entries into many smaller JSONL files.
        # Due to performance, it's recommended to dump to many smaller files.
        for string in json_strings:
            output_file.write(string + "\n")
    
    
    def process_dataset(
        connector: OracleConnector,
        config: Dict[str, str],
        schema_name: str,
        entry_type: EntryType,
    ):
        """Builds dataset and converts it to jsonl."""
        df_raw = connector.get_dataset(schema_name, entry_type)
        df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
        return df.toJSON().collect()
    
    
    def run():
        """Runs a pipeline."""
        config = cmd_reader.read_args()
        config["password"] = secret_manager.get_password(config["password_secret"])
        connector = OracleConnector(config)
    
        with open(FILENAME, "w", encoding="utf-8") as file:
            # Write top entries that don't require connection to the database
            file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
            file.writelines("\n")
            file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
    
            # Get schemas, write them and collect to the list
            df_raw_schemas = connector.get_db_schemas()
            schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
            schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
    
            write_jsonl(file, schemas_json)
    
            # Ingest tables and views for every schema in a list
            for schema in schemas:
                print(f"Processing tables for {schema}")
                tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                write_jsonl(file, tables_json)
                print(f"Processing views for {schema}")
                views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                write_jsonl(file, views_json)
    
        gcs_uploader.upload(config, FILENAME)
    

    Este exemplo salva o arquivo de importação de metadados como um único arquivo JSON Lines. Você podem usar ferramentas PySpark, como a classe DataFrameWriter, para gerar lotes de JSON em paralelo.

    O conector pode gravar entradas no arquivo de importação de metadados em qualquer ordem.

  7. Atualize o arquivo gcs_uploader.py com o código para fazer upload do arquivo de importação de metadados para um bucket do Cloud Storage.

    """Sends files to GCP storage."""
    from typing import Dict
    from google.cloud import storage
    
    
    def upload(config: Dict[str, str], filename: str):
        """Uploads a file to GCP bucket."""
        client = storage.Client()
        bucket = client.get_bucket(config["output_bucket"])
        folder = config["output_folder"]
    
        blob = bucket.blob(f"{folder}/{filename}")
        blob.upload_from_filename(filename)
    
  8. Crie a imagem do conector.

    Se o conector tiver vários arquivos ou se você quiser usar bibliotecas que não estão incluídas na imagem padrão do Docker, use um contêiner personalizado. O Dataproc Serverless para Spark executa cargas de trabalho no Docker contêineres. Crie uma imagem personalizada do conector do Docker e armazene a imagem no Artifact Registry. O Dataproc Serverless lê a imagem do Artifact Registry.

    1. Crie um Dockerfile:

      FROM debian:11-slim
      
      ENV DEBIAN_FRONTEND=noninteractive
      
      RUN apt update && apt install -y procps tini
      RUN apt install -y wget
      
      ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
      RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
      COPY ojdbc11.jar "${SPARK_EXTRA_JARS_DIR}"
      
      ENV CONDA_HOME=/opt/miniconda3
      ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
      ENV PATH=${CONDA_HOME}/bin:${PATH}
      RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py310_23.3.1-0-Linux-x86_64.sh
      
      RUN bash Miniconda3-py310_23.3.1-0-Linux-x86_64.sh -b -p /opt/miniconda3 \
        && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
        && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
        && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
        && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
      
      RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
          && ${CONDA_HOME}/bin/mamba install \
            conda \
            google-cloud-dataproc \
            google-cloud-logging \
            google-cloud-monitoring \
            google-cloud-storage
      
      RUN apt update && apt install -y git
      COPY requirements.txt .
      RUN python -m pip install -r requirements.txt
      
      ENV PYTHONPATH=/opt/python/packages
      RUN mkdir -p "${PYTHONPATH}/src/"
      COPY src/ "${PYTHONPATH}/src/"
      COPY main.py .
      
      RUN groupadd -g 1099 spark
      RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
      USER spark

      Use o Conda como gerenciador de pacotes. Dataproc sem servidor do Spark monta pyspark no contêiner durante a execução. Assim, você não precisará instalar dependências do PySpark na sua imagem personalizada de contêiner.

    2. Crie a imagem do contêiner personalizado e envie-a para o Artifact Registry.

      #!/bin/bash
      
      IMAGE=oracle-pyspark:0.0.1
      PROJECT=qc-cloudsql-connector-devproj
      
      
      REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pyspark
      
      docker build -t "${IMAGE}" .
      
      # Tag and push to GCP container registry
      gcloud config set project ${PROJECT}
      gcloud auth configure-docker us-central1-docker.pkg.dev
      docker tag "${IMAGE}" "${REPO_IMAGE}"
      docker push "${REPO_IMAGE}"

      Como uma imagem pode ter vários nomes, você pode usar a tag Docker para atribuir um alias à imagem.

  9. Executar o conector no Dataproc sem servidor. Para enviar um job em lote do PySpark usando a imagem do contêiner personalizada, execute o comando gcloud dataproc batches submit pyspark.

    gcloud dataproc batches submit pyspark main.py --project=PROJECT \
        --region=REGION --batch=BATCH_ID \
        --container-image=CUSTOM_CONTAINER_IMAGE \
        --service-account=SERVICE_ACCOUNT_NAME \
        --jars=PATH_TO_JAR_FILES \
        --properties=PYSPARK_PROPERTIES \
        -- PIPELINE_ARGUMENTS
    

    Observe o seguinte:

    • Os arquivos JAR são drivers do Spark. para realizar leituras em Oracle, MySQL ou Postgres, é preciso fornecer um pacote específico ao Apache Spark. O pacote pode estar localizado no Cloud Storage ou dentro do contêiner. Se o arquivo JAR estiver dentro do contêiner, o caminho será semelhante a file:///path/to/file/driver.jar. Neste exemplo, o caminho para o arquivo JAR é /opt/spark/jars/.
    • PIPELINE_ARGUMENTS são os argumentos de linha de comando. para o conector.

    O conector extrai metadados do banco de dados Oracle, gera um arquivo de importação de metadados e o salva em um bucket do Cloud Storage.

  10. Importar manualmente os metadados no arquivo de importação de metadados para o Dataplex, execute um job de metadados. Use o método metadataJobs.create.

    1. Na linha de comando, adicione variáveis de ambiente e crie um alias para o comando curl.

      PROJECT_ID=PROJECT
      LOCATION_ID=LOCATION
      DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID
      alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
      
    2. Chame o método API, passando os tipos de entrada e de aspecto que você que quiser importar.

      gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF
      {
        "type": "IMPORT",
        "import_spec": {
          "source_storage_uri": "gs://BUCKET/FOLDER/",
          "entry_sync_mode": "FULL",
          "aspect_sync_mode": "INCREMENTAL",
          "scope": {
            "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"],
            "entry_types": [
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"],
      
            "aspect_types": [
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance",
              "projects/dataplex-types/locations/global/aspectTypes/schema",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"],
            },
          },
        }
      EOF
      )"
      

      schema é um tipo de aspecto global definido pelo o Dataplex.

      O formato usado para nomes de tipo de aspecto ao chamar o método da API é diferente do formato usado no código do conector.

    3. Opcional: use o Cloud Logging para conferir os registros do job de metadados. Para mais informações, consulte Monitorar os registros do Dataplex.

Configurar a orquestração de pipeline

As seções anteriores mostraram como criar um conector de exemplo e executá-lo manualmente.

Em um ambiente de produção, execute o conector como parte de um pipeline de conectividade gerenciado usando uma plataforma de orquestração, como o Workflows.

  1. Para executar um pipeline de conectividade gerenciada com o conector de exemplo, siga as etapas para importar metadados usando o Workflows. Faça o seguinte:

    • Crie o fluxo de trabalho no mesmo local do Google Cloud que o conector.
    • No arquivo de definição do fluxo de trabalho, atualize a função submit_pyspark_extract_job com o código abaixo para extrair dados do banco de dados Oracle usando o conector que você criou.

      - submit_pyspark_extract_job:
          call: http.post
          args:
            url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
            auth:
              type: OAuth2
              scopes: "https://www.googleapis.com/auth/cloud-platform"
            headers:
              Content-Type: "application/json"
            query:
              batchId: ${WORKFLOW_ID}
            body:
              pysparkBatch:
                mainPythonFileUri: file:///main.py
                jars: file:///opt/spark/jars/ojdbc11.jar
                args:
                  - ${"--host_port=" + args.ORACLE_HOST_PORT}
                  - ${"--user=" + args.ORACLE_USER}
                  - ${"--password=" + args.ORACLE_PASSWORD}
                  - ${"--database=" + args.ORACE_DATABASE}
                  - ${"--project=" + args.TARGET_PROJECT_ID}
                  - ${"--location=" + args.CLOUD_REGION}
                  - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID}
                  - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                  - ${"--folder=" + WORKFLOW_ID}
              runtimeConfig:
                version: "2.0"
                containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark"
              environmentConfig:
                executionConfig:
                    serviceAccount: ${args.SERVICE_ACCOUNT}
          result: RESPONSE_MESSAGE
      
    • No arquivo de definição do fluxo de trabalho, atualize a função submit_import_job com o código abaixo para importar as entradas. A função chama o método da API metadataJobs.create para executar um job de importação de metadados.

      - submit_import_job:
          call: http.post
          args:
            url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
            auth:
              type: OAuth2
              scopes: "https://www.googleapis.com/auth/cloud-platform"
            body:
              type: IMPORT
              import_spec:
                source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                entry_sync_mode: FULL
                aspect_sync_mode: INCREMENTAL
                scope:
                  entry_groups:
                    - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}
                  entry_types:
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"
                  aspect_types:
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance"
                    -"projects/dataplex-types/locations/global/aspectTypes/schema"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"
          result: IMPORT_JOB_RESPONSE
      

      Forneça os mesmos tipos de entrada e de aspecto que você incluiu ao o método da API manualmente. Não há uma vírgula no final de cada string.

    • Ao executar o fluxo de trabalho, forneça os seguintes argumentos do ambiente de execução:

      {
        "CLOUD_REGION": "us-central1",
        "ORACLE_USER": "system",
        "ORACLE_HOST_PORT": "x.x.x.x:1521",
        "ORACLE_DATABASE": "xe",
        "ADDITIONAL_CONNECTOR_ARGS": [],
      }
      
  2. Opcional: use o Cloud Logging para conferir os registros do pipeline de conectividade gerenciada. A carga útil do registro inclui um link para os registros do job em lote do Dataproc sem servidor e o job de importação de metadados relevantes. Para mais informações, consulte Conferir os registros do fluxo de trabalho.

  3. Opcional: para melhorar a segurança, o desempenho e a funcionalidade do pipeline de conectividade gerenciada, considere fazer o seguinte:

    1. Use o Secret Manager para armazenar as credenciais da sua fonte de dados de terceiros.
    2. Usar o PySpark para gravar a saída do JSON Lines na importação de vários metadados arquivos em paralelo.
    3. Use um prefixo para dividir arquivos grandes (mais de 100 MB) em arquivos menores.
    4. Adicione mais aspectos personalizados que capturem outros metadados comerciais e técnicos da sua fonte.

Exemplo de recursos do Dataplex Catalog para uma origem do Oracle

O exemplo de conector extrai metadados de um banco de dados Oracle e os mapeia para os recursos correspondentes do Dataplex Catalog.

Considerações sobre hierarquia

Cada sistema no Dataplex tem uma entrada raiz que é a entrada mãe do sistema. Normalmente, a entrada raiz tem um tipo de entrada instance. A tabela a seguir mostra um exemplo de hierarquia de tipos de entrada e de aspecto para um sistema Oracle.

ID do tipo de entrada Descrição ID do tipo de aspecto vinculado
oracle-instance A raiz do sistema importado. oracle-instance
oracle-database O banco de dados Oracle. oracle-database
oracle-schema O esquema do banco de dados. oracle-schema
oracle-table Uma tabela.

oracle-table

schema

oracle-view Uma visualização.

oracle-view

schema

schema é um tipo de aspecto global definido pelo o Dataplex. Ele contém uma descrição dos campos em uma tabela, ou outra entidade com colunas. O tipo de aspecto personalizado oracle-schema contém o nome do esquema do banco de dados Oracle.

Exemplo de campos de importação de itens

O conector deve usar as seguintes convenções para recursos do Oracle.

  • Nomes totalmente qualificados: nomes totalmente qualificados para Oracle os recursos usam o modelo de nomenclatura a seguir. Caracteres proibidos são escapou com acentos graves.

    Recurso Modelo Exemplo
    Instância

    SOURCE:ADDRESS

    Use o host e o número da porta ou o nome de domínio do sistema.

    oracle:`localhost:1521` ou oracle:`myinstance.com`
    Banco de dados SOURCE:ADDRESS,DATABASE oracle:`localhost:1521`.xe
    Esquema SOURCE:ADDRESS.DATABASE.SCHEMA oracle:`localhost:1521`.xe.sys
    Tabela SOURCE:ADDRESS.DATABASE.SCHEMA.TABLE_NAME oracle:`localhost:1521`.xe.sys.orders
    Ver SOURCE:ADDRESS.DATABASE.SCHEMA.VIEW_NAME oracle:`localhost:1521`.xe.sys.orders_view
  • Nomes de entrada ou IDs de entrada: entradas para recursos do Oracle. use o modelo de nomenclatura a seguir. Os caracteres proibidos são substituídos por um permitido. Os recursos usam o prefixo projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries:

    Recurso Modelo Exemplo
    Instância PREFIX/HOST_PORT projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
    Banco de dados PREFIX/HOST_PORT/databases/DATABASE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
    Esquema PREFIX/HOST_PORT/bancos de dados/DATABASE/schema_de_banco_de_dados/SCHEMA projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
    Tabela PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
    Ver PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEW projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
  • Entradas mães: se uma entrada não for raiz do sistema, ela pode ter um campo de entrada mãe que descreve a posição dela na hierarquia. O campo deve conter o nome da entrada pai. Qa recomendamos que você gere esse valor.

    A tabela a seguir mostra as entradas pai dos recursos do Oracle.

    Entrada Entrada pai
    Instância "" (string vazia)
    Banco de dados Nome da instância
    Esquema Nome do banco de dados
    Tabela Nome do esquema
    Ver Nome do esquema
  • Mapa de aspecto: o mapa precisa conter pelo menos um aspecto. que descreve a entidade a ser importada. Confira um exemplo de mapa de aspectos para uma tabela do Oracle.

    "example-project.us-central1.oracle-table": {
        "aspect_type": "example-project.us-central1.oracle-table",
        "path": "",
        "data": {}
     },

    Você pode encontrar tipos de aspecto predefinidos (como schema) que definem a estrutura de tabela ou visualização no projeto dataplex-types, no local global.

  • Chaves de aspecto: as chaves de aspecto usam o formato de nomenclatura PROJECT.LOCATION.ASPECT_TYPE. Confira na tabela a seguir exemplos de chaves de aspecto para recursos do Oracle.

    Entrada Exemplo de chave de aspecto
    Instância example-project.us-central1.oracle-instance
    Banco de dados example-project.us-central1.oracle-database
    Schema example-project.us-central1.oracle-schema
    Tabela example-project.us-central1.oracle-table
    Ver example-project.us-central1.oracle-view

A seguir