API Workflow - Develop a Flow

Feature Availability: This feature is available in Cloud Dataprep Premium by TRIFACTA® INC.

Overview

This example walks through the process of creating, identifying, and executing a job through automated methods. For this example, these tasks are accomplished using the following methods:

  1. Locate or create flow. The datasets that you wrangle must be contained within a flow. You can add them to an existing flow or create a new one through the APIs.
  2. Create dataset. Through the APIs, you create an imported dataset from an asset that is accessible through one of the established connections. Then, you create the recipe object through the API.
    1. For the recipe, you must retrieve the internal identifier.
    2. Through the application, you modify the recipe for the dataset.
  3. Automate job execution. Using the APIs, you can automate execution of the wrangling of the dataset.
    1. As needed, this job can be re-executed on a periodic basis or whenever the source files are updated.

Example Datasets

In this example, you are attempting to wrangle monthly point of sale (POS) data from three separate regions into a single dataset for the state. This monthly data must be enhanced with information about the products and stores in the state. So, the example has a combination of transactional and reference data, which must be brought together into a single dataset.

Tip: To facilitate re-execution of this job each month, the transactional data should be stored in a dedicated directory. This directory can be overwritten with next month's data using the same filenames. As long as the new files are structured in an identical manner to the original ones, the new month's data can be processed by re-running the API aspects of this workflow.


Example Files:

The following files are stored on your HDFS deployment:

Path and FilenameDescription
hdfs:///user/pos/POS-r01.txtPoint of sale transactions for Region 1.
hdfs:///user/pos/POS-r02.txtPoint of sale transactions for Region 2.
hdfs:///user/pos/POS-r03.txtPoint of sale transactions for Region 3.
hdfs:///user/ref/REF_PROD.txtReference data on products for the state.
hdfs:///user/ref/REF_CAL.txtReference data on stores in the state.

NOTE: The reference and transactional data are stored in separate directories. In this case, you can assume that the user has read access through his Cloud Dataprep account to these directories, although this access must be enabled and configured for real use cases.

Base URL:

For purposes of this example, the base URL for the Cloud Dataprep platform is the following:

http://www.example.com:3005

Step - Create Containing Flow

To begin, you must locate a flow or create a flow through the APIs to contain the datasets that you are importing.

NOTE: You cannot add datasets to the flow through the flows endpoint. Moving pre-existing datasets into a flow is not supported in this release. Create or locate the flow first and then when you create the datasets, associate them with the flow at the time of creation.

Locate:

NOTE: If you know the display name value for the flow and are confident that it is not shared with any other flows, you can use the APIs to retrieve the flowId. See https://clouddataprep.com/documentation/api/#operation/listFlows

  1. Login through the application.
  2. In the Flows page, select or create the flow to contain the above datasets.
  3. In the Flow Details page for that flow, locate the flow identifier in the URL:

    Flow Details URLhttp://www.example.com:3005/flows/10
    Flow Id10
  4. Retain this identifier for later use.

Create:

  1. Through the APIs, you can create a flow using the following call:

    Endpointhttp://www.example.com:3005/v4/flows
    AuthenticationRequired
    MethodPOST
    Request Body
    {
      "name": "Point of Sale - 2013",
      "description": "Point of Sale data for state"
    }
  2. The response should be status code 201 - Created with a response body like the following:

    {
      "id": 10,
      "updatedAt": "2017-02-17T17:08:57.848Z",
      "createdAt": "2017-02-17T17:08:57.848Z",
      "name": "Point of Sale - 2013",
      "description": "Point of Sale data for state",
      "creator": {
        "id": 1
      },
      "updater": {
        "id": 1
      },
      "workspace": {
        "id": 1
      }
    }
  3. Retain the flow identifier (10) for later use.

For more information, see https://clouddataprep.com/documentation/api/#operation/createFlow

Checkpoint: You have identified or created the flow to contain your dataset or datasets.

Step - Create Datasets

To create datasets from the above sources, you must:

  1. Create an imported dataset for each file.
  2. For each imported dataset, create a recipe, which can be used to transform the imported dataset.

