Desarrolla un conector personalizado para la importación de metadatos

En este documento, se proporciona una plantilla de referencia para que crees un conector personalizado que extraiga metadatos de una fuente de terceros. Usas el conector cuando ejecutas una canalización de conectividad administrada que importa metadatos a Dataplex.

Puedes crear conectores para extraer metadatos de fuentes de terceros. Por ejemplo, puedes compilar un conector para extraer datos de fuentes como MySQL, SQL Server, Oracle, Snowflake, Databricks y otras.

Usa el conector de ejemplo de este documento como punto de partida para compilar tus propios conectores. El conector de ejemplo se conecta a una base de datos de Oracle Express Edition (XE). El conector está compilado en Python, aunque también puedes usar Java, Scala o R.

Cómo funcionan los conectores

Un conector extrae metadatos de una fuente de datos de terceros, transforma metadatos a formato ImportItem de Dataplex y genera archivos de importación de metadatos que se pueden importar con Dataplex.

El conector forma parte de una canalización de conectividad administrada. Una cuenta administrada La canalización de conectividad es un flujo de trabajo organizado que se usa para importar Metadatos de Dataplex Catalog. La canalización de conectividad administrada ejecuta el conector y realiza otras tareas en el flujo de trabajo de importación, como las siguientes: ejecutar un trabajo de importación de metadatos y capturar registros.

La canalización de conectividad administrada ejecuta el conector con un trabajo por lotes de Dataproc Serverless. Dataproc Serverless ofrece un entorno de ejecución de Spark sin servidores. Si bien puedes compilar un conector que no use Spark, te recomendamos que lo hagas, ya que puede mejorar el rendimiento de tu conector.

Requisitos del conector

El conector tiene los siguientes requisitos:

  • El conector debe ser una imagen de Artifact Registry que se pueda ejecutar en Dataproc Serverless.
  • El conector debe generar archivos de metadatos en un formato que se pueda importar por un trabajo de importación de metadatos de Dataplex (el metadataJobs.create método de API). Para ver los requisitos detallados, consulta Archivo de importación de metadatos.
  • El conector debe aceptar los siguientes argumentos de línea de comandos para recibir de la canalización:

    Argumento de la línea de comandos Valor que proporciona la canalización
    target_project_id PROJECT_ID
    target_location_id REGION
    target_entry_group_id ENTRY_GROUP_ID
    output_bucket CLOUD_STORAGE_BUCKET_ID
    output_folder FOLDER_ID

    El conector usa estos argumentos para generar metadatos en un grupo de entrada de destino. projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID, y a escribir en un bucket de Cloud Storage gs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID Cada ejecución de la canalización crea una carpeta nueva FOLDER_ID en el bucket CLOUD_STORAGE_BUCKET_ID El conector debe escribir los archivos de importación de metadatos en esta carpeta.

Las plantillas de canalización admiten conectores de PySpark. Las plantillas suponen que el controlador (mainPythonFileUri) es un archivo local en la imagen del conector llamado main.py. Puedes modificar las plantillas de canalización para otras situaciones, como un conector de Spark, un URI de controlador diferente o alguna otra opción.

A continuación, se muestra cómo usar PySpark para crear un elemento de importación en el archivo de importación de metadatos.

"""PySpark schemas for the data."""
entry_source_schema = StructType([
      StructField("display_name", StringType()),
      StructField("source", StringType())])

aspect_schema = MapType(StringType(),
                        StructType([
                            StructField("aspect_type", StringType()),
                            StructField("data", StructType([
                            ]))
                          ])
                        )

entry_schema = StructType([
  StructField("name", StringType()),
  StructField("entry_type", StringType()),
  StructField("fully_qualified_name", StringType()),
  StructField("parent_entry", StringType()),
  StructField("entry_source", entry_source_schema),
  StructField("aspects", aspect_schema)
])

import_item_schema = StructType([
  StructField("entry", entry_schema),
  StructField("aspect_keys", ArrayType(StringType())),
  StructField("update_mask", ArrayType(StringType()))
])

Antes de comenzar

En esta guía, se supone que estás familiarizado con Python y PySpark.

Revisa la siguiente información:

Haz lo siguiente: Crea todos los recursos en la misma ubicación de Google Cloud.

  1. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  2. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  3. Enable the Dataplex, Dataproc, Workflows, and Artifact Registry APIs:

    gcloud services enable dataplex.googleapis.com dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com
  4. Install the Google Cloud CLI.
  5. To initialize the gcloud CLI, run the following command:

    gcloud init
  6. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  7. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant the roles/owner IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
  8. Crea un bucket de Cloud Storage para almacenar los archivos de importación de metadatos.

  9. Crea los siguientes recursos de Dataplex Catalog en el mismo proyecto.

    Para ver valores de ejemplo, consulta la sección Recursos de ejemplo de Dataplex Catalog para una fuente de Oracle de este documento.

    1. Crea un grupo de entradas.
    2. Crea tipos de aspectos personalizados para las entradas que deseas importar. Usa la convención de nomenclatura SOURCE-ENTITY_TO_IMPORT

      De manera opcional, puedes crear tipos de aspectos adicionales para almacenar otra información.

    3. Crea tipos de entrada personalizados para los recursos que deseas importar y asígnales los tipos de aspectos relevantes. Usa la convención de nomenclatura SOURCE-ENTITY_TO_IMPORT

      Por ejemplo, para una base de datos de Oracle, crea un tipo de entrada llamado oracle-database Vincúlalo al tipo de aspecto que se llama oracle-database.

  10. Asegúrese de que se pueda acceder a su fuente de terceros desde su proyecto de Google Cloud. Para obtener más información, consulta Dataproc sin servidores para la configuración de redes Spark.

