差分プライバシーを拡張する

このドキュメントでは、BigQuery の差分プライバシーを拡張する方法の例を示します。

BigQuery では、差分プライバシーをマルチクラウド データソースや外部差分プライバシー ライブラリに拡張できます。このドキュメントでは、BigQuery Omni を使用して AWS S3 などのマルチクラウド データソースに差分プライバシーを適用する方法、リモート関数を使用して外部の差分プライバシー ライブラリを呼び出す方法、Apache Spark や Apache Beam と実行できる Python ライブである PipelineDP を使用して差分プライバシー集計を実行する方法を説明します。

差分プライバシーの詳細については、差分プライバシーを使用するをご覧ください。

BigQuery Omni を使用した差分プライバシー

BigQuery の差分プライバシーは、AWS S3 などのマルチクラウド データソースへの呼び出しをサポートしています。次の例では、外部データソース foo.wikidata に対してクエリを実行し、差分プライバシーを適用します。差分プライバシー句の構文の詳細については、差分プライバシー句をご覧ください。

SELECT
  WITH
    DIFFERENTIAL_PRIVACY
      OPTIONS (
        epsilon = 1,
        delta = 1e-5,
        privacy_unit_column = foo.wikidata.es_description)
      COUNT(*) AS results
FROM foo.wikidata;

この例では次のような結果が返されます。

-- These results will change each time you run the query.
+----------+
| results  |
+----------+
| 3465     |
+----------+

BigQuery Omni の制限事項の詳細については、制限事項をご覧ください。

リモート関数を使用して外部の差分プライバシー ライブラリを呼び出す

リモート関数を使用して、外部差分プライバシー ライブラリを呼び出すことができます。次のリンクは、リモート関数を使用して Tumult Analytics がホストする外部ライブラリを呼び出し、小売販売データセットの 0 に集中した差分プライバシーを使用します。

Tumult Analytics の操作については、Tumult Analytics のリリース投稿 {: .external} をご覧ください。

PipelineDP を使用した差分プライバシーの集計

PipelineDP は差分プライバシー集計を行う Python ライブラリで、Apache Spark と Apache Beam で実行できます。BigQuery では、Python で記述された Apache Spark ストアド プロシージャを実行できます。Apache Spark ストアド プロシージャの実行の詳細については、Apache Spark のストアド プロシージャの操作をご覧ください。

次の例では、PipelineDP ライブラリを使用して差分プライバシー集計を実行します。これは、Chicago Taxi Trips の一般公開データセットを使用して、乗車回数と各乗車のチップの合計および平均をタクシー車両ごとに計算します。

始める前に

標準の Apache Spark イメージには PipelineDP は含まれていません。PipelineDP ストアド プロシージャを実行する前に、必要なすべての依存関係を含む Docker イメージを作成する必要があります。このセクションでは、Docker イメージを作成して Google Cloud に push する方法について説明します。

始める前に、ローカルマシンに Docker がインストールされていることを確認し、Docker イメージを gcr.io に push するための認証を設定します。Docker イメージを push する方法の詳細については、イメージの push と pull をご覧ください。

Docker イメージを作成して push する

必要な依存関係を含む Docker イメージを作成して push する手順は次のとおりです。

  1. ローカル フォルダ DIR を作成します。
  2. Python 3.9 バージョンを含む Miniconda インストーラDIR にダウンロードします。
  3. 次のテキストを Dockerfile に保存します。

    
      # Debian 11 is recommended.
      FROM debian:11-slim
    
      # Suppress interactive prompts
      ENV DEBIAN_FRONTEND=noninteractive
    
      # (Required) Install utilities required by Spark scripts.
      RUN apt update && apt install -y procps tini libjemalloc2
    
      # Enable jemalloc2 as default memory allocator
      ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2
    
      # Install and configure Miniconda3.
      ENV CONDA_HOME=/opt/miniconda3
      ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
      ENV PATH=${CONDA_HOME}/bin:${PATH}
      COPY Miniconda3-py39_23.1.0-1-Linux-x86_64.sh .
      RUN bash Miniconda3-py39_23.1.0-1-Linux-x86_64.sh -b -p /opt/miniconda3 \
      && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
      && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
      && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
      && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
    
      # The following packages are installed in the default image, it is
      # strongly recommended to include all of them.
      RUN apt install -y python3
      RUN apt install -y python3-pip
      RUN apt install -y libopenblas-dev
      RUN pip install \
        cython \
        fastavro \
        fastparquet \
        gcsfs \
        google-cloud-bigquery-storage \
        google-cloud-bigquery[pandas] \
        google-cloud-bigtable \
        google-cloud-container \
        google-cloud-datacatalog \
        google-cloud-dataproc \
        google-cloud-datastore \
        google-cloud-language \
        google-cloud-logging \
        google-cloud-monitoring \
        google-cloud-pubsub \
        google-cloud-redis \
        google-cloud-spanner \
        google-cloud-speech \
        google-cloud-storage \
        google-cloud-texttospeech \
        google-cloud-translate \
        google-cloud-vision \
        koalas \
        matplotlib \
        nltk \
        numba \
        numpy \
        orc \
        pandas \
        pyarrow \
        pysal \
        regex \
        requests \
        rtree \
        scikit-image \
        scikit-learn \
        scipy \
        seaborn \
        sqlalchemy \
        sympy \
        tables \
        virtualenv
      RUN pip install --no-input pipeline-dp==0.2.0
    
      # (Required) Create the 'spark' group/user.
      # The GID and UID must be 1099. Home directory is required.
      RUN groupadd -g 1099 spark
      RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
      USER spark
    
  4. 次のコマンドを実行します。

    
    IMAGE=gcr.io/PROJECT_ID/DOCKER_IMAGE:0.0.1
    # Build and push the image.
    docker build -t "${IMAGE}"
    docker push "${IMAGE}"
    

    次のように置き換えます。

    • PROJECT_ID: Docker イメージを作成するプロジェクト。
    • DOCKER_IMAGE: Docker イメージの名前。

    イメージがアップロードされます。

