Extiende la privacidad diferencial
En este documento, se proporcionan ejemplos de cómo extender la privacidad diferencial para la privacidad diferencial de BigQuery.
BigQuery te permite extender la privacidad diferencial a las fuentes de datos en múltiples nubes y a las bibliotecas de privacidad diferencial externas. En este documento, se proporcionan ejemplos de cómo aplicar la privacidad diferencial para fuentes de datos de múltiples nubes, como AWS S3 con BigQuery Omni, cómo llamar a una biblioteca de privacidad diferencial externa con una función remota y cómo realizar una agregación de privacidad diferencial con PipelineDP, una biblioteca de Python que puede ejecutarse con Apache Spark y Apache Beam.
Para obtener más información sobre la privacidad diferencial, consulta Usa la privacidad diferencial.
Privacidad diferencial con BigQuery Omni
La privacidad diferencial de BigQuery admite llamadas a fuentes de datos de múltiples nubes, como AWS S3. En el siguiente ejemplo, se consulta una fuente de datos externa, foo.wikidata
, y se aplica la privacidad diferencial. Para obtener más información sobre la sintaxis de la cláusula de privacidad diferencial, consulta Cláusula de privacidad diferencial.
SELECT WITH DIFFERENTIAL_PRIVACY OPTIONS ( epsilon = 1, delta = 1e-5, privacy_unit_column = foo.wikidata.es_description) COUNT(*) AS results FROM foo.wikidata;
Este ejemplo muestra resultados similares al siguiente:
-- These results will change each time you run the query. +----------+ | results | +----------+ | 3465 | +----------+
Para obtener más información sobre las limitaciones de BigQuery Omni, consulta Limitaciones.
Llama a bibliotecas de privacidad diferencial externas con funciones remotas
Puedes llamar a las bibliotecas de privacidad diferencial externas con funciones remotas. En el siguiente vínculo, se usa una función remota para llamar a una biblioteca externa alojada en Tumult Analytics para usar la privacidad diferencial sin concentración en un conjunto de datos de ventas minoristas.
Para obtener información sobre cómo trabajar con Tumult Analytics, consulta la publicación de lanzamiento de Tumult Analytics {: .external}.
Agregaciones de privacidad diferenciales con PipelineDP
PipelineDP es una biblioteca de Python que realiza agregaciones de privacidad diferencial y puede ejecutarse con Apache Spark y Apache Beam. BigQuery puede ejecutar los procedimientos almacenados de Apache Spark escritos en Python. Para obtener más información sobre cómo ejecutar procedimientos almacenados de Apache Spark, consulta Trabaja con procedimientos almacenados para Apache Spark.
En el siguiente ejemplo, se realiza una agregación de privacidad diferencial con la biblioteca de PipelineDP. Usa el conjunto de datos públicos de viajes en taxi de Chicago y calcula cada vehículo de taxi, la cantidad de viajes, y la suma y media de sugerencias para estos viajes.
Antes de comenzar
Una imagen de Apache Spark estándar no incluye PipelineDP. Debes crear una imagen de Docker que contenga todas las dependencias necesarias antes de ejecutar un procedimiento almacenado de PipelineDP. En esta sección, se describe cómo crear y enviar una imagen de Docker a Google Cloud.
Antes de comenzar, asegúrate de haber instalado Docker en tu máquina local y configurado la autenticación para enviar imágenes de Docker a gcr.io. Para obtener más información sobre cómo enviar imágenes de Docker, consulta Envía y extrae imágenes.
Crea y envía una imagen de Docker
Para crear y enviar una imagen de Docker con dependencias requeridas, sigue estos pasos:
- Crea una carpeta local
DIR
. - Descarga el
instalador de Miniconda,
con la versión 3.9 de Python, en
DIR
. Guarda el siguiente texto en Dockerfile.
# Debian 11 is recommended. FROM debian:11-slim # Suppress interactive prompts ENV DEBIAN_FRONTEND=noninteractive # (Required) Install utilities required by Spark scripts. RUN apt update && apt install -y procps tini libjemalloc2 # Enable jemalloc2 as default memory allocator ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2 # Install and configure Miniconda3. ENV CONDA_HOME=/opt/miniconda3 ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python ENV PATH=${CONDA_HOME}/bin:${PATH} COPY Miniconda3-py39_23.1.0-1-Linux-x86_64.sh . RUN bash Miniconda3-py39_23.1.0-1-Linux-x86_64.sh -b -p /opt/miniconda3 \ && ${CONDA_HOME}/bin/conda config --system --set always_yes True \ && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \ && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \ && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict # The following packages are installed in the default image, it is # strongly recommended to include all of them. RUN apt install -y python3 RUN apt install -y python3-pip RUN apt install -y libopenblas-dev RUN pip install \ cython \ fastavro \ fastparquet \ gcsfs \ google-cloud-bigquery-storage \ google-cloud-bigquery[pandas] \ google-cloud-bigtable \ google-cloud-container \ google-cloud-datacatalog \ google-cloud-dataproc \ google-cloud-datastore \ google-cloud-language \ google-cloud-logging \ google-cloud-monitoring \ google-cloud-pubsub \ google-cloud-redis \ google-cloud-spanner \ google-cloud-speech \ google-cloud-storage \ google-cloud-texttospeech \ google-cloud-translate \ google-cloud-vision \ koalas \ matplotlib \ nltk \ numba \ numpy \ orc \ pandas \ pyarrow \ pysal \ regex \ requests \ rtree \ scikit-image \ scikit-learn \ scipy \ seaborn \ sqlalchemy \ sympy \ tables \ virtualenv RUN pip install --no-input pipeline-dp==0.2.0 # (Required) Create the 'spark' group/user. # The GID and UID must be 1099. Home directory is required. RUN groupadd -g 1099 spark RUN useradd -u 1099 -g 1099 -d /home/spark -m spark USER spark
Ejecuta el siguiente comando.
IMAGE=gcr.io/PROJECT_ID/DOCKER_IMAGE:0.0.1 # Build and push the image. docker build -t "${IMAGE}" docker push "${IMAGE}"
Reemplaza lo siguiente:
PROJECT_ID
: el proyecto en el que deseas crear la imagen de Docker.DOCKER_IMAGE
: Es el nombre de la imagen de Docker.
Se subirá la imagen.
Ejecuta un procedimiento almacenado de PipelineDP
Para crear un procedimiento almacenado, usa la instrucción CREATE PROCEDURE.
CREATE OR REPLACE PROCEDURE `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`() WITH CONNECTION `PROJECT_ID.REGION.CONNECTION_ID` OPTIONS ( engine = "SPARK", container_image= "gcr.io/PROJECT_ID/DOCKER_IMAGE") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession import pipeline_dp def compute_dp_metrics(data, spark_context): budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=10, total_delta=1e-6) backend = pipeline_dp.SparkRDDBackend(spark_context) # Create a DPEngine instance. dp_engine = pipeline_dp.DPEngine(budget_accountant, backend) params = pipeline_dp.AggregateParams( noise_kind=pipeline_dp.NoiseKind.LAPLACE, metrics=[ pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM, pipeline_dp.Metrics.MEAN], max_partitions_contributed=1, max_contributions_per_partition=1, min_value=0, # Tips that are larger than 100 will be clipped to 100. max_value=100) # Specify how to extract privacy_id, partition_key and value from an # element of the taxi dataset. data_extractors = pipeline_dp.DataExtractors( partition_extractor=lambda x: x.taxi_id, privacy_id_extractor=lambda x: x.unique_key, value_extractor=lambda x: 0 if x.tips is None else x.tips) # Run aggregation. dp_result = dp_engine.aggregate(data, params, data_extractors) budget_accountant.compute_budgets() dp_result = backend.map_tuple(dp_result, lambda pk, result: (pk, result.count, result.sum, result.mean)) return dp_result spark = SparkSession.builder.appName("spark-pipeline-dp-demo").getOrCreate() spark_context = spark.sparkContext # Load data from BigQuery. taxi_trips = spark.read.format("bigquery") \ .option("table", "bigquery-public-data:chicago_taxi_trips.taxi_trips") \ .load().rdd dp_result = compute_dp_metrics(taxi_trips, spark_context).toDF(["pk", "count","sum", "mean"]) # Saving the data to BigQuery dp_result.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("DATASET_ID.TABLE_NAME") """;
Reemplaza lo siguiente:
PROJECT_ID
: el proyecto en el que deseas crear el procedimiento almacenado.DATASET_ID
: el conjunto de datos en el que deseas crear el procedimiento almacenado.REGION
: Es la región en la que se encuentra tu proyecto.DOCKER_IMAGE
: Es el nombre de la imagen de Docker.CONNECTION_ID
: Es el nombre de la conexión.TABLE_NAME
: Es el nombre de la tabla.
Usa la declaración CALL para llamar al procedimiento.
CALL `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()
Reemplaza lo siguiente:
PROJECT_ID
: el proyecto en el que deseas crear el procedimiento almacenado.DATASET_ID
: el conjunto de datos en el que deseas crear el procedimiento almacenado.
¿Qué sigue?
- Obtén más información sobre cómo usar la privacidad diferencial.
- Obtén más información sobre la cláusula de privacidad diferencial.
- Obtén más información sobre cómo usar funciones de agregación diferenciada privada.