Como executar um pipeline automatizado do Dataflow para desidentificar um conjunto de dados de PII

Neste tutorial, mostramos como executar um pipeline automatizado do Dataflow para desidentificar o conjunto de dados de amostra usado no tutorial Como criar modelos de transformação de desidentificação de DLP para conjuntos de dados de PII (em inglês). O conjunto de dados de amostra contém informações de identificação pessoal (PII) (em inglês) em grande escala.

Este documento faz parte de uma série:

Neste tutorial, supomos que você já tenha familiaridade com scripts de shell e pipelines do Dataflow.

Arquitetura de referência

Neste tutorial há a demonstração do pipeline de desidentificação de dados, ilustrado no diagrama a seguir.

Arquitetura do pipeline de desidentificação.

O pipeline de streaming de desidentificação de dados desidentifica dados confidenciais no conteúdo de texto usando o Dataflow. É possível reutilizar o pipeline para várias transformações e casos de uso.

Objetivos

  • Engatilhar e monitorar o pipeline do Dataflow para desidentificar um conjunto de dados de amostra
  • Entender o código por trás do pipeline

Custos

Neste tutorial, usamos os seguintes componentes faturáveis do Google Cloud:

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem ser qualificados para uma avaliação gratuita.

Ao concluir este tutorial, exclua os recursos criados para evitar o faturamento contínuo. Para mais informações, consulte Como fazer a limpeza.

Antes de começar

Como revisar os parâmetros do pipeline

Neste tutorial, usamos um pipeline do Dataflow desenvolvido com o SDK do Apache Beam para Java (em inglês). Para resolver repetidamente tarefas comuns relacionadas a dados em escala, o Dataflow fornece um framework conhecido como modelos fornecidos pelo Google. Se você usar esses modelos, não precisará gravar ou manter nenhum código de pipeline. Neste tutorial, você engatilhará uma tokenização/mascaramento de dados automatizado usando o Cloud DLP do pipeline do Cloud Storage para BigQuery com os parâmetros a seguir.

Parâmetros de pipeline Valor Notas
numWorkers 5 Configurado por padrão
maxNumWorkers 10
machineType n1-standard-4
pollingInterval 30 segundos
windowInterval 30 segundos
inputFilePattern gs://${DATA_STORAGE_BUCKET}/CCRecords_*.csv Criado durante a parte 2 do tutorial (em inglês)
deidentifyTemplateName ${DEID_TEMPLATE_NAME}
inspectTemplateName ${INSPECT_TEMPLATE_NAME}
datasetName deid_dataset
batchSize 500 Para um número total de 100 mil registros com tamanho de lote de 500, há 200 chamadas de API em paralelo. Por padrão, o tamanho do lote é definido em 100.
dlpProjectName ${PROJECT_ID} Seu projeto padrão do Google Cloud é usado neste tutorial.
jobId my-deid-job O ID do job do Dataflow