PipelineDP ストアド プロシージャを実行する

  1. ストアド プロシージャを作成するには、CREATE PROCEDURE ステートメントを使用します。

    CREATE OR REPLACE
    PROCEDURE
      `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()
      WITH CONNECTION `PROJECT_ID.REGION.CONNECTION_ID`
    OPTIONS (
      engine = "SPARK",
      container_image= "gcr.io/PROJECT_ID/DOCKER_IMAGE")
    LANGUAGE PYTHON AS R"""
    from pyspark.sql import SparkSession
    import pipeline_dp
    
    def compute_dp_metrics(data, spark_context):
    budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=10,
                                                          total_delta=1e-6)
    backend = pipeline_dp.SparkRDDBackend(spark_context)
    
    # Create a DPEngine instance.
    dp_engine = pipeline_dp.DPEngine(budget_accountant, backend)
    
    params = pipeline_dp.AggregateParams(
        noise_kind=pipeline_dp.NoiseKind.LAPLACE,
        metrics=[
            pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM,
            pipeline_dp.Metrics.MEAN],
        max_partitions_contributed=1,
        max_contributions_per_partition=1,
        min_value=0,
        # Tips that are larger than 100 will be clipped to 100.
        max_value=100)
    # Specify how to extract privacy_id, partition_key and value from an
    # element of the taxi dataset.
    data_extractors = pipeline_dp.DataExtractors(
        partition_extractor=lambda x: x.taxi_id,
        privacy_id_extractor=lambda x: x.unique_key,
        value_extractor=lambda x: 0 if x.tips is None else x.tips)
    
    # Run aggregation.
    dp_result = dp_engine.aggregate(data, params, data_extractors)
    budget_accountant.compute_budgets()
    dp_result = backend.map_tuple(dp_result, lambda pk, result: (pk, result.count, result.sum, result.mean))
    return dp_result
    
    spark = SparkSession.builder.appName("spark-pipeline-dp-demo").getOrCreate()
    spark_context = spark.sparkContext
    
    # Load data from BigQuery.
    taxi_trips = spark.read.format("bigquery") \
    .option("table", "bigquery-public-data:chicago_taxi_trips.taxi_trips") \
    .load().rdd
    dp_result = compute_dp_metrics(taxi_trips, spark_context).toDF(["pk", "count","sum", "mean"])
    # Saving the data to BigQuery
    dp_result.write.format("bigquery") \
    .option("writeMethod", "direct") \
    .save("DATASET_ID.TABLE_NAME")
    """;

    次のように置き換えます。

    • PROJECT_ID: ストアド プロシージャを作成するプロジェクト。
    • DATASET_ID: ストアド プロシージャを作成するデータセット。
    • REGION: プロジェクトが配置されているリージョン。
    • DOCKER_IMAGE: Docker イメージの名前。
    • CONNECTION_ID: 接続の名前。
    • TABLE_NAME: テーブルの名前。
  2. CALL ステートメントを使用してプロシージャを呼び出します。

    CALL `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()

    次のように置き換えます。

    • PROJECT_ID: ストアド プロシージャを作成するプロジェクト。
    • DATASET_ID: ストアド プロシージャを作成するデータセット。

次のステップ