This document provides a reference template for you to build a custom connector that extracts metadata from a third-party source. You use the connector when running a managed connectivity pipeline that imports metadata into Dataplex.
You can build connectors to extract metadata from third-party sources. For example, you can build a connector to extract data from sources like MySQL, SQL Server, Oracle, Snowflake, Databricks, and others.
Use the example connector in this document as a starting point to build your own connectors. The example connector connects to an Oracle Database Express Edition (XE) database. The connector is built in Python, though you can also use Java, Scala, or R.
How connectors work
A connector extracts metadata from a third-party data source, transforms the
metadata to Dataplex ImportItem
format, and generates
metadata import files that can be imported by Dataplex.
The connector is a part of a managed connectivity pipeline. A managed connectivity pipeline is an orchestrated workflow that you use to import Dataplex Catalog metadata. The managed connectivity pipeline runs the connector and performs other tasks in the import workflow, such as running a metadata import job and capturing logs.
The managed connectivity pipeline runs the connector by using a Dataproc Serverless batch job. Dataproc Serverless provides a serverless Spark execution environment. Although you can build a connector that doesn't use Spark, we recommend that you use Spark because it can improve the performance of your connector.
Connector requirements
The connector has the following requirements:
- The connector must be an Artifact Registry image that can be run on Dataproc Serverless.
- The connector must generate metadata files in a format that can be imported
by a Dataplex metadata import job (the
metadataJobs.create
API method). For detailed requirements, see Metadata import file. The connector must accept the following command-line arguments to receive information from the pipeline:
Command-line argument Value that pipeline provides target_project_id
PROJECT_ID target_location_id
REGION target_entry_group_id
ENTRY_GROUP_ID output_bucket
CLOUD_STORAGE_BUCKET_ID output_folder
FOLDER_ID The connector uses these arguments to generate metadata in a target entry group
projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID
, and to write to a Cloud Storage bucketgs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID
. Each execution of the pipeline creates a new folder FOLDER_ID in bucket CLOUD_STORAGE_BUCKET_ID. The connector should write metadata import files to this folder.
The pipeline templates support PySpark connectors. The templates assume that the driver
(mainPythonFileUri
)
is a local file on the connector image named main.py
. You can modify the
pipeline templates for other scenarios, such as a Spark connector, a different
driver URI, or other options.
Here's how you use PySpark to create an import item in the metadata import file.
"""PySpark schemas for the data."""
entry_source_schema = StructType([
StructField("display_name", StringType()),
StructField("source", StringType())])
aspect_schema = MapType(StringType(),
StructType([
StructField("aspect_type", StringType()),
StructField("data", StructType([
]))
])
)
entry_schema = StructType([
StructField("name", StringType()),
StructField("entry_type", StringType()),
StructField("fully_qualified_name", StringType()),
StructField("parent_entry", StringType()),
StructField("entry_source", entry_source_schema),
StructField("aspects", aspect_schema)
])
import_item_schema = StructType([
StructField("entry", entry_schema),
StructField("aspect_keys", ArrayType(StringType())),
StructField("update_mask", ArrayType(StringType()))
])
Before you begin
This guide assumes that you're familiar with Python and PySpark.
Review the following information:
- Dataplex Catalog metadata concepts
- Documentation about metadata import jobs
Do the following things. Create all resources in the same Google Cloud location.
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataplex, Dataproc, Workflows, and Artifact Registry APIs:
gcloud services enable dataplex.googleapis.com
dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com - Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant the
roles/owner
IAM role to the service account:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service account
-
-
Create a Cloud Storage bucket to store the metadata import files.
-
Create the following Dataplex Catalog resources in the same project.
For example values, see the Example Dataplex Catalog resources for an Oracle source section of this document.
- Create an entry group.
-
Create custom aspect types for the entries that you want to import. Use the naming convention
SOURCE
-ENTITY_TO_IMPORT
.Optionally, you can create additional aspect types to store other information.
-
Create custom entry types for the resources that you want to import, and assign the relevant aspect types to them. Use the naming convention
SOURCE
-ENTITY_TO_IMPORT
.For example, for an Oracle database, create an entry type named
oracle-database
. Link it to the aspect type that is namedoracle-database
.
- Ensure that your third-party source is accessible from your Google Cloud project. For more information, see Dataproc Serverless for Spark network configuration.
Create a basic Python connector
The example basic Python connector creates top-level entries for an Oracle data source by using the Dataplex client library classes. Then, you provide the values for the entry fields.
The connector creates a metadata import file with the following entries:
- An
instance
entry, with entry typeprojects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance
. This entry represents an Oracle Database XE system. - A
database
entry, which represents a database inside the Oracle Database XE system.
To build a basic Python connector, do the following:
Clone the
cloud-dataplex
repository.Set up a local environment. We recommend that you use a virtual environment.
mkdir venv python -m venv venv/ source venv/bin/activate
Use the active or maintenance versions of Python. Python versions 3.7 and later are supported.
Create a Python project.
Install requirements:
pip install -r requirements.txt
The following requirements are installed:
Add a
main.py
pipeline file on the root of the project.When deploying your code to Dataproc Serverless, the
main.py
file serves as the entry point for execution. We recommend that you minimize the amount of information that is stored in themain.py
file; use this file to call functions and classes that are defined within your connector, such as thesrc/bootstap.py
class.Create a
src
folder to store the majority of the logic for your connector.Update the
src/cmd_reader.py
file with a Python class to accept command-line arguments. You can use the argeparse module to do this.In production environments, we recommend that you store the password in Secret Manager.
Update the
src/constants.py
file with code to create constants.Update the
src/name_builder.py
file with methods to build the Dataplex Catalog resources that you want the connector to create for your Oracle resources. Use the conventions that are described in the Example Dataplex Catalog resources for an Oracle source section of this document.Because the
name_builder.py
file is used for both the Python core code and the PySpark core code, we recommend that you write the methods as pure functions, instead of as members of a class.Update the
src/top_entry_builder.py
file with code to fill the top-level entries with data.Update the
src/bootstrap.py
file with code to generate the metadata import file and run the connector.Run the code locally.
A metadata import file named
output.jsonl
is returned. The file has two lines, each representing an import item. The managed connectivity pipeline reads this file when running the metadata import job.Optional: Extend the previous example to use the Dataplex client library classes to create import items for tables, schemas, and views. You can also run the Python example on Dataproc Serverless.
We recommend that you create a connector that uses Spark (and runs on Dataproc Serverless), because it can improve the performance of your connector.
Create a PySpark connector
This example is based on the PySpark DataFrame API. You can install PySpark SQL and run it locally before running on Dataproc Serverless. If you install and run PySpark locally, install the PySpark library by using pip, but you don't need to install a local Spark cluster.
For performance reasons, this example doesn't use predefined classes from the PySpark library. Instead, the example creates DataFrames, converts the DataFrames into JSON entries, and then writes the output into a metadata import file in JSON Lines format that can be imported into Dataplex.
To build a connector using PySpark, do the following:
Clone the
cloud-dataplex
repository.Install PySpark:
pip install pyspark
Install requirements:
pip install -r requirements.txt
The following requirements are installed:
Update the
oracle_connector.py
file with code to read data from an Oracle data source and return DataFrames.Add SQL queries to return the metadata that you want to import. The queries need to return the following information:
- Database schemas
- Tables that belong to these schemas
- Columns that belong to these tables, including the column name, column data type, and whether the column is nullable or required
All of the columns of all the tables and views are stored in the same system table. You can select columns with the
_get_columns
method. Depending on the parameters that you provide, you can select columns for the tables or for the views separately.Note the following:
- In Oracle, a database schema is owned by a database user and has the same name as that user.
- Schema objects are logical structures that are created by users. Objects such as tables or indexes can hold data, and objects like views or synonyms consist of only a definition.
- The
ojdbc11.jar
file contains the Oracle JDBC driver.
Update the
src/entry_builder.py
file with shared methods for applying Spark transformations.Note the following:
- The methods build the Dataplex Catalog resources that the connector creates for your Oracle resources. Use the conventions that are described in the Example Dataplex Catalog resources for an Oracle source section of this document.
- The
convert_to_import_items
method applies to schemas, tables, and views. Ensure that the output of the connector is one or more import items that can be processed by themetadataJobs.create
method, not individual entries. - Even in a view, the column is called
TABLE_NAME
.
Update the
bootstrap.py
file with code to generate the metadata import file and run the connector.This example saves the metadata import file as a single JSON Lines file. You can use PySpark tools like the
DataFrameWriter
class to output batches of JSON in parallel.The connector can write entries to the metadata import file in any order.
Update the
gcs_uploader.py
file with code to upload the metadata import file to a Cloud Storage bucket.Build the connector image.
If your connector contains multiple files, or if you want to use libraries that aren't included in the default Docker image, you must use a custom container. Dataproc Serverless for Spark runs workloads within Docker containers. Create a custom Docker image of the connector and store the image in Artifact Registry. Dataproc Serverless reads the image from Artifact Registry.
Create a Dockerfile:
Use Conda as your package manager. Dataproc Serverless for Spark mounts
pyspark
into the container at runtime, so you don't need to install PySpark dependencies in your custom container image.Build the custom container image and push it to Artifact Registry.
Because one image can have multiple names, you can use the Docker tag to assign an alias to the image.
Run the connector on Dataproc Serverless. To submit a PySpark batch job using the custom container image, run the
gcloud dataproc batches submit pyspark
command.gcloud dataproc batches submit pyspark main.py --project=PROJECT \ --region=REGION --batch=BATCH_ID \ --container-image=CUSTOM_CONTAINER_IMAGE \ --service-account=SERVICE_ACCOUNT_NAME \ --jars=PATH_TO_JAR_FILES \ --properties=PYSPARK_PROPERTIES \ -- PIPELINE_ARGUMENTS
Note the following:
- The JAR files are drivers for Spark. To read from Oracle, MySQL, or
Postgres, you must provide Apache Spark a specific package. The package
can be located in Cloud Storage or inside the container. If the
JAR file is inside the container, the path is similar to
file:///path/to/file/driver.jar
. In this example, the path to the JAR file is/opt/spark/jars/
. - PIPELINE_ARGUMENTS are the command-line arguments for the connector.
The connector extracts metadata from the Oracle database, generates a metadata import file, and saves the metadata import file to a Cloud Storage bucket.
- The JAR files are drivers for Spark. To read from Oracle, MySQL, or
Postgres, you must provide Apache Spark a specific package. The package
can be located in Cloud Storage or inside the container. If the
JAR file is inside the container, the path is similar to
To manually import the metadata in the metadata import file into Dataplex, run a metadata job. Use the
metadataJobs.create
method.In the command line, add environment variables and create an alias for the curl command.
PROJECT_ID=PROJECT LOCATION_ID=LOCATION DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
Call the API method, passing the entry types and aspect types that you want to import.
gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF { "type": "IMPORT", "import_spec": { "source_storage_uri": "gs://BUCKET/FOLDER/", "entry_sync_mode": "FULL", "aspect_sync_mode": "INCREMENTAL", "scope": { "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"], "entry_types": [ "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"], "aspect_types": [ "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance", "projects/dataplex-types/locations/global/aspectTypes/schema", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"], }, }, } EOF )"
The
schema
aspect type is a global aspect type that is defined by Dataplex.Note that the format that you use for aspect type names when calling the API method is different from the format that you use in the connector code.
Optional: Use Cloud Logging to view logs for the metadata job. For more information, see Monitor Dataplex logs.
Set up pipeline orchestration
The previous sections showed how to build an example connector and run the connector manually.
In a production environment, you run the connector as part of a managed connectivity pipeline, by using an orchestration platform like Workflows.
To run a managed connectivity pipeline with the example connector, follow the steps to import metadata using Workflows. Do these things:
- Create the workflow in the same Google Cloud location as the connector.
In the workflow definition file, update the
submit_pyspark_extract_job
function with the following code to extract data from the Oracle database using the connector that you created.- submit_pyspark_extract_job: call: http.post args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" headers: Content-Type: "application/json" query: batchId: ${WORKFLOW_ID} body: pysparkBatch: mainPythonFileUri: file:///main.py jars: file:///opt/spark/jars/ojdbc11.jar args: - ${"--host_port=" + args.ORACLE_HOST_PORT} - ${"--user=" + args.ORACLE_USER} - ${"--password=" + args.ORACLE_PASSWORD} - ${"--database=" + args.ORACE_DATABASE} - ${"--project=" + args.TARGET_PROJECT_ID} - ${"--location=" + args.CLOUD_REGION} - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID} - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID} - ${"--folder=" + WORKFLOW_ID} runtimeConfig: version: "2.0" containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark" environmentConfig: executionConfig: serviceAccount: ${args.SERVICE_ACCOUNT} result: RESPONSE_MESSAGE
In the workflow definition file, update the
submit_import_job
function with the following code to import the entries. The function calls themetadataJobs.create
API method to run a metadata import job.- submit_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" body: type: IMPORT import_spec: source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"} entry_sync_mode: FULL aspect_sync_mode: INCREMENTAL scope: entry_groups: - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID} entry_types: -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view" aspect_types: -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance" -"projects/dataplex-types/locations/global/aspectTypes/schema" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view" result: IMPORT_JOB_RESPONSE
Provide the same entry types and aspect types that you included when you called the API method manually. Note that there isn't a comma at the end of each string.
When you execute the workflow, provide the following runtime arguments:
{ "CLOUD_REGION": "us-central1", "ORACLE_USER": "system", "ORACLE_HOST_PORT": "x.x.x.x:1521", "ORACLE_DATABASE": "xe", "ADDITIONAL_CONNECTOR_ARGS": [], }
Optional: Use Cloud Logging to view logs for the managed connectivity pipeline. The log payload includes a link to the logs for the Dataproc Serverless batch job and the metadata import job, as relevant. For more information, see View workflow logs.
Optional: To improve the security, performance, and functionality of your managed connectivity pipeline, consider doing the following things:
- Use Secret Manager to store the credentials for your third-party data source.
- Use PySpark to write the JSON Lines output into multiple metadata import files in parallel.
- Use a prefix to split big files (more than 100 MB) into smaller files.
- Add more custom aspects that capture additional business and technical metadata from your source.
Example Dataplex Catalog resources for an Oracle source
The example connector extracts metadata from an Oracle database and maps the metadata to corresponding Dataplex Catalog resources.
Hierarchy considerations
Every system in Dataplex has a root entry that is the parent
entry for the system. Usually the root entry has an instance
entry type.
The following table shows the example hierarchy of entry types and aspect types
for an Oracle system.
Entry type ID | Description | Linked aspect type ID |
---|---|---|
oracle-instance |
The root of the imported system. | oracle-instance |
oracle-database |
The Oracle database. | oracle-database |
oracle-schema |
The database schema. | oracle-schema |
oracle-table |
A table. |
|
oracle-view |
A view. |
|
The schema
aspect type is a global aspect type that is defined by
Dataplex. It contains a description of the fields in a table,
view, or other entity that has columns. The oracle-schema
custom aspect type
contains the name of the Oracle database schema.
Example import item fields
The connector should use the following conventions for Oracle resources.
-
Fully qualified names: fully qualified names for Oracle resources use the following naming template. Forbidden characters are escaped with backticks.
Resource Template Example Instance SOURCE
:ADDRESS
Use the host and port number or the domain name of the system.
oracle:`localhost:1521`
ororacle:`myinstance.com`
Database SOURCE
:ADDRESS
.DATABASE
oracle:`localhost:1521`.xe
Schema SOURCE
:ADDRESS
.DATABASE
.SCHEMA
oracle:`localhost:1521`.xe.sys
Table SOURCE
:ADDRESS
.DATABASE
.SCHEMA
.TABLE_NAME
oracle:`localhost:1521`.xe.sys.orders
View SOURCE
:ADDRESS
.DATABASE
.SCHEMA
.VIEW_NAME
oracle:`localhost:1521`.xe.sys.orders_view
-
Entry names or entry IDs: entries for Oracle resources use the following naming template. Forbidden characters are replaced with a permitted character. Resources use the prefix
projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries
.Resource Template Example Instance PREFIX
/HOST_PORT
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
Database PREFIX
/HOST_PORT
/databases/DATABASE
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
Schema PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
Table PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
/tables/TABLE
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
View PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
/views/VIEW
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
-
Parent entries: if an entry isn't a root entry for the system, the entry can have a parent entry field that describes its position in the hierarchy. The field should contain the name of the parent entry. We recommend that you generate this value.
The following table shows the parent entries for Oracle resources.
Entry Parent entry Instance ""
(empty string)Database Instance name Schema Database name Table Schema name View Schema name Aspect map: the aspect map must contain at least one aspect that describes the entity to import. Here's an example aspect map for an Oracle table.
"example-project.us-central1.oracle-table": { "aspect_type": "example-project.us-central1.oracle-table", "path": "", "data": {} },
You can find predefined aspect types (like
schema
) that define the table or view structure in thedataplex-types
project, in theglobal
location.-
Aspect keys: aspect keys use the naming format PROJECT.LOCATION.ASPECT_TYPE. The following table shows example aspect keys for Oracle resources.
Entry Example aspect key Instance example-project.us-central1.oracle-instance
Database example-project.us-central1.oracle-database
Schema example-project.us-central1.oracle-schema
Table example-project.us-central1.oracle-table
View example-project.us-central1.oracle-view