Como executar o canal

  1. No Cloud Shell, configure as credenciais padrão do aplicativo.

    gcloud auth activate-service-account \
        ${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com \
        --key-file=service-account-key.json --project=${PROJECT_ID}
    export GOOGLE_APPLICATION_CREDENTIALS=service-account-key.json
    
  2. Como executar o pipeline:

    export JOB_ID=my-deid-job
    gcloud dataflow jobs run ${JOB_ID}  \
        --gcs-location gs://dataflow-templates/latest/Stream_DLP_GCS_Text_to_BigQuery \
        --region ${REGION} \
        --parameters \
        "inputFilePattern=gs://${DATA_STORAGE_BUCKET}/CCRecords_1564602825.csv,dlpProjectId=${PROJECT_ID},deidentifyTemplateName=${DEID_TEMPLATE_NAME},inspectTemplateName=${INSPECT_TEMPLATE_NAME},datasetName=deid_dataset,batchSize=500"
    
  3. Para monitorar o pipeline, acesse a página Dataflow no Console do Google Cloud.

    Acessar o Dataflow

  4. Clique no ID do job (my-deid-job). Você verá o gráfico do job:

    Primeira metade do fluxo de trabalho de detalhes do job.

    Segunda metade do fluxo de trabalho de detalhes do job.

  5. Para validar a quantidade de dados processados pelo pipeline, clique em Processar dados tokenizados.

    Resumo das coleções de entrada e saída.

    O número de elementos adicionados à transformação de tokenização de DLP é 200, e o número de elementos adicionados à transformação de processamento de dados tokenizados é 100.000.

  6. Para validar o número total de registros inseridos nas Tabelas do BigQuery, clique em Write To BQ.

    Resumo do número total de registros inseridos.

    O número de elementos adicionados à transformação de processamento de dados tokenizados é 100.000.

Como lidar com exceções no pipeline

A API DLP tem um limite padrão de 600 chamadas de API por minuto. O pipeline processa a solicitação em paralelo com base no tamanho do lote especificado.

O pipeline está configurado para ter, no máximo, dez workers n1-standard-4. Se for necessário processar um grande conjunto de dados mais rapidamente do que a configuração padrão dos modelos fornecidos pelo Google, é possível personalizar o pipeline (em inglês) para atualizar o número de workers e o tipo de máquina. Se você aumentar o número de workers, talvez seja necessário aumentar a cota padrão do número de vCPUs, os endereços IP em uso e os SSDs para o projeto do Cloud.

Como revisar o código do pipeline

O código completo do pipeline está no repositório do GitHub.

  • Esse pipeline usa uma Transformação de IO de arquivo Beam integrada para buscar novos arquivos configurados para a versão automatizada do pipeline a cada 30 segundos. O pipeline procura, continuamente, por novos arquivos até que seja interrompido ou encerrado.

    .apply(
        "Poll Input Files",
        FileIO.match()
            .filepattern(options.getInputFilePattern())
            .continuously(DEFAULT_POLL_INTERVAL, Watch.Growth.never()))
    .apply("Find Pattern Match", FileIO.readMatches().withCompression(Compression.AUTO))
  • Os arquivos legíveis podem conter o mesmo cabeçalho. Em vez de criar o cabeçalho para cada elemento, o pipeline usa um padrão de design conhecido como entrada secundária. O cabeçalho pode ser criado apenas uma vez para a janela e transmitido como uma entrada para outras transformações.

    final PCollectionView<List<KV<String, List<String>>>> headerMap =
        csvFiles
    
            // 2) Create a side input for the window containing list of headers par file.
            .apply(
                "Create Header Map",
                ParDo.of(
                    new DoFn<KV<String, Iterable<ReadableFile>>, KV<String, List<String>>>() {
    
                      @ProcessElement
                      public void processElement(ProcessContext c) {
                        String fileKey = c.element().getKey();
                        c.element()
                            .getValue()
                            .forEach(
                                file -> {
                                  try (BufferedReader br = getReader(file)) {
                                    c.output(KV.of(fileKey, getFileHeaders(br)));
    
                                  } catch (IOException e) {
                                    LOG.error("Failed to Read File {}", e.getMessage());
                                    throw new RuntimeException(e);
                                  }
                                });
                      }
                    }))
            .apply("View As List", View.asList());
  • O Cloud DLP tem um tamanho máximo de payload de 512 KB por solicitação de API e 600 chamadas de API simultâneas por minuto. Para gerenciar essa limitação, o pipeline usa o tamanho do lote definido pelo usuário como um parâmetro. Por exemplo, o conjunto de dados de amostra tem 500 mil linhas. Um tamanho de lote de 1.000 significa 500 chamadas da API DLP em paralelo com uma suposição de que cada solicitação não excederá o tamanho máximo do payload. Um tamanho de lote menor contribui para um número maior de chamadas de API. Esse tamanho de lote pode causar quota resource exception. Se você precisa aumentar o limite de cota, consulte Aumentos de cota.

    /**
     * The {@link CSVReader} class uses experimental Split DoFn to split each csv file contents in
     * chunks and process it in non-monolithic fashion. For example: if a CSV file has 100 rows and
     * batch size is set to 15, then initial restrictions for the SDF will be 1 to 7 and split
     * restriction will be {{1-2},{2-3}..{7-8}} for parallel executions.
     */
    static class CSVReader extends DoFn<KV<String, ReadableFile>, KV<String, Table>> {
    
      private ValueProvider<Integer> batchSize;
      private PCollectionView<List<KV<String, List<String>>>> headerMap;
      /** This counter is used to track number of lines processed against batch size. */
      private Integer lineCount;
    
      List<String> csvHeaders;
    
      public CSVReader(
          ValueProvider<Integer> batchSize,
          PCollectionView<List<KV<String, List<String>>>> headerMap) {
        lineCount = 1;
        this.batchSize = batchSize;
        this.headerMap = headerMap;
        this.csvHeaders = new ArrayList<>();
      }
    
      @ProcessElement
      public void processElement(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker)
          throws IOException {
        for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
          String fileKey = c.element().getKey();
          try (BufferedReader br = getReader(c.element().getValue())) {
    
            csvHeaders = getHeaders(c.sideInput(headerMap), fileKey);
            if (csvHeaders != null) {
              List<FieldId> dlpTableHeaders =
                  csvHeaders.stream()
                      .map(header -> FieldId.newBuilder().setName(header).build())
                      .collect(Collectors.toList());
              List<Table.Row> rows = new ArrayList<>();
              Table dlpTable = null;
              /** finding out EOL for this restriction so that we know the SOL */
              int endOfLine = (int) (i * batchSize.get().intValue());
              int startOfLine = (endOfLine - batchSize.get().intValue());
              /** skipping all the rows that's not part of this restriction */
              br.readLine();
              Iterator<CSVRecord> csvRows =
                  CSVFormat.DEFAULT.withSkipHeaderRecord().parse(br).iterator();
              for (int line = 0; line < startOfLine; line++) {
                if (csvRows.hasNext()) {
                  csvRows.next();
                }
              }
              /** looping through buffered reader and creating DLP Table Rows equals to batch */
              while (csvRows.hasNext() && lineCount <= batchSize.get()) {
    
                CSVRecord csvRow = csvRows.next();
                rows.add(convertCsvRowToTableRow(csvRow));
                lineCount += 1;
              }
              /** creating DLP table and output for next transformation */
              dlpTable = Table.newBuilder().addAllHeaders(dlpTableHeaders).addAllRows(rows).build();
              c.output(KV.of(fileKey, dlpTable));
    
              LOG.debug(
                  "Current Restriction From: {}, Current Restriction To: {},"
                      + " StartofLine: {}, End Of Line {}, BatchData {}",
                  tracker.currentRestriction().getFrom(),
                  tracker.currentRestriction().getTo(),
                  startOfLine,
                  endOfLine,
                  dlpTable.getRowsCount());
    
            } else {
    
              throw new RuntimeException("Header Values Can't be found For file Key " + fileKey);
            }
          }
        }
      }
    
      /**
       * SDF needs to define a @GetInitialRestriction method that can create a restriction describing
       * the complete work for a given element. For our case this would be the total number of rows
       * for each CSV file. We will calculate the number of split required based on total number of
       * rows and batch size provided.
       *
       * @throws IOException
       */
      //
      @GetInitialRestriction
      public OffsetRange getInitialRestriction(@Element KV<String, ReadableFile> csvFile)
          throws IOException {
    
        int rowCount = 0;
        int totalSplit = 0;
        try (BufferedReader br = getReader(csvFile.getValue())) {
          /** assume first row is header */
          int checkRowCount = (int) br.lines().count() - 1;
          rowCount = (checkRowCount < 1) ? 1 : checkRowCount;
          totalSplit = rowCount / batchSize.get().intValue();
          int remaining = rowCount % batchSize.get().intValue();
          /**
           * Adjusting the total number of split based on remaining rows. For example: batch size of
           * 15 for 100 rows will have total 7 splits. As it's a range last split will have offset
           * range {7,8}
           */
          if (remaining > 0) {
            totalSplit = totalSplit + 2;
    
          } else {
            totalSplit = totalSplit + 1;
          }
        }
    
        LOG.debug("Initial Restriction range from 1 to: {}", totalSplit);
        return new OffsetRange(1, totalSplit);
      }
    
      /**
       * SDF needs to define a @SplitRestriction method that can split the intital restricton to a
       * number of smaller restrictions. For example: a intital rewstriction of (x, N) as input and
       * produces pairs (x, 0), (x, 1), …, (x, N-1) as output.
       */
      @SplitRestriction
      public void splitRestriction(
          @Element KV<String, ReadableFile> csvFile,
          @Restriction OffsetRange range,
          OutputReceiver<OffsetRange> out) {
        /** split the initial restriction by 1 */
        for (final OffsetRange p : range.split(1, 1)) {
          out.output(p);
        }
      }
    
      @NewTracker
      public OffsetRangeTracker newTracker(@Restriction OffsetRange range) {
        return new OffsetRangeTracker(new OffsetRange(range.getFrom(), range.getTo()));
      }
  • O pipeline usa um Conector de transformação de IO do BigQuery para gravar no BigQuery. Esse conector cria automaticamente um esquema e uma Tabela do BigQuery usando o recurso de destino dinâmico. Para conseguir baixa latência, o pipeline também usa inserções de streaming do BigQuery.

    /**
     * The {@link BQDestination} class creates BigQuery table destination and table schema based on
     * the CSV file processed in earlier transformations. Table id is same as filename Table schema is
     * same as file header columns.
     */
    public static class BQDestination
        extends DynamicDestinations<KV<String, TableRow>, KV<String, TableRow>> {
    
      private ValueProvider<String> datasetName;
      private ValueProvider<String> projectId;
    
      public BQDestination(ValueProvider<String> datasetName, ValueProvider<String> projectId) {
        this.datasetName = datasetName;
        this.projectId = projectId;
      }
    
      @Override
      public KV<String, TableRow> getDestination(ValueInSingleWindow<KV<String, TableRow>> element) {
        String key = element.getValue().getKey();
        String tableName = String.format("%s:%s.%s", projectId.get(), datasetName.get(), key);
        // Strip the file name to only the letters and numbers so that it is a valid BQ table id.
        tableName = tableName.replaceAll("[^a-zA-Z0-9]", "");
        LOG.debug("Table Name {}", tableName);
        return KV.of(tableName, element.getValue().getValue());
      }
    
      @Override
      public TableDestination getTable(KV<String, TableRow> destination) {
        TableDestination dest =
            new TableDestination(destination.getKey(), "pii-tokenized output data from dataflow");
        LOG.debug("Table Destination {}", dest.getTableSpec());
        return dest;
      }
    
      @Override
      public TableSchema getSchema(KV<String, TableRow> destination) {
        TableRow bqRow = destination.getValue();
        TableSchema schema = new TableSchema();
        List<TableFieldSchema> fields = new ArrayList<TableFieldSchema>();
        List<TableCell> cells = bqRow.getF();
        for (int i = 0; i < cells.size(); i++) {
          Map<String, Object> object = cells.get(i);
          String header = object.keySet().iterator().next();
          /** currently all BQ data types are set to String */
          fields.add(new TableFieldSchema().setName(checkHeaderName(header)).setType("STRING"));
        }
    
        schema.setFields(fields);
        return schema;
      }
    }

Você concluiu este tutorial. Você engatilhou um pipeline automatizado de desidentificação usando modelos do Cloud DLP para processar um grande conjunto de dados. No próximo tutorial, saiba como validar o conjunto de dados desidentificado no BigQuery e reidentificar os dados usando outro pipeline do Dataflow.

Limpar

Se você não pretende continuar com os tutoriais da série, a maneira mais fácil de evitar cobranças é excluindo o projeto do Cloud criado para o tutorial. A outra opção é excluir os recursos individuais.

Exclua o projeto

  1. No Console do Cloud, acesse a página Gerenciar recursos:

    Acessar "Gerenciar recursos"

  2. Na lista de projetos, selecione o projeto que você quer excluir e clique em Excluir .
  3. Na caixa de diálogo, digite o ID do projeto e clique em Encerrar para excluí-lo.

A seguir