差分プライバシーを拡張する
このドキュメントでは、BigQuery の差分プライバシーを拡張する方法の例を示します。
BigQuery では、差分プライバシーをマルチクラウド データソースや外部差分プライバシー ライブラリに拡張できます。このドキュメントでは、BigQuery Omni を使用して AWS S3 などのマルチクラウド データソースに差分プライバシーを適用する方法、リモート関数を使用して外部の差分プライバシー ライブラリを呼び出す方法、Apache Spark や Apache Beam と実行できる Python ライブである PipelineDP を使用して差分プライバシー集計を実行する方法を説明します。
差分プライバシーの詳細については、差分プライバシーを使用するをご覧ください。
BigQuery Omni を使用した差分プライバシー
BigQuery の差分プライバシーは、AWS S3 などのマルチクラウド データソースへの呼び出しをサポートしています。次の例では、外部データソース foo.wikidata
に対してクエリを実行し、差分プライバシーを適用します。差分プライバシー句の構文の詳細については、差分プライバシー句をご覧ください。
SELECT WITH DIFFERENTIAL_PRIVACY OPTIONS ( epsilon = 1, delta = 1e-5, privacy_unit_column = foo.wikidata.es_description) COUNT(*) AS results FROM foo.wikidata;
この例では次のような結果が返されます。
-- These results will change each time you run the query. +----------+ | results | +----------+ | 3465 | +----------+
BigQuery Omni の制限事項の詳細については、制限事項をご覧ください。
リモート関数を使用して外部の差分プライバシー ライブラリを呼び出す
リモート関数を使用して、外部差分プライバシー ライブラリを呼び出すことができます。次のリンクは、リモート関数を使用して Tumult Analytics がホストする外部ライブラリを呼び出し、小売販売データセットの 0 に集中した差分プライバシーを使用します。
Tumult Analytics の操作については、Tumult Analytics のリリース投稿 {: .external} をご覧ください。
PipelineDP を使用した差分プライバシーの集計
PipelineDP は差分プライバシー集計を行う Python ライブラリで、Apache Spark と Apache Beam で実行できます。BigQuery では、Python で記述された Apache Spark ストアド プロシージャを実行できます。Apache Spark ストアド プロシージャの実行の詳細については、Apache Spark のストアド プロシージャの操作をご覧ください。
次の例では、PipelineDP ライブラリを使用して差分プライバシー集計を実行します。これは、Chicago Taxi Trips の一般公開データセットを使用して、乗車回数と各乗車のチップの合計および平均をタクシー車両ごとに計算します。
始める前に
標準の Apache Spark イメージには PipelineDP は含まれていません。PipelineDP ストアド プロシージャを実行する前に、必要なすべての依存関係を含む Docker イメージを作成する必要があります。このセクションでは、Docker イメージを作成して Google Cloud に push する方法について説明します。
始める前に、ローカルマシンに Docker がインストールされていることを確認し、Docker イメージを gcr.io に push するための認証を設定します。Docker イメージを push する方法の詳細については、イメージの push と pull をご覧ください。
Docker イメージを作成して push する
必要な依存関係を含む Docker イメージを作成して push する手順は次のとおりです。
- ローカル フォルダ
DIR
を作成します。 - Python 3.9 バージョンを含む Miniconda インストーラを
DIR
にダウンロードします。 次のテキストを Dockerfile に保存します。
# Debian 11 is recommended. FROM debian:11-slim # Suppress interactive prompts ENV DEBIAN_FRONTEND=noninteractive # (Required) Install utilities required by Spark scripts. RUN apt update && apt install -y procps tini libjemalloc2 # Enable jemalloc2 as default memory allocator ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2 # Install and configure Miniconda3. ENV CONDA_HOME=/opt/miniconda3 ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python ENV PATH=${CONDA_HOME}/bin:${PATH} COPY Miniconda3-py39_23.1.0-1-Linux-x86_64.sh . RUN bash Miniconda3-py39_23.1.0-1-Linux-x86_64.sh -b -p /opt/miniconda3 \ && ${CONDA_HOME}/bin/conda config --system --set always_yes True \ && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \ && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \ && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict # The following packages are installed in the default image, it is # strongly recommended to include all of them. RUN apt install -y python3 RUN apt install -y python3-pip RUN apt install -y libopenblas-dev RUN pip install \ cython \ fastavro \ fastparquet \ gcsfs \ google-cloud-bigquery-storage \ google-cloud-bigquery[pandas] \ google-cloud-bigtable \ google-cloud-container \ google-cloud-datacatalog \ google-cloud-dataproc \ google-cloud-datastore \ google-cloud-language \ google-cloud-logging \ google-cloud-monitoring \ google-cloud-pubsub \ google-cloud-redis \ google-cloud-spanner \ google-cloud-speech \ google-cloud-storage \ google-cloud-texttospeech \ google-cloud-translate \ google-cloud-vision \ koalas \ matplotlib \ nltk \ numba \ numpy \ orc \ pandas \ pyarrow \ pysal \ regex \ requests \ rtree \ scikit-image \ scikit-learn \ scipy \ seaborn \ sqlalchemy \ sympy \ tables \ virtualenv RUN pip install --no-input pipeline-dp==0.2.0 # (Required) Create the 'spark' group/user. # The GID and UID must be 1099. Home directory is required. RUN groupadd -g 1099 spark RUN useradd -u 1099 -g 1099 -d /home/spark -m spark USER spark
次のコマンドを実行します。
IMAGE=gcr.io/PROJECT_ID/DOCKER_IMAGE:0.0.1 # Build and push the image. docker build -t "${IMAGE}" docker push "${IMAGE}"
次のように置き換えます。
PROJECT_ID
: Docker イメージを作成するプロジェクト。DOCKER_IMAGE
: Docker イメージの名前。
イメージがアップロードされます。
PipelineDP ストアド プロシージャを実行する
ストアド プロシージャを作成するには、CREATE PROCEDURE ステートメントを使用します。
CREATE OR REPLACE PROCEDURE `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`() WITH CONNECTION `PROJECT_ID.REGION.CONNECTION_ID` OPTIONS ( engine = "SPARK", container_image= "gcr.io/PROJECT_ID/DOCKER_IMAGE") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession import pipeline_dp def compute_dp_metrics(data, spark_context): budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=10, total_delta=1e-6) backend = pipeline_dp.SparkRDDBackend(spark_context) # Create a DPEngine instance. dp_engine = pipeline_dp.DPEngine(budget_accountant, backend) params = pipeline_dp.AggregateParams( noise_kind=pipeline_dp.NoiseKind.LAPLACE, metrics=[ pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM, pipeline_dp.Metrics.MEAN], max_partitions_contributed=1, max_contributions_per_partition=1, min_value=0, # Tips that are larger than 100 will be clipped to 100. max_value=100) # Specify how to extract privacy_id, partition_key and value from an # element of the taxi dataset. data_extractors = pipeline_dp.DataExtractors( partition_extractor=lambda x: x.taxi_id, privacy_id_extractor=lambda x: x.unique_key, value_extractor=lambda x: 0 if x.tips is None else x.tips) # Run aggregation. dp_result = dp_engine.aggregate(data, params, data_extractors) budget_accountant.compute_budgets() dp_result = backend.map_tuple(dp_result, lambda pk, result: (pk, result.count, result.sum, result.mean)) return dp_result spark = SparkSession.builder.appName("spark-pipeline-dp-demo").getOrCreate() spark_context = spark.sparkContext # Load data from BigQuery. taxi_trips = spark.read.format("bigquery") \ .option("table", "bigquery-public-data:chicago_taxi_trips.taxi_trips") \ .load().rdd dp_result = compute_dp_metrics(taxi_trips, spark_context).toDF(["pk", "count","sum", "mean"]) # Saving the data to BigQuery dp_result.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("DATASET_ID.TABLE_NAME") """;
次のように置き換えます。
PROJECT_ID
: ストアド プロシージャを作成するプロジェクト。DATASET_ID
: ストアド プロシージャを作成するデータセット。REGION
: プロジェクトが配置されているリージョン。DOCKER_IMAGE
: Docker イメージの名前。CONNECTION_ID
: 接続の名前。TABLE_NAME
: テーブルの名前。
CALL ステートメントを使用してプロシージャを呼び出します。
CALL `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()
次のように置き換えます。
PROJECT_ID
: ストアド プロシージャを作成するプロジェクト。DATASET_ID
: ストアド プロシージャを作成するデータセット。
次のステップ
- 差分プライバシーを使用する方法を確認する。
- 差分プライバシー句について確認する。
- 差分プライベート集計関数の使用方法を学習する。