自動化された Dataflow パイプラインの実行による PII データセットの匿名化

このチュートリアルでは、PII データセットの DLP 匿名化変換テンプレートの作成チュートリアルで使用したサンプル データセットを匿名化するために、自動化された Dataflow パイプラインを実行する方法について説明します。サンプル データセットには、大量の個人情報(PII)が含まれています

このドキュメントはシリーズの一部です。

このチュートリアルでは、読者がシェル スクリプトと Dataflow パイプラインに精通していることを前提としています。

リファレンス アーキテクチャ

このチュートリアルでは、次の図に示すデータ匿名化パイプラインについて説明します。

匿名化パイプラインのアーキテクチャ。

データ匿名化ストリーミング パイプラインは、Dataflow を使用してテキスト コンテンツ内の機密データを匿名化します。このパイプラインは、複数の変換やユースケースで再利用できます。

目標

  • Dataflow パイプラインをトリガーしてモニタリングし、サンプル データセットを匿名化します。
  • パイプラインの背後にあるコードを理解します。

費用

このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。新しい Google Cloud ユーザーは無料トライアルをご利用いただけます。

このチュートリアルを終了した後、作成したリソースを削除すると、それ以上の請求は発生しません。詳しくは、クリーンアップをご覧ください。

始める前に

パイプライン パラメータの確認

このチュートリアルでは、Apache Beam Java SDK を使用して開発された Dataflow パイプラインを使用します。大規模なデータ関連の一般的なタスクを繰り返し解決するために、Dataflow には Google 提供のテンプレートというフレームワークが用意されています。これらのテンプレートを使用すると、パイプライン コードの記述や保守を行う必要がなくなります。このチュートリアルでは、次のパラメータで自動化された Cloud DLP を使用した Cloud Storage から BigQuery へのデータ マスキング / トークン化パイプラインをトリガーします。

パイプライン パラメータ
numWorkers 5 デフォルトで設定
maxNumWorkers 10
machineType n1-standard-4
pollingInterval 30 秒
windowInterval 30 秒
inputFilePattern gs://${DATA_STORAGE_BUCKET}/CCRecords_*.csv チュートリアルのパート 2 で作成されます。
deidentifyTemplateName ${DEID_TEMPLATE_NAME}
inspectTemplateName ${INSPECT_TEMPLATE_NAME}
datasetName deid_dataset
batchSize 500 バッチサイズが 500 で合計 10 万レコードの場合、200 の API 呼び出しが同時に実行されます。デフォルトでは、バッチサイズは 100 に設定されています。
dlpProjectName ${PROJECT_ID} このチュートリアルでは、デフォルトの Google Cloud プロジェクトが使用されます。
jobId my-deid-job Dataflow のジョブ ID

