Espandere la privacy differenziale

Questo documento fornisce esempi di come estendere la privacy differenziale per la privacy differenziale di BigQuery.

BigQuery ti consente di estendere la privacy differenziale alle origini dati multi-cloud e alle librerie di privacy differenziale esterne. Questo documento fornisce esempi di come applicare la privacy differenziale per le origini dati multi-cloud come AWS S3 con BigQuery Omni, come chiamare una libreria di privacy differenziale esterna utilizzando una funzione remota e come eseguire aggregazioni con privacy differenziale con PipelineDP, una libreria Python che può essere eseguita con Apache Spark e Apache Beam.

Per ulteriori informazioni sulla privacy differenziale, consulta Utilizzare la privacy differenziale.

Privacy differenziale con BigQuery Omni

La privacy differenziale di BigQuery supporta le chiamate a origini dati multi-cloud come AWS S3. L'esempio seguente esegue una query su un'origine dati esterna, foo.wikidata, e applica la privacy differenziale. Per ulteriori informazioni sulla sintassi della clausola sulla privacy differenziale, consulta la clausola sulla privacy differenziale.

SELECT
  WITH
    DIFFERENTIAL_PRIVACY
      OPTIONS (
        epsilon = 1,
        delta = 1e-5,
        privacy_unit_column = foo.wikidata.es_description)
      COUNT(*) AS results
FROM foo.wikidata;

Questo esempio restituisce risultati simili ai seguenti:

-- These results will change each time you run the query.
+----------+
| results  |
+----------+
| 3465     |
+----------+

Per ulteriori informazioni sulle limitazioni di BigQuery Omni, consulta Limitazioni.

Chiamare librerie di privacy differenziale esterne con funzioni remote

Puoi chiamare librerie di privacy differenziale esterne utilizzando le funzioni remote. Il seguente link utilizza una funzione remota per chiamare una libreria esterna ospitata da Tumult Analytics per utilizzare la privacy differenziale con concentrazione zero su un set di dati sulle vendite al dettaglio.

Per informazioni sull'utilizzo di Tumult Analytics, consulta il post di lancio di Tumult Analytics {: .external}.

Aggregazioni con privacy differenziale con PipelineDP

PipelineDP è una libreria Python che esegue aggregazioni con privacy differenziale e può essere eseguita con Apache Spark e Apache Beam. BigQuery può eseguire stored procedure Apache Spark scritte in Python. Per ulteriori informazioni sull'esecuzione delle stored procedure di Apache Spark, consulta Utilizzare le stored procedure per Apache Spark.

L'esempio seguente esegue un'aggregazione con privacy differenziale utilizzando la libreria PipelineDP. Utilizza il set di dati pubblico Chicago Taxi Trips e calcola per ogni auto di taxi il numero di corse, la somma e la media delle mance per queste corse.

Prima di iniziare

Un'immagine Apache Spark standard non include PipelineDP. Prima di eseguire una procedura archiviata PipelineDP, devi creare un'immagine Docker contenente tutte le dipendenze necessarie. Questa sezione descrive come creare ed eseguire il push di un'immagine Docker in Google Cloud.

Prima di iniziare, assicurati di aver installato Docker sulla tua macchina locale e di aver configurato l'autenticazione per il push delle immagini Docker su gcr.io. Per ulteriori informazioni sul push delle immagini Docker, consulta Eseguire il push e il pull delle immagini.

Crea ed esegui il push di un'immagine Docker

