运行自动 Dataflow 流水线以对个人身份信息数据集进行去标识化

本教程介绍如何运行自动 Dataflow 流水线,以对为个人身份信息数据集创建 DLP 去标识化转换模板教程中使用的示例数据集进行去标识化。该示例数据集包含大规模的个人身份信息 (PII)

本文档是以下系列文章中的一篇:

本教程假定您熟悉 Shell 脚本和 Dataflow 流水线。

参考架构

本教程演示了数据去标识化流水线,如下图所示。

去标识化流水线的架构。

数据去标识化流式处理流水线使用 Dataflow 对文本内容中的敏感数据进行去标识化。您可以将该流水线重复用于多个转换和用例。

目标

  • 触发和监控用于对示例数据集进行去标识化的 Dataflow 流水线。
  • 了解流水线采用的代码。

费用

本教程使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用量来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

准备工作

查看流水线参数

本教程使用通过 Apache Beam Java SDK 开发的 Dataflow 流水线。为了不断大规模地解决常见的数据相关任务,Dataflow 提供了一个框架,称为 Google 提供的模板如果您使用这些模板,则无需编写或维护任何流水线代码。在本教程中,您需要使用以下参数触发自动的 data masking/tokenization using Cloud DLP from Cloud Storage to BigQuery 流水线。

流水线参数 备注
numWorkers 5 默认设置
maxNumWorkers 10
machineType n1-standard-4
pollingInterval 30 秒
windowInterval 30 秒
inputFilePattern gs://${DATA_STORAGE_BUCKET}/CCRecords_*.csv 在本教程的第 2 部分中创建。
deidentifyTemplateName ${DEID_TEMPLATE_NAME}
inspectTemplateName ${INSPECT_TEMPLATE_NAME}
datasetName deid_dataset
batchSize 500 对于批次大小为 500、总数为 10 万的记录,有 200 个并行 API 调用。默认情况下,批次大小设置为 100
dlpProjectName ${PROJECT_ID} 本教程使用您的默认 Google Cloud 项目。
jobId my-deid-job Dataflow 的作业 ID