パイプラインの実行

  1. Cloud Shell で、アプリケーションのデフォルト認証情報を設定します。

    gcloud auth activate-service-account \
        ${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com \
        --key-file=service-account-key.json --project=${PROJECT_ID}
    export GOOGLE_APPLICATION_CREDENTIALS=service-account-key.json
    
  2. パイプラインの実行:

    export JOB_ID=my-deid-job
    gcloud dataflow jobs run ${JOB_ID}  \
        --gcs-location gs://dataflow-templates/latest/Stream_DLP_GCS_Text_to_BigQuery \
        --region ${REGION}
        --parameters \
        "inputFilePattern=gs://${DATA_STORAGE_BUCKET}/CCRecords_1564602825.csv,dlpProjectId=${PROJECT_ID},deidentifyTemplateName=${DEID_TEMPLATE_NAME},inspectTemplateName=${INSPECT_TEMPLATE_NAME},datasetName=deid_dataset,batchSize=500"
    
  3. パイプラインをモニタリングするには、Google Cloud Console で [Dataflow] ページに移動します。

    [Dataflow] に移動

  4. ジョブ ID(my-deid-job)をクリックします。ジョブのグラフが表示されます。

    ジョブの詳細ワークフローの前半。

    ジョブの詳細ワークフローの後半。

  5. パイプラインによって処理されたデータの量を検証するには、[Process Tokenized Data] をクリックします。

    入出力のコレクションの概要。

    DLP-Tokenization 変換に追加される要素の数は 200 で、Process Tokenized Data 変換に追加される要素の数は 100,000 です。

  6. BigQuery テーブルに挿入されたレコードの合計数を検証するには、[Write To BQ] をクリックします。

    挿入されたレコードの合計数のサマリー。

    Process Tokenized Data 変換に追加される要素の数は 100,000 です。

パイプラインでの例外の処理

DLP API では、デフォルトで 1 分あたりの API 呼び出しは 600 に制限されています。パイプラインは、指定されたバッチサイズに基づいてリクエストを並列処理します。

パイプラインは最大 10 個の n1-standard-4 ワーカーを持つように構成されています。大規模なデータセットを Google 提供のテンプレートのデフォルト構成よりも高速に処理する必要がある場合は、パイプラインをカスタマイズしてワーカー数とマシンタイプを更新します。ワーカー数を増やす場合は、Cloud プロジェクトの vCPU 数、使用中の IP アドレス、SSD のデフォルトの割り当てを増やす必要があります。

パイプライン コードの確認

完全なパイプライン コードは GitHub リポジトリにあります。

  • このパイプラインは、組み込みの Beam File IO 変換を使用して、パイプラインの自動バージョン用に構成された新しいファイルを 30 秒ごとにポーリングします。パイプラインは、停止または終了するまで継続的に新しいファイルを探します。

    .apply(
        "Poll Input Files",
        FileIO.match()
            .filepattern(options.getInputFilePattern())
            .continuously(DEFAULT_POLL_INTERVAL, Watch.Growth.never()))
    .apply("Find Pattern Match", FileIO.readMatches().withCompression(Compression.AUTO))
  • 読み取り可能なファイルには同じヘッダーを含めることができます。すべての要素のヘッダーを作成する代わりに、パイプラインでは副入力と呼ばれるデザイン パターンを使用します。ヘッダーはウィンドウに対して 1 回だけ作成して、他の変換への入力として渡すことができます。

    final PCollectionView<List<KV<String, List<String>>>> headerMap =
        csvFiles
    
            // 2) Create a side input for the window containing list of headers par file.
            .apply(
                "Create Header Map",
                ParDo.of(
                    new DoFn<KV<String, Iterable<ReadableFile>>, KV<String, List<String>>>() {
    
                      @ProcessElement
                      public void processElement(ProcessContext c) {
                        String fileKey = c.element().getKey();
                        c.element()
                            .getValue()
                            .forEach(
                                file -> {
                                  try (BufferedReader br = getReader(file)) {
                                    c.output(KV.of(fileKey, getFileHeaders(br)));
    
                                  } catch (IOException e) {
                                    LOG.error("Failed to Read File {}", e.getMessage());
                                    throw new RuntimeException(e);
                                  }
                                });
                      }
                    }))
            .apply("View As List", View.asList());
  • Cloud DLP の最大ペイロード サイズは API リクエストあたり 512 KB で、1 分あたりの同時 API 呼び出しは 600 です。この制限を管理するために、パイプラインではユーザー定義のバッチサイズをパラメータとして使用します。たとえば、サンプル データセットには 50 万行あります。バッチサイズが 1000 の場合、各リクエストが最大ペイロード サイズを超えないという前提で、500 の DLP API 呼び出しが並行して実行されます。バッチサイズを小さくすると、API 呼び出しの回数が増えます。このバッチサイズにより quota resource exception が発生する可能性があります。割り当ての上限を増やす必要がある場合は、割り当ての増加をご覧ください。

    /**
     * The {@link CSVReader} class uses experimental Split DoFn to split each csv file contents in
     * chunks and process it in non-monolithic fashion. For example: if a CSV file has 100 rows and
     * batch size is set to 15, then initial restrictions for the SDF will be 1 to 7 and split
     * restriction will be {{1-2},{2-3}..{7-8}} for parallel executions.
     */
    static class CSVReader extends DoFn<KV<String, ReadableFile>, KV<String, Table>> {
    
      private ValueProvider<Integer> batchSize;
      private PCollectionView<List<KV<String, List<String>>>> headerMap;
      /** This counter is used to track number of lines processed against batch size. */
      private Integer lineCount;
    
      List<String> csvHeaders;
    
      public CSVReader(
          ValueProvider<Integer> batchSize,
          PCollectionView<List<KV<String, List<String>>>> headerMap) {
        lineCount = 1;
        this.batchSize = batchSize;
        this.headerMap = headerMap;
        this.csvHeaders = new ArrayList<>();
      }
    
      @ProcessElement
      public void processElement(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker)
          throws IOException {
        for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
          String fileKey = c.element().getKey();
          try (BufferedReader br = getReader(c.element().getValue())) {
    
            csvHeaders = getHeaders(c.sideInput(headerMap), fileKey);
            if (csvHeaders != null) {
              List<FieldId> dlpTableHeaders =
                  csvHeaders.stream()
                      .map(header -> FieldId.newBuilder().setName(header).build())
                      .collect(Collectors.toList());
              List<Table.Row> rows = new ArrayList<>();
              Table dlpTable = null;
              /** finding out EOL for this restriction so that we know the SOL */
              int endOfLine = (int) (i * batchSize.get().intValue());
              int startOfLine = (endOfLine - batchSize.get().intValue());
              /** skipping all the rows that's not part of this restriction */
              br.readLine();
              Iterator<CSVRecord> csvRows =
                  CSVFormat.DEFAULT.withSkipHeaderRecord().parse(br).iterator();
              for (int line = 0; line < startOfLine; line++) {
                if (csvRows.hasNext()) {
                  csvRows.next();
                }
              }
              /** looping through buffered reader and creating DLP Table Rows equals to batch */
              while (csvRows.hasNext() && lineCount <= batchSize.get()) {
    
                CSVRecord csvRow = csvRows.next();
                rows.add(convertCsvRowToTableRow(csvRow));
                lineCount += 1;
              }
              /** creating DLP table and output for next transformation */
              dlpTable = Table.newBuilder().addAllHeaders(dlpTableHeaders).addAllRows(rows).build();
              c.output(KV.of(fileKey, dlpTable));
    
              LOG.debug(
                  "Current Restriction From: {}, Current Restriction To: {},"
                      + " StartofLine: {}, End Of Line {}, BatchData {}",
                  tracker.currentRestriction().getFrom(),
                  tracker.currentRestriction().getTo(),
                  startOfLine,
                  endOfLine,
                  dlpTable.getRowsCount());
    
            } else {
    
              throw new RuntimeException("Header Values Can't be found For file Key " + fileKey);
            }
          }
        }
      }
    
      /**
       * SDF needs to define a @GetInitialRestriction method that can create a restriction describing
       * the complete work for a given element. For our case this would be the total number of rows
       * for each CSV file. We will calculate the number of split required based on total number of
       * rows and batch size provided.
       *
       * @throws IOException
       */
      //
      @GetInitialRestriction
      public OffsetRange getInitialRestriction(@Element KV<String, ReadableFile> csvFile)
          throws IOException {
    
        int rowCount = 0;
        int totalSplit = 0;
        try (BufferedReader br = getReader(csvFile.getValue())) {
          /** assume first row is header */
          int checkRowCount = (int) br.lines().count() - 1;
          rowCount = (checkRowCount < 1) ? 1 : checkRowCount;
          totalSplit = rowCount / batchSize.get().intValue();
          int remaining = rowCount % batchSize.get().intValue();
          /**
           * Adjusting the total number of split based on remaining rows. For example: batch size of
           * 15 for 100 rows will have total 7 splits. As it's a range last split will have offset
           * range {7,8}
           */
          if (remaining > 0) {
            totalSplit = totalSplit + 2;
    
          } else {
            totalSplit = totalSplit + 1;
          }
        }
    
        LOG.debug("Initial Restriction range from 1 to: {}", totalSplit);
        return new OffsetRange(1, totalSplit);
      }
    
      /**
       * SDF needs to define a @SplitRestriction method that can split the intital restricton to a
       * number of smaller restrictions. For example: a intital rewstriction of (x, N) as input and
       * produces pairs (x, 0), (x, 1), …, (x, N-1) as output.
       */
      @SplitRestriction
      public void splitRestriction(
          @Element KV<String, ReadableFile> csvFile,
          @Restriction OffsetRange range,
          OutputReceiver<OffsetRange> out) {
        /** split the initial restriction by 1 */
        for (final OffsetRange p : range.split(1, 1)) {
          out.output(p);
        }
      }
    
      @NewTracker
      public OffsetRangeTracker newTracker(@Restriction OffsetRange range) {
        return new OffsetRangeTracker(new OffsetRange(range.getFrom(), range.getTo()));
      }
  • パイプラインでは、組み込みの BigQuery IO 変換コネクタを使用して BigQuery に書き込みます。このコネクタは、ダイナミック デスティネーション機能を使用して BigQuery テーブルとスキーマを自動的に作成します。低レイテンシを実現するために、パイプラインでは BigQuery のストリーミング挿入も使用します。

    /**
     * The {@link BQDestination} class creates BigQuery table destination and table schema based on
     * the CSV file processed in earlier transformations. Table id is same as filename Table schema is
     * same as file header columns.
     */
    public static class BQDestination
        extends DynamicDestinations<KV<String, TableRow>, KV<String, TableRow>> {
    
      private ValueProvider<String> datasetName;
      private ValueProvider<String> projectId;
    
      public BQDestination(ValueProvider<String> datasetName, ValueProvider<String> projectId) {
        this.datasetName = datasetName;
        this.projectId = projectId;
      }
    
      @Override
      public KV<String, TableRow> getDestination(ValueInSingleWindow<KV<String, TableRow>> element) {
        String key = element.getValue().getKey();
        String tableName = String.format("%s:%s.%s", projectId.get(), datasetName.get(), key);
        // Strip the file name to only the letters and numbers so that it is a valid BQ table id.
        tableName = tableName.replaceAll("[^a-zA-Z0-9]", "");
        LOG.debug("Table Name {}", tableName);
        return KV.of(tableName, element.getValue().getValue());
      }
    
      @Override
      public TableDestination getTable(KV<String, TableRow> destination) {
        TableDestination dest =
            new TableDestination(destination.getKey(), "pii-tokenized output data from dataflow");
        LOG.debug("Table Destination {}", dest.getTableSpec());
        return dest;
      }
    
      @Override
      public TableSchema getSchema(KV<String, TableRow> destination) {
        TableRow bqRow = destination.getValue();
        TableSchema schema = new TableSchema();
        List<TableFieldSchema> fields = new ArrayList<TableFieldSchema>();
        List<TableCell> cells = bqRow.getF();
        for (int i = 0; i < cells.size(); i++) {
          Map<String, Object> object = cells.get(i);
          String header = object.keySet().iterator().next();
          /** currently all BQ data types are set to String */
          fields.add(new TableFieldSchema().setName(checkHeaderName(header)).setType("STRING"));
        }
    
        schema.setFields(fields);
        return schema;
      }
    }

このチュートリアルは正常に完了しました。Cloud DLP テンプレートを使用して自動化された匿名化パイプラインをトリガーし、大規模なデータセットを処理しました。次のチュートリアルでは、BigQuery で匿名化されたデータセットを検証し、別の Dataflow パイプラインを使用してデータを再識別します

クリーンアップ

シリーズのチュートリアルを続行する予定がない場合、課金を停止する最も簡単な方法は、チュートリアル用に作成した Cloud プロジェクトを削除することです。また、リソースを個別に削除することもできます。

プロジェクトの削除

  1. Cloud Console で [リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

次のステップ