Exécuter un pipeline Dataflow automatisé pour supprimer l'identification d'un ensemble de données personnelles

Ce tutoriel explique comment exécuter un pipeline Dataflow automatisé pour anonymiser l'exemple d'ensemble de données utilisé dans le tutoriel Créer des modèles de transformation DLP pour supprimer l'identification des informations personnelles dans les ensembles de données. L'exemple d'ensemble de données contient des informations personnelles à grande échelle.

Ce document fait partie d'une série :

Dans ce tutoriel, nous partons du principe que vous maîtrisez les scripts d'interface système et les pipelines Dataflow.

Architecture de référence

Ce tutoriel décrit le pipeline de suppression de l'identification des données illustré dans le schéma suivant.

Architecture du pipeline de suppression de l'identification.

Le pipeline de flux de suppression de l'identification des données supprime l'identification des données sensibles dans le contenu textuel à l'aide de Dataflow. Vous pouvez réutiliser le pipeline pour plusieurs transformations et cas d'utilisation.

Objectifs

  • Déclencher et surveiller le pipeline Dataflow pour supprimer l'identification d'un exemple d'ensemble de données
  • Comprendre le code derrière le pipeline

Coûts

Ce tutoriel utilise les composants facturables suivants de Google Cloud :

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût. Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Une fois que vous avez terminé ce tutoriel, vous pouvez éviter de continuer à payer des frais en supprimant les ressources que vous avez créées. Consultez la page Effectuer un nettoyage pour en savoir plus.

Avant de commencer

  • Prenez connaissance de la partie 2 de la série.

Examiner les paramètres du pipeline

Ce tutoriel utilise un pipeline Dataflow développé à l'aide du SDK Java Apache Beam. Pour le traitement répété de tâches courantes liées aux données à grande échelle, Dataflow inclut un framework appelé Modèles fournis par Google. Si vous utilisez ces modèles, vous n'avez pas besoin d'écrire ni de gérer de code de pipeline. Dans ce tutoriel, vous allez déclencher un masquage (ou tokenisation) de données automatisé à l'aide de Cloud DLP, de Cloud Storage vers BigQuery avec les paramètres suivants.

Paramètres de pipeline Valeur Remarques
numWorkers 5 Configurer par défaut
maxNumWorkers 10
machineType n1-standard-4
pollingInterval 30 secondes
windowInterval 30 secondes
inputFilePattern gs://${DATA_STORAGE_BUCKET}/CCRecords_*.csv Créé au cours de la partie 2 de ce tutoriel.
deidentifyTemplateName ${DEID_TEMPLATE_NAME}
inspectTemplateName ${INSPECT_TEMPLATE_NAME}
datasetName deid_dataset
batchSize 500 Pour un nombre total de 100 000 enregistrements avec une taille de lot de 500, 200 appels d'API sont effectués en parallèle. Par défaut, la taille de lot est définie sur 100.
dlpProjectName ${PROJECT_ID} Pour ce tutoriel, votre projet Google Cloud par défaut est utilisé.
jobId my-deid-job ID de la tâche Dataflow

