Running an automated Dataflow pipeline to de-identify a PII dataset

This tutorial shows you how to run an automated Dataflow pipeline to de-identify the sample dataset used in the Creating DLP de-identification transformation templates for PII datasets tutorial. The sample dataset contains large-scale personally identifiable information (PII).

This document is part of a series:

This tutorial assumes that you are familiar with shell scripting and Dataflow pipelines.

Reference architecture

This tutorial demonstrates the data de-identification pipeline that is illustrated in the following diagram.

Architecture of de-identification pipeline.

The data de-identification streaming pipeline de-identifies sensitive data in text content using Dataflow. You can reuse the pipeline for multiple transformations and use cases.

Objectives

  • Trigger and monitor the Dataflow pipeline to de-identify a sample dataset.
  • Understand the code behind the pipeline.

Costs

This tutorial uses the following billable components of Google Cloud:

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

When you finish this tutorial, you can avoid continued billing by deleting the resources you created. For more information, see Cleaning up.

Before you begin

  • Complete part 2 of the series.

Reviewing the pipeline parameters

This tutorial uses a Dataflow pipeline developed using the Apache Beam Java SDK. To repeatedly solve common data-related tasks at scale, Dataflow provides a framework known as Google-provided templates. If you use these templates, you don't need to write or maintain any pipeline code. In this tutorial, you trigger an automated data masking/tokenization using Cloud DLP from Cloud Storage to BigQuery pipeline with the following parameters.

Pipeline parameters Value Notes
numWorkers 5 Set up by default
maxNumWorkers 10
machineType n1-standard-4
pollingInterval 30 seconds
windowInterval 30 seconds
inputFilePattern gs://${DATA_STORAGE_BUCKET}/CCRecords_*.csv Created during part 2 of the tutorial.
deidentifyTemplateName ${DEID_TEMPLATE_NAME}
inspectTemplateName ${INSPECT_TEMPLATE_NAME}
datasetName deid_dataset
batchSize 500 For a total number of 100k records with batch size of 500, there are 200 API calls in parallel. By default the batch size is set to 100.
dlpProjectName ${PROJECT_ID} For this tutorial, your default Google Cloud project is used.
jobId my-deid-job The Dataflow's job ID

