Estendi la privacy differenziale

Questo documento fornisce esempi su come estendere la privacy differenziale per Privacy differenziale di BigQuery.

Panoramica

BigQuery ti consente di estendere privacy differenziale alle origini dati multi-cloud e alle librerie di privacy differenziale esterne. Questo documento fornisce esempi di applicazione della privacy differenziale per origini dati multi-cloud, come AWS S3 con BigQuery Omni, come chiamare una libreria della privacy differenziale esterna utilizzando una funzione remota e come di eseguire aggregazioni della privacy differenziale PipelineDP, una libreria Python che può eseguire con Apache Spark e Apache Beam.

Per ulteriori informazioni sulla privacy differenziale, consulta Utilizzare la privacy differenziale.

Privacy differenziale con BigQuery Omni

La privacy differenziale di BigQuery supporta le chiamate ai dati multi-cloud come AWS S3. L'esempio seguente esegue query su una sorgente dati esterna foo.wikidata e applica la privacy differenziale. Per ulteriori informazioni sui della clausola di privacy differenziale, consulta Privacy differenziale della clausola.

SELECT
  WITH
    DIFFERENTIAL_PRIVACY
      OPTIONS (
        epsilon = 1,
        delta = 1e-5,
        privacy_unit_column = foo.wikidata.es_description)
      COUNT(*) AS results
FROM foo.wikidata;

Questo esempio restituisce risultati simili ai seguenti:

-- These results will change each time you run the query.
+----------+
| results  |
+----------+
| 3465     |
+----------+

Per saperne di più su BigQuery Omni limitazioni, consulta la sezione Limitazioni.

Chiamare librerie della privacy differenziale esterne con funzioni remote

Puoi chiamare le librerie della privacy differenziale esterne usando un telecomando . La il seguente link utilizza una funzione remota per chiamare una libreria esterna ospitata da Tumult Analytics per usare la concentrazione zero della privacy differenziale su un set di dati sulle vendite al dettaglio.

Per informazioni sull'utilizzo di Tumult Analytics, consulta Post sul lancio di Tumult Analytics {: .external}.

Aggregazioni di privacy differenziale con PipelineDP

PipelineDP è una libreria Python che esegue la privacy differenziale aggregazioni possono essere eseguiti con Apache Spark e Apache Beam. BigQuery può eseguire stored procedure Apache Spark scritte in Python. Per ulteriori informazioni che eseguono le stored procedure Apache Spark, vedi Utilizzare le stored procedure per Apache Spark.

L'esempio seguente esegue un'aggregazione della privacy differenziale utilizzando il metodo tramite la libreria PipelineDP. Utilizza la classe Set di dati pubblico delle corse dei taxi a Chicago e calcola per ogni taxi car - il numero di corse, nonché la somma e la media delle mance per queste corse.

Prima di iniziare

Un'immagine Apache Spark standard non include PipelineDP. Devi creare un'istanza Immagine Docker che contiene tutte le informazioni necessarie delle dipendenze prima di eseguire una stored procedure PipelineDP. Questa sezione descrive come creare ed eseguire il push di un'immagine Docker in Google Cloud.

Prima di iniziare, assicurati di aver installato Docker sulla macchina locale e di averlo impostato dell'autenticazione per il push delle immagini Docker a gcr.io. Per ulteriori informazioni sul push delle immagini Docker, consulta Immagini push e pull.

crea un'immagine Docker ed eseguine il push

