Étendre la confidentialité différentielle

Ce document fournit des exemples d'extension de la confidentialité différentielle pour la confidentialité différentielle de BigQuery.

BigQuery vous permet d'étendre la confidentialité différentielle aux sources de données multicloud et aux bibliothèques de confidentialité différentielle externes. Ce document explique comment appliquer la confidentialité différentielle pour des sources de données multicloud telles qu'AWS S3 avec BigQuery Omni, appeler une bibliothèque de confidentialité différentielle externe à l'aide d'une fonction distante et effectuer des agrégations de confidentialité différentielle avec PipelineDP, une bibliothèque Python pouvant s'exécuter avec Apache Spark et Apache Beam.

Pour en savoir plus sur la confidentialité différentielle, consultez la section Utiliser la confidentialité différentielle.

Confidentialité différentielle avec BigQuery Omni

La confidentialité différentielle de BigQuery accepte les appels à des sources de données multicloud telles qu'AWS S3. L'exemple suivant interroge une source de données externe, foo.wikidata, et applique la confidentialité différentielle. Pour en savoir plus sur la syntaxe de la clause de confidentialité différentielle, consultez la section Clause de confidentialité différentielle.

SELECT
  WITH
    DIFFERENTIAL_PRIVACY
      OPTIONS (
        epsilon = 1,
        delta = 1e-5,
        privacy_unit_column = foo.wikidata.es_description)
      COUNT(*) AS results
FROM foo.wikidata;

Cet exemple renvoie des résultats semblables à ceux-ci :

-- These results will change each time you run the query.
+----------+
| results  |
+----------+
| 3465     |
+----------+

Pour en savoir plus sur les limites de BigQuery Omni, consultez la section Limites.

Appeler des bibliothèques de confidentialité différentielle externes avec des fonctions distantes

Vous pouvez appeler des bibliothèques de confidentialité différentielle externes à l'aide de fonctions distantes. Le lien suivant utilise une fonction distante pour appeler une bibliothèque externe hébergée par Tumult Analytics afin d'utiliser une confidentialité différentielle concentrée sur zéro pour un ensemble de données de vente au détail.

Pour plus d'informations sur l'utilisation de Tumult Analytics, consultez l'article de lancement de Tumult Analytics {: .external}.

Agrégations de confidentialité différentielle avec PipelineDP

PipelineDP est une bibliothèque Python qui effectue des agrégations de confidentialité différentielle et peut s'exécuter avec Apache Spark et Apache Beam. BigQuery peut exécuter des procédures stockées Apache Spark écrites en Python. Pour en savoir plus sur l'exécution de procédures stockées Apache Spark, consultez la section Travailler avec des procédures stockées pour Apache Spark.

L'exemple suivant effectue une agrégation de confidentialité différentielle à l'aide de la bibliothèque PipelineDP. Il utilise l'ensemble de données public sur les trajets en taxi à Chicago et calcule pour chaque taxi le nombre de trajets, ainsi que la somme et la moyenne des conseils pour ces trajets.

Avant de commencer

Une image Apache Spark standard n'inclut pas PipelineDP. Vous devez créer une image Docker contenant toutes les dépendances nécessaires avant d'exécuter une procédure stockée PipelineDP. Cette section explique comment créer et transférer une image Docker vers Google Cloud.

Avant de commencer, assurez-vous d'avoir installé Docker sur votre ordinateur local et configuré l'authentification pour transférer des images Docker vers gcr.io. Pour en savoir plus sur le transfert d'images Docker, consultez la section Transférer et extraire des images.

Créer et transférer une image Docker

Pour créer et transférer une image Docker avec les dépendances requises, procédez comme suit :

  1. Créez un dossier local DIR.
  2. Téléchargez le programme d'installation de Miniconda, avec la version 3.9 de Python, sur DIR.
  3. Enregistrez le texte suivant dans le fichier Dockerfile.

    
      # Debian 11 is recommended.
      FROM debian:11-slim
    
      # Suppress interactive prompts
      ENV DEBIAN_FRONTEND=noninteractive
    
      # (Required) Install utilities required by Spark scripts.
      RUN apt update && apt install -y procps tini libjemalloc2
    
      # Enable jemalloc2 as default memory allocator
      ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2
    
      # Install and configure Miniconda3.
      ENV CONDA_HOME=/opt/miniconda3
      ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
      ENV PATH=${CONDA_HOME}/bin:${PATH}
      COPY Miniconda3-py39_23.1.0-1-Linux-x86_64.sh .
      RUN bash Miniconda3-py39_23.1.0-1-Linux-x86_64.sh -b -p /opt/miniconda3 \
      && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
      && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
      && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
      && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
    
      # The following packages are installed in the default image, it is
      # strongly recommended to include all of them.
      RUN apt install -y python3
      RUN apt install -y python3-pip
      RUN apt install -y libopenblas-dev
      RUN pip install \
        cython \
        fastavro \
        fastparquet \
        gcsfs \
        google-cloud-bigquery-storage \
        google-cloud-bigquery[pandas] \
        google-cloud-bigtable \
        google-cloud-container \
        google-cloud-datacatalog \
        google-cloud-dataproc \
        google-cloud-datastore \
        google-cloud-language \
        google-cloud-logging \
        google-cloud-monitoring \
        google-cloud-pubsub \
        google-cloud-redis \
        google-cloud-spanner \
        google-cloud-speech \
        google-cloud-storage \
        google-cloud-texttospeech \
        google-cloud-translate \
        google-cloud-vision \
        koalas \
        matplotlib \
        nltk \
        numba \
        numpy \
        orc \
        pandas \
        pyarrow \
        pysal \
        regex \
        requests \
        rtree \
        scikit-image \
        scikit-learn \
        scipy \
        seaborn \
        sqlalchemy \
        sympy \
        tables \
        virtualenv
      RUN pip install --no-input pipeline-dp==0.2.0
    
      # (Required) Create the 'spark' group/user.
      # The GID and UID must be 1099. Home directory is required.
      RUN groupadd -g 1099 spark
      RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
      USER spark
    
  4. Exécutez la commande suivante :

    
    IMAGE=gcr.io/PROJECT_ID/DOCKER_IMAGE:0.0.1
    # Build and push the image.
    docker build -t "${IMAGE}"
    docker push "${IMAGE}"
    

    Remplacez les éléments suivants :

    • PROJECT_ID : projet dans lequel vous souhaitez créer l'image Docker
    • DOCKER_IMAGE : nom de l'image Docker

    L'image est importée.

Exécuter une procédure stockée PipelineDP

  1. Pour créer une procédure stockée, utilisez l'instruction CREATE PROCEDURE.

    CREATE OR REPLACE
    PROCEDURE
      `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()
      WITH CONNECTION `PROJECT_ID.REGION.CONNECTION_ID`
    OPTIONS (
      engine = "SPARK",
      container_image= "gcr.io/PROJECT_ID/DOCKER_IMAGE")
    LANGUAGE PYTHON AS R"""
    from pyspark.sql import SparkSession
    import pipeline_dp
    
    def compute_dp_metrics(data, spark_context):
    budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=10,
                                                          total_delta=1e-6)
    backend = pipeline_dp.SparkRDDBackend(spark_context)
    
    # Create a DPEngine instance.
    dp_engine = pipeline_dp.DPEngine(budget_accountant, backend)
    
    params = pipeline_dp.AggregateParams(
        noise_kind=pipeline_dp.NoiseKind.LAPLACE,
        metrics=[
            pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM,
            pipeline_dp.Metrics.MEAN],
        max_partitions_contributed=1,
        max_contributions_per_partition=1,
        min_value=0,
        # Tips that are larger than 100 will be clipped to 100.
        max_value=100)
    # Specify how to extract privacy_id, partition_key and value from an
    # element of the taxi dataset.
    data_extractors = pipeline_dp.DataExtractors(
        partition_extractor=lambda x: x.taxi_id,
        privacy_id_extractor=lambda x: x.unique_key,
        value_extractor=lambda x: 0 if x.tips is None else x.tips)
    
    # Run aggregation.
    dp_result = dp_engine.aggregate(data, params, data_extractors)
    budget_accountant.compute_budgets()
    dp_result = backend.map_tuple(dp_result, lambda pk, result: (pk, result.count, result.sum, result.mean))
    return dp_result
    
    spark = SparkSession.builder.appName("spark-pipeline-dp-demo").getOrCreate()
    spark_context = spark.sparkContext
    
    # Load data from BigQuery.
    taxi_trips = spark.read.format("bigquery") \
    .option("table", "bigquery-public-data:chicago_taxi_trips.taxi_trips") \
    .load().rdd
    dp_result = compute_dp_metrics(taxi_trips, spark_context).toDF(["pk", "count","sum", "mean"])
    # Saving the data to BigQuery
    dp_result.write.format("bigquery") \
    .option("writeMethod", "direct") \
    .save("DATASET_ID.TABLE_NAME")
    """;

    Remplacez les éléments suivants :

    • PROJECT_ID : projet dans lequel vous souhaitez créer la procédure stockée
    • DATASET_ID : ensemble de données dans lequel vous souhaitez créer la procédure stockée
    • REGION : région dans laquelle se trouve votre projet
    • DOCKER_IMAGE : nom de l'image Docker
    • CONNECTION_ID : nom de la connexion
    • TABLE_NAME : nom de la table
  2. Utilisez l'instruction CALL pour appeler la procédure.

    CALL `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()

    Remplacez les éléments suivants :

    • PROJECT_ID : projet dans lequel vous souhaitez créer la procédure stockée
    • DATASET_ID : ensemble de données dans lequel vous souhaitez créer la procédure stockée

Étapes suivantes