Exécuter le pipeline

  1. Dans Cloud Shell, configurez les identifiants par défaut de l'application.

    gcloud auth activate-service-account \
        ${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com \
        --key-file=service-account-key.json --project=${PROJECT_ID}
    export GOOGLE_APPLICATION_CREDENTIALS=service-account-key.json
    
  2. Exécutez le pipeline :

    export JOB_ID=my-deid-job
    gcloud dataflow jobs run ${JOB_ID}  \
        --gcs-location gs://dataflow-templates/latest/Stream_DLP_GCS_Text_to_BigQuery \
        --region ${REGION} \
        --parameters \
        "inputFilePattern=gs://${DATA_STORAGE_BUCKET}/CCRecords_1564602825.csv,dlpProjectId=${PROJECT_ID},deidentifyTemplateName=${DEID_TEMPLATE_NAME},inspectTemplateName=${INSPECT_TEMPLATE_NAME},datasetName=deid_dataset,batchSize=500"
    
  3. Pour surveiller le pipeline, accédez à la page Dataflow dans Google Cloud Console.

    Accéder à Dataflow

  4. Cliquez sur l'ID de la tâche (my-deid-job). Le graphique de la tâche s'affiche :

    Première moitié du workflow de détails de la tâche

    Deuxième moitié du workflow de détails de la tâche

  5. Pour valider la quantité de données traitées par le pipeline, cliquez sur Process Tokenized Data (Traiter les données tokenisées).

    Résumé des collections d'entrée et de sortie

    Le nombre d'éléments ajoutés pour la transformation DLP-Tokenisation est de 200 et le nombre d'éléments ajoutés pour la transformation Process Tokenized Data est de 100 000.

  6. Pour valider le nombre total d'enregistrements insérés dans les tables BigQuery, cliquez sur Write To BQ.

    Récapitulatif du nombre total d'enregistrements insérés

    Le nombre d'éléments ajoutés pour la transformation Process Tokenized Data est de 100 000.

Gérer les exceptions dans le pipeline

L'API DLP a une limite par défaut de 600 appels d'API par minute. Le pipeline traite la requête en parallèle en fonction de la taille de lot que vous spécifiez.

Le pipeline est configuré pour utiliser un maximum de dix nœuds de calcul n1-standard-4. Si vous devez traiter un ensemble de données volumineux plus rapidement que ne le permet la configuration par défaut des modèles fournis par Google, vous pouvez personnaliser le pipeline pour modifier le nombre de nœuds de calcul et le type de machine. Si vous augmentez le nombre de nœuds de calcul, vous devrez peut-être augmenter le quota par défaut pour le nombre de processeurs virtuels, d'adresses IP disponibles et de disques SSD pour votre projet Cloud.

Examiner le code du pipeline

Le code complet du pipeline se trouve dans le dépôt GitHub.

  • Ce pipeline utilise une transformation Beam File IO intégrée pour interroger les nouveaux fichiers configurés pour la version automatique du pipeline toutes les 30 secondes. Le pipeline recherche en permanence de nouveaux fichiers jusqu'à ce qu'il soit arrêté ou interrompu.

    .apply(
        "Poll Input Files",
        FileIO.match()
            .filepattern(options.getInputFilePattern())
            .continuously(DEFAULT_POLL_INTERVAL, Watch.Growth.never()))
    .apply("Find Pattern Match", FileIO.readMatches().withCompression(Compression.AUTO))
  • Les fichiers lisibles peuvent contenir le même en-tête. Au lieu de créer l'en-tête pour chaque élément, le pipeline utilise un modèle de conception appelé entrée secondaire. L'en-tête peut être créé une seule fois pour la période et transmis en tant qu'entrée à d'autres transformations.

    final PCollectionView<List<KV<String, List<String>>>> headerMap =
        csvFiles
    
            // 2) Create a side input for the window containing list of headers par file.
            .apply(
                "Create Header Map",
                ParDo.of(
                    new DoFn<KV<String, Iterable<ReadableFile>>, KV<String, List<String>>>() {
    
                      @ProcessElement
                      public void processElement(ProcessContext c) {
                        String fileKey = c.element().getKey();
                        c.element()
                            .getValue()
                            .forEach(
                                file -> {
                                  try (BufferedReader br = getReader(file)) {
                                    c.output(KV.of(fileKey, getFileHeaders(br)));
    
                                  } catch (IOException e) {
                                    LOG.error("Failed to Read File {}", e.getMessage());
                                    throw new RuntimeException(e);
                                  }
                                });
                      }
                    }))
            .apply("View As List", View.asList());
  • Cloud DLP a une taille de charge utile maximale de 512 Ko par requête API et 600 appels d'API simultanés/minute. Pour gérer cette limitation, le pipeline utilise la taille de lot définie par l'utilisateur en tant que paramètre. Par exemple, imaginons que notre exemple d'ensemble de données comporte 500 000 lignes. Une taille de lot de 1 000 implique un traitement parallèle de 500 appels d'API DLP, en supposant qu'aucune requête ne dépasse la taille maximale de la charge utile. Une taille de lot inférieure contribue à l'augmentation du nombre d'appels d'API. Cela peut entraîner une erreur quota resource exception. Si vous devez augmenter la limite de quota, consultez la section Augmentations des quotas.

    /**
     * The {@link CSVReader} class uses experimental Split DoFn to split each csv file contents in
     * chunks and process it in non-monolithic fashion. For example: if a CSV file has 100 rows and
     * batch size is set to 15, then initial restrictions for the SDF will be 1 to 7 and split
     * restriction will be {{1-2},{2-3}..{7-8}} for parallel executions.
     */
    static class CSVReader extends DoFn<KV<String, ReadableFile>, KV<String, Table>> {
    
      private ValueProvider<Integer> batchSize;
      private PCollectionView<List<KV<String, List<String>>>> headerMap;
      /** This counter is used to track number of lines processed against batch size. */
      private Integer lineCount;
    
      List<String> csvHeaders;
    
      public CSVReader(
          ValueProvider<Integer> batchSize,
          PCollectionView<List<KV<String, List<String>>>> headerMap) {
        lineCount = 1;
        this.batchSize = batchSize;
        this.headerMap = headerMap;
        this.csvHeaders = new ArrayList<>();
      }
    
      @ProcessElement
      public void processElement(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker)
          throws IOException {
        for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
          String fileKey = c.element().getKey();
          try (BufferedReader br = getReader(c.element().getValue())) {
    
            csvHeaders = getHeaders(c.sideInput(headerMap), fileKey);
            if (csvHeaders != null) {
              List<FieldId> dlpTableHeaders =
                  csvHeaders.stream()
                      .map(header -> FieldId.newBuilder().setName(header).build())
                      .collect(Collectors.toList());
              List<Table.Row> rows = new ArrayList<>();
              Table dlpTable = null;
              /** finding out EOL for this restriction so that we know the SOL */
              int endOfLine = (int) (i * batchSize.get().intValue());
              int startOfLine = (endOfLine - batchSize.get().intValue());
              /** skipping all the rows that's not part of this restriction */
              br.readLine();
              Iterator<CSVRecord> csvRows =
                  CSVFormat.DEFAULT.withSkipHeaderRecord().parse(br).iterator();
              for (int line = 0; line < startOfLine; line++) {
                if (csvRows.hasNext()) {
                  csvRows.next();
                }
              }
              /** looping through buffered reader and creating DLP Table Rows equals to batch */
              while (csvRows.hasNext() && lineCount <= batchSize.get()) {
    
                CSVRecord csvRow = csvRows.next();
                rows.add(convertCsvRowToTableRow(csvRow));
                lineCount += 1;
              }
              /** creating DLP table and output for next transformation */
              dlpTable = Table.newBuilder().addAllHeaders(dlpTableHeaders).addAllRows(rows).build();
              c.output(KV.of(fileKey, dlpTable));
    
              LOG.debug(
                  "Current Restriction From: {}, Current Restriction To: {},"
                      + " StartofLine: {}, End Of Line {}, BatchData {}",
                  tracker.currentRestriction().getFrom(),
                  tracker.currentRestriction().getTo(),
                  startOfLine,
                  endOfLine,
                  dlpTable.getRowsCount());
    
            } else {
    
              throw new RuntimeException("Header Values Can't be found For file Key " + fileKey);
            }
          }
        }
      }
    
      /**
       * SDF needs to define a @GetInitialRestriction method that can create a restriction describing
       * the complete work for a given element. For our case this would be the total number of rows
       * for each CSV file. We will calculate the number of split required based on total number of
       * rows and batch size provided.
       *
       * @throws IOException
       */
      //
      @GetInitialRestriction
      public OffsetRange getInitialRestriction(@Element KV<String, ReadableFile> csvFile)
          throws IOException {
    
        int rowCount = 0;
        int totalSplit = 0;
        try (BufferedReader br = getReader(csvFile.getValue())) {
          /** assume first row is header */
          int checkRowCount = (int) br.lines().count() - 1;
          rowCount = (checkRowCount < 1) ? 1 : checkRowCount;
          totalSplit = rowCount / batchSize.get().intValue();
          int remaining = rowCount % batchSize.get().intValue();
          /**
           * Adjusting the total number of split based on remaining rows. For example: batch size of
           * 15 for 100 rows will have total 7 splits. As it's a range last split will have offset
           * range {7,8}
           */
          if (remaining > 0) {
            totalSplit = totalSplit + 2;
    
          } else {
            totalSplit = totalSplit + 1;
          }
        }
    
        LOG.debug("Initial Restriction range from 1 to: {}", totalSplit);
        return new OffsetRange(1, totalSplit);
      }
    
      /**
       * SDF needs to define a @SplitRestriction method that can split the intital restricton to a
       * number of smaller restrictions. For example: a intital rewstriction of (x, N) as input and
       * produces pairs (x, 0), (x, 1), …, (x, N-1) as output.
       */
      @SplitRestriction
      public void splitRestriction(
          @Element KV<String, ReadableFile> csvFile,
          @Restriction OffsetRange range,
          OutputReceiver<OffsetRange> out) {
        /** split the initial restriction by 1 */
        for (final OffsetRange p : range.split(1, 1)) {
          out.output(p);
        }
      }
    
      @NewTracker
      public OffsetRangeTracker newTracker(@Restriction OffsetRange range) {
        return new OffsetRangeTracker(new OffsetRange(range.getFrom(), range.getTo()));
      }
  • Le pipeline utilise un connecteur de transformation d'E/S BigQuery intégré pour écrire dans BigQuery. Ce connecteur crée automatiquement une table et un schéma BigQuery à l'aide de la fonctionnalité de destination dynamique. Pour obtenir une faible latence, le pipeline utilise également des insertions en flux continu BigQuery.

    /**
     * The {@link BQDestination} class creates BigQuery table destination and table schema based on
     * the CSV file processed in earlier transformations. Table id is same as filename Table schema is
     * same as file header columns.
     */
    public static class BQDestination
        extends DynamicDestinations<KV<String, TableRow>, KV<String, TableRow>> {
    
      private ValueProvider<String> datasetName;
      private ValueProvider<String> projectId;
    
      public BQDestination(ValueProvider<String> datasetName, ValueProvider<String> projectId) {
        this.datasetName = datasetName;
        this.projectId = projectId;
      }
    
      @Override
      public KV<String, TableRow> getDestination(ValueInSingleWindow<KV<String, TableRow>> element) {
        String key = element.getValue().getKey();
        String tableName = String.format("%s:%s.%s", projectId.get(), datasetName.get(), key);
        // Strip the file name to only the letters and numbers so that it is a valid BQ table id.
        tableName = tableName.replaceAll("[^a-zA-Z0-9]", "");
        LOG.debug("Table Name {}", tableName);
        return KV.of(tableName, element.getValue().getValue());
      }
    
      @Override
      public TableDestination getTable(KV<String, TableRow> destination) {
        TableDestination dest =
            new TableDestination(destination.getKey(), "pii-tokenized output data from dataflow");
        LOG.debug("Table Destination {}", dest.getTableSpec());
        return dest;
      }
    
      @Override
      public TableSchema getSchema(KV<String, TableRow> destination) {
        TableRow bqRow = destination.getValue();
        TableSchema schema = new TableSchema();
        List<TableFieldSchema> fields = new ArrayList<TableFieldSchema>();
        List<TableCell> cells = bqRow.getF();
        for (int i = 0; i < cells.size(); i++) {
          Map<String, Object> object = cells.get(i);
          String header = object.keySet().iterator().next();
          /** currently all BQ data types are set to String */
          fields.add(new TableFieldSchema().setName(checkHeaderName(header)).setType("STRING"));
        }
    
        schema.setFields(fields);
        return schema;
      }
    }

Vous avez terminé ce tutoriel. Vous avez déclenché un pipeline automatisé de suppression de l'identification à l'aide de modèles Cloud DLP pour traiter un ensemble de données volumineux. Dans le tutoriel suivant, vous allez valider les données anonymisées dans BigQuery, puis restaurer l'identification des informations personnelles à l'aide d'un autre pipeline Dataflow.

Effectuer un nettoyage

Si vous n'avez pas l'intention de continuer à suivre les tutoriels de cette série, le moyen le plus simple d'éviter la facturation consiste à supprimer le projet Cloud que vous avez créé pour ce tutoriel. Vous pouvez également supprimer les différentes ressources.

Supprimer le projet

  1. Dans Cloud Console, accédez à la page Gérer les ressources.

    Accéder à la page Gérer les ressources

  2. Dans la liste des projets, sélectionnez le projet que vous souhaitez supprimer, puis cliquez sur Supprimer.
  3. Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.

Étape suivante