Étendre la confidentialité différentielle
Ce document fournit des exemples d'extension de la confidentialité différentielle pour la confidentialité différentielle de BigQuery.
BigQuery vous permet d'étendre la confidentialité différentielle aux sources de données multicloud et aux bibliothèques de confidentialité différentielle externes. Ce document explique comment appliquer la confidentialité différentielle pour des sources de données multicloud telles qu'AWS S3 avec BigQuery Omni, appeler une bibliothèque de confidentialité différentielle externe à l'aide d'une fonction distante et effectuer des agrégations de confidentialité différentielle avec PipelineDP, une bibliothèque Python pouvant s'exécuter avec Apache Spark et Apache Beam.
Pour en savoir plus sur la confidentialité différentielle, consultez la section Utiliser la confidentialité différentielle.
Confidentialité différentielle avec BigQuery Omni
La confidentialité différentielle de BigQuery accepte les appels à des sources de données multicloud telles qu'AWS S3. L'exemple suivant interroge une source de données externe, foo.wikidata
, et applique la confidentialité différentielle. Pour en savoir plus sur la syntaxe de la clause de confidentialité différentielle, consultez la section Clause de confidentialité différentielle.
SELECT WITH DIFFERENTIAL_PRIVACY OPTIONS ( epsilon = 1, delta = 1e-5, privacy_unit_column = foo.wikidata.es_description) COUNT(*) AS results FROM foo.wikidata;
Cet exemple renvoie des résultats semblables à ceux-ci :
-- These results will change each time you run the query. +----------+ | results | +----------+ | 3465 | +----------+
Pour en savoir plus sur les limites de BigQuery Omni, consultez la section Limites.
Appeler des bibliothèques de confidentialité différentielle externes avec des fonctions distantes
Vous pouvez appeler des bibliothèques de confidentialité différentielle externes à l'aide de fonctions distantes. Le lien suivant utilise une fonction distante pour appeler une bibliothèque externe hébergée par Tumult Analytics afin d'utiliser une confidentialité différentielle concentrée sur zéro pour un ensemble de données de vente au détail.
Pour plus d'informations sur l'utilisation de Tumult Analytics, consultez l'article de lancement de Tumult Analytics {: .external}.
Agrégations de confidentialité différentielle avec PipelineDP
PipelineDP est une bibliothèque Python qui effectue des agrégations de confidentialité différentielle et peut s'exécuter avec Apache Spark et Apache Beam. BigQuery peut exécuter des procédures stockées Apache Spark écrites en Python. Pour en savoir plus sur l'exécution de procédures stockées Apache Spark, consultez la section Travailler avec des procédures stockées pour Apache Spark.
L'exemple suivant effectue une agrégation de confidentialité différentielle à l'aide de la bibliothèque PipelineDP. Il utilise l'ensemble de données public sur les trajets en taxi à Chicago et calcule pour chaque taxi le nombre de trajets, ainsi que la somme et la moyenne des conseils pour ces trajets.
Avant de commencer
Une image Apache Spark standard n'inclut pas PipelineDP. Vous devez créer une image Docker contenant toutes les dépendances nécessaires avant d'exécuter une procédure stockée PipelineDP. Cette section explique comment créer et transférer une image Docker vers Google Cloud.
Avant de commencer, assurez-vous d'avoir installé Docker sur votre ordinateur local et configuré l'authentification pour transférer des images Docker vers gcr.io. Pour en savoir plus sur le transfert d'images Docker, consultez la section Transférer et extraire des images.
Créer et transférer une image Docker
Pour créer et transférer une image Docker avec les dépendances requises, procédez comme suit :
- Créez un dossier local
DIR
. - Téléchargez le programme d'installation de Miniconda, avec la version 3.9 de Python, sur
DIR
. Enregistrez le texte suivant dans le fichier Dockerfile.
# Debian 11 is recommended. FROM debian:11-slim # Suppress interactive prompts ENV DEBIAN_FRONTEND=noninteractive # (Required) Install utilities required by Spark scripts. RUN apt update && apt install -y procps tini libjemalloc2 # Enable jemalloc2 as default memory allocator ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2 # Install and configure Miniconda3. ENV CONDA_HOME=/opt/miniconda3 ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python ENV PATH=${CONDA_HOME}/bin:${PATH} COPY Miniconda3-py39_23.1.0-1-Linux-x86_64.sh . RUN bash Miniconda3-py39_23.1.0-1-Linux-x86_64.sh -b -p /opt/miniconda3 \ && ${CONDA_HOME}/bin/conda config --system --set always_yes True \ && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \ && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \ && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict # The following packages are installed in the default image, it is # strongly recommended to include all of them. RUN apt install -y python3 RUN apt install -y python3-pip RUN apt install -y libopenblas-dev RUN pip install \ cython \ fastavro \ fastparquet \ gcsfs \ google-cloud-bigquery-storage \ google-cloud-bigquery[pandas] \ google-cloud-bigtable \ google-cloud-container \ google-cloud-datacatalog \ google-cloud-dataproc \ google-cloud-datastore \ google-cloud-language \ google-cloud-logging \ google-cloud-monitoring \ google-cloud-pubsub \ google-cloud-redis \ google-cloud-spanner \ google-cloud-speech \ google-cloud-storage \ google-cloud-texttospeech \ google-cloud-translate \ google-cloud-vision \ koalas \ matplotlib \ nltk \ numba \ numpy \ orc \ pandas \ pyarrow \ pysal \ regex \ requests \ rtree \ scikit-image \ scikit-learn \ scipy \ seaborn \ sqlalchemy \ sympy \ tables \ virtualenv RUN pip install --no-input pipeline-dp==0.2.0 # (Required) Create the 'spark' group/user. # The GID and UID must be 1099. Home directory is required. RUN groupadd -g 1099 spark RUN useradd -u 1099 -g 1099 -d /home/spark -m spark USER spark
Exécutez la commande suivante :
IMAGE=gcr.io/PROJECT_ID/DOCKER_IMAGE:0.0.1 # Build and push the image. docker build -t "${IMAGE}" docker push "${IMAGE}"
Remplacez les éléments suivants :
PROJECT_ID
: projet dans lequel vous souhaitez créer l'image DockerDOCKER_IMAGE
: nom de l'image Docker
L'image est importée.
Exécuter une procédure stockée PipelineDP
Pour créer une procédure stockée, utilisez l'instruction CREATE PROCEDURE.
CREATE OR REPLACE PROCEDURE `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`() WITH CONNECTION `PROJECT_ID.REGION.CONNECTION_ID` OPTIONS ( engine = "SPARK", container_image= "gcr.io/PROJECT_ID/DOCKER_IMAGE") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession import pipeline_dp def compute_dp_metrics(data, spark_context): budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=10, total_delta=1e-6) backend = pipeline_dp.SparkRDDBackend(spark_context) # Create a DPEngine instance. dp_engine = pipeline_dp.DPEngine(budget_accountant, backend) params = pipeline_dp.AggregateParams( noise_kind=pipeline_dp.NoiseKind.LAPLACE, metrics=[ pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM, pipeline_dp.Metrics.MEAN], max_partitions_contributed=1, max_contributions_per_partition=1, min_value=0, # Tips that are larger than 100 will be clipped to 100. max_value=100) # Specify how to extract privacy_id, partition_key and value from an # element of the taxi dataset. data_extractors = pipeline_dp.DataExtractors( partition_extractor=lambda x: x.taxi_id, privacy_id_extractor=lambda x: x.unique_key, value_extractor=lambda x: 0 if x.tips is None else x.tips) # Run aggregation. dp_result = dp_engine.aggregate(data, params, data_extractors) budget_accountant.compute_budgets() dp_result = backend.map_tuple(dp_result, lambda pk, result: (pk, result.count, result.sum, result.mean)) return dp_result spark = SparkSession.builder.appName("spark-pipeline-dp-demo").getOrCreate() spark_context = spark.sparkContext # Load data from BigQuery. taxi_trips = spark.read.format("bigquery") \ .option("table", "bigquery-public-data:chicago_taxi_trips.taxi_trips") \ .load().rdd dp_result = compute_dp_metrics(taxi_trips, spark_context).toDF(["pk", "count","sum", "mean"]) # Saving the data to BigQuery dp_result.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("DATASET_ID.TABLE_NAME") """;
Remplacez les éléments suivants :
PROJECT_ID
: projet dans lequel vous souhaitez créer la procédure stockéeDATASET_ID
: ensemble de données dans lequel vous souhaitez créer la procédure stockéeREGION
: région dans laquelle se trouve votre projetDOCKER_IMAGE
: nom de l'image DockerCONNECTION_ID
: nom de la connexionTABLE_NAME
: nom de la table
Utilisez l'instruction CALL pour appeler la procédure.
CALL `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()
Remplacez les éléments suivants :
PROJECT_ID
: projet dans lequel vous souhaitez créer la procédure stockéeDATASET_ID
: ensemble de données dans lequel vous souhaitez créer la procédure stockée
Étapes suivantes
- Découvrez comment utiliser la confidentialité différentielle.
- En savoir plus sur la clause de confidentialité différentielle
- Découvrez comment utiliser des fonctions d'agrégation privées de manière différentielle.