Running the pipeline

  1. In Cloud Shell, set up application default credentials.

    gcloud auth activate-service-account \
        ${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com \
        --key-file=service-account-key.json --project=${PROJECT_ID}
    export GOOGLE_APPLICATION_CREDENTIALS=service-account-key.json
    
  2. Running the pipeline:

    export JOB_ID=my-deid-job
    gcloud dataflow jobs run ${JOB_ID}  \
        --gcs-location gs://dataflow-templates/latest/Stream_DLP_GCS_Text_to_BigQuery \
        --region ${REGION} \
        --parameters \
        "inputFilePattern=gs://${DATA_STORAGE_BUCKET}/CCRecords_1564602825.csv,dlpProjectId=${PROJECT_ID},deidentifyTemplateName=${DEID_TEMPLATE_NAME},inspectTemplateName=${INSPECT_TEMPLATE_NAME},datasetName=deid_dataset,batchSize=500"
    
  3. To monitor the pipeline, in the Google Cloud Console, go to the Dataflow page.

    Go to Dataflow

  4. Click on the job's ID (my-deid-job). You see the job's graph:

    First half of the job details workflow.

    Second half of the job details workflow

  5. To validate the amount of data processed by the pipeline, click Process Tokenized Data.

    Summary of input and output collections.

    The number of elements added for the DLP-Tokenization transformation is 200 and the number of elements added for the Process Tokenized Data transformation is 100,000.

  6. To validate the total number of records inserted in the BigQuery tables, click Write To BQ.

    Summary of total number of records inserted.

    The number of elements added for the Process Tokenized Data transformation is 100,000.

Handling exceptions in the pipeline

The DLP API has a default limit of 600 API calls/minute. The pipeline processes the request in parallel based on the batch size that you specify.

The pipeline is configured to have a maximum of ten n1-standard-4 workers. If you need to process a large dataset faster than the default configuration of the Google-provided templates, you can customize the pipeline to update the number of workers and the machine type. If you increase the number of workers, you might need to increase your default quota of the number of vCPUs, the in-use IP addresses, and the SSDs for the Cloud project.

Reviewing the pipeline code

The full pipeline code is in the GitHub repository.

  • This pipeline uses a built-in Beam File IO transform to poll for new files that are configured for the automated version of the pipeline every 30 seconds. The pipeline continuously looks for new files until it's stopped or terminated.

    .apply(
        "Poll Input Files",
        FileIO.match()
            .filepattern(options.getInputFilePattern())
            .continuously(DEFAULT_POLL_INTERVAL, Watch.Growth.never()))
    .apply("Find Pattern Match", FileIO.readMatches().withCompression(Compression.AUTO))
  • Readable files can contain the same header. Instead of creating the header for every element, the pipeline uses a design pattern known as side-input. The header can be created only once for the window and passed as an input to other transformations.

    final PCollectionView<List<KV<String, List<String>>>> headerMap =
        csvFiles
    
            // 2) Create a side input for the window containing list of headers par file.
            .apply(
                "Create Header Map",
                ParDo.of(
                    new DoFn<KV<String, Iterable<ReadableFile>>, KV<String, List<String>>>() {
    
                      @ProcessElement
                      public void processElement(ProcessContext c) {
                        String fileKey = c.element().getKey();
                        c.element()
                            .getValue()
                            .forEach(
                                file -> {
                                  try (BufferedReader br = getReader(file)) {
                                    c.output(KV.of(fileKey, getFileHeaders(br)));
    
                                  } catch (IOException e) {
                                    LOG.error("Failed to Read File {}", e.getMessage());
                                    throw new RuntimeException(e);
                                  }
                                });
                      }
                    }))
            .apply("View As List", View.asList());
  • Cloud DLP has a maximum payload size of 512 KB per API request and 600 concurrent API calls/minute. To manage this limitation, the pipeline uses the user-defined batch size as a parameter. For example, the sample dataset has 500k rows. A batch size of 1000 means 500 DLP API calls in parallel with an assumption that each request doesn't exceed the maximum payload size. A lower batch size contributes to a higher number of API calls. This batch size might cause a quota resource exception. If you need to increase the quota limit, see Quota increases.

    /**
     * The {@link CSVReader} class uses experimental Split DoFn to split each csv file contents in
     * chunks and process it in non-monolithic fashion. For example: if a CSV file has 100 rows and
     * batch size is set to 15, then initial restrictions for the SDF will be 1 to 7 and split
     * restriction will be {{1-2},{2-3}..{7-8}} for parallel executions.
     */
    static class CSVReader extends DoFn<KV<String, ReadableFile>, KV<String, Table>> {
    
      private ValueProvider<Integer> batchSize;
      private PCollectionView<List<KV<String, List<String>>>> headerMap;
      /** This counter is used to track number of lines processed against batch size. */
      private Integer lineCount;
    
      List<String> csvHeaders;
    
      public CSVReader(
          ValueProvider<Integer> batchSize,
          PCollectionView<List<KV<String, List<String>>>> headerMap) {
        lineCount = 1;
        this.batchSize = batchSize;
        this.headerMap = headerMap;
        this.csvHeaders = new ArrayList<>();
      }
    
      @ProcessElement
      public void processElement(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker)
          throws IOException {
        for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
          String fileKey = c.element().getKey();
          try (BufferedReader br = getReader(c.element().getValue())) {
    
            csvHeaders = getHeaders(c.sideInput(headerMap), fileKey);
            if (csvHeaders != null) {
              List<FieldId> dlpTableHeaders =
                  csvHeaders.stream()
                      .map(header -> FieldId.newBuilder().setName(header).build())
                      .collect(Collectors.toList());
              List<Table.Row> rows = new ArrayList<>();
              Table dlpTable = null;
              /** finding out EOL for this restriction so that we know the SOL */
              int endOfLine = (int) (i * batchSize.get().intValue());
              int startOfLine = (endOfLine - batchSize.get().intValue());
              /** skipping all the rows that's not part of this restriction */
              br.readLine();
              Iterator<CSVRecord> csvRows =
                  CSVFormat.DEFAULT.withSkipHeaderRecord().parse(br).iterator();
              for (int line = 0; line < startOfLine; line++) {
                if (csvRows.hasNext()) {
                  csvRows.next();
                }
              }
              /** looping through buffered reader and creating DLP Table Rows equals to batch */
              while (csvRows.hasNext() && lineCount <= batchSize.get()) {
    
                CSVRecord csvRow = csvRows.next();
                rows.add(convertCsvRowToTableRow(csvRow));
                lineCount += 1;
              }
              /** creating DLP table and output for next transformation */
              dlpTable = Table.newBuilder().addAllHeaders(dlpTableHeaders).addAllRows(rows).build();
              c.output(KV.of(fileKey, dlpTable));
    
              LOG.debug(
                  "Current Restriction From: {}, Current Restriction To: {},"
                      + " StartofLine: {}, End Of Line {}, BatchData {}",
                  tracker.currentRestriction().getFrom(),
                  tracker.currentRestriction().getTo(),
                  startOfLine,
                  endOfLine,
                  dlpTable.getRowsCount());
    
            } else {
    
              throw new RuntimeException("Header Values Can't be found For file Key " + fileKey);
            }
          }
        }
      }
    
      /**
       * SDF needs to define a @GetInitialRestriction method that can create a restriction describing
       * the complete work for a given element. For our case this would be the total number of rows
       * for each CSV file. We will calculate the number of split required based on total number of
       * rows and batch size provided.
       *
       * @throws IOException
       */
      //
      @GetInitialRestriction
      public OffsetRange getInitialRestriction(@Element KV<String, ReadableFile> csvFile)
          throws IOException {
    
        int rowCount = 0;
        int totalSplit = 0;
        try (BufferedReader br = getReader(csvFile.getValue())) {
          /** assume first row is header */
          int checkRowCount = (int) br.lines().count() - 1;
          rowCount = (checkRowCount < 1) ? 1 : checkRowCount;
          totalSplit = rowCount / batchSize.get().intValue();
          int remaining = rowCount % batchSize.get().intValue();
          /**
           * Adjusting the total number of split based on remaining rows. For example: batch size of
           * 15 for 100 rows will have total 7 splits. As it's a range last split will have offset
           * range {7,8}
           */
          if (remaining > 0) {
            totalSplit = totalSplit + 2;
    
          } else {
            totalSplit = totalSplit + 1;
          }
        }
    
        LOG.debug("Initial Restriction range from 1 to: {}", totalSplit);
        return new OffsetRange(1, totalSplit);
      }
    
      /**
       * SDF needs to define a @SplitRestriction method that can split the intital restricton to a
       * number of smaller restrictions. For example: a intital rewstriction of (x, N) as input and
       * produces pairs (x, 0), (x, 1), …, (x, N-1) as output.
       */
      @SplitRestriction
      public void splitRestriction(
          @Element KV<String, ReadableFile> csvFile,
          @Restriction OffsetRange range,
          OutputReceiver<OffsetRange> out) {
        /** split the initial restriction by 1 */
        for (final OffsetRange p : range.split(1, 1)) {
          out.output(p);
        }
      }
    
      @NewTracker
      public OffsetRangeTracker newTracker(@Restriction OffsetRange range) {
        return new OffsetRangeTracker(new OffsetRange(range.getFrom(), range.getTo()));
      }
  • The pipeline uses a built-in BigQuery IO transform connector to write to BigQuery. This connector automatically creates a BigQuery table and schema by using the dynamic destination feature. To achieve low latency, the pipeline also uses BigQuery streaming inserts.

    /**
     * The {@link BQDestination} class creates BigQuery table destination and table schema based on
     * the CSV file processed in earlier transformations. Table id is same as filename Table schema is
     * same as file header columns.
     */
    public static class BQDestination
        extends DynamicDestinations<KV<String, TableRow>, KV<String, TableRow>> {
    
      private ValueProvider<String> datasetName;
      private ValueProvider<String> projectId;
    
      public BQDestination(ValueProvider<String> datasetName, ValueProvider<String> projectId) {
        this.datasetName = datasetName;
        this.projectId = projectId;
      }
    
      @Override
      public KV<String, TableRow> getDestination(ValueInSingleWindow<KV<String, TableRow>> element) {
        String key = element.getValue().getKey();
        String tableName = String.format("%s:%s.%s", projectId.get(), datasetName.get(), key);
        // Strip the file name to only the letters and numbers so that it is a valid BQ table id.
        tableName = tableName.replaceAll("[^a-zA-Z0-9]", "");
        LOG.debug("Table Name {}", tableName);
        return KV.of(tableName, element.getValue().getValue());
      }
    
      @Override
      public TableDestination getTable(KV<String, TableRow> destination) {
        TableDestination dest =
            new TableDestination(destination.getKey(), "pii-tokenized output data from dataflow");
        LOG.debug("Table Destination {}", dest.getTableSpec());
        return dest;
      }
    
      @Override
      public TableSchema getSchema(KV<String, TableRow> destination) {
        TableRow bqRow = destination.getValue();
        TableSchema schema = new TableSchema();
        List<TableFieldSchema> fields = new ArrayList<TableFieldSchema>();
        List<TableCell> cells = bqRow.getF();
        for (int i = 0; i < cells.size(); i++) {
          Map<String, Object> object = cells.get(i);
          String header = object.keySet().iterator().next();
          /** currently all BQ data types are set to String */
          fields.add(new TableFieldSchema().setName(checkHeaderName(header)).setType("STRING"));
        }
    
        schema.setFields(fields);
        return schema;
      }
    }

You have successfully completed this tutorial. You've triggered an automated de-identification pipeline using Cloud DLP templates to process a large dataset. In the next tutorial, you validate the de-identified dataset in BigQuery and re-identify the data using another Dataflow pipeline.

Cleaning up

If you don't intend to continue with the tutorials in the series, the easiest way to eliminate billing is to delete the Cloud project you created for the tutorial. Alternatively, you can delete the individual resources.

Delete the project

  1. In the Cloud Console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

What's next