Ampliar la privacidad diferencial
En este documento se proporcionan ejemplos de cómo ampliar la privacidad diferencial de BigQuery.
BigQuery te permite ampliar la privacidad diferencial a fuentes de datos multinube y bibliotecas de privacidad diferencial externas. En este documento se proporcionan ejemplos de cómo aplicar la privacidad diferencial a fuentes de datos multicloud, como AWS S3 con BigQuery Omni, cómo llamar a una biblioteca de privacidad diferencial externa mediante una función remota y cómo realizar agregaciones de privacidad diferencial con PipelineDP, una biblioteca de Python que se puede ejecutar con Apache Spark y Apache Beam.
Para obtener más información sobre la privacidad diferencial, consulta Usar la privacidad diferencial.
Privacidad diferencial con BigQuery Omni
La privacidad diferencial de BigQuery admite llamadas a fuentes de datos multinube, como AWS S3. En el siguiente ejemplo se consulta una fuente de datos externa, foo.wikidata
, y se aplica la privacidad diferencial. Para obtener más información sobre la sintaxis de la cláusula de privacidad diferencial, consulta Cláusula de privacidad diferencial.
SELECT WITH DIFFERENTIAL_PRIVACY OPTIONS ( epsilon = 1, delta = 1e-5, privacy_unit_column = foo.wikidata.es_description) COUNT(*) AS results FROM foo.wikidata;
Este ejemplo devuelve resultados similares a los siguientes:
-- These results will change each time you run the query. +----------+ | results | +----------+ | 3465 | +----------+
Para obtener más información sobre las limitaciones de BigQuery Omni, consulta Limitaciones.
Llamar a bibliotecas externas de privacidad diferencial con funciones remotas
Puedes llamar a bibliotecas externas de privacidad diferencial mediante funciones remotas. El siguiente enlace usa una función remota para llamar a una biblioteca externa alojada por Tumult Analytics y usar la privacidad diferencial concentrada en cero en un conjunto de datos de ventas minoristas.
Para obtener información sobre cómo trabajar con Tumult Analytics, consulta la publicación de lanzamiento de Tumult Analytics {: .external}.
Agregaciones de privacidad diferencial con PipelineDP
PipelineDP es una biblioteca de Python que realiza agregaciones de privacidad diferencial y se puede ejecutar con Apache Spark y Apache Beam. BigQuery puede ejecutar procedimientos almacenados de Apache Spark escritos en Python. Para obtener más información sobre cómo ejecutar procedimientos almacenados de Apache Spark, consulta Trabajar con procedimientos almacenados de Apache Spark.
En el siguiente ejemplo se realiza una agregación de privacidad diferencial con la biblioteca PipelineDP. Usa el conjunto de datos público de viajes en taxi de Chicago y calcula, para cada taxi, el número de viajes, así como la suma y la media de las propinas de esos viajes.
Antes de empezar
Una imagen estándar de Apache Spark no incluye PipelineDP. Debes crear una imagen de Docker que contenga todas las dependencias necesarias antes de ejecutar un procedimiento almacenado de PipelineDP. En esta sección se describe cómo crear y enviar una imagen Docker a Google Cloud.
Antes de empezar, asegúrate de que has instalado Docker en tu máquina local y de que has configurado la autenticación para enviar imágenes de Docker a gcr.io. Para obtener más información sobre el envío de imágenes de Docker, consulta Enviar y extraer imágenes.
Crear y enviar una imagen de Docker
Para crear e insertar una imagen de Docker con las dependencias necesarias, sigue estos pasos:
- Crea una carpeta local
DIR
. - Descarga el instalador de Miniconda, con la versión de Python 3.9, en
DIR
. Guarda el siguiente texto en el archivo Dockerfile.
# Debian 11 is recommended. FROM debian:11-slim # Suppress interactive prompts ENV DEBIAN_FRONTEND=noninteractive # (Required) Install utilities required by Spark scripts. RUN apt update && apt install -y procps tini libjemalloc2 # Enable jemalloc2 as default memory allocator ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2 # Install and configure Miniconda3. ENV CONDA_HOME=/opt/miniconda3 ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python ENV PATH=${CONDA_HOME}/bin:${PATH} COPY Miniconda3-py39_23.1.0-1-Linux-x86_64.sh . RUN bash Miniconda3-py39_23.1.0-1-Linux-x86_64.sh -b -p /opt/miniconda3 \ && ${CONDA_HOME}/bin/conda config --system --set always_yes True \ && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \ && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \ && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict # The following packages are installed in the default image, it is # strongly recommended to include all of them. RUN apt install -y python3 RUN apt install -y python3-pip RUN apt install -y libopenblas-dev RUN pip install \ cython \ fastavro \ fastparquet \ gcsfs \ google-cloud-bigquery-storage \ google-cloud-bigquery[pandas] \ google-cloud-bigtable \ google-cloud-container \ google-cloud-datacatalog \ google-cloud-dataproc \ google-cloud-datastore \ google-cloud-language \ google-cloud-logging \ google-cloud-monitoring \ google-cloud-pubsub \ google-cloud-redis \ google-cloud-spanner \ google-cloud-speech \ google-cloud-storage \ google-cloud-texttospeech \ google-cloud-translate \ google-cloud-vision \ koalas \ matplotlib \ nltk \ numba \ numpy \ orc \ pandas \ pyarrow \ pysal \ regex \ requests \ rtree \ scikit-image \ scikit-learn \ scipy \ seaborn \ sqlalchemy \ sympy \ tables \ virtualenv RUN pip install --no-input pipeline-dp==0.2.0 # (Required) Create the 'spark' group/user. # The GID and UID must be 1099. Home directory is required. RUN groupadd -g 1099 spark RUN useradd -u 1099 -g 1099 -d /home/spark -m spark USER spark
Ejecuta el siguiente comando:
IMAGE=gcr.io/PROJECT_ID/DOCKER_IMAGE:0.0.1 # Build and push the image. docker build -t "${IMAGE}" docker push "${IMAGE}"
Haz los cambios siguientes:
PROJECT_ID
: el proyecto en el que quieres crear la imagen Docker.DOCKER_IMAGE
: nombre de la imagen de Docker.
La imagen se ha subido.
Ejecutar un procedimiento almacenado de PipelineDP
Para crear un procedimiento almacenado, usa la instrucción CREATE PROCEDURE.
CREATE OR REPLACE PROCEDURE `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`() WITH CONNECTION `PROJECT_ID.REGION.CONNECTION_ID` OPTIONS ( engine = "SPARK", container_image= "gcr.io/PROJECT_ID/DOCKER_IMAGE") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession import pipeline_dp def compute_dp_metrics(data, spark_context): budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=10, total_delta=1e-6) backend = pipeline_dp.SparkRDDBackend(spark_context) # Create a DPEngine instance. dp_engine = pipeline_dp.DPEngine(budget_accountant, backend) params = pipeline_dp.AggregateParams( noise_kind=pipeline_dp.NoiseKind.LAPLACE, metrics=[ pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM, pipeline_dp.Metrics.MEAN], max_partitions_contributed=1, max_contributions_per_partition=1, min_value=0, # Tips that are larger than 100 will be clipped to 100. max_value=100) # Specify how to extract privacy_id, partition_key and value from an # element of the taxi dataset. data_extractors = pipeline_dp.DataExtractors( partition_extractor=lambda x: x.taxi_id, privacy_id_extractor=lambda x: x.unique_key, value_extractor=lambda x: 0 if x.tips is None else x.tips) # Run aggregation. dp_result = dp_engine.aggregate(data, params, data_extractors) budget_accountant.compute_budgets() dp_result = backend.map_tuple(dp_result, lambda pk, result: (pk, result.count, result.sum, result.mean)) return dp_result spark = SparkSession.builder.appName("spark-pipeline-dp-demo").getOrCreate() spark_context = spark.sparkContext # Load data from BigQuery. taxi_trips = spark.read.format("bigquery") \ .option("table", "bigquery-public-data:chicago_taxi_trips.taxi_trips") \ .load().rdd dp_result = compute_dp_metrics(taxi_trips, spark_context).toDF(["pk", "count","sum", "mean"]) # Saving the data to BigQuery dp_result.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("DATASET_ID.TABLE_NAME") """;
Haz los cambios siguientes:
PROJECT_ID
: el proyecto en el que quieres crear el procedimiento almacenado.DATASET_ID
: el conjunto de datos en el que quieres crear el procedimiento almacenado.REGION
: la región en la que se encuentra tu proyecto.DOCKER_IMAGE
: nombre de la imagen de Docker.CONNECTION_ID
: el nombre de la conexión.TABLE_NAME
: el nombre de la tabla.
Usa la instrucción CALL para llamar al procedimiento.
CALL `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()
Haz los cambios siguientes:
PROJECT_ID
: el proyecto en el que quieres crear el procedimiento almacenado.DATASET_ID
: el conjunto de datos en el que quieres crear el procedimiento almacenado.
Siguientes pasos
- Consulta cómo usar la privacidad diferencial.
- Consulta la cláusula de privacidad diferencial.
- Consulta cómo usar funciones de agregación con privacidad diferencial.