Aumentar a privacidade diferencial

Neste documento, fornecemos exemplos de como estender a privacidade diferencial para a privacidade diferencial do BigQuery.

O BigQuery permite estender a privacidade diferencial para fontes de dados de várias nuvens e bibliotecas externas de privacidade diferencial. Neste documento, fornecemos exemplos de como aplicar a privacidade diferencial para fontes de dados de várias nuvens como o AWS S3 com o BigQuery Omni, como chamar uma biblioteca externa de privacidade diferencial usando uma função remota e como realizar as agregações de privacidade diferencial com o PipelineDP, uma biblioteca Python que pode ser executada com o Apache Spark e o Apache Beam.

Para mais informações sobre privacidade diferencial, consulte Usar privacidade diferencial.

Privacidade diferencial com o BigQuery Omni

A privacidade diferencial do BigQuery é compatível com chamadas para fontes de dados de várias nuvens, como o AWS S3. O exemplo a seguir consulta uma fonte externa de dados, foo.wikidata, e aplica a privacidade diferencial. Para mais informações sobre a sintaxe da cláusula de privacidade diferencial, consulte a cláusula de privacidade diferencial.

SELECT
  WITH
    DIFFERENTIAL_PRIVACY
      OPTIONS (
        epsilon = 1,
        delta = 1e-5,
        privacy_unit_column = foo.wikidata.es_description)
      COUNT(*) AS results
FROM foo.wikidata;

Este exemplo retorna resultados semelhantes aos seguintes:

-- These results will change each time you run the query.
+----------+
| results  |
+----------+
| 3465     |
+----------+

Para mais informações sobre as limitações do BigQuery Omni, consulte Limitações.

Chamar bibliotecas externas de privacidade diferencial com funções remotas

É possível chamar bibliotecas de privacidade diferencial externa usando funções remotas. O link a seguir usa uma função remota para chamar uma biblioteca externa hospedada pelo Tumult Analytics para usar a privacidade diferencial concentrada em zero em um conjunto de dados de vendas no varejo.

Para informações sobre como trabalhar com o Tumult Analytics, consulte a postagem de lançamento do Tumult Analytics {: .external}.

Agregações de privacidade diferencial com o PipelineDP

O PipelineDP é uma biblioteca Python que executa agregações de privacidade diferencial e pode ser executada com o Apache Spark e o Apache Beam. O BigQuery pode executar procedimentos armazenados no Apache Spark escritos em Python. Para mais informações sobre como executar procedimentos armazenados no Apache Spark, consulte esta página.

O exemplo a seguir executa uma agregação de privacidade diferencial usando a biblioteca PipelineDP. Ele usa o conjunto de dados públicos sobre viagens de táxi em Chicago e calcula para cada carro de táxi: o número de viagens, a soma e a média das gorjetas para essas viagens.

Antes de começar

Uma imagem Apache Spark padrão não inclui o PipelineDP. Crie uma imagem do Docker que contenha todas as dependências necessárias antes de executar um procedimento armazenado do PipelineDP. Nesta seção, descrevemos como criar e enviar uma imagem do Docker para o Google Cloud.

Antes de começar, instale o Docker na sua máquina local e configure a autenticação para enviar imagens do Docker para gcr.io. Para mais informações sobre como enviar imagens do Docker, consulte Enviar e extrair imagens.

Criar e enviar uma imagem do Docker

