Ejecuta una canalización automatizada de Dataflow para desidentificar un conjunto de datos de PII

En este instructivo, se muestra cómo ejecutar una canalización automatizada de Dataflow a fin de desidentificar el conjunto de datos de muestra que se usa en el instructivo Crea plantillas de transformación de desidentificación de DLP para conjuntos de datos de PII. El conjunto de datos de muestra contiene información de identificación personal (PII) a gran escala.

Este documento forma parte de una serie:

En este instructivo, se supone que estás familiarizado con las secuencias de comandos de shell y las canalizaciones de Dataflow.

Arquitectura de referencia

En este instructivo, se muestra la canalización de desidentificación de datos que se ilustra en el siguiente diagrama.

Arquitectura de la canalización de desidentificación

La canalización de la transmisión de desidentificación de datos desidentifica datos sensibles en el contenido de texto mediante Dataflow. Puedes volver a usar la canalización para varias transformaciones y casos prácticos.

Objetivos

  • Activar y supervisar la canalización de Dataflow para desidentificar un conjunto de datos de muestra
  • Comprender el código detrás de la canalización

Costos

En este instructivo, se usan los siguientes componentes facturables de Google Cloud:

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios. Es posible que los usuarios nuevos de Google Cloud sean aptos para obtener una prueba gratuita.

Cuando finalices este instructivo, podrás borrar los recursos creados para evitar que se te siga facturando. Para obtener más información, consulta cómo hacer una limpieza.

Antes de comenzar

Revisa los parámetros de canalización

En este instructivo, se usa una canalización de Dataflow que se desarrolló mediante el SDK de Java de Apache Beam. Para resolver tareas comunes relacionadas con los datos repetidas veces y a gran escala, Dataflow proporciona un framework conocido como plantillas que proporciona Google. Si usas estas plantillas, no necesitas escribir ni mantener ningún código de canalización. En este instructivo, debes activar una canalización automatizada de enmascaramiento de datos o asignación de tokens a datos mediante Cloud DLP desde Cloud Storage hacia BigQuery con los siguientes parámetros.

Parámetros de canalización Valor Notas
numWorkers 5 Se encuentra configurado de forma predeterminada.
maxNumWorkers 10
machineType n1-standard-4
pollingInterval 30 segundos
windowInterval 30 segundos
inputFilePattern gs://${DATA_STORAGE_BUCKET}/CCRecords_*.csv Se crea en la parte 2 del instructivo.
deidentifyTemplateName ${DEID_TEMPLATE_NAME}
inspectTemplateName ${INSPECT_TEMPLATE_NAME}
datasetName deid_dataset
batchSize 500 Para una cantidad total de 100,000 registros con un tamaño del lote de 500, hay 200 llamadas a la API en paralelo. De forma predeterminada, el tamaño del lote se establece en 100.
dlpProjectName ${PROJECT_ID} En este instructivo, se usa el proyecto de Google Cloud predeterminado.
jobId my-deid-job El ID de trabajo de Dataflow

