Bigtable Spark コネクタを使用する
Bigtable Spark コネクタを使用すると、Bigtable との間でデータを読み書きできます。Spark SQL と DataFrames を使用して Spark アプリケーションからデータを読み取ることができます。Bigtable Spark コネクタを使用すると、次の Bigtable オペレーションがサポートされます。
- データを書き込む
- データを読み取る
- 新しいテーブルを作成する
このドキュメントでは、Spark SQL DataFrames テーブルを Bigtable テーブルに変換し、Spark ジョブを送信するための JAR ファイルをコンパイルして作成する方法を説明します。
Spark と Scala のサポート状況
Bigtable Spark コネクタは、Scala 2.12 バージョンと次の Spark バージョンのみをサポートしています。
Bigtable Spark コネクタは、次の Dataproc バージョンをサポートしています。
- 1.5 イメージ バージョン クラスタ
- 2.0 イメージ バージョン クラスタ
- 2.1 イメージ バージョン クラスタ
- 2.2 イメージ バージョン クラスタ
- Dataproc Serverless ランタイム バージョン 1.0
費用の計算
課金対象である次の Google Cloud コンポーネントのいずれかを使用する場合は、使用したリソースに対して課金されます。
- Bigtable(Bigtable エミュレータの使用は無料)
- Dataproc
- Cloud Storage
Dataproc の料金は、Compute Engine クラスタでの Dataproc の使用に適用されます。Dataproc Serverless の料金は、Dataproc Serverless for Spark で実行されるワークロードとセッションに適用されます。
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
始める前に
Bigtable Spark コネクタを使用する前に、次の前提条件を満たす必要があります。
必要なロール
Bigtable Spark コネクタの使用に必要な権限を取得するには、プロジェクトに対する次の IAM ロールの付与を管理者に依頼してください。
- Bigtable 管理者(
roles/bigtable.admin
)(省略可): データを読み取りまたは書き込みして、新しいテーブルを作成できます。 - Bigtable ユーザー(
roles/bigtable.user
): データを読み取りまたは書き込みできますが、新しいテーブルの作成はできません。
ロールの付与の詳細については、アクセス権の管理をご覧ください。
必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。
Dataproc または Cloud Storage を使用している場合は、追加の権限が必要になる場合があります。詳細については、Dataproc の権限と Cloud Storage の権限をご覧ください。
Spark を設定する
Bigtable インスタンスを作成する以外に、Spark インスタンスも設定する必要があります。ローカルで行うか、次のいずれかのオプションを選択して、Dataproc で Spark を使用できます。
- Dataproc クラスタ
- Dataproc Serverless
Dataproc クラスタまたはサーバーレス オプションの選択の詳細については、Spark 用 Dataproc Serverless と Compute Engine 上の Dataproc の比較に関するドキュメントをご覧ください。
コネクタの JAR ファイルをダウンロードする
Bigtable Spark コネクタのソースコードと例は、Bigtable Spark コネクタの GitHub リポジトリにあります。
Spark の設定に基づいて、次のように JAR ファイルにアクセスできます。
PySpark をローカルで実行する場合は、
gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
Cloud Storage の場所からコネクタの JAR ファイルをダウンロードする必要があります。SCALA_VERSION
は Scala バージョンに置き換え、サポートされている唯一のバージョンとして2.12
に設定します。CONNECTOR_VERSION
は、使用するコネクタ バージョンに置き換えます。Dataproc クラスタまたはサーバーレス オプションの場合は、Scala または Java Spark アプリケーションに追加できるアーティファクトとして最新の JAR ファイルを使用します。JAR ファイルをアーティファクトとして使用する方法については、依存関係を管理するをご覧ください。
PySpark ジョブを Dataproc に送信する場合は、
gcloud dataproc jobs submit pyspark --jars
フラグを使用して、URI を Cloud Storage 内の JAR ファイルの場所(例:gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
)に設定します。
Spark アプリケーションに Bigtable 構成を追加する
Spark アプリケーションに Bigtable を操作するための Spark オプションを追加します。
サポートされている Spark オプション
com.google.cloud.spark.bigtable
パッケージの一部として利用可能な Spark オプションを使用します。
オプション名 | 必須 | デフォルト値 | 意味 |
---|---|---|---|
spark.bigtable.project.id |
はい | なし | Bigtable プロジェクト ID を設定します。 |
spark.bigtable.instance.id |
はい | なし | Bigtable インスタンス ID を設定します。 |
catalog |
はい | なし | DataFrame の SQL に似たスキーマと Bigtable テーブルのスキーマの間の変換形式を指定する JSON 形式を設定します。 詳細については、JSON 形式でテーブル メタデータを作成するをご覧ください。 |
spark.bigtable.app_profile.id |
いいえ | default |
Bigtable アプリ プロファイル ID を設定します。 |
spark.bigtable.write.timestamp.milliseconds |
いいえ | 現在のシステム時刻 | Bigtable に DataFrame を書き込むときに使用するタイムスタンプをミリ秒単位で設定します。 DataFrame 内のすべての行が同じタイムスタンプを使用するため、DataFrame に同じ行キー列を持つ行は、同じタイムスタンプを共有するため、Bigtable では単一のバージョンとして保持されることに注意してください。 |
spark.bigtable.create.new.table |
いいえ | false |
Bigtable に書き込む前に新しいテーブルを作成するには、true に設定します。 |
spark.bigtable.read.timerange.start.milliseconds または spark.bigtable.read.timerange.end.milliseconds |
いいえ | なし | タイムスタンプ(エポックタイムからのミリ秒単位)を設定して、それぞれ特定の開始日と終了日を持つセルをフィルタします。これらのパラメータは両方とも指定するか、いずれも指定しません。 |
spark.bigtable.push.down.row.key.filters |
いいえ | true |
サーバーサイドで単純な行キーフィルタリングを許可するには、true に設定します。複合行キーのフィルタリングはクライアントサイドに実装されています。詳しくは、フィルタを使用して特定の DataFrame 行を読み取るをご覧ください。 |
spark.bigtable.read.rows.attempt.timeout.milliseconds |
いいえ | 30 分 | Java 用 Bigtable クライアントの 1 つの DataFrame パーティションに対応する行の読み取り試行のタイムアウト時間を設定します。 |
spark.bigtable.read.rows.total.timeout.milliseconds |
いいえ | 12 時間 | Java 用 Bigtable クライアントの 1 つの DataFrame パーティションに対応する行の読み取り試行の合計タイムアウト時間を設定します。 |
spark.bigtable.mutate.rows.attempt.timeout.milliseconds |
いいえ | 1m | Java 用 Bigtable クライアントの 1 つの DataFrame パーティションに対応する行の変更試行のタイムアウト時間を設定します。 |
spark.bigtable.mutate.rows.total.timeout.milliseconds |
いいえ | 10 分 | Java 用 Bigtable クライアントの 1 つの DataFrame パーティションに対応する行の変更試行の合計タイムアウト時間を設定します。 |
spark.bigtable.batch.mutate.size |
いいえ | 100 |
各バッチのミューテーションの数を設定します。設定できる最大値は 100000 です。 |
spark.bigtable.enable.batch_mutate.flow_control |
いいえ | false |
バッチ ミューテーションのフロー制御を有効にするには、true に設定します。 |
テーブル メタデータを JSON 形式で作成する
Spark SQL DataFrames テーブル形式は、JSON 形式の文字列を使用して Bigtable テーブルに変換する必要があります。この文字列 JSON 形式により、データ形式が Bigtable 互換になります。.option("catalog", catalog_json_string)
オプションを使用すると、アプリケーション コードで JSON 形式を渡すことができます。
たとえば、次の DataFrame テーブルとそれに対応する Bigtable テーブルについて考えてみましょう。
以下の例では、DataFrame の name
列と birthYear
列が info
列ファミリーの下にグループ化され、名前がそれぞれ name
と birth_year
に変更されます。同様に、address
列は同じ列名の location
列ファミリーの下に格納されます。DataFrame の id
列は Bigtable の行キーに変換されます。
Bigtable には行キーに専用の列名がありません。この例では、id_rowkey
は、これが行キー列であることをコネクタに示すためだけに使用されます。行キー列には任意の名前を使用できます。また、JSON 形式で "rowkey":"column_name"
フィールドを宣言するときは、同じ名前を使用してください。
DataFrame | Bigtable テーブル = t1 | |||||||
列 | 行キー | 列ファミリー | ||||||
info | 場所 | |||||||
列 | 列 | |||||||
id | name | birthYear | address | id_rowkey | name | birth_year | address |
カタログの JSON 形式は次のとおりです。
"""
{
"table": {"name": "t1"},
"rowkey": "id_rowkey",
"columns": {
"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
"name": {"cf": "info", "col": "name", "type": "string"},
"birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
"address": {"cf": "location", "col": "address", "type": "string"}
}
}
"""
JSON 形式で使用されるキーと値は次のとおりです。
カタログキー | カタログ値 | JSON 形式 |
---|---|---|
テーブル | Bigtable テーブルの名前。 | "table":{"name":"t1"} テーブルが存在しない場合は、 .option("spark.bigtable.create.new.table", "true") を使用してテーブルを作成します。 |
rowkey | Bigtable の行キーとして使用される列の名前。DataFrame 列の列名が行キーとして使用されていることを確認します(例: id_rowkey )。複合キーは行キーとしても受け入れられます。例: "rowkey":"name:address" このアプローチをとると、すべての読み取りリクエストでテーブル全体のスキャンが必要になる行キーが発生する可能性があります。 |
"rowkey":"id_rowkey" , |
列 | 各 DataFrame 列を対応する Bigtable 列ファミリー("cf" )と列名("col" )にマッピングします。この列名は、DataFrame テーブルの列名と異なる場合があります。サポートされているデータ型には、string 、long 、binary があります。 |
"columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}" この例では、 id_rowkey が行キー、info と location が列ファミリーです。 |
サポートされるデータタイプ
コネクタは、カタログでの string
、long
、binary
(バイト配列)型の使用をサポートしています。int
や float
などの他の型のサポートが追加されるまで、このようなデータ型は、コネクタを使用して Bigtable に書き込む前に、バイト配列(Spark SQL の BinaryType
)に手動で変換できます。
さらに、Avro を使用して ArrayType
などの複雑な型をシリアル化することもできます。詳細については、Apache Avro を使用して複雑なデータ型をシリアル化するをご覧ください。
Bigtable に書き込む
.write()
関数とサポートされているオプションを使用して、Bigtable にデータを書き込みます。
Java
GitHub リポジトリの次のコードは、Java と Maven を使用して Bigtable に書き込みます。
String catalog = "{" +
"\"table\":{\"name\":\"" + tableName + "\"," +
"\"tableCoder\":\"PrimitiveType\"}," +
"\"rowkey\":\"wordCol\"," +
"\"columns\":{" +
"\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
"\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
"}}".replaceAll("\\s+", "");
…
private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
String createNewTable) {
dataframe
.write()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.option("spark.bigtable.create.new.table", createNewTable)
.save();
}
Python
GitHub リポジトリの次のコードは、Python を使用して Bigtable に書き込みます。
catalog = ''.join(("""{
"table":{"name":" """ + bigtable_table_name + """
", "tableCoder":"PrimitiveType"},
"rowkey":"wordCol",
"columns":{
"word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
"count":{"cf":"example_family", "col":"countCol", "type":"long"}
}
}""").split())
…
input_data = spark.createDataFrame(data)
print('Created the DataFrame:')
input_data.show()
input_data.write \
.format('bigtable') \
.options(catalog=catalog) \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.option('spark.bigtable.create.new.table', create_new_table) \
.save()
print('DataFrame was written to Bigtable.')
…
Bigtable から読み取る
.read()
関数を使用して、テーブルが Bigtable に正常にインポートされたかどうかを確認します。
Java
…
private static Dataset<Row> readDataframeFromBigtable(String catalog) {
Dataset<Row> dataframe = spark
.read()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.load();
return dataframe;
}
Python
…
records = spark.read \
.format('bigtable') \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.options(catalog=catalog) \
.load()
print('Reading the DataFrame from Bigtable:')
records.show()
プロジェクトをコンパイルする
Dataproc クラスタ、Dataproc Serverless、またはローカル Spark インスタンスでジョブを実行するために使用される JAR ファイルを生成します。JAR ファイルをローカルでコンパイルしてから、それを使用してジョブを送信できます。ジョブを送信すると、コンパイル済みの JAR へのパスが PATH_TO_COMPILED_JAR
環境変数として設定されます。
この手順は PySpark アプリケーションには適用されません。
依存関係を管理する
Bigtable Spark コネクタは、次の依存関係管理ツールをサポートしています。
JAR ファイルをコンパイルする
Maven
pom.xml ファイルに
spark-bigtable
依存関係を追加します。<dependencies> <dependency> <groupId>com.google.cloud.spark.bigtable</groupId> <artifactId>spark-bigtable_SCALA_VERSION</artifactId> <version>0.1.0</version> </dependency> </dependencies>
Maven Shade プラグインを
pom.xml
ファイルに追加して uber JAR を作成します。<plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins>
mvn clean install
コマンドを実行して JAR ファイルを生成します。
sbt
spark-bigtable
依存関係をbuild.sbt
ファイルに追加します。libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
project/plugins.sbt
ファイルまたはproject/assembly.sbt
ファイルにsbt-assembly
プラグインを追加して、Uber JAR ファイルを作成します。addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
sbt clean assembly
コマンドを実行して JAR ファイルを生成します。
Gradle
spark-bigtable
依存関係をbuild.gradle
ファイルに追加します。dependencies { implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0' }
build.gradle
ファイルに Shadow プラグインを追加して、uber JAR ファイルを作成します。plugins { id 'com.github.johnrengelman.shadow' version '8.1.1' id 'java' }
構成と JAR コンパイルの詳細については、Shadow プラグインのドキュメントをご覧ください。
ジョブの送信
Dataproc、Dataproc Serverless、またはローカル Spark インスタンスのいずれかを使用して Spark ジョブを送信し、アプリケーションを起動します。
ランタイム環境の設定
次の環境変数を設定します。
#Google Cloud
export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE
#Dataproc Serverless
export BIGTABLE_SPARK_SUBNET=SUBNET
export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME
#Scala/Java
export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR
#PySpark
export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR
次のように置き換えます。
- PROJECT_ID: Bigtable プロジェクトの永続的な識別子。
- INSTANCE_ID: Bigtable インスタンスの永続的な識別子。
- TABLE_NAME: テーブルの永続的な識別子。
- DATAPROC_CLUSTER: Dataproc クラスタの永続的な識別子。
- DATAPROC_REGION: Dataproc インスタンス内のクラスタのいずれかを含む Dataproc リージョン(例:
northamerica-northeast2
)。 - DATAPROC_ZONE: Dataproc クラスタが実行されるゾーン。
- SUBNET: サブネットの完全なリソースパス。
- GCS_BUCKET_NAME: Spark ワークロードの依存関係をアップロードするための Cloud Storage バケット。
- PATH_TO_COMPILED_JAR: コンパイルされた JAR へのフルパスまたは相対パス(Maven の場合は
/path/to/project/root/target/<compiled_JAR_name>
)。 - GCS_PATH_TO_CONNECTOR_JAR:
spark-bigtable_SCALA_VERSION_CONNECTOR_VERSION.jar
ファイルが配置されているgs://spark-lib/bigtable
Cloud Storage バケット。 - PATH_TO_PYTHON_FILE: PySpark アプリケーションの場合、Bigtable に対するデータの書き込みと読み取りに使用される Python ファイルへのパス。
- LOCAL_PATH_TO_CONNECTOR_JAR: PySpark アプリケーションの場合は、ダウンロードされた Bigtable Spark コネクタの JAR ファイルへのパス。
Spark ジョブの送信
Dataproc インスタンスまたはローカルの Spark 設定では、Spark ジョブを実行してデータを Bigtable にアップロードします。
Dataproc クラスタ
コンパイルされた JAR ファイルを使用して、Bigtable との間でデータの読み取りと書き込みを行う Dataproc クラスタジョブを作成します。
Dataproc クラスタを作成します。次の例は、Debian 10、2 つのワーカーノード、デフォルト構成を使用して Dataproc v2.0 クラスタを作成するサンプル コマンドを示しています。
gcloud dataproc clusters create \ $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \ --zone $BIGTABLE_SPARK_DATAPROC_ZONE \ --master-machine-type n2-standard-4 --master-boot-disk-size 500 \ --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \ --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
ジョブの送信
Scala / Java
次の例は、DataFrame でテストテーブルを作成し、テーブルを Bigtable に書き込み、テーブル内の単語数をカウントするロジックを含む
spark.bigtable.example.WordCount
クラスを示しています。gcloud dataproc jobs submit spark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --class=spark.bigtable.example.WordCount \ --jar=$PATH_TO_COMPILED_JAR \ -- \ $BIGTABLE_SPARK_PROJECT_ID \ $BIGTABLE_SPARK_INSTANCE_ID \ $BIGTABLE_SPARK_TABLE_NAME \
PySpark
gcloud dataproc jobs submit pyspark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --jars=$GCS_PATH_TO_CONNECTOR_JAR \ --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \ $PATH_TO_PYTHON_FILE \ -- \ --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \ --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \ --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
Dataproc Serverless
コンパイルされた JAR ファイルを使用して、Dataproc Serverless インスタンスで Bigtable との間でデータの読み取りと書き込みを行う Dataproc ジョブを作成します。
Scala / Java
gcloud dataproc batches submit spark \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
-- \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
--jars=$GCS_PATH_TO_CONNECTOR_JAR \
--properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
-- \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
ローカル Spark
ダウンロードした JAR ファイルを使用して、ローカルの Spark インスタンスで Bigtable との間でデータの読み取りと書き込みを行う Spark ジョブを作成します。Bigtable エミュレータを使用して Spark ジョブを送信することもできます。
Bigtable エミュレータを使用する
Bigtable エミュレータを使用する場合は、次の手順を行います。
次のコマンドを実行して、エミュレータを起動します。
gcloud beta emulators bigtable start
デフォルトでは、エミュレータは
localhost:8086
を選択します。BIGTABLE_EMULATOR_HOST
環境変数を設定します。export BIGTABLE_EMULATOR_HOST=localhost:8086
Bigtable エミュレータの使用について詳しくは、エミュレータを使用してテストするをご覧ください。
Spark ジョブの送信
ローカル Bigtable エミュレータを使用しているかどうかにかかわらず、spark-submit
コマンドを使用して Spark ジョブを送信します。
Scala / Java
spark-submit $PATH_TO_COMPILED_JAR \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
spark-submit \
--jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
--packages=org.slf4j:slf4j-reload4j:1.7.36 \
$PATH_TO_PYTHON_FILE \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
テーブルデータを確認する
次の cbt
CLI コマンドを実行して、データが Bigtable に書き込まれていることを確認します。cbt
CLI は、Google Cloud CLI のコンポーネントです。詳細については、cbt
CLI の概要をご覧ください。
cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
read $BIGTABLE_SPARK_TABLE_NAME
追加ソリューション
複雑な Spark SQL 型のシリアル化、特定の行の読み取り、クライアントサイド指標の生成など、特定のソリューションに Bigtable Spark コネクタを使用します。
フィルタを使用して特定の DataFrame 行を読み取る
DataFrame を使用して Bigtable から読み取る場合は、特定の行のみを読み取るようにフィルタを指定できます。テーブル全体のスキャンを回避するために、行キー列の ==
、<=
、startsWith
などの単純なフィルタはサーバー側で適用されます。複合行キーのフィルタや、行キー列の LIKE
フィルタなどの複雑なフィルタは、クライアント側で適用されます。
大きいテーブルを読み取る場合は、テーブル全体のスキャンを実行しないように、シンプルな行キーフィルタを使用することをおすすめします。次のサンプル ステートメントは、シンプルなフィルタを使用して読み取る方法を示しています。Spark フィルタでは、行キーに変換された DataFrame 列の名前を使用していることを確認してください。
dataframe.filter("id == 'some_id'").show()
フィルタを適用する際は、Bigtable テーブルの列名ではなく DataFrame 列名を使用します。
Apache Avro を使用して複雑なデータ型をシリアル化する
Bigtable Spark コネクタは、Apache Avro を使用した複雑な Spark SQL 型(ArrayType
、MapType
、StructType
など)のシリアル化のサポートを提供します。Apache Avro は、複雑なデータ構造の処理と保存によく使用されているレコードデータのデータシリアル化を提供します。
"avro":"avroSchema"
などの構文を使用して、Bigtable 内の列を Avro でエンコードするように指定します。次に、Bigtable に対して読み取りまたは書き込みを行う際に .option("avroSchema", avroSchemaString)
を使用すると、その列に対応する Avro スキーマを文字列形式で指定できます。異なるオプション名を使用できます。たとえば、異なる列に "anotherAvroSchema"
を使用し、複数の列に Avro スキーマを渡すことができます。
def catalogWithAvroColumn = s"""{
|"table":{"name":"ExampleAvroTable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
|}
|}""".stripMargin
クライアントサイドの指標を使用する
Bigtable Spark コネクタは Java 用 Bigtable クライアントをベースにしているため、クライアントサイドの指標はデフォルトでコネクタ内で有効になっています。これらの指標へのアクセスと解釈の詳細については、クライアントサイドの指標のドキュメントをご覧ください。
低レベルの RDD 関数で Java 用 Bigtable クライアントを使用する
Bigtable Spark コネクタは Java 用 Bigtable クライアントをベースにしているため、Spark アプリケーションでクライアントを直接使用して、mapPartitions
や foreachPartition
などのような低レベルの RDD 関数内で分散された読み取りまたは書き込みのリクエストを実行できます。
Java 用 Bigtable クライアントのクラスを使用するには、パッケージ名に com.google.cloud.spark.bigtable.repackaged
接頭辞を追加します。たとえば、クラス名は com.google.cloud.bigtable.data.v2.BigtableDataClient
ではなく com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient
を使用します。
Java 用 Bigtable クライアントの詳細については、Java 用 Bigtable クライアントをご覧ください。
次のステップ
- Dataproc で Spark ジョブを調整する方法を学ぶ。
- Bigtable Spark コネクタで Java 用 Bigtable クライアントのクラスを使用する。