The following steps describe how to complete these actions via API for a single file.

Steps:

  1. To create an imported dataset, you must acquire the following information about the source. In the above example, the source is the POS-r01.txt file.

    1. uri
    2. name
    3. description
    4. bucket (if a file stored on S3)
  2. Construct the following request:

    Endpointhttp://www.example.com:3005/v4/importedDatasets
    AuthenticationRequired
    MethodPOST
    Request Body
    {
      "uri": "hdfs:///user/pos/POS-r01.txt",
      "name": "POS-r01.txt",
      "description": "POS-r01.txt"
    }
  3. You should receive a 201 - Created response with a response body similar to the following:

    {
      "id": 8,
      "size": "281032",
      "uri": "hdfs:///user/pos/POS-r01.txt",
      "dynamicPath": null,
      "bucket": null,
      "isSchematized": true,
      "isDynamic": false,
      "disableTypeInference": false,
      "updatedAt": "2017-02-08T18:38:56.640Z",
      "createdAt": "2017-02-08T18:38:56.560Z",
      "parsingScriptId": {
        "id": 14
      },
       "runParameters": {
            "data": []
        },
        "name": "POS-r01.txt",
        "description": "POS-r01.txt",
        "creator": {
            "id": 1
        },
        "updater": {
            "id": 1
        },
        "connection": null
    }
  4. You must retain the id value so you can reference it when you create the recipe.

  5. See https://clouddataprep.com/documentation/api/#operation/createImportedDataset

  6. Next, you create the recipe. Construct the following request:

    Endpointhttp://www.example.com:3005/v4/wrangledDatasets
    AuthenticationRequired
    MethodPOST
    Request Body
    { "name":"POS-r01",
      "importedDataset": {
        "id":8
      },
      "flow": {
        "id":10
      }
    }
  7. You should receive a 201 - Created response with a response body similar to the following:

    {
        "id": 23,
        "wrangled": true,
        "updatedAt": "2018-02-06T19:59:22.735Z",
        "createdAt": "2018-02-06T19:59:22.698Z",
        "name": "POS-r01",
        "active": true,
        "referenceInfo": null,
        "activeSample": {
            "id": 23
        },
        "creator": {
            "id": 1
        },
        "updater": {
            "id": 1
        },
        "recipe": {
            "id": 23
        },
        "flow": {
            "id": 10
        }
    }
  8. From the recipe, you must retain the value for the id. For more information, see https://clouddataprep.com/documentation/api/#operation/createWrangledDataset

  9. Repeat the above steps for each of the source files that you are adding to your flow.

Checkpoint: You have created a flow with multiple imported datasets and recipes.

Step - Wrangle Data

After you have created the flow with all of your source datasets, you can wrangle the base dataset to integrate all of the source into it.

Steps for Transactional data:

  1. Open the POS-r01 dataset. It's loaded in the Transformer page.
  2. To chain together the other transactional data into this dataset, you use a union transform. In the Search panel, enter union in the textbox and press ENTER.
  3. In the Union page:
    1. Click Add datasets.
    2. Select the other two transactional datasets: POS-r02 and POS-r03.

      NOTE: When you join or union one dataset into another, changes made in the joined dataset are automatically propagated to the dataset where it has been joined.

    3. Add the datasets and align by name.
    4. Check the dataset names and fields. If all looks well, click Add to Recipe.

Steps for reference data:

In the columns Store_Nbr and Item_Nbr are unique keys into the REF_CAL and REF_PROD datasets, respectively. Using the Join window, you can pull in the other fields from these reference datasets based on these unique keys.

  1. Open the POS-r01 dataset.
  2. In Search panel, enter join datasets for the transform. The Join window opens.
  3. Select the RED_PROD dataset. Click Accept. Click Next.
  4. Review the two keys to verify that they are the proper columns on which to structure the join. Click Next.
  5. Click the All tab. Select all fields to add. Click Review.
  6. After reviewing your join, click Add to Recipe.
  7. For each Item_Nbr value that has a matching ITEM_NBR value in the reference dataset, all of the other reference fields are pulled into the POS-r01 dataset.