Ejecuta la canalización

  1. En Cloud Shell, configura las credenciales predeterminadas de la aplicación.

    gcloud auth activate-service-account \
        ${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com \
        --key-file=service-account-key.json --project=${PROJECT_ID}
    export GOOGLE_APPLICATION_CREDENTIALS=service-account-key.json
    
  2. Ejecuta la canalización.

    export JOB_ID=my-deid-job
    gcloud dataflow jobs run ${JOB_ID}  \
        --gcs-location gs://dataflow-templates/latest/Stream_DLP_GCS_Text_to_BigQuery \
        --region ${REGION} \
        --parameters \
        "inputFilePattern=gs://${DATA_STORAGE_BUCKET}/CCRecords_1564602825.csv,dlpProjectId=${PROJECT_ID},deidentifyTemplateName=${DEID_TEMPLATE_NAME},inspectTemplateName=${INSPECT_TEMPLATE_NAME},datasetName=deid_dataset,batchSize=500"
    
  3. Para supervisar la canalización, ve a la página Dataflow en Google Cloud Console.

    Ir a Dataflow

  4. Haz clic en el ID del trabajo (my-deid-job). Verá el grafo del trabajo:

    La primera mitad del flujo de trabajo de los detalles del trabajo

    La segunda mitad del flujo de trabajo de los detalles del trabajo

  5. Para validar la cantidad de datos que procesa la canalización, haz clic en Procesar datos con asignación de token (Process Tokenized Data).

    Resumen de las colecciones de entrada y salida

    Para la transformación de la asignación de token de DLP se agregaron 200 elementos y para la transformación “Procesar datos con asignación de token” se agregaron 100,000 elementos.

  6. Para validar la cantidad total de registros insertados en las tablas de BigQuery, haz clic en Write To BQ.

    Resumen de la cantidad total de registros que se insertaron

    Para la transformación de “Procesar datos con asignación de token” se agregaron 100,000 elementos.

Controla excepciones en la canalización

La API de DLP tiene un límite predeterminado de 600 llamadas a la API por minuto. La canalización procesa la solicitud en paralelo según el tamaño del lote que especifiques.

La canalización se configura para que tenga un máximo de diez trabajadores n1-standard-4. Si necesitas procesar un conjunto de datos grande con mayor rapidez que la que permite la configuración predeterminada de las plantillas que proporciona Google, puedes personalizar la canalización para actualizar la cantidad de trabajadores y el tipo de máquina. Si aumentas la cantidad de trabajadores, es posible que debas aumentar la cuota predeterminada de la cantidad de CPU virtuales, las direcciones IP en uso y los SSD del proyecto de Cloud.

Revisa el código de canalización

El código de canalización completo está en el repositorio de GitHub.

  • Esta canalización usa una transformación de IO de archivos de Beam integrada a fin de sondear los archivos nuevos que se configuran para la versión automatizada de la canalización cada 30 segundos. La canalización busca archivos nuevos de forma continua hasta que se detiene o finaliza.

    .apply(
        "Poll Input Files",
        FileIO.match()
            .filepattern(options.getInputFilePattern())
            .continuously(DEFAULT_POLL_INTERVAL, Watch.Growth.never()))
    .apply("Find Pattern Match", FileIO.readMatches().withCompression(Compression.AUTO))
  • Los archivos legibles pueden contener el mismo encabezado. En lugar de crear el encabezado para cada elemento, la canalización usa un patrón de diseño conocido como side-input. El encabezado se puede crear solo una vez para la ventana y se lo puede pasar como una entrada a otras transformaciones.

    final PCollectionView<List<KV<String, List<String>>>> headerMap =
        csvFiles
    
            // 2) Create a side input for the window containing list of headers par file.
            .apply(
                "Create Header Map",
                ParDo.of(
                    new DoFn<KV<String, Iterable<ReadableFile>>, KV<String, List<String>>>() {
    
                      @ProcessElement
                      public void processElement(ProcessContext c) {
                        String fileKey = c.element().getKey();
                        c.element()
                            .getValue()
                            .forEach(
                                file -> {
                                  try (BufferedReader br = getReader(file)) {
                                    c.output(KV.of(fileKey, getFileHeaders(br)));
    
                                  } catch (IOException e) {
                                    LOG.error("Failed to Read File {}", e.getMessage());
                                    throw new RuntimeException(e);
                                  }
                                });
                      }
                    }))
            .apply("View As List", View.asList());
  • Cloud DLP tiene un tamaño de carga útil máximo de 512 KB por solicitud a la API y de 600 llamadas simultáneas a la API por minuto. Para administrar esta limitación, la canalización usa el tamaño del lote definido por el usuario como parámetro. Por ejemplo, el conjunto de datos de muestra tiene 500,000 filas. Un tamaño del lote de 1,000 implica 500 llamadas a la API de DLP en paralelo con la suposición de que cada solicitud no supera el tamaño máximo de carga útil. Un tamaño del lote más reducido contribuye a una mayor cantidad de llamadas a la API. Este tamaño del lote puede causar una quota resource exception. Si necesitas aumentar el límite de la cuota, consulta Aumentos de cuota.

    /**
     * The {@link CSVReader} class uses experimental Split DoFn to split each csv file contents in
     * chunks and process it in non-monolithic fashion. For example: if a CSV file has 100 rows and
     * batch size is set to 15, then initial restrictions for the SDF will be 1 to 7 and split
     * restriction will be {{1-2},{2-3}..{7-8}} for parallel executions.
     */
    static class CSVReader extends DoFn<KV<String, ReadableFile>, KV<String, Table>> {
    
      private ValueProvider<Integer> batchSize;
      private PCollectionView<List<KV<String, List<String>>>> headerMap;
      /** This counter is used to track number of lines processed against batch size. */
      private Integer lineCount;
    
      List<String> csvHeaders;
    
      public CSVReader(
          ValueProvider<Integer> batchSize,
          PCollectionView<List<KV<String, List<String>>>> headerMap) {
        lineCount = 1;
        this.batchSize = batchSize;
        this.headerMap = headerMap;
        this.csvHeaders = new ArrayList<>();
      }
    
      @ProcessElement
      public void processElement(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker)
          throws IOException {
        for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
          String fileKey = c.element().getKey();
          try (BufferedReader br = getReader(c.element().getValue())) {
    
            csvHeaders = getHeaders(c.sideInput(headerMap), fileKey);
            if (csvHeaders != null) {
              List<FieldId> dlpTableHeaders =
                  csvHeaders.stream()
                      .map(header -> FieldId.newBuilder().setName(header).build())
                      .collect(Collectors.toList());
              List<Table.Row> rows = new ArrayList<>();
              Table dlpTable = null;
              /** finding out EOL for this restriction so that we know the SOL */
              int endOfLine = (int) (i * batchSize.get().intValue());
              int startOfLine = (endOfLine - batchSize.get().intValue());
              /** skipping all the rows that's not part of this restriction */
              br.readLine();
              Iterator<CSVRecord> csvRows =
                  CSVFormat.DEFAULT.withSkipHeaderRecord().parse(br).iterator();
              for (int line = 0; line < startOfLine; line++) {
                if (csvRows.hasNext()) {
                  csvRows.next();
                }
              }
              /** looping through buffered reader and creating DLP Table Rows equals to batch */
              while (csvRows.hasNext() && lineCount <= batchSize.get()) {
    
                CSVRecord csvRow = csvRows.next();
                rows.add(convertCsvRowToTableRow(csvRow));
                lineCount += 1;
              }
              /** creating DLP table and output for next transformation */
              dlpTable = Table.newBuilder().addAllHeaders(dlpTableHeaders).addAllRows(rows).build();
              c.output(KV.of(fileKey, dlpTable));
    
              LOG.debug(
                  "Current Restriction From: {}, Current Restriction To: {},"
                      + " StartofLine: {}, End Of Line {}, BatchData {}",
                  tracker.currentRestriction().getFrom(),
                  tracker.currentRestriction().getTo(),
                  startOfLine,
                  endOfLine,
                  dlpTable.getRowsCount());
    
            } else {
    
              throw new RuntimeException("Header Values Can't be found For file Key " + fileKey);
            }
          }
        }
      }
    
      /**
       * SDF needs to define a @GetInitialRestriction method that can create a restriction describing
       * the complete work for a given element. For our case this would be the total number of rows
       * for each CSV file. We will calculate the number of split required based on total number of
       * rows and batch size provided.
       *
       * @throws IOException
       */
      //
      @GetInitialRestriction
      public OffsetRange getInitialRestriction(@Element KV<String, ReadableFile> csvFile)
          throws IOException {
    
        int rowCount = 0;
        int totalSplit = 0;
        try (BufferedReader br = getReader(csvFile.getValue())) {
          /** assume first row is header */
          int checkRowCount = (int) br.lines().count() - 1;
          rowCount = (checkRowCount < 1) ? 1 : checkRowCount;
          totalSplit = rowCount / batchSize.get().intValue();
          int remaining = rowCount % batchSize.get().intValue();
          /**
           * Adjusting the total number of split based on remaining rows. For example: batch size of
           * 15 for 100 rows will have total 7 splits. As it's a range last split will have offset
           * range {7,8}
           */
          if (remaining > 0) {
            totalSplit = totalSplit + 2;
    
          } else {
            totalSplit = totalSplit + 1;
          }
        }
    
        LOG.debug("Initial Restriction range from 1 to: {}", totalSplit);
        return new OffsetRange(1, totalSplit);
      }
    
      /**
       * SDF needs to define a @SplitRestriction method that can split the intital restricton to a
       * number of smaller restrictions. For example: a intital rewstriction of (x, N) as input and
       * produces pairs (x, 0), (x, 1), …, (x, N-1) as output.
       */
      @SplitRestriction
      public void splitRestriction(
          @Element KV<String, ReadableFile> csvFile,
          @Restriction OffsetRange range,
          OutputReceiver<OffsetRange> out) {
        /** split the initial restriction by 1 */
        for (final OffsetRange p : range.split(1, 1)) {
          out.output(p);
        }
      }
    
      @NewTracker
      public OffsetRangeTracker newTracker(@Restriction OffsetRange range) {
        return new OffsetRangeTracker(new OffsetRange(range.getFrom(), range.getTo()));
      }
  • La canalización usa un conector de transformación de IO de BigQuery integrado para escribir en BigQuery. Este conector crea de forma automática una tabla y un esquema de BigQuery mediante la función de destino dinámica. Para lograr una latencia baja, la canalización también usa las inserciones de transmisión de BigQuery.

    /**
     * The {@link BQDestination} class creates BigQuery table destination and table schema based on
     * the CSV file processed in earlier transformations. Table id is same as filename Table schema is
     * same as file header columns.
     */
    public static class BQDestination
        extends DynamicDestinations<KV<String, TableRow>, KV<String, TableRow>> {
    
      private ValueProvider<String> datasetName;
      private ValueProvider<String> projectId;
    
      public BQDestination(ValueProvider<String> datasetName, ValueProvider<String> projectId) {
        this.datasetName = datasetName;
        this.projectId = projectId;
      }
    
      @Override
      public KV<String, TableRow> getDestination(ValueInSingleWindow<KV<String, TableRow>> element) {
        String key = element.getValue().getKey();
        String tableName = String.format("%s:%s.%s", projectId.get(), datasetName.get(), key);
        // Strip the file name to only the letters and numbers so that it is a valid BQ table id.
        tableName = tableName.replaceAll("[^a-zA-Z0-9]", "");
        LOG.debug("Table Name {}", tableName);
        return KV.of(tableName, element.getValue().getValue());
      }
    
      @Override
      public TableDestination getTable(KV<String, TableRow> destination) {
        TableDestination dest =
            new TableDestination(destination.getKey(), "pii-tokenized output data from dataflow");
        LOG.debug("Table Destination {}", dest.getTableSpec());
        return dest;
      }
    
      @Override
      public TableSchema getSchema(KV<String, TableRow> destination) {
        TableRow bqRow = destination.getValue();
        TableSchema schema = new TableSchema();
        List<TableFieldSchema> fields = new ArrayList<TableFieldSchema>();
        List<TableCell> cells = bqRow.getF();
        for (int i = 0; i < cells.size(); i++) {
          Map<String, Object> object = cells.get(i);
          String header = object.keySet().iterator().next();
          /** currently all BQ data types are set to String */
          fields.add(new TableFieldSchema().setName(checkHeaderName(header)).setType("STRING"));
        }
    
        schema.setFields(fields);
        return schema;
      }
    }

Completaste este instructivo de forma correcta. Activaste una canalización de desidentificación automatizada mediante las plantillas de Cloud DLP para procesar un conjunto de datos grande. En el siguiente instructivo, debes validar el conjunto de datos desidentificado en BigQuery y volver a identificar los datos mediante otra canalización de Dataflow.

Limpia

Si no quieres continuar con los instructivos de la serie, la manera más fácil de quitar la facturación es borrar el proyecto de Cloud que creaste para el instructivo. Como alternativa, puedes borrar los recursos individuales.

Borra el proyecto

  1. En Cloud Console, ve a la página Administrar recursos.

    Ir a Administrar recursos

  2. En la lista de proyectos, elige el proyecto que quieres borrar y haz clic en Borrar.
  3. En el diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrar el proyecto.

Próximos pasos