Extiende la privacidad diferencial

En este documento, se proporcionan ejemplos de cómo extender la privacidad diferencial para la privacidad diferencial de BigQuery.

BigQuery te permite extender la privacidad diferencial a las fuentes de datos en múltiples nubes y a las bibliotecas de privacidad diferencial externas. En este documento, se proporcionan ejemplos de cómo aplicar la privacidad diferencial para fuentes de datos de múltiples nubes, como AWS S3 con BigQuery Omni, cómo llamar a una biblioteca de privacidad diferencial externa con una función remota y cómo realizar una agregación de privacidad diferencial con PipelineDP, una biblioteca de Python que puede ejecutarse con Apache Spark y Apache Beam.

Para obtener más información sobre la privacidad diferencial, consulta Usa la privacidad diferencial.

Privacidad diferencial con BigQuery Omni

La privacidad diferencial de BigQuery admite llamadas a fuentes de datos de múltiples nubes, como AWS S3. En el siguiente ejemplo, se consulta una fuente de datos externa, foo.wikidata, y se aplica la privacidad diferencial. Para obtener más información sobre la sintaxis de la cláusula de privacidad diferencial, consulta Cláusula de privacidad diferencial.

SELECT
  WITH
    DIFFERENTIAL_PRIVACY
      OPTIONS (
        epsilon = 1,
        delta = 1e-5,
        privacy_unit_column = foo.wikidata.es_description)
      COUNT(*) AS results
FROM foo.wikidata;

Este ejemplo muestra resultados similares al siguiente:

-- These results will change each time you run the query.
+----------+
| results  |
+----------+
| 3465     |
+----------+

Para obtener más información sobre las limitaciones de BigQuery Omni, consulta Limitaciones.

Llama a bibliotecas de privacidad diferencial externas con funciones remotas

Puedes llamar a las bibliotecas de privacidad diferencial externas con funciones remotas. En el siguiente vínculo, se usa una función remota para llamar a una biblioteca externa alojada en Tumult Analytics para usar la privacidad diferencial sin concentración en un conjunto de datos de ventas minoristas.

Para obtener información sobre cómo trabajar con Tumult Analytics, consulta la publicación de lanzamiento de Tumult Analytics {: .external}.

Agregaciones de privacidad diferenciales con PipelineDP

PipelineDP es una biblioteca de Python que realiza agregaciones de privacidad diferencial y puede ejecutarse con Apache Spark y Apache Beam. BigQuery puede ejecutar los procedimientos almacenados de Apache Spark escritos en Python. Para obtener más información sobre cómo ejecutar procedimientos almacenados de Apache Spark, consulta Trabaja con procedimientos almacenados para Apache Spark.

En el siguiente ejemplo, se realiza una agregación de privacidad diferencial con la biblioteca de PipelineDP. Usa el conjunto de datos públicos de viajes en taxi de Chicago y calcula cada vehículo de taxi, la cantidad de viajes, y la suma y media de sugerencias para estos viajes.

Antes de comenzar

Una imagen de Apache Spark estándar no incluye PipelineDP. Debes crear una imagen de Docker que contenga todas las dependencias necesarias antes de ejecutar un procedimiento almacenado de PipelineDP. En esta sección, se describe cómo crear y enviar una imagen de Docker a Google Cloud.

Antes de comenzar, asegúrate de haber instalado Docker en tu máquina local y configurado la autenticación para enviar imágenes de Docker a gcr.io. Para obtener más información sobre cómo enviar imágenes de Docker, consulta Envía y extrae imágenes.

Crea y envía una imagen de Docker

Para crear y enviar una imagen de Docker con dependencias requeridas, sigue estos pasos:

  1. Crea una carpeta local DIR.
  2. Descarga el instalador de Miniconda, con la versión 3.9 de Python, en DIR.
  3. Guarda el siguiente texto en Dockerfile.

    
      # Debian 11 is recommended.
      FROM debian:11-slim
    
      # Suppress interactive prompts
      ENV DEBIAN_FRONTEND=noninteractive
    
      # (Required) Install utilities required by Spark scripts.
      RUN apt update && apt install -y procps tini libjemalloc2
    
      # Enable jemalloc2 as default memory allocator
      ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2
    
      # Install and configure Miniconda3.
      ENV CONDA_HOME=/opt/miniconda3
      ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
      ENV PATH=${CONDA_HOME}/bin:${PATH}
      COPY Miniconda3-py39_23.1.0-1-Linux-x86_64.sh .
      RUN bash Miniconda3-py39_23.1.0-1-Linux-x86_64.sh -b -p /opt/miniconda3 \
      && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
      && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
      && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
      && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
    
      # The following packages are installed in the default image, it is
      # strongly recommended to include all of them.
      RUN apt install -y python3
      RUN apt install -y python3-pip
      RUN apt install -y libopenblas-dev
      RUN pip install \
        cython \
        fastavro \
        fastparquet \
        gcsfs \
        google-cloud-bigquery-storage \
        google-cloud-bigquery[pandas] \
        google-cloud-bigtable \
        google-cloud-container \
        google-cloud-datacatalog \
        google-cloud-dataproc \
        google-cloud-datastore \
        google-cloud-language \
        google-cloud-logging \
        google-cloud-monitoring \
        google-cloud-pubsub \
        google-cloud-redis \
        google-cloud-spanner \
        google-cloud-speech \
        google-cloud-storage \
        google-cloud-texttospeech \
        google-cloud-translate \
        google-cloud-vision \
        koalas \
        matplotlib \
        nltk \
        numba \
        numpy \
        orc \
        pandas \
        pyarrow \
        pysal \
        regex \
        requests \
        rtree \
        scikit-image \
        scikit-learn \
        scipy \
        seaborn \
        sqlalchemy \
        sympy \
        tables \
        virtualenv
      RUN pip install --no-input pipeline-dp==0.2.0
    
      # (Required) Create the 'spark' group/user.
      # The GID and UID must be 1099. Home directory is required.
      RUN groupadd -g 1099 spark
      RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
      USER spark
    
  4. Ejecuta el siguiente comando.

    
    IMAGE=gcr.io/PROJECT_ID/DOCKER_IMAGE:0.0.1
    # Build and push the image.
    docker build -t "${IMAGE}"
    docker push "${IMAGE}"
    

    Reemplaza lo siguiente:

    • PROJECT_ID: el proyecto en el que deseas crear la imagen de Docker.
    • DOCKER_IMAGE: Es el nombre de la imagen de Docker.

    Se subirá la imagen.

Ejecuta un procedimiento almacenado de PipelineDP

  1. Para crear un procedimiento almacenado, usa la instrucción CREATE PROCEDURE.

    CREATE OR REPLACE
    PROCEDURE
      `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()
      WITH CONNECTION `PROJECT_ID.REGION.CONNECTION_ID`
    OPTIONS (
      engine = "SPARK",
      container_image= "gcr.io/PROJECT_ID/DOCKER_IMAGE")
    LANGUAGE PYTHON AS R"""
    from pyspark.sql import SparkSession
    import pipeline_dp
    
    def compute_dp_metrics(data, spark_context):
    budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=10,
                                                          total_delta=1e-6)
    backend = pipeline_dp.SparkRDDBackend(spark_context)
    
    # Create a DPEngine instance.
    dp_engine = pipeline_dp.DPEngine(budget_accountant, backend)
    
    params = pipeline_dp.AggregateParams(
        noise_kind=pipeline_dp.NoiseKind.LAPLACE,
        metrics=[
            pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM,
            pipeline_dp.Metrics.MEAN],
        max_partitions_contributed=1,
        max_contributions_per_partition=1,
        min_value=0,
        # Tips that are larger than 100 will be clipped to 100.
        max_value=100)
    # Specify how to extract privacy_id, partition_key and value from an
    # element of the taxi dataset.
    data_extractors = pipeline_dp.DataExtractors(
        partition_extractor=lambda x: x.taxi_id,
        privacy_id_extractor=lambda x: x.unique_key,
        value_extractor=lambda x: 0 if x.tips is None else x.tips)
    
    # Run aggregation.
    dp_result = dp_engine.aggregate(data, params, data_extractors)
    budget_accountant.compute_budgets()
    dp_result = backend.map_tuple(dp_result, lambda pk, result: (pk, result.count, result.sum, result.mean))
    return dp_result
    
    spark = SparkSession.builder.appName("spark-pipeline-dp-demo").getOrCreate()
    spark_context = spark.sparkContext
    
    # Load data from BigQuery.
    taxi_trips = spark.read.format("bigquery") \
    .option("table", "bigquery-public-data:chicago_taxi_trips.taxi_trips") \
    .load().rdd
    dp_result = compute_dp_metrics(taxi_trips, spark_context).toDF(["pk", "count","sum", "mean"])
    # Saving the data to BigQuery
    dp_result.write.format("bigquery") \
    .option("writeMethod", "direct") \
    .save("DATASET_ID.TABLE_NAME")
    """;

    Reemplaza lo siguiente:

    • PROJECT_ID: el proyecto en el que deseas crear el procedimiento almacenado.
    • DATASET_ID: el conjunto de datos en el que deseas crear el procedimiento almacenado.
    • REGION: Es la región en la que se encuentra tu proyecto.
    • DOCKER_IMAGE: Es el nombre de la imagen de Docker.
    • CONNECTION_ID: Es el nombre de la conexión.
    • TABLE_NAME: Es el nombre de la tabla.
  2. Usa la declaración CALL para llamar al procedimiento.

    CALL `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()

    Reemplaza lo siguiente:

    • PROJECT_ID: el proyecto en el que deseas crear el procedimiento almacenado.
    • DATASET_ID: el conjunto de datos en el que deseas crear el procedimiento almacenado.

¿Qué sigue?