You can repeat the above general process to integrate the reference data for stores.

Checkpoint: You have created a flow with multiple datasets and have integrated all of the relevant data into a single dataset.

Step - Create Output Objects

Before you run a job, you must define output objects, which specify the following:

  • Running environment where the job is executed
  • Profiling on or off
  • outputObjects have the following objects associated with them:
    • writeSettings: These objects define the file-based outputs that are produced for the output object
    • publications: These objects define the database target, table, and other settings for publication to a relational datastore.

NOTE: You can continue with this workflow without creating outputObjects yet. In this workflow, overrides are applied during the job definition, so you don't have to create the outputObjects and writeSettings at this time.

Step - Run Job

Through the APIs, you can specify and run a job. In the above example, you must run the job for the terminal dataset, which is POS-r01 in this case. This dataset contains references to all of the other datasets. When the job is run, the recipes for the other datasets are also applied to the terminal dataset, which ensures that the output reflects the proper integration of these other datasets into POS-r01.

NOTE: In the following example, writeSettings have been specified as overrides in the job definition. These overrides are applied for this job run only. If you need to re-run the job with these settings, you must either 1) re-apply the overrides or 2) create the writeSettings objects.


Steps:

  1. Acquire the internal identifier for the recipe for which you wish to execute a job. In the previous example, this identifier was 23.
  2. Endpointhttp://www.example.com:3005/v4/jobGroups
    AuthenticationRequired
    MethodPOST

    Request Body:

    {
      "wrangledDataset": {
        "id": 23
      },
      "overrides": {
        "execution": "photon",
        "profiler": true,
        "writesettings": [
          {
            "path": "hdfs://hadoop:50070/trifacta/queryResults/admin@example.com/POS-r01.csv",
            "action": "create",
            "format": "csv",
            "compression": "none",
            "header": false,
            "asSingleFile": false
          }
        ]
      },
      "ranfrom": null
    }
    
  3. In the above example, the specified job has been launched for recipe 23 to execute on the Google Photon running environment with profiling enabled.
    1. Output format is CSV to the designated path. For more information on these properties, see https://clouddataprep.com/documentation/api/#operation/runJobGroup

    2. Output is written as a new file with no overwriting of previous files.
  4. A response code of 201 - Created is returned. The response body should look like the following:

    {
        "sessionId": "79276c31-c58c-4e79-ae5e-fed1a25ebca1",
        "reason": "JobStarted",
        "jobGraph": {
            "vertices": [
                21,
                22
            ],
            "edges": [
                {
                    "source": 21,
                    "target": 22
                }
            ]
        },
        "id": 3,
        "jobs": {
            "data": [
                {
                    "id": 21
                },
                {
                    "id": 22
                }
            ]
        }
    }
  5. Retain the id value, which is the job identifier, for monitoring.

Step - Monitoring Your Job

You can monitor the status of your job through the following endpoint:

Endpointhttp://www.example.com:3005/v4/jobGroups/<id>/status
AuthenticationRequired
MethodGET
Request BodyNone.

When the job has successfully completed, the returned status message is the following:

"Complete"

For more information, see https://clouddataprep.com/documentation/api/#operation/runJobGroup

Step - Re-run Job

In the future, you can re-run the job exactly as you specified it by executing the following call:

Tip: You can swap imported datasets before re-running the job. For example, if you have uploaded a new file, you can change the primary input dataset for the dataset and then use the following API call to re-run the job as specified. See https://clouddataprep.com/documentation/api/#operation/updateInputDataset

Endpointhttp://www.example.com:3005/v4/jobGroups
AuthenticationRequired
MethodPOST
Request Body
{
  "wrangledDataset": {
    "id": 23
  },
  "overrides": {
    "execution": "photon",
    "profiler": true,
    "writesettings": [
      {
        "path": "hdfs://hadoop:50070/trifacta/queryResults/admin@example.com/POS-r01.csv",
        "action": "create",
        "format": "csv",
        "compression": "none",
        "header": false,
        "asSingleFile": false
      }
    ]
  },
  "ranfrom": null
}

The job is re-run as it was previously specified.