Crea un conector básico de Python

El ejemplo de conector básico de Python crea entradas de nivel superior para una fuente de datos de Oracle con las clases de la biblioteca cliente de Dataplex. Luego, debes proporcionar los valores para los campos de entrada.

El conector crea un archivo de importación de metadatos con las siguientes entradas:

  • Una entrada instance, con el tipo de entrada projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance. Esta entrada representa un sistema XE de la base de datos de Oracle.
  • Una entrada database, que representa una base de datos dentro del sistema Oracle Database XE.

Para compilar un conector básico de Python, haz lo siguiente:

  1. Configurar un entorno local Te recomendamos que uses un entorno virtual.

    mkdir venv
    python -m venv venv/
    source venv/bin/activate
    

    Usa el activo o mantenimiento más recientes de Python. Se admiten las versiones 3.7 y posteriores de Python.

  2. Crea un proyecto de Python.

  3. Agrega un archivo requirements.txt con lo siguiente:

    • google-cloud-dataplex
    • google-cloud-storage

    Requisitos de instalación:

    pip install -r requirements.txt
    
  4. Agrega un archivo de canalización main.py en la raíz del proyecto.

    Cuando implementas tu código en Dataproc Serverless, el main.py archivo funciona como punto de entrada para la ejecución. Te recomendamos que minimices la cantidad de información que se almacena en el archivo main.py usar esto para llamar a funciones y clases definidas dentro de tu conector, como la clase src/bootstap.py.

  5. Crea una carpeta src para almacenar la mayor parte de la lógica del conector.

  6. Crea un archivo llamado src/cmd_reader.py con una clase de Python para aceptar argumentos de línea de comandos. Puedes usar la argeparse módulo para hacerlo.

    import argparse
    
    def read_args():
        """Reads arguments from the command line."""
        parser = argparse.ArgumentParser()
        parser.add_argument("-p", "--project", type=str, required=True,
                            help="Google Cloud project")
        parser.add_argument("-l", "--location", type=str, required=True,
                            help="Google Cloud location")
        parser.add_argument("-g", "--entry_group", type=str, required=True,
                            help="Dataplex entry group")
        parser.add_argument("--host_port", type=str, required=True,
                            help="Oracle host and port")
        parser.add_argument("--user", type=str, required=True, help="Oracle user")
        parser.add_argument("--password-secret", type=str, required=True,
                            help="An ID in Secret Manager for the Oracle password.")
        parser.add_argument("-d", "--database", type=str, required=True,
                            help="Oracle database")
        parser.add_argument("--bucket", type=str, required=True,
                            help="Cloud Storage bucket")
        parser.add_argument("--folder", type=str, required=True,
                            help="Folder in the bucket")
    
        return vars(parser.parse_known_args()[0])
    

    Los siguientes argumentos son los mínimos necesarios para leer una base de datos de Oracle:

    • project: Es el proyecto de Google Cloud que contiene el grupo de entradas, los tipos de entradas y los tipos de aspectos.
    • location: Es la ubicación de Google Cloud del grupo de entradas, los tipos de entradas y los tipos de aspectos.
    • entry_group: Es un grupo de entradas existente en Dataplex. El los metadatos que importas son para las entradas que pertenecen a esta entrada grupo.
    • host_port: Es el host y el número de puerto de la instancia de Oracle.
    • user: El usuario de Oracle
    • password-secret: Es la contraseña para el usuario de Oracle. En los entornos de producción, te recomendamos que almacenes la contraseña en Secret Manager.
    • database: Es la base de datos de Oracle de destino.
    • bucket: El bucket de Cloud Storage que almacena los metadatos importar un archivo.
    • folder: Es una carpeta en el bucket de Cloud Storage que se usa para separar los archivos de importación de metadatos. Cada ejecución del flujo de trabajo crea un nuevo carpeta.
  7. Crea un archivo llamado src/constants.py. Agrega el siguiente código para crear constantes.

    SOURCE_TYPE = 'oracle'
    
    class EntryType(enum.Enum):
       """Types of Oracle entries."""
       INSTANCE: str = "projects/{project}/locations/{location}/entryTypes/oracle-instance"
       DATABASE: str = "projects/{project}/locations/{location}/entryTypes/oracle-database"
       DB_SCHEMA: str = "projects/{project}/locations/{location}/entryTypes/oracle-schema"
       TABLE: str = "projects/{project}/locations/{location}/entryTypes/oracle-table"
       VIEW: str = "projects/{project}/locations/{location}/entryTypes/oracle-view"
    
  8. Crea un archivo llamado src/name_builder.py. Escribe métodos para compilar los recursos de Dataplex Catalog que deseas que el conector cree para tus recursos de Oracle. Usa las convenciones que se describen en el Ejemplo de recursos de Dataplex Catalog para una fuente de Oracle de este documento.

    Debido a que el archivo name_builder.py se usa para el código principal de Python y el código principal de PySpark, te recomendamos que escribas los métodos como valores en lugar de como miembros de una clase.

  9. Crea un archivo llamado src/top_level_builder.py. Agrega el siguiente código a llenar las entradas de nivel superior con datos.

    from src.common import EntryType
    from src import name_builder as nb
    
    @dataclasses.dataclass(slots=True)
    class ImportItem:
        entry: dataplex_v1.Entry = dataclasses.field(
            default_factory=dataplex_v1.Entry)
        aspect_keys: List[str] = dataclasses.field(default_factory=list)
        update_mask: List[str] = dataclasses.field(default_factory=list)
    
    def dict_factory(data: object):
        """Factory function required for converting Entry dataclass to dict."""
        def convert(obj: object):
          if isinstance(obj, proto.Message):
            return proto.Message.to_dict(obj)
          return obj
    
        return dict((k, convert(v)) for k, v in data)
    
    class TopEntryBuilder:
        def __init__(self, config):
            self._config = config
            self._project = config["project"]
            self._location = config["location"]
    
        def create_jsonl_item(self, entry_type: EntryType):
            item = self.entry_to_import_item(self.create_entry(entry_type))
            return self.to_json(item)
    
        def create_entry(self, entry_type: EntryType):
            entry = dataplex_v1.Entry()
            entry.name = nb.create_name(self._config, entry_type)
            entry.entry_type = entry_type.value.format(project=self._project, location=self._location)
            entry.fully_qualified_name = nb.create_fqn(self._config, entry_type)
            entry.parent_entry = nb.create_parent_name(self._config, entry_type)
    
            aspect_key = nb.create_entry_aspect_name(self._config, entry_type)
    
            entry_aspect = dataplex_v1.Aspect()
            entry_aspect.aspect_type = aspect_key
            entry_aspect.data = {}
            entry.aspects[aspect_key] = entry_aspect
    
            return entry
    
        def entry_to_import_item(self, entry: dataplex_v1.Entry):
            import_item = ImportItem()
            import_item.entry = entry
            import_item.aspect_keys = list(entry.aspects.keys())
            import_item.update_mask = "aspects"
    
            return import_item
    
        def to_json(self, import_item: ImportItem):
            return json.dumps(dataclasses.asdict(import_item, dict_factory=dict_factory))
    
        import_item.aspect_keys = list(entry.aspects.keys())
        import_item.update_mask = "aspects"
    
        return import_item
    
      def to_json(self, import_item: ImportItem):
        return json.dumps(dataclasses.asdict(import_item, dict_factory=dict_factory))
    
  10. Crea un archivo llamado src/bootstrap.py. Agrega el siguiente código que genera el archivo de importación de metadatos y ejecuta el conector.

    FILENAME = "output.jsonl"
    
    def write_jsonl(output_file, json_strings):
        """Writes a list of strings to the file in JSONL format."""
        for string in json_strings:
            output_file.write(string + "\n")
    
    def run():
        """Runs a pipeline."""
        config = cmd_reader.read_args()
        with open(FILENAME, "w", encoding="utf-8") as file:
            # Write the top entries
            file.write(top_entry_builder.create(EntryType.INSTANCE))
            file.write(top_entry_builder.create(EntryType.DATABASE))
    
  11. Ejecuta el código de manera local.

    Se muestra un archivo de importación de metadatos llamado output.jsonl. El archivo tiene dos líneas, cada una de las cuales representa un elemento de importación. La canalización de conectividad administrada lee este archivo cuando ejecuta el trabajo de importación de metadatos.

  12. Opcional: Amplía el ejemplo anterior para usar Dataplex las clases de biblioteca cliente para crear elementos de importación de tablas, esquemas y vistas. También puedes ejecutar el ejemplo de Python en Dataproc Serverless.

    Recomendamos que crees un conector que use Spark (y ejecute en Dataproc Serverless), ya que puede mejorar el el rendimiento del conector.