Per creare ed eseguire il push di un'immagine Docker con le dipendenze richieste:

  1. Crea una cartella locale DIR.
  2. Scarica il programma di installazione di Miniconda, con la versione Python 3.9, in DIR.
  3. Salva il seguente testo nel Dockerfile.

    
      # Debian 11 is recommended.
      FROM debian:11-slim
    
      # Suppress interactive prompts
      ENV DEBIAN_FRONTEND=noninteractive
    
      # (Required) Install utilities required by Spark scripts.
      RUN apt update && apt install -y procps tini libjemalloc2
    
      # Enable jemalloc2 as default memory allocator
      ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2
    
      # Install and configure Miniconda3.
      ENV CONDA_HOME=/opt/miniconda3
      ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
      ENV PATH=${CONDA_HOME}/bin:${PATH}
      COPY Miniconda3-py39_23.1.0-1-Linux-x86_64.sh .
      RUN bash Miniconda3-py39_23.1.0-1-Linux-x86_64.sh -b -p /opt/miniconda3 \
      && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
      && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
      && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
      && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
    
      # The following packages are installed in the default image, it is
      # strongly recommended to include all of them.
      RUN apt install -y python3
      RUN apt install -y python3-pip
      RUN apt install -y libopenblas-dev
      RUN pip install \
        cython \
        fastavro \
        fastparquet \
        gcsfs \
        google-cloud-bigquery-storage \
        google-cloud-bigquery[pandas] \
        google-cloud-bigtable \
        google-cloud-container \
        google-cloud-datacatalog \
        google-cloud-dataproc \
        google-cloud-datastore \
        google-cloud-language \
        google-cloud-logging \
        google-cloud-monitoring \
        google-cloud-pubsub \
        google-cloud-redis \
        google-cloud-spanner \
        google-cloud-speech \
        google-cloud-storage \
        google-cloud-texttospeech \
        google-cloud-translate \
        google-cloud-vision \
        koalas \
        matplotlib \
        nltk \
        numba \
        numpy \
        orc \
        pandas \
        pyarrow \
        pysal \
        regex \
        requests \
        rtree \
        scikit-image \
        scikit-learn \
        scipy \
        seaborn \
        sqlalchemy \
        sympy \
        tables \
        virtualenv
      RUN pip install --no-input pipeline-dp==0.2.0
    
      # (Required) Create the 'spark' group/user.
      # The GID and UID must be 1099. Home directory is required.
      RUN groupadd -g 1099 spark
      RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
      USER spark
    
  4. Esegui questo comando.

    
    IMAGE=gcr.io/PROJECT_ID/DOCKER_IMAGE:0.0.1
    # Build and push the image.
    docker build -t "${IMAGE}"
    docker push "${IMAGE}"
    

    Sostituisci quanto segue:

    • PROJECT_ID: il progetto in cui vuoi creare l'immagine Docker.
    • DOCKER_IMAGE: il nome dell'immagine Docker.

    L'immagine è stata caricata.

Esegui una stored procedure PipelineDP

  1. Per creare una stored procedure, utilizza l'istruzione CREATE PROCEDURE.

    CREATE OR REPLACE
    PROCEDURE
      `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()
      WITH CONNECTION `PROJECT_ID.REGION.CONNECTION_ID`
    OPTIONS (
      engine = "SPARK",
      container_image= "gcr.io/PROJECT_ID/DOCKER_IMAGE")
    LANGUAGE PYTHON AS R"""
    from pyspark.sql import SparkSession
    import pipeline_dp
    
    def compute_dp_metrics(data, spark_context):
    budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=10,
                                                          total_delta=1e-6)
    backend = pipeline_dp.SparkRDDBackend(spark_context)
    
    # Create a DPEngine instance.
    dp_engine = pipeline_dp.DPEngine(budget_accountant, backend)
    
    params = pipeline_dp.AggregateParams(
        noise_kind=pipeline_dp.NoiseKind.LAPLACE,
        metrics=[
            pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM,
            pipeline_dp.Metrics.MEAN],
        max_partitions_contributed=1,
        max_contributions_per_partition=1,
        min_value=0,
        # Tips that are larger than 100 will be clipped to 100.
        max_value=100)
    # Specify how to extract privacy_id, partition_key and value from an
    # element of the taxi dataset.
    data_extractors = pipeline_dp.DataExtractors(
        partition_extractor=lambda x: x.taxi_id,
        privacy_id_extractor=lambda x: x.unique_key,
        value_extractor=lambda x: 0 if x.tips is None else x.tips)
    
    # Run aggregation.
    dp_result = dp_engine.aggregate(data, params, data_extractors)
    budget_accountant.compute_budgets()
    dp_result = backend.map_tuple(dp_result, lambda pk, result: (pk, result.count, result.sum, result.mean))
    return dp_result
    
    spark = SparkSession.builder.appName("spark-pipeline-dp-demo").getOrCreate()
    spark_context = spark.sparkContext
    
    # Load data from BigQuery.
    taxi_trips = spark.read.format("bigquery") \
    .option("table", "bigquery-public-data:chicago_taxi_trips.taxi_trips") \
    .load().rdd
    dp_result = compute_dp_metrics(taxi_trips, spark_context).toDF(["pk", "count","sum", "mean"])
    # Saving the data to BigQuery
    dp_result.write.format("bigquery") \
    .option("writeMethod", "direct") \
    .save("DATASET_ID.TABLE_NAME")
    """;

    Sostituisci quanto segue:

    • PROJECT_ID: il progetto in cui vuoi creare la stored procedure.
    • DATASET_ID: il set di dati in cui vuoi creare la stored procedure.
    • REGION: la regione in cui si trova il progetto.
    • DOCKER_IMAGE: il nome dell'immagine Docker.
    • CONNECTION_ID: il nome della connessione.
    • TABLE_NAME: il nome della tabella.
  2. Utilizza l'istruzione CALL per chiamare la procedura.

    CALL `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()

    Sostituisci quanto segue:

    • PROJECT_ID: il progetto in cui vuoi creare la stored procedure.
    • DATASET_ID: il set di dati in cui vuoi creare la stored procedure.

Passaggi successivi