Para criar e enviar uma imagem do Docker com as dependências necessárias, siga estas etapas:

  1. Crie uma pasta local DIR.
  2. Faça o download do instalador Miniconda, com a versão Python 3.9, em DIR.
  3. Salve o texto a seguir no Dockerfile.

    
      # Debian 11 is recommended.
      FROM debian:11-slim
    
      # Suppress interactive prompts
      ENV DEBIAN_FRONTEND=noninteractive
    
      # (Required) Install utilities required by Spark scripts.
      RUN apt update && apt install -y procps tini libjemalloc2
    
      # Enable jemalloc2 as default memory allocator
      ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2
    
      # Install and configure Miniconda3.
      ENV CONDA_HOME=/opt/miniconda3
      ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
      ENV PATH=${CONDA_HOME}/bin:${PATH}
      COPY Miniconda3-py39_23.1.0-1-Linux-x86_64.sh .
      RUN bash Miniconda3-py39_23.1.0-1-Linux-x86_64.sh -b -p /opt/miniconda3 \
      && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
      && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
      && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
      && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
    
      # The following packages are installed in the default image, it is
      # strongly recommended to include all of them.
      RUN apt install -y python3
      RUN apt install -y python3-pip
      RUN apt install -y libopenblas-dev
      RUN pip install \
        cython \
        fastavro \
        fastparquet \
        gcsfs \
        google-cloud-bigquery-storage \
        google-cloud-bigquery[pandas] \
        google-cloud-bigtable \
        google-cloud-container \
        google-cloud-datacatalog \
        google-cloud-dataproc \
        google-cloud-datastore \
        google-cloud-language \
        google-cloud-logging \
        google-cloud-monitoring \
        google-cloud-pubsub \
        google-cloud-redis \
        google-cloud-spanner \
        google-cloud-speech \
        google-cloud-storage \
        google-cloud-texttospeech \
        google-cloud-translate \
        google-cloud-vision \
        koalas \
        matplotlib \
        nltk \
        numba \
        numpy \
        orc \
        pandas \
        pyarrow \
        pysal \
        regex \
        requests \
        rtree \
        scikit-image \
        scikit-learn \
        scipy \
        seaborn \
        sqlalchemy \
        sympy \
        tables \
        virtualenv
      RUN pip install --no-input pipeline-dp==0.2.0
    
      # (Required) Create the 'spark' group/user.
      # The GID and UID must be 1099. Home directory is required.
      RUN groupadd -g 1099 spark
      RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
      USER spark
    
  4. Execute o comando a seguir.

    
    IMAGE=gcr.io/PROJECT_ID/DOCKER_IMAGE:0.0.1
    # Build and push the image.
    docker build -t "${IMAGE}"
    docker push "${IMAGE}"
    

    Substitua:

    • PROJECT_ID: o projeto em que você quer criar a imagem do Docker.
    • DOCKER_IMAGE: o nome da imagem do Docker.

    A imagem foi enviada.

Executar um procedimento armazenado do PipelineDP

  1. Para criar um procedimento armazenado, use a instrução CREATE PROCEDURE.

    CREATE OR REPLACE
    PROCEDURE
      `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()
      WITH CONNECTION `PROJECT_ID.REGION.CONNECTION_ID`
    OPTIONS (
      engine = "SPARK",
      container_image= "gcr.io/PROJECT_ID/DOCKER_IMAGE")
    LANGUAGE PYTHON AS R"""
    from pyspark.sql import SparkSession
    import pipeline_dp
    
    def compute_dp_metrics(data, spark_context):
    budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=10,
                                                          total_delta=1e-6)
    backend = pipeline_dp.SparkRDDBackend(spark_context)
    
    # Create a DPEngine instance.
    dp_engine = pipeline_dp.DPEngine(budget_accountant, backend)
    
    params = pipeline_dp.AggregateParams(
        noise_kind=pipeline_dp.NoiseKind.LAPLACE,
        metrics=[
            pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM,
            pipeline_dp.Metrics.MEAN],
        max_partitions_contributed=1,
        max_contributions_per_partition=1,
        min_value=0,
        # Tips that are larger than 100 will be clipped to 100.
        max_value=100)
    # Specify how to extract privacy_id, partition_key and value from an
    # element of the taxi dataset.
    data_extractors = pipeline_dp.DataExtractors(
        partition_extractor=lambda x: x.taxi_id,
        privacy_id_extractor=lambda x: x.unique_key,
        value_extractor=lambda x: 0 if x.tips is None else x.tips)
    
    # Run aggregation.
    dp_result = dp_engine.aggregate(data, params, data_extractors)
    budget_accountant.compute_budgets()
    dp_result = backend.map_tuple(dp_result, lambda pk, result: (pk, result.count, result.sum, result.mean))
    return dp_result
    
    spark = SparkSession.builder.appName("spark-pipeline-dp-demo").getOrCreate()
    spark_context = spark.sparkContext
    
    # Load data from BigQuery.
    taxi_trips = spark.read.format("bigquery") \
    .option("table", "bigquery-public-data:chicago_taxi_trips.taxi_trips") \
    .load().rdd
    dp_result = compute_dp_metrics(taxi_trips, spark_context).toDF(["pk", "count","sum", "mean"])
    # Saving the data to BigQuery
    dp_result.write.format("bigquery") \
    .option("writeMethod", "direct") \
    .save("DATASET_ID.TABLE_NAME")
    """;

    Substitua:

    • PROJECT_ID: o projeto em que você quer criar o procedimento armazenado.
    • DATASET_ID: o conjunto de dados em que você quer criar o procedimento armazenado.
    • REGION: a região em que o projeto está localizado.
    • DOCKER_IMAGE: o nome da imagem do Docker.
    • CONNECTION_ID: o nome da conexão.
    • TABLE_NAME: o nome da tabela.
  2. Use a instrução CALL para chamar o procedimento.

    CALL `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()

    Substitua:

    • PROJECT_ID: o projeto em que você quer criar o procedimento armazenado.
    • DATASET_ID: o conjunto de dados em que você quer criar o procedimento armazenado.

A seguir