Crear un conector de PySpark

Este ejemplo se basa en el API de PySpark DataFrame Puedes instalar PySpark SQL y ejecutarlo de forma local antes de ejecucarlo en Dataproc Serverless. Si instalas y ejecutas PySpark de forma local, instala la biblioteca de PySpark con pip, pero no es necesario un clúster local de Spark.

Por motivos de rendimiento, este ejemplo no usa clases predefinidas de la biblioteca de PySpark. En su lugar, el ejemplo crea DataFrames, los convierte en entradas JSON y, luego, escribe el resultado en un archivo de importación de metadatos en formato de líneas JSON que se puede importar a Dataplex.

Para compilar un conector con PySpark, haz lo siguiente:

  1. Agrega el siguiente código que lee los datos de una fuente de datos de Oracle y devuelve DataFrames.

    SPARK_JAR_PATH="/opt/spark/jars/ojdbc11.jar"
    
    class OracleConnector:
        """Reads data from Oracle and returns Spark Dataframes."""
    
        def __init__(self, config: Dict[str, str]):
            # Spark entrypoint
            self._spark = SparkSession.builder.appName("OracleIngestor").config(
                "spark.jars", SPARK_JAR_PATH).getOrCreate()
            self._config = config
            self._url = (f"jdbc:oracle:thin:@{config['host_port']}"
                        f":{config['database']}")
    
        def _execute(self, query: str) -> DataFrame:
            return self._spark.read.format('jdbc') \
              .option("driver", "oracle.jdbc.OracleDriver") \
              .option("url",  self._url) \
              .option('query', query) \
              .option('user', self._config["user"]) \
              .option('password', self._config["password"]) \
              .load()
    
        def get_db_schemas(self) -> DataFrame:
            """In Oracle, schemas are usernames."""
            query = "SELECT username FROM dba_users"
            return self._execute(query)
    
        def _get_columns(self, schema_name: str, object_type: str) -> str:
            """Every line here is a column."""
            return (f"SELECT col.TABLE_NAME, col.COLUMN_NAME, "
                    f"col.DATA_TYPE, col.NULLABLE "
                    f"FROM all_tab_columns col "
                    f"INNER JOIN DBA_OBJECTS tab "
                    f"ON tab.OBJECT_NAME = col.TABLE_NAME "
                    f"WHERE tab.OWNER = '{schema_name}' "
                    f"AND tab.OBJECT_TYPE = '{object_type}'")
    
        def get_dataset(self, schema_name: str, entry_type: EntryType):
            """Get table or view."""
            short_type = entry_type.name # the title of enum value
            query = self._get_columns(schema_name, short_type)
            return self._execute(query)
    

    Agrega consultas de SQL para mostrar los metadatos que deseas importar. Las consultas es necesario que devuelva la siguiente información:

    • Esquemas de bases de datos
    • Tablas que pertenecen a estos esquemas
    • Las columnas que pertenecen a estas tablas, incluido el nombre de la columna, el tipo de datos de la columna y si la columna es nula o obligatoria

    Todas las columnas de todas las tablas y vistas se almacenan en la misma de la tabla del sistema. Puedes seleccionar columnas con el método _get_columns. Según los parámetros que proporciones, puedes seleccionar columnas para las tablas o para las vistas por separado.

    Ten en cuenta lo siguiente:

    • En Oracle, un esquema de base de datos es propiedad de un usuario de base de datos y tiene la misma como ese usuario.
    • Los objetos de esquema son estructuras lógicas creadas por los usuarios. Los objetos como las tablas o los índices pueden contener datos, y los objetos como las vistas o los sinónimos solo consisten en una definición.
    • El archivo ojdbc11.jar contiene las Controlador JDBC de Oracle.
  2. Crea un archivo llamado src/entry_builder.py. Agregar los siguientes compartidos para aplicar transformaciones de Spark.

    import pyspark.sql.functions as F
    from src import name_builder as nb
    
    @F.udf(returnType=StringType())
    def choose_metadata_type_udf(data_type: str):
        """Parse Oracle column type to Dataplex type."""
        USER_DEFINED_FUNCTION
    
    def create_entry_source(column):
        """Create Entry Source segment."""
        return F.named_struct(F.lit("display_name"),
                              column,
                              F.lit("system"),
                              F.lit(SOURCE_TYPE))
    
    def create_entry_aspect(entry_aspect_name):
        """Create aspect with general information (usually it is empty)."""
        return F.create_map(F.lit(entry_aspect_name),
                            F.named_struct(F.lit("aspect_type"),
                                          F.lit(entry_aspect_name),
                                          F.lit("data"),
                                          F.create_map()
                                          )
                            )
    
    def convert_to_import_items(df, aspect_keys):
        """Convert entries to import items."""
        entry_columns = ["name", "fully_qualified_name", "parent_entry",
                        "entry_source", "aspects", "entry_type"]
    
        return df.withColumn("entry", F.struct(entry_columns)) \
          .withColumn("aspect_keys", F.array([F.lit(key) for key in aspect_keys]))\
          .withColumn("update_mask", F.array(F.lit("aspects"))) \
          .drop(*entry_columns)
    

    Puedes hacer lo siguiente:

    • El método choose_metadata_type_udf es una herramienta de PySpark definida por el usuario función que creas para indexar columnas. Para USER_DEFINED_FUNCTION, escribe métodos para compilar la Recursos de Dataplex Catalog que quieres que sea el conector para tus recursos de Oracle. Usa las convenciones que se describen en el Ejemplo de recursos de Dataplex Catalog para una fuente de Oracle de este documento.
    • El método convert_to_import_items se aplica a esquemas, tablas y vistas. Asegúrate de que el resultado del conector sea uno o más elementos de importación que el método metadataJobs.create pueda procesar, no entradas individuales.
  3. En src/entry_builder.py, agrega el siguiente código para aplicar transformaciones de Spark a los esquemas.

    def build_schemas(config, df_raw_schemas):
      """Create a dataframe with database schemas."""
      entry_type = EntryType.DB_SCHEMA
      entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
      parent_name =  nb.create_parent_name(config, entry_type)
    
      create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, x),
                              StringType())
      create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, x),
                            StringType())
    
      # Remember to fill the missed project and location
      full_entry_type = entry_type.value.format(project=config["PROJECT"],
                                                location=config["LOCATION"])
    
      # To list of entries
      column = F.col("USERNAME")
      df = df_raw_schemas.withColumn("name", create_name_udf(column)) \
        .withColumn("fully_qualified_name", create_fqn_udf(column)) \
        .withColumn("parent_entry", F.lit(parent_name)) \
        .withColumn("entry_type", F.lit(full_entry_type)) \
        .withColumn("entry_source", create_entry_source(column)) \
        .withColumn("aspects", create_entry_aspect(entry_aspect_name)) \
      .drop(column)
    
      df = convert_to_import_items(df, [entry_aspect_name])
      return df
    
  4. En src/entry_builder.py, agrega el siguiente código que aplica Spark. transformaciones de datos a las tablas y vistas. El código crea una lista de tablas con definiciones de columnas que Dataplex puede leer.

    def build_dataset(config, df_raw, db_schema, entry_type):
      """Build table entries from a flat list of columns."""
      schema_key = "dataplex-types.global.schema"
    
      # Format column names and details
      df = df_raw \
        .withColumn("mode", F.when(F.col("NULLABLE") == 'Y',
                                    "NULLABLE").otherwise("REQUIRED")) \
        .drop("NULLABLE") \
        .withColumnRenamed("DATA_TYPE", "dataType") \
        .withColumn("metadataType", choose_metadata_type_udf("dataType")) \
        .withColumnRenamed("COLUMN_NAME", "name")
    
      # Aggregate fields
      aspect_columns = ["name", "mode", "dataType", "metadataType"]
      df = df.withColumn("columns", F.struct(aspect_columns))\
        .groupby('TABLE_NAME') \
        .agg(F.collect_list("columns").alias("fields"))
    
      # Create aspects
      entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
      df = df.withColumn("schema",
                        F.create_map(F.lit(schema_key),
                                      F.named_struct(
                                          F.lit("aspect_type"),
                                          F.lit(schema_key),
                                          F.lit("data"),
                                          F.create_map(F.lit("fields"),
                                                      F.col("fields")))
                                      )
                        )\
        .withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \
      .drop("fields")
    
      # Merge separate aspect columns into the one map
      df = df.select(F.col("TABLE_NAME"),
                    F.map_concat("schema", "entry_aspect").alias("aspects"))
    
      # Fill general information and hierarchy names
      create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type,
                                                      db_schema, x),
                              StringType())
    
      create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type,
                                                    db_schema, x), StringType())
    
      parent_name = nb.create_parent_name(entry_type, db_schema)
      full_entry_type = entry_type.value.format(project=config["project"],
                                                location=config["location"])
    
      column = F.col("TABLE_NAME")
      df = df.withColumn("name", create_name_udf(column)) \
         .withColumn("fully_qualified_name", create_fqn_udf(column)) \
         .withColumn("entry_type", F.lit(full_entry_type)) \
         .withColumn("parent_entry", F.lit(parent_name)) \
         .withColumn("entry_source", create_entry_source(column)) \
      .drop(column)
    
      df = convert_to_import_items(df, [schema_key, entry_aspect_name])
      return df
    

    Ten en cuenta que, incluso en una vista, la columna se llama TABLE_NAME.

  5. Agrega el siguiente código que genera el archivo de importación de metadatos y ejecuta el conector.

    def process_raw_dataset(df: pyspark.sql.dataframe.DataFrame,
                            config: Dict[str, str],
                            schema_name: str,
                            entry_type: EntryType):
        """Builds dataset and converts it to jsonl."""
        df = entry_builder.build_dataset(config, df, schema_name, entry_type)
        return df.toJSON().collect()
    
    def run():
        """Runs a pipeline."""
        config = cmd_reader.read_args()
        connector = OracleConnector(config)
    
        top_entry_builder = TopEntryBuilder(config)
        with open(FILENAME, "w", encoding="utf-8") as file:
            # Write top entries
            # ...
    
            # Get schemas
            df_raw_schemas = connector.get_db_schemas()
            schemas = [schema.USERNAME for schema in
                      df_raw_schemas.select('USERNAME').collect()]
            schemas_json_strings = entry_builder.build_schemas(config,
                                                              df_raw_schemas) \
                .toJSON() \
                .collect()
            write_jsonl(file, schemas_json_strings)
    
            for schema_name in schemas:
                print(f"Processing tables for {schema_name}")
                df_raw_tables = connector.get_table_columns(schema_name)
                tables_json_strings = process_raw_dataset(df_raw_tables,
                                                          config,
                                                          schema_name,
                                                          EntryType.TABLE)
                write_jsonl(file, tables_json_strings)
                print(f"Processing views for {schema_name}")
                df_raw_views = connector.get_view_columns(schema_name)
                views_json_strings = process_raw_dataset(df_raw_views,
                                                        config,
                                                        schema_name,
                                                        EntryType.VIEW)
                write_jsonl(file, views_json_strings)
    
        gcs_uploader.upload(config, FILENAME)
    

    En este ejemplo, se guarda el archivo de importación de metadatos como un solo archivo de líneas JSON. Tú puedes usar herramientas de PySpark como la clase DataFrameWriter para generar lotes de JSON en paralelo

    El conector puede escribir entradas en el archivo de importación de metadatos en cualquier orden.

  6. Agrega el siguiente código que sube el archivo de importación de metadatos a un bucket de Cloud Storage.

    def upload(config: Dict[str, str], filename: str):
        """Uploads a file to a bucket."""
        client = storage.Client()
        bucket = client.get_bucket(config["bucket"])
        folder = config["folder"]
    
        blob = bucket.blob(f"{folder}/{filename}")
        blob.upload_from_filename(filename)
    
  7. Compila la imagen del conector.

    Si tu conector contiene varios archivos o si deseas no incluidas en la imagen predeterminada de Docker, debes usar un contenedor personalizado. Dataproc Serverless para Spark ejecuta cargas de trabajo dentro de contenedores de Docker. Crea una imagen de Docker personalizada del conector y almacénala en Artifact Registry. Dataproc Serverless lee la imagen de Artifact Registry.

    1. Crea un Dockerfile:

      FROM debian:11-slim
      
      ENV DEBIAN_FRONTEND=noninteractive
      
      RUN apt update && apt install -y procps tini
      RUN apt install -y wget
      
      ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
      RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
      COPY ojdbc11.jar "${SPARK_EXTRA_JARS_DIR}"
      
      ENV CONDA_HOME=/opt/miniconda3
      ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
      ENV PATH=${CONDA_HOME}/bin:${PATH}
      RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py310_23.3.1-0-Linux-x86_64.sh
      
      RUN bash Miniconda3-py310_23.3.1-0-Linux-x86_64.sh -b -p /opt/miniconda3 \
        && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
        && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
        && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
        && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
      
      RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
          && ${CONDA_HOME}/bin/mamba install \
            conda \
            google-cloud-dataproc \
            google-cloud-logging \
            google-cloud-monitoring \
            google-cloud-storage
      
      RUN apt update && apt install -y git
      COPY requirements.txt .
      RUN python -m pip install -r requirements.txt
      
      ENV PYTHONPATH=/opt/python/packages
      RUN mkdir -p "${PYTHONPATH}/src/"
      COPY src/ "${PYTHONPATH}/src/"
      COPY main.py .
      
      RUN groupadd -g 1099 spark
      RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
      USER spark
      

      Usa Conda como tu administrador de paquetes. Dataproc sin servidores para Spark activa pyspark en el contenedor en el entorno de ejecución, por lo que no necesita instalar dependencias de PySpark en su imagen de contenedor personalizada.

    2. Compila la imagen del contenedor personalizado y envíala a Artifact Registry.

      #!/bin/bash
      
      IMAGE=oracle-pyspark:0.0.1
      PROJECT=example-project
      
      REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pyspark
      
      docker build -t "${IMAGE}" .
      
      # Tag and push to Artifact Registry
      gcloud config set project ${PROJECT}
      gcloud auth configure-docker us-central1-docker.pkg.dev
      docker tag "${IMAGE}" "${REPO_IMAGE}"
      docker push "${REPO_IMAGE}"
      

      Debido a que una imagen puede tener varios nombres, puedes usar la etiqueta de Docker para un alias a la imagen.

  8. Ejecuta el conector en Dataproc Serverless. Para enviar un trabajo por lotes de PySpark con la imagen del contenedor personalizada, ejecuta el Comando gcloud dataproc batches submit pyspark

    gcloud dataproc batches submit pyspark main.py --project=PROJECT \
        --region=REGION --batch=BATCH_ID \
        --container-image=CUSTOM_CONTAINER_IMAGE \
        --service-account=SERVICE_ACCOUNT_NAME \
        --jars=PATH_TO_JAR_FILES \
        --properties=PYSPARK_PROPERTIES \
        -- PIPELINE_ARGUMENTS
    

    Ten en cuenta lo siguiente:

    • Los archivos JAR son controladores de Spark. Para leer desde Oracle, MySQL o Postgres, debes proporcionarle a Apache Spark un paquete específico. El paquete pueden ubicarse en Cloud Storage o dentro del contenedor. Si el archivo JAR está dentro del contenedor, la ruta de acceso es similar a file:///path/to/file/driver.jar. En este ejemplo, la ruta a la El archivo JAR es /opt/spark/jars/.
    • PIPELINE_ARGUMENTS son los argumentos de la línea de comandos. para el conector.

    El conector extrae metadatos de la base de datos de Oracle, genera un archivo de importación de metadatos y lo guarda en un bucket de Cloud Storage.

  9. Para importar manualmente los metadatos en el archivo de importación de metadatos Dataplex, ejecutarás un trabajo de metadatos. Usa el método metadataJobs.create.

    1. En la línea de comandos, agrega variables de entorno y crea un alias para el comando curl.

      PROJECT_ID=PROJECT
      LOCATION_ID=LOCATION
      DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID
      alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
      
    2. Llama al método de la API y pasa los tipos de entrada y los tipos de aspecto que que quieres importar.

      gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF
      {
        "type": "IMPORT",
        "import_spec": {
          "source_storage_uri": "gs://BUCKET/FOLDER/",
          "entry_sync_mode": "FULL",
          "aspect_sync_mode": "INCREMENTAL",
          "scope": {
            "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"],
            "entry_types": [
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"],
      
            "aspect_types": [
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance",
              "projects/dataplex-types/locations/global/aspectTypes/schema",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"],
            },
          },
        }
      EOF
      )"
      

      El tipo de aspecto schema es un tipo de aspecto global definido por Dataplex

      Ten en cuenta que el formato que usas para los nombres de los tipos de aspectos cuando llamas al método de la API es diferente del formato que usas en el código del conector.

    3. Usa Cloud Logging para ver los registros del trabajo de metadatos (opcional). Para más información, consulta Supervisa los registros de Dataplex.

Configura la organización de canalizaciones

En las secciones anteriores, se mostró cómo compilar un conector de ejemplo y ejecutar la de forma manual.

En un entorno de producción, debes ejecutar el conector como parte de un de conectividad privada a través de una plataforma de organización, como Workflows

  1. Para ejecutar una canalización de conectividad administrada con el conector de ejemplo, sigue las pasos para importar metadatos con Workflows. Haz lo siguiente:

    • Crea el flujo de trabajo en la misma ubicación de Google Cloud que el conector.
    • En el archivo de definición del flujo de trabajo, actualiza submit_pyspark_extract_job. función con el siguiente código para extraer datos de la base de datos de Oracle con el conector que creaste.

      - submit_pyspark_extract_job:
          call: http.post
          args:
            url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
            auth:
              type: OAuth2
              scopes: "https://www.googleapis.com/auth/cloud-platform"
            headers:
              Content-Type: "application/json"
            query:
              batchId: ${WORKFLOW_ID}
            body:
              pysparkBatch:
                mainPythonFileUri: file:///main.py
                jars: file:///opt/spark/jars/ojdbc11.jar
                args:
                  - ${"--host_port=" + args.ORACLE_HOST_PORT}
                  - ${"--user=" + args.ORACLE_USER}
                  - ${"--password=" + args.ORACLE_PASSWORD}
                  - ${"--database=" + args.ORACE_DATABASE}
                  - ${"--project=" + args.TARGET_PROJECT_ID}
                  - ${"--location=" + args.CLOUD_REGION}
                  - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID}
                  - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                  - ${"--folder=" + WORKFLOW_ID}
              runtimeConfig:
                version: "2.0"
                containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark"
              environmentConfig:
                executionConfig:
                    serviceAccount: ${args.SERVICE_ACCOUNT}
          result: RESPONSE_MESSAGE
      
    • En el archivo de definición del flujo de trabajo, actualiza la función submit_import_job. con el siguiente código para importar las entradas. La función llama a Método de API metadataJobs.create para ejecutar un trabajo de importación de metadatos.

      - submit_import_job:
          call: http.post
          args:
            url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
            auth:
              type: OAuth2
              scopes: "https://www.googleapis.com/auth/cloud-platform"
            body:
              type: IMPORT
              import_spec:
                source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                entry_sync_mode: FULL
                aspect_sync_mode: INCREMENTAL
                scope:
                  entry_groups:
                    - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}
                  entry_types:
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"
                  aspect_types:
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance"
                    -"projects/dataplex-types/locations/global/aspectTypes/schema"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"
          result: IMPORT_JOB_RESPONSE
      

      Proporciona los mismos tipos de entrada y de aspecto que incluiste cuando llamaste al método de la API de forma manual. Ten en cuenta que no hay una coma al final de cada cadena.

    • Cuando ejecutes el flujo de trabajo, proporciona los siguientes argumentos del entorno de ejecución:

      {
        "CLOUD_REGION": "us-central1",
        "ORACLE_USER": "system",
        "ORACLE_HOST_PORT": "x.x.x.x:1521",
        "ORACLE_DATABASE": "xe",
        "ADDITIONAL_CONNECTOR_ARGS": [],
      }
      
  2. Opcional: Usa Cloud Logging para ver los registros de la conectividad administrada en una canalización de integración continua. La carga útil del registro incluye un vínculo a los registros de El trabajo por lotes Dataproc Serverless y el trabajo de importación de metadatos. según corresponda. Para obtener más información, consulta Ver registros de flujo de trabajo.

  3. Opcional: Para mejorar la seguridad, el rendimiento y la funcionalidad de tu canalización de conectividad administrada, considera hacer lo siguiente:

    1. Usa Secret Manager para almacenar las credenciales de tu fuente de datos de terceros.
    2. Usa PySpark para escribir el resultado de JSON Lines en varios archivos de importación de metadatos en paralelo.
    3. Usa un prefijo para dividir los archivos grandes (de más de 100 MB) en archivos más pequeños archivos.
    4. Agrega más aspectos personalizados que capten más aspectos técnicos y comerciales metadatos de tu fuente.

Ejemplo de recursos de Dataplex Catalog para una fuente de Oracle

El conector de ejemplo extrae metadatos de una base de datos de Oracle y los asigna a los recursos de Dataplex Catalog correspondientes.

Consideraciones de jerarquía

Cada sistema en Dataplex tiene una entrada raíz que es la superior. de entrada para el sistema. Por lo general, la entrada raíz tiene un tipo de entrada instance. En la siguiente tabla, se muestra la jerarquía de ejemplo de los tipos de entrada y tipos de aspecto para un sistema de Oracle.

ID del tipo de entrada Descripción ID del tipo de aspecto vinculado
oracle-instance La raíz del sistema importado. oracle-instance
oracle-database La base de datos de Oracle oracle-database
oracle-schema Esquema de la base de datos oracle-schema
oracle-table Una tabla.

oracle-table

schema

oracle-view Una vista

oracle-view

schema

El tipo de aspecto schema es un tipo de aspecto global que define Dataplex. Contiene una descripción de los campos de una tabla, vista u otra entidad que tenga columnas. El tipo de aspecto personalizado oracle-schema contiene el nombre del esquema de la base de datos de Oracle.

Ejemplo de campos de elementos de importación

El conector debe usar las siguientes convenciones para los recursos de Oracle.

  • Nombres completamente calificados: Nombres completamente calificados para Oracle utiliza la siguiente plantilla de nombres. Los caracteres prohibidos se escapan con acentos graves.

    Recurso Plantilla Ejemplo
    Instancia

    SOURCE:ADDRESS

    Usa el host y el número de puerto o el nombre de dominio del sistema.

    oracle:`localhost:1521` o oracle:`myinstance.com`
    Base de datos SOURCE:ADDRESS.DATABASE oracle:`localhost:1521`.xe
    Esquema SOURCE:ADDRESS.DATABASE.SCHEMA oracle:`localhost:1521`.xe.sys
    Tabla SOURCE:ADDRESS.DATABASE.SCHEMA.TABLE_NAME oracle:`localhost:1521`.xe.sys.orders
    Ver SOURCE:ADDRESS.DATABASE.SCHEMA.VIEW_NAME oracle:`localhost:1521`.xe.sys.orders_view
  • Nombres o IDs de entrada: entradas para los recursos de Oracle usa la siguiente plantilla de nombres. Los caracteres prohibidos se reemplazan por un el carácter permitido. Los recursos usan el prefijo projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries.

    Recurso Plantilla Ejemplo
    Instancia PREFIX/HOST_PORT projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
    Base de datos PREFIX/HOST_PORT/databases/DATABASE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
    Esquema PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
    Tabla PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
    Ver PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEW projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
  • Entradas superiores: Si una entrada no es una entrada raíz para el sistema, puede tener un campo de entrada superior que describe su posición en la jerarquía. El campo debe contener el nombre de la entrada superior. Te recomendamos que generes este valor.

    En la siguiente tabla, se muestran las entradas principales de los recursos de Oracle.

    Entrada Entrada principal
    Instancia "" (string vacía)
    Base de datos Nombre de la instancia
    Esquema Nombre de la base de datos
    Tabla Nombre del esquema
    Ver Nombre del esquema
  • Mapa de aspectos: El mapa de aspectos debe contener al menos un aspecto que describa la entidad que se importará. Este es un ejemplo de un mapa de aspecto Tabla de Oracle.

    "example-project.us-central1.oracle-table": {
        "aspect_type": "example-project.us-central1.oracle-table",
        "path": "",
        "data": {}
     },

    Puedes encontrar tipos de aspectos predefinidos (como schema) que definen la estructura de tablas o vistas en el archivo dataplex-types en la ubicación global.

  • Claves de aspecto: Las claves de aspecto usan el formato de nombres PROJECT.LOCATION.ASPECT_TYPE. En la siguiente tabla, se muestran ejemplos de claves de aspecto para recursos de Oracle.

    Entrada Ejemplo de clave de aspecto
    Instancia example-project.us-central1.oracle-instance
    Base de datos example-project.us-central1.oracle-database
    Esquema example-project.us-central1.oracle-schema
    Tabla example-project.us-central1.oracle-table
    Ver example-project.us-central1.oracle-view

¿Qué sigue?