运行流水线

  1. 在 Cloud Shell 中,设置应用默认凭据。

    gcloud auth activate-service-account \
        ${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com \
        --key-file=service-account-key.json --project=${PROJECT_ID}
    export GOOGLE_APPLICATION_CREDENTIALS=service-account-key.json
    
  2. 运行流水线:

    export JOB_ID=my-deid-job
    gcloud dataflow jobs run ${JOB_ID}  \
        --gcs-location gs://dataflow-templates/latest/Stream_DLP_GCS_Text_to_BigQuery \
        --region ${REGION}
        --parameters \
        "inputFilePattern=gs://${DATA_STORAGE_BUCKET}/CCRecords_1564602825.csv,dlpProjectId=${PROJECT_ID},deidentifyTemplateName=${DEID_TEMPLATE_NAME},inspectTemplateName=${INSPECT_TEMPLATE_NAME},datasetName=deid_dataset,batchSize=500"
    
  3. 要监控流水线,请在 Google Cloud Console 中转到 Dataflow 页面。

    转到 Dataflow

  4. 点击作业 ID (my-deid-job),您会看到作业图:

    作业详情工作流的前半部分。

    作业详情工作流的后半部分

  5. 要验证流水线处理的数据量,请点击处理经过标记化的数据 (Process Tokenized Data)。

    输入和输出集合的摘要。

    为 DLP-Tokenization 转换添加的元素数量为 200,为 Process Tokenized Data 转换添加的元素数量为 100000。

  6. 要验证在 BigQuery 表中插入的记录总数,请点击 Write To BQ

    插入的记录总数摘要。

    为 Process Tokenized Data 转换添加的元素数量为 100000。

处理流水线中的异常

DLP API 的默认上限为每分钟 600 次 API 调用。 流水线根据您指定的批次大小并行处理请求。

流水线配置为最多具有十个 n1-standard-4 工作器。 如果您需要比 Google 提供的模板的默认配置更快地处理大型数据集,可以自定义流水线以更新工作器数量和机器类型。如果增加工作器数量,则可能需要增加 vCPU、正在使用的 IP 地址以及 Cloud 项目的 SSD 的数量的默认配额

查看流水线代码

完整的流水线代码位于 GitHub 代码库中。

  • 此流水线使用内置的 Beam File IO 转换,每 30 秒轮询一次为该流水线的自动化版本配置的新文件。在停止或终止该流水线之前,该流水线会不断查找新文件。

    .apply(
        "Poll Input Files",
        FileIO.match()
            .filepattern(options.getInputFilePattern())
            .continuously(DEFAULT_POLL_INTERVAL, Watch.Growth.never()))
    .apply("Find Pattern Match", FileIO.readMatches().withCompression(Compression.AUTO))
  • 可读文件可以包含相同的标头。流水线使用称为侧边输入的设计模式,而不是为每个元素都创建标头。只能为窗口创建一次标头,标头作为输入传递到其他转换。

    final PCollectionView<List<KV<String, List<String>>>> headerMap =
        csvFiles
    
            // 2) Create a side input for the window containing list of headers par file.
            .apply(
                "Create Header Map",
                ParDo.of(
                    new DoFn<KV<String, Iterable<ReadableFile>>, KV<String, List<String>>>() {
    
                      @ProcessElement
                      public void processElement(ProcessContext c) {
                        String fileKey = c.element().getKey();
                        c.element()
                            .getValue()
                            .forEach(
                                file -> {
                                  try (BufferedReader br = getReader(file)) {
                                    c.output(KV.of(fileKey, getFileHeaders(br)));
    
                                  } catch (IOException e) {
                                    LOG.error("Failed to Read File {}", e.getMessage());
                                    throw new RuntimeException(e);
                                  }
                                });
                      }
                    }))
            .apply("View As List", View.asList());
  • Cloud DLP 的载荷大小上限为:每个 API 请求 512 KB每分钟 600 次并行 API 调用。为了管理此限制,流水线使用用户定义的批次大小作为参数。例如,示例数据集有 50 万行。批次大小为 1000 表示,假设每个请求不超过载荷大小上限,并行进行 500 次 DLP API 调用。批次大小越小,API 调用次数就越多。此批次大小可能会导致 quota resource exception。如果您需要提高配额限制,请参阅增加配额

    /**
     * The {@link CSVReader} class uses experimental Split DoFn to split each csv file contents in
     * chunks and process it in non-monolithic fashion. For example: if a CSV file has 100 rows and
     * batch size is set to 15, then initial restrictions for the SDF will be 1 to 7 and split
     * restriction will be {{1-2},{2-3}..{7-8}} for parallel executions.
     */
    static class CSVReader extends DoFn<KV<String, ReadableFile>, KV<String, Table>> {
    
      private ValueProvider<Integer> batchSize;
      private PCollectionView<List<KV<String, List<String>>>> headerMap;
      /** This counter is used to track number of lines processed against batch size. */
      private Integer lineCount;
    
      List<String> csvHeaders;
    
      public CSVReader(
          ValueProvider<Integer> batchSize,
          PCollectionView<List<KV<String, List<String>>>> headerMap) {
        lineCount = 1;
        this.batchSize = batchSize;
        this.headerMap = headerMap;
        this.csvHeaders = new ArrayList<>();
      }
    
      @ProcessElement
      public void processElement(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker)
          throws IOException {
        for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
          String fileKey = c.element().getKey();
          try (BufferedReader br = getReader(c.element().getValue())) {
    
            csvHeaders = getHeaders(c.sideInput(headerMap), fileKey);
            if (csvHeaders != null) {
              List<FieldId> dlpTableHeaders =
                  csvHeaders.stream()
                      .map(header -> FieldId.newBuilder().setName(header).build())
                      .collect(Collectors.toList());
              List<Table.Row> rows = new ArrayList<>();
              Table dlpTable = null;
              /** finding out EOL for this restriction so that we know the SOL */
              int endOfLine = (int) (i * batchSize.get().intValue());
              int startOfLine = (endOfLine - batchSize.get().intValue());
              /** skipping all the rows that's not part of this restriction */
              br.readLine();
              Iterator<CSVRecord> csvRows =
                  CSVFormat.DEFAULT.withSkipHeaderRecord().parse(br).iterator();
              for (int line = 0; line < startOfLine; line++) {
                if (csvRows.hasNext()) {
                  csvRows.next();
                }
              }
              /** looping through buffered reader and creating DLP Table Rows equals to batch */
              while (csvRows.hasNext() && lineCount <= batchSize.get()) {
    
                CSVRecord csvRow = csvRows.next();
                rows.add(convertCsvRowToTableRow(csvRow));
                lineCount += 1;
              }
              /** creating DLP table and output for next transformation */
              dlpTable = Table.newBuilder().addAllHeaders(dlpTableHeaders).addAllRows(rows).build();
              c.output(KV.of(fileKey, dlpTable));
    
              LOG.debug(
                  "Current Restriction From: {}, Current Restriction To: {},"
                      + " StartofLine: {}, End Of Line {}, BatchData {}",
                  tracker.currentRestriction().getFrom(),
                  tracker.currentRestriction().getTo(),
                  startOfLine,
                  endOfLine,
                  dlpTable.getRowsCount());
    
            } else {
    
              throw new RuntimeException("Header Values Can't be found For file Key " + fileKey);
            }
          }
        }
      }
    
      /**
       * SDF needs to define a @GetInitialRestriction method that can create a restriction describing
       * the complete work for a given element. For our case this would be the total number of rows
       * for each CSV file. We will calculate the number of split required based on total number of
       * rows and batch size provided.
       *
       * @throws IOException
       */
      //
      @GetInitialRestriction
      public OffsetRange getInitialRestriction(@Element KV<String, ReadableFile> csvFile)
          throws IOException {
    
        int rowCount = 0;
        int totalSplit = 0;
        try (BufferedReader br = getReader(csvFile.getValue())) {
          /** assume first row is header */
          int checkRowCount = (int) br.lines().count() - 1;
          rowCount = (checkRowCount < 1) ? 1 : checkRowCount;
          totalSplit = rowCount / batchSize.get().intValue();
          int remaining = rowCount % batchSize.get().intValue();
          /**
           * Adjusting the total number of split based on remaining rows. For example: batch size of
           * 15 for 100 rows will have total 7 splits. As it's a range last split will have offset
           * range {7,8}
           */
          if (remaining > 0) {
            totalSplit = totalSplit + 2;
    
          } else {
            totalSplit = totalSplit + 1;
          }
        }
    
        LOG.debug("Initial Restriction range from 1 to: {}", totalSplit);
        return new OffsetRange(1, totalSplit);
      }
    
      /**
       * SDF needs to define a @SplitRestriction method that can split the intital restricton to a
       * number of smaller restrictions. For example: a intital rewstriction of (x, N) as input and
       * produces pairs (x, 0), (x, 1), …, (x, N-1) as output.
       */
      @SplitRestriction
      public void splitRestriction(
          @Element KV<String, ReadableFile> csvFile,
          @Restriction OffsetRange range,
          OutputReceiver<OffsetRange> out) {
        /** split the initial restriction by 1 */
        for (final OffsetRange p : range.split(1, 1)) {
          out.output(p);
        }
      }
    
      @NewTracker
      public OffsetRangeTracker newTracker(@Restriction OffsetRange range) {
        return new OffsetRangeTracker(new OffsetRange(range.getFrom(), range.getTo()));
      }
  • 流水线使用内置的 BigQuery IO 转换连接器写入 BigQuery。此连接器使用动态目标功能自动创建 BigQuery 表和架构。为了实现低延迟,流水线还使用了 BigQuery 流式插入

    /**
     * The {@link BQDestination} class creates BigQuery table destination and table schema based on
     * the CSV file processed in earlier transformations. Table id is same as filename Table schema is
     * same as file header columns.
     */
    public static class BQDestination
        extends DynamicDestinations<KV<String, TableRow>, KV<String, TableRow>> {
    
      private ValueProvider<String> datasetName;
      private ValueProvider<String> projectId;
    
      public BQDestination(ValueProvider<String> datasetName, ValueProvider<String> projectId) {
        this.datasetName = datasetName;
        this.projectId = projectId;
      }
    
      @Override
      public KV<String, TableRow> getDestination(ValueInSingleWindow<KV<String, TableRow>> element) {
        String key = element.getValue().getKey();
        String tableName = String.format("%s:%s.%s", projectId.get(), datasetName.get(), key);
        // Strip the file name to only the letters and numbers so that it is a valid BQ table id.
        tableName = tableName.replaceAll("[^a-zA-Z0-9]", "");
        LOG.debug("Table Name {}", tableName);
        return KV.of(tableName, element.getValue().getValue());
      }
    
      @Override
      public TableDestination getTable(KV<String, TableRow> destination) {
        TableDestination dest =
            new TableDestination(destination.getKey(), "pii-tokenized output data from dataflow");
        LOG.debug("Table Destination {}", dest.getTableSpec());
        return dest;
      }
    
      @Override
      public TableSchema getSchema(KV<String, TableRow> destination) {
        TableRow bqRow = destination.getValue();
        TableSchema schema = new TableSchema();
        List<TableFieldSchema> fields = new ArrayList<TableFieldSchema>();
        List<TableCell> cells = bqRow.getF();
        for (int i = 0; i < cells.size(); i++) {
          Map<String, Object> object = cells.get(i);
          String header = object.keySet().iterator().next();
          /** currently all BQ data types are set to String */
          fields.add(new TableFieldSchema().setName(checkHeaderName(header)).setType("STRING"));
        }
    
        schema.setFields(fields);
        return schema;
      }
    }

您已成功完成本教程。您已触发使用 Cloud DLP 模板的自动去标识化流水线来处理大型数据集。在下一个教程中,您将验证 BigQuery 中的去标识化数据集,并使用其他 Dataflow 流水线重标识数据

清理

如果您不打算继续学习本系列教程,那么避免产生费用的最简单方法是删除您为本教程创建的 Cloud 项目。或者,您也可以删除各个资源。

删除项目

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

后续步骤