Ampliar la privacidad diferencial

En este documento se proporcionan ejemplos de cómo ampliar la privacidad diferencial de BigQuery.

BigQuery te permite ampliar la privacidad diferencial a fuentes de datos multinube y bibliotecas de privacidad diferencial externas. En este documento se proporcionan ejemplos de cómo aplicar la privacidad diferencial a fuentes de datos multicloud, como AWS S3 con BigQuery Omni, cómo llamar a una biblioteca de privacidad diferencial externa mediante una función remota y cómo realizar agregaciones de privacidad diferencial con PipelineDP, una biblioteca de Python que se puede ejecutar con Apache Spark y Apache Beam.

Para obtener más información sobre la privacidad diferencial, consulta Usar la privacidad diferencial.

Privacidad diferencial con BigQuery Omni

La privacidad diferencial de BigQuery admite llamadas a fuentes de datos multinube, como AWS S3. En el siguiente ejemplo se consulta una fuente de datos externa, foo.wikidata, y se aplica la privacidad diferencial. Para obtener más información sobre la sintaxis de la cláusula de privacidad diferencial, consulta Cláusula de privacidad diferencial.

SELECT
  WITH
    DIFFERENTIAL_PRIVACY
      OPTIONS (
        epsilon = 1,
        delta = 1e-5,
        privacy_unit_column = foo.wikidata.es_description)
      COUNT(*) AS results
FROM foo.wikidata;

Este ejemplo devuelve resultados similares a los siguientes:

-- These results will change each time you run the query.
+----------+
| results  |
+----------+
| 3465     |
+----------+

Para obtener más información sobre las limitaciones de BigQuery Omni, consulta Limitaciones.

Llamar a bibliotecas externas de privacidad diferencial con funciones remotas

Puedes llamar a bibliotecas externas de privacidad diferencial mediante funciones remotas. El siguiente enlace usa una función remota para llamar a una biblioteca externa alojada por Tumult Analytics y usar la privacidad diferencial concentrada en cero en un conjunto de datos de ventas minoristas.

Para obtener información sobre cómo trabajar con Tumult Analytics, consulta la publicación de lanzamiento de Tumult Analytics {: .external}.

Agregaciones de privacidad diferencial con PipelineDP

PipelineDP es una biblioteca de Python que realiza agregaciones de privacidad diferencial y se puede ejecutar con Apache Spark y Apache Beam. BigQuery puede ejecutar procedimientos almacenados de Apache Spark escritos en Python. Para obtener más información sobre cómo ejecutar procedimientos almacenados de Apache Spark, consulta Trabajar con procedimientos almacenados de Apache Spark.

En el siguiente ejemplo se realiza una agregación de privacidad diferencial con la biblioteca PipelineDP. Usa el conjunto de datos público de viajes en taxi de Chicago y calcula, para cada taxi, el número de viajes, así como la suma y la media de las propinas de esos viajes.

Antes de empezar

Una imagen estándar de Apache Spark no incluye PipelineDP. Debes crear una imagen de Docker que contenga todas las dependencias necesarias antes de ejecutar un procedimiento almacenado de PipelineDP. En esta sección se describe cómo crear y enviar una imagen Docker a Google Cloud.

Antes de empezar, asegúrate de que has instalado Docker en tu máquina local y de que has configurado la autenticación para enviar imágenes de Docker a gcr.io. Para obtener más información sobre el envío de imágenes de Docker, consulta Enviar y extraer imágenes.

Crear y enviar una imagen de Docker

Para crear e insertar una imagen de Docker con las dependencias necesarias, sigue estos pasos:

  1. Crea una carpeta local DIR.
  2. Descarga el instalador de Miniconda, con la versión de Python 3.9, en DIR.
  3. Guarda el siguiente texto en el archivo Dockerfile.

    
      # Debian 11 is recommended.
      FROM debian:11-slim
    
      # Suppress interactive prompts
      ENV DEBIAN_FRONTEND=noninteractive
    
      # (Required) Install utilities required by Spark scripts.
      RUN apt update && apt install -y procps tini libjemalloc2
    
      # Enable jemalloc2 as default memory allocator
      ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2
    
      # Install and configure Miniconda3.
      ENV CONDA_HOME=/opt/miniconda3
      ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
      ENV PATH=${CONDA_HOME}/bin:${PATH}
      COPY Miniconda3-py39_23.1.0-1-Linux-x86_64.sh .
      RUN bash Miniconda3-py39_23.1.0-1-Linux-x86_64.sh -b -p /opt/miniconda3 \
      && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
      && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
      && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
      && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
    
      # The following packages are installed in the default image, it is
      # strongly recommended to include all of them.
      RUN apt install -y python3
      RUN apt install -y python3-pip
      RUN apt install -y libopenblas-dev
      RUN pip install \
        cython \
        fastavro \
        fastparquet \
        gcsfs \
        google-cloud-bigquery-storage \
        google-cloud-bigquery[pandas] \
        google-cloud-bigtable \
        google-cloud-container \
        google-cloud-datacatalog \
        google-cloud-dataproc \
        google-cloud-datastore \
        google-cloud-language \
        google-cloud-logging \
        google-cloud-monitoring \
        google-cloud-pubsub \
        google-cloud-redis \
        google-cloud-spanner \
        google-cloud-speech \
        google-cloud-storage \
        google-cloud-texttospeech \
        google-cloud-translate \
        google-cloud-vision \
        koalas \
        matplotlib \
        nltk \
        numba \
        numpy \
        orc \
        pandas \
        pyarrow \
        pysal \
        regex \
        requests \
        rtree \
        scikit-image \
        scikit-learn \
        scipy \
        seaborn \
        sqlalchemy \
        sympy \
        tables \
        virtualenv
      RUN pip install --no-input pipeline-dp==0.2.0
    
      # (Required) Create the 'spark' group/user.
      # The GID and UID must be 1099. Home directory is required.
      RUN groupadd -g 1099 spark
      RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
      USER spark
    
  4. Ejecuta el siguiente comando:

    
    IMAGE=gcr.io/PROJECT_ID/DOCKER_IMAGE:0.0.1
    # Build and push the image.
    docker build -t "${IMAGE}"
    docker push "${IMAGE}"
    

    Haz los cambios siguientes:

    • PROJECT_ID: el proyecto en el que quieres crear la imagen Docker.
    • DOCKER_IMAGE: nombre de la imagen de Docker.

    La imagen se ha subido.

Ejecutar un procedimiento almacenado de PipelineDP

  1. Para crear un procedimiento almacenado, usa la instrucción CREATE PROCEDURE.

    CREATE OR REPLACE
    PROCEDURE
      `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()
      WITH CONNECTION `PROJECT_ID.REGION.CONNECTION_ID`
    OPTIONS (
      engine = "SPARK",
      container_image= "gcr.io/PROJECT_ID/DOCKER_IMAGE")
    LANGUAGE PYTHON AS R"""
    from pyspark.sql import SparkSession
    import pipeline_dp
    
    def compute_dp_metrics(data, spark_context):
    budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=10,
                                                          total_delta=1e-6)
    backend = pipeline_dp.SparkRDDBackend(spark_context)
    
    # Create a DPEngine instance.
    dp_engine = pipeline_dp.DPEngine(budget_accountant, backend)
    
    params = pipeline_dp.AggregateParams(
        noise_kind=pipeline_dp.NoiseKind.LAPLACE,
        metrics=[
            pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM,
            pipeline_dp.Metrics.MEAN],
        max_partitions_contributed=1,
        max_contributions_per_partition=1,
        min_value=0,
        # Tips that are larger than 100 will be clipped to 100.
        max_value=100)
    # Specify how to extract privacy_id, partition_key and value from an
    # element of the taxi dataset.
    data_extractors = pipeline_dp.DataExtractors(
        partition_extractor=lambda x: x.taxi_id,
        privacy_id_extractor=lambda x: x.unique_key,
        value_extractor=lambda x: 0 if x.tips is None else x.tips)
    
    # Run aggregation.
    dp_result = dp_engine.aggregate(data, params, data_extractors)
    budget_accountant.compute_budgets()
    dp_result = backend.map_tuple(dp_result, lambda pk, result: (pk, result.count, result.sum, result.mean))
    return dp_result
    
    spark = SparkSession.builder.appName("spark-pipeline-dp-demo").getOrCreate()
    spark_context = spark.sparkContext
    
    # Load data from BigQuery.
    taxi_trips = spark.read.format("bigquery") \
    .option("table", "bigquery-public-data:chicago_taxi_trips.taxi_trips") \
    .load().rdd
    dp_result = compute_dp_metrics(taxi_trips, spark_context).toDF(["pk", "count","sum", "mean"])
    # Saving the data to BigQuery
    dp_result.write.format("bigquery") \
    .option("writeMethod", "direct") \
    .save("DATASET_ID.TABLE_NAME")
    """;

    Haz los cambios siguientes:

    • PROJECT_ID: el proyecto en el que quieres crear el procedimiento almacenado.
    • DATASET_ID: el conjunto de datos en el que quieres crear el procedimiento almacenado.
    • REGION: la región en la que se encuentra tu proyecto.
    • DOCKER_IMAGE: nombre de la imagen de Docker.
    • CONNECTION_ID: el nombre de la conexión.
    • TABLE_NAME: el nombre de la tabla.
  2. Usa la instrucción CALL para llamar al procedimiento.

    CALL `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()

    Haz los cambios siguientes:

    • PROJECT_ID: el proyecto en el que quieres crear el procedimiento almacenado.
    • DATASET_ID: el conjunto de datos en el que quieres crear el procedimiento almacenado.

Siguientes pasos