Per creare un'immagine Docker ed eseguirne il push con le dipendenze richieste, segui questi passaggi:

  1. Crea una cartella locale DIR.
  2. Scarica Programma di installazione Miniconda, con la versione Python 3.9, fino a DIR.
  3. Salva il testo seguente nel Dockerfile.

    
      # Debian 11 is recommended.
      FROM debian:11-slim
    
      # Suppress interactive prompts
      ENV DEBIAN_FRONTEND=noninteractive
    
      # (Required) Install utilities required by Spark scripts.
      RUN apt update && apt install -y procps tini libjemalloc2
    
      # Enable jemalloc2 as default memory allocator
      ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2
    
      # Install and configure Miniconda3.
      ENV CONDA_HOME=/opt/miniconda3
      ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
      ENV PATH=${CONDA_HOME}/bin:${PATH}
      COPY Miniconda3-py39_23.1.0-1-Linux-x86_64.sh .
      RUN bash Miniconda3-py39_23.1.0-1-Linux-x86_64.sh -b -p /opt/miniconda3 \
      && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
      && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
      && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
      && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
    
      # The following packages are installed in the default image, it is
      # strongly recommended to include all of them.
      RUN apt install -y python3
      RUN apt install -y python3-pip
      RUN apt install -y libopenblas-dev
      RUN pip install \
        cython \
        fastavro \
        fastparquet \
        gcsfs \
        google-cloud-bigquery-storage \
        google-cloud-bigquery[pandas] \
        google-cloud-bigtable \
        google-cloud-container \
        google-cloud-datacatalog \
        google-cloud-dataproc \
        google-cloud-datastore \
        google-cloud-language \
        google-cloud-logging \
        google-cloud-monitoring \
        google-cloud-pubsub \
        google-cloud-redis \
        google-cloud-spanner \
        google-cloud-speech \
        google-cloud-storage \
        google-cloud-texttospeech \
        google-cloud-translate \
        google-cloud-vision \
        koalas \
        matplotlib \
        nltk \
        numba \
        numpy \
        orc \
        pandas \
        pyarrow \
        pysal \
        regex \
        requests \
        rtree \
        scikit-image \
        scikit-learn \
        scipy \
        seaborn \
        sqlalchemy \
        sympy \
        tables \
        virtualenv
      RUN pip install --no-input pipeline-dp==0.2.0
    
      # (Required) Create the 'spark' group/user.
      # The GID and UID must be 1099. Home directory is required.
      RUN groupadd -g 1099 spark
      RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
      USER spark
    
    
  4. Esegui questo comando.

    
    IMAGE=gcr.io/PROJECT_ID/DOCKER_IMAGE:0.0.1
    # Build and push the image.
    docker build -t "${IMAGE}"
    docker push "${IMAGE}"
    
    

    Sostituisci quanto segue:

    • PROJECT_ID: il progetto in cui vuoi creare l'immagine Docker.
    • DOCKER_IMAGE: il nome dell'immagine Docker.

    L'immagine è stata caricata.

Esegui una stored procedure PipelineDP

  1. Per creare una stored procedure, utilizza il comando PROCEDURA l'Informativa.

    CREATE OR REPLACE
    PROCEDURE
      `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()
      WITH CONNECTION `PROJECT_ID.REGION.CONNECTION_ID`
    OPTIONS (
      engine = "SPARK",
      container_image= "gcr.io/PROJECT_ID/DOCKER_IMAGE")
    LANGUAGE PYTHON AS R"""
    from pyspark.sql import SparkSession
    import pipeline_dp
    
    def compute_dp_metrics(data, spark_context):
    budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=10,
                                                          total_delta=1e-6)
    backend = pipeline_dp.SparkRDDBackend(spark_context)
    
    # Create a DPEngine instance.
    dp_engine = pipeline_dp.DPEngine(budget_accountant, backend)
    
    params = pipeline_dp.AggregateParams(
        noise_kind=pipeline_dp.NoiseKind.LAPLACE,
        metrics=[
            pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM,
            pipeline_dp.Metrics.MEAN],
        max_partitions_contributed=1,
        max_contributions_per_partition=1,
        min_value=0,
        # Tips that are larger than 100 will be clipped to 100.
        max_value=100)
    # Specify how to extract privacy_id, partition_key and value from an
    # element of the taxi dataset.
    data_extractors = pipeline_dp.DataExtractors(
        partition_extractor=lambda x: x.taxi_id,
        privacy_id_extractor=lambda x: x.unique_key,
        value_extractor=lambda x: 0 if x.tips is None else x.tips)
    
    # Run aggregation.
    dp_result = dp_engine.aggregate(data, params, data_extractors)
    budget_accountant.compute_budgets()
    dp_result = backend.map_tuple(dp_result, lambda pk, result: (pk, result.count, result.sum, result.mean))
    return dp_result
    
    spark = SparkSession.builder.appName("spark-pipeline-dp-demo").getOrCreate()
    spark_context = spark.sparkContext
    
    # Load data from BigQuery.
    taxi_trips = spark.read.format("bigquery") \
    .option("table", "bigquery-public-data:chicago_taxi_trips.taxi_trips") \
    .load().rdd
    dp_result = compute_dp_metrics(taxi_trips, spark_context).toDF(["pk", "count","sum", "mean"])
    # Saving the data to BigQuery
    dp_result.write.format("bigquery") \
    .option("writeMethod", "direct") \
    .save("DATASET_ID.TABLE_NAME")
    """;
    

    Sostituisci quanto segue:

    • PROJECT_ID: il progetto in cui vuoi per creare la stored procedure.
    • DATASET_ID: il set di dati in cui vuoi per creare la stored procedure.
    • REGION: la regione in cui si trova il progetto.
    • DOCKER_IMAGE: il nome dell'immagine Docker.
    • CONNECTION_ID: il nome della connessione.
    • TABLE_NAME: il nome della tabella.
  2. Usa la chiamata CALL per chiamare la procedura.

    CALL `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()
    

    Sostituisci quanto segue:

    • PROJECT_ID: il progetto in cui vuoi per creare la stored procedure.
    • DATASET_ID: il set di dati in cui vuoi per creare la stored procedure.

Passaggi successivi