CDAP reference

When using Cloud Data Fusion, you use both the Cloud Console and the Cloud Data Fusion UI. You use the Cloud Console to create a Cloud Data Fusion instance. You then use the Cloud Data Fusion UI to create and manage your pipelines.

Alternatively, you can use command-line tools to create and manage your Cloud Data Fusion instances and pipelines.

  • The REST reference describes the API for creating and managing your Cloud Data Fusion instances on Google Cloud.
  • This page describes the REST API for creating and managing pipelines and datasets. Throughout this page, there are links to the CDAP documentation site, where you can find more detailed information.

Before you begin

Before you use the REST API, download the Cloud SDK and set up environment variables for your gcloud command-line tool access credentials and CDAP API endpoint.

Download and log in to the Cloud SDK

  1. Install and initialize the Cloud SDK.

  2. Log in to the Cloud SDK, using the gcloud command-line tool. Run:

    $ gcloud auth login
    

Set up environment variables

Set up environment variables for your gcloud command-line tool access credentials and the CDAP apiEndpoint of your Cloud Data Fusion instance.

export AUTH_TOKEN=$(gcloud auth print-access-token)
export CDAP_ENDPOINT=apiEndpoint

The CDAP_ENDPOINT can be obtained using either the gcloud command-line tool or the Cloud Data Fusion REST API. Make sure to use the value of the apiEndpoint, not the serviceEndpoint field. It has the format [hostname]/api

  • To set the CDAP_ENDPOINT to the apiEndpoint, you can use the gcloud beta data-fusion command:

    1. In a local terminal window or in Cloud Shell, specify your-instance-name, then run the following gcloud commands to set the CDAP_ENDPOINT environment variable to your instance's apiEndpoint.

      export INSTANCE_ID=your-instance-name
      
      export CDAP_ENDPOINT=$(gcloud beta data-fusion instances describe \
          --location=us-central1 \
          --format="value(apiEndpoint)" \
        ${INSTANCE_ID})
      
  • As an alternative, you can get the apiEndpoint by calling the Cloud Data Fusion REST API, then manually set the CDAP_ENDPOINT.

    1. Use the Try this API panel to submit an instances.get request:
      1. Fill in the name request parameter. Provide your project-id, instance location (the region, for example, "us-central1"), and instance-name in the following format:
        projects/project-id/locations/location/instances/instance-name
      2. Click "EXECUTE" to submit the request.
      3. Expand the panel to view apiEndpoint listed in the bHTTP response section.
      4. Run the following command (insert the value of your apiEndpoint).
        export CDAP_ENDPOINT=your-apiEndpoint
        

Create a pipeline

To create a Cloud Data Fusion pipeline, submit the following HTTP PUT request.

PUT -H "Authorization: Bearer ${AUTH_TOKEN}" "${CDAP_ENDPOINT}/v3/namespaces/namespace-id/apps/pipeline-name"
Parameter Description
namespace-id If your pipeline belongs to a Basic edition instance, the namespace ID is always default. If your pipeline belongs to an Enterprise edition instance, you can create a namespace.
pipeline-name Your pipeline name

Learn more about creating a pipeline on the CDAP documentation site.

The body of the HTTP PUT request is a JSON object in the following format:

{
  "name": "MyPipeline",
  "artifact": {
    "name": "cdap-data-pipeline",
    "version": "6.0.0",
    "scope": "system"
  },
  "config": {
    . . .
    "connections": [ . . . ],
    "engine": "mapreduce",
    "postActions": [ . . . ],
    "stages": [ . . . ],
    "schedule": "0 * * * *",
  },
  "__ui__": {
    . . .
  }
}

Learn more about pipeline configuration file format and see an example on the CDAP documentation site.

Start a batch pipeline

To start a batch pipeline, submit the following HTTP POST request.

POST -H "Authorization: Bearer ${AUTH_TOKEN}" "${CDAP_ENDPOINT}/v3/namespaces/namespace-id/apps/pipeline-name/workflows/DataPipelineWorkflow/start"
Parameter Description / value
namespace-id If your pipeline belongs to a Basic edition instance, the namespace ID is always default. If your pipeline belongs to an Enterprise edition instance, you can create a namespace.
pipeline-name Your pipeline name

Learn more about starting a pipeline on the CDAP documentation site.

Start a real-time pipeline

To start a real-time pipeline, submit the following HTTP POST request.

POST -H "Authorization: Bearer ${AUTH_TOKEN}" "${CDAP_ENDPOINT}/v3/namespaces/namespace-id/apps/pipeline-name/workflows/DataPipelineWorkflow/start"
Parameter Description / value
namespace-id If your pipeline belongs to a Basic edition instance, the namespace ID is always default. If your pipeline belongs to an Enterprise edition instance, you can create a namespace.
pipeline-name Your pipeline name

Learn more about starting a pipeline on the CDAP documentation site.

Stop a batch pipeline

To stop a batch pipeline, submit the following HTTP POST request.

POST -H "Authorization: Bearer ${AUTH_TOKEN}" "${CDAP_ENDPOINT}/v3/namespaces/namespace-id/apps/pipeline-name/workflows/DataPipelineWorkflow/stop"
Parameter Description / value
namespace-id If your pipeline belongs to a Basic edition instance, the namespace ID is always default. If your pipeline belongs to an Enterprise edition instance, you can create a namespace.
pipeline-name Your pipeline name

Learn more about stopping a pipeline on the CDAP documentation site.

Stop a real-time pipeline

To stop a real-time pipeline, submit the following HTTP POST request.

POST -H "Authorization: Bearer ${AUTH_TOKEN}" "${CDAP_ENDPOINT}/v3/namespaces/namespace-id/apps/pipeline-name/workflows/DataPipelineWorkflow/stop"
Parameter Description / value
namespace-id If your pipeline belongs to a Basic edition instance, the namespace ID is always default. If your pipeline belongs to an Enterprise edition instance, you can create a namespace.
pipeline-name Your pipeline name

Learn more about stopping a pipeline on the CDAP documentation site.

Schedule a batch pipeline

Note: Scheduling is available only for batch pipelines.

By default, scheduling is disabled. To enable scheduling for your pipeline, submit the following HTTP POST request.

POST -H "Authorization: Bearer ${AUTH_TOKEN}" "${CDAP_ENDPOINT}/v3/namespaces/namespace-id/apps/pipeline-name/schedules/dataPipelineSchedule/enable"
Parameter Description / value
namespace-id If your pipeline belongs to a Basic edition instance, the namespace ID is always default. If your pipeline belongs to an Enterprise edition instance, you can create a namespace.
pipeline-name Your pipeline name

Learn more about scheduling a pipeline on the CDAP documentation site.

Run history for a batch pipeline

To see the runs of a batch pipeline, submit the following HTTP GET request.

GET -H "Authorization: Bearer ${AUTH_TOKEN}" "${CDAP_ENDPOINT}/v3/namespaces/namespace-id/apps/pipeline-name/workflows/DataPipelineWorkflow/runs"
Parameter Description / value
namespace-id If your pipeline belongs to a Basic edition instance, the namespace ID is always default. If your pipeline belongs to an Enterprise edition instance, you can create a namespace.
pipeline-name Your pipeline name

Learn more about the run history of a pipeline on the CDAP documentation site.

Run history for a real-time pipeline

To see the runs of a real-time pipeline, submit the following HTTP GET request.

GET -H "Authorization: Bearer ${AUTH_TOKEN}" "${CDAP_ENDPOINT}/v3/namespaces/namespace-id/apps/pipeline-name/workflows/DataPipelineWorkflow/runs"
Parameter Description / value
namespace-id If your pipeline belongs to a Basic edition instance, the namespace ID is always default. If your pipeline belongs to an Enterprise edition instance, you can create a namespace.
pipeline-name Your pipeline name

Learn more about the run history of a pipeline on the CDAP documentation site.

Logs for a batch pipeline

You can view the logs of a pipeline or of a specific pipeline run.

  • To view the logs of a batch pipeline, submit the following HTTP GET request.

    GET -H "Authorization: Bearer ${AUTH_TOKEN}" "${CDAP_ENDPOINT}/v3/namespaces/namespace-id/apps/pipeline-name/workflows/DataPipelineWorkflow/logs?start=start-ts&stop=stop-ts
    
  • To view logs of a specific run of a batch pipeline, submit the following HTTP GET request.

    GET -H "Authorization: Bearer ${AUTH_TOKEN}" "${CDAP_ENDPOINT}/v3/namespaces/namespace-id/apps/pipeline-name/workflows/DataPipelineWorkflow/runs/run-id/logs?start=start-ts&stop=stop-ts"
    
Parameter Description / value
namespace-id If your pipeline belongs to a Basic edition instance, the namespace ID is always default. If your pipeline belongs to an Enterprise edition instance, you can create a namespace.
pipeline-name Your pipeline name
run-id Relevant only if you want to view logs of a specific pipeline run. To find the run ID, call run history, which returns a list of run IDs.

Learn more about the viewing logs on the CDAP documentation site.

Logs for a real-time pipeline

You can view the logs of a pipeline or of a specific pipeline run.

  • To view the logs of a real-time pipeline, submit the following HTTP GET request.

    GET -H "Authorization: Bearer ${AUTH_TOKEN}" "${CDAP_ENDPOINT}/v3/namespaces/namespace-id/apps/pipeline-name/spark/DataStreamsSparkStreaming/logs?start=start-ts&stop=stop-ts"
    
  • To view logs of a specific run of a real-time pipeline, submit the following HTTP GET request.

    GET -H "Authorization: Bearer ${AUTH_TOKEN}" "${CDAP_ENDPOINT}/v3/namespaces/namespace-id/apps/pipeline-name/spark/DataStreamsSparkStreaming/runs/run-id/logs?start=start-ts&stop=stop-ts"
    
Parameter Description / value
namespace-id If your pipeline belongs to a Basic edition instance, the namespace ID is always default. If your pipeline belongs to an Enterprise edition instance, you can create a namespace.
pipeline-name Your pipeline name
run-id Relevant only if you want to view logs of a specific pipeline run. To find the run ID, call run history, which returns a list of run IDs.

Learn more about the viewing logs on the CDAP documentation site.

Metrics for a batch pipeline

To view specific metrics for a batch pipeline, submit the following HTTP POST request.

POST -H "Authorization: Bearer ${AUTH_TOKEN}" "${CDAP_ENDPOINT}/v3/metrics/query"

The body of the HTTP POST request is a JSON object in the following format:

{
  "query": {
    "tags": {
      "namespace": "default",
      "app": "pipeline name",
      "workflow": "DataPipelineWorkflow",
      "run": "run-id"
    },
    "metrics": [
      "metric1 name",
      "metric2 name",
      ...
    ],
    "timeRange": {
      "aggregate": true
    }
  }
}
Query parameter Description / value
pipeline name Pipeline name
run-id To find the run ID, call run history, which returns a list of run IDs.
metric name Metric names follow the format:
user.pipeline-stage.metric
  • pipeline-stage is any of the stage names in the body of the HTTP PUT request you configured when you created your pipeline. In the following example, BigQuery or GoogleCloudStorage are possible values for pipeline-stage.
    {
      "stages": [
        {
          "name": "BigQuery",
          ...
        },
        {
          "name": "GoogleCloudStorage",
          ...
        },
        ...
      ],
      ...
    }         
  • metrics can be any of:
    • records.in
    • records.out
    • records.error
    • process.time.total
    • process.time.avg
    • process.time.max
    • process.time.min
    • process.time.stddev

For example, the following query gets the records.out and process.time.avg metrics for the BigQuery stage of the batch pipeline, batch-pipeline.

{
  "query": {
    "tags": {
      "namespace": "default",
      "app": "batch-pipeline",
      "workflow": "DataPipelineWorkflow",
      "run": "81e3d583-f68b-11e9-aba0-0242b9f29569"
    },
    "metrics": [
      "user.BigQuery.records.out",
      "user.BigQuery.process.time.avg"
    ],
    "timeRange": {
      "aggregate": true
    }
  }
}

Learn more about the metrics on the CDAP documentation site.

Metrics for a real-time pipeline

To view specific metrics for a real-time pipeline, submit the following HTTP POST request.

POST -H "Authorization: Bearer ${AUTH_TOKEN}" "${CDAP_ENDPOINT}/v3/metrics/query"

The body of the HTTP POST request is a JSON object in the following format:

{
  "query": {
    "tags": {
      "namespace": "default",
      "app": "pipeline name",
      "spark": "DataStreamsSparkStreaming",
      "run": "run-id"
    },
    "metrics": [
      "metric1 name",
      "metric2 name",
      ...
    ],
    "timeRange": {
      "aggregate": true
    }
  }
}
Query parameter Description / value
pipeline name Pipeline name
run-id To find the run ID, call run history, which returns a list of run IDs.
metric name Metric names follow the format:
user.pipeline-stage.metric
  • pipeline-stage is any of the stage names in the body of the HTTP PUT request you configured when you created your pipeline. In the following example, BigQuery or GoogleCloudStorage are possible values for pipeline-stage.
    {
     "stages": [
      {
       "name": BigQuery,
       "name": GoogleCloudStorage
      },
      ...
     ],
     ...
    }           
  • metrics can be any of:
    • records.in
    • records.out
    • records.error
    • process.time.total
    • process.time.avg
    • process.time.max
    • process.time.min
    • process.time.stddev

For example, the following query gets the records.out and process.time.avg metrics for the BigQuery stage of the real-time pipeline, rt-pipeline.

{
  "query": {
    "tags": {
      "namespace": "default",
      "app": "rt-pipeline",
      "spark": "DataStreamsSparkStreaming",
      "run": "81e3d583-f68b-11e9-aba0-0242b9f29570"
    },
    "metrics": [
      "user.BigQuery.records.out",
      "user.BigQuery.process.time.avg"
    ],
    "timeRange": {
      "aggregate": true
    }
  }
}

Learn more about the metrics on the CDAP documentation site.

Metadata properties

To view metadata properties for your dataset, submit the following HTTP GET request.

GET -H "Authorization: Bearer ${AUTH_TOKEN}" "${CDAP_ENDPOINT}/v3/namespaces/default/datasets/dataset-id/metadata/properties"
Parameter Description / value
dataset-id To get the dataset ID, submit an HTTP GET request that lists all datasets:
GET -H "Authorization: Bearer $(gcloud auth print-access-token) ${CDAP_ENDPOINT}/v3/namespaces/namespace-id/data/datasets
If your pipeline belongs to a Basic edition instance, the namespace ID is always default. If your pipeline belongs to an Enterprise edition instance, you can create a namespace.

Learn more about the getting metadata properties on the CDAP documentation site.

Metadata tags

To view metadata tags for your dataset, submit the following HTTP GET request.

GET -H "Authorization: Bearer ${AUTH_TOKEN}" "${CDAP_ENDPOINT}/v3/namespaces/default/datasets/dataset-id/metadata/tags"
Parameter Description / value
dataset-id To get the dataset ID, submit an HTTP GET request that lists all datasets: GET -H "Authorization: Bearer $(gcloud auth print-access-token) ${CDAP_ENDPOINT}/v3/namespaces/namespace-id/data/datasets. If your pipeline belongs to a Basic edition instance, the namespace ID is always default. If your pipeline belongs to an Enterprise edition instance, you can create a namespace.

Learn more about the getting metadata tags on the CDAP documentation site.

Dataset lineage

To view the lineage of your dataset, submit the following HTTP GET request.

GET -H "Authorization: Bearer ${AUTH_TOKEN}" "${CDAP_ENDPOINT}/v3/namespaces/namespace-id/datasets/dataset-id/lineage"
Parameter Description / value
namespace-id If your pipeline belongs to a Basic edition instance, the namespace ID is always default. If your pipeline belongs to an Enterprise edition instance, you can create a namespace.
dataset-id To get the dataset ID, submit an HTTP GET request that lists all datasets: GET -H "Authorization: Bearer $(gcloud auth print-access-token) ${CDAP_ENDPOINT}/v3/namespaces/namespace-id/data/datasets. If your pipeline belongs to a Basic edition instance, the namespace ID is always default. If your pipeline belongs to an Enterprise edition instance, you can create a namespace.

Learn more about the viewing dataset lineage on the CDAP documentation site.

Field level lineage

To view the lineage of fields in your dataset in a specified range of time, submit the following HTTP GET request.

GET -H "Authorization: Bearer ${AUTH_TOKEN}" "${CDAP_ENDPOINT}/v3/namespaces/namespace-id/datasets/dataset-id/lineage/fields?start=start-ts&end=end-ts[&prefix=prefix>]"
Parameter Description / value
namespace-id If your pipeline belongs to a Basic edition instance, the namespace ID is always default. If your pipeline belongs to an Enterprise edition instance, you can create a namespace.
dataset-id To get the dataset ID, submit an HTTP GET request that lists all datasets: GET -H "Authorization: Bearer $(gcloud auth print-access-token) ${CDAP_ENDPOINT}/v3/namespaces/namespace-id/data/datasets.

Learn more about the viewing dataset field level lineage on the CDAP documentation site.