Bigtable Spark 커넥터 사용
Bigtable Spark 커넥터를 사용하면 Bigtable에서 데이터를 읽고 쓸 수 있습니다. Spark SQL 및 DataFrame을 사용하여 Spark 애플리케이션에서 데이터를 읽을 수 있습니다. Bigtable Spark 커넥터를 사용하여 지원되는 Bigtable 작업은 다음과 같습니다.
- 데이터 쓰기
- 데이터 읽기
- 새 테이블 만들기
이 문서에서는 Spark SQL DataFrame 테이블을 Bigtable 테이블로 변환한 다음 JAR 파일을 컴파일하고 만들어 Spark 작업을 제출하는 방법을 보여줍니다.
Spark 및 Scala 지원 상태
Bigtable Spark 커넥터는 Scala 2.12 버전 및 다음 Spark 버전만 지원합니다.
Bigtable Spark 커넥터는 다음과 같은 Dataproc 버전을 지원합니다.
- 1.5 이미지 버전 클러스터
- 2.0 이미지 버전 클러스터
- 2.1 이미지 버전 클러스터
- 2.2 이미지 버전 클러스터
- Dataproc Serverless 런타임 버전 1.0
- Dataproc Serverless 런타임 버전 2.0
비용 계산
비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용하면 사용한 리소스에 대한 요금이 청구됩니다.
- Bigtable(Bigtable 에뮬레이터 사용 요금은 부과되지 않음)
- Dataproc
- Cloud Storage
Dataproc 가격 책정은 Compute Engine 클러스터에서 Dataproc을 사용할 때 적용됩니다. Dataproc Serverless 가격 책정은 Spark를 위한 서버리스 Dataproc에서 실행되는 워크로드 및 세션에 적용됩니다.
프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용합니다.
시작하기 전에
Bigtable Spark 커넥터를 사용하기 전에 다음 기본 요건을 완료하세요.
필요한 역할
Bigtable Spark 커넥터를 사용하는 데 필요한 권한을 얻으려면 관리자에게 프로젝트에 대한 다음 IAM 역할을 부여해 달라고 요청하세요.
-
Bigtable 관리자(
roles/bigtable.admin
)(선택사항): 데이터를 읽거나 쓰고 새 테이블을 만들 수 있습니다. -
Bigtable 사용자(
roles/bigtable.user
): 데이터를 읽거나 쓸 수 있지만 새 테이블을 만들 수는 없습니다.
역할 부여에 대한 자세한 내용은 액세스 관리를 참조하세요.
커스텀 역할이나 다른 사전 정의된 역할을 통해 필요한 권한을 얻을 수도 있습니다.
Dataproc 또는 Cloud Storage를 사용하는 경우 추가 권한이 필요할 수 있습니다. 자세한 내용은 Dataproc 권한 및 Cloud Storage 권한을 참조하세요.
Spark 설정
Bigtable 인스턴스를 만드는 것 외에도 Spark 인스턴스를 설정해야 합니다. 로컬에서 수행하거나 다음 옵션 중 하나를 선택하여 Dataproc에서 Spark를 사용할 수 있습니다.
- Dataproc 클러스터
- Dataproc Serverless
Dataproc 클러스터 또는 서버리스 옵션 중에서 선택하는 방법에 대한 자세한 내용은 Spark용 Dataproc Serverless와 Compute Engine 기반 Dataproc 비교 문서를 참조하세요.
커넥터 JAR 파일 다운로드
Bigtable Spark 커넥터 GitHub 저장소에서 예시가 포함된 Bigtable Spark 커넥터 소스 코드를 찾을 수 있습니다.
Spark 설정에 따라 다음과 같이 JAR 파일에 액세스할 수 있습니다.
PySpark를 로컬에서 실행하는 경우
gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
Cloud Storage 위치에서 커넥터의 JAR 파일을 다운로드해야 합니다.SCALA_VERSION
을 Scala 버전으로 바꾸고,2.12
를 지원되는 유일한 버전으로 설정하고,CONNECTOR_VERSION
을 사용할 커넥터 버전으로 바꿉니다.Dataproc 클러스터 또는 서버리스 옵션의 경우 최신 JAR 파일을 Scala 또는 Java Spark 애플리케이션에 추가할 수 있는 아티팩트로 사용합니다. JAR 파일을 아티팩트로 사용하는 방법에 대한 자세한 내용은 종속 항목 관리를 참조하세요.
Dataproc에 PySpark 작업을 제출하는 경우
gcloud dataproc jobs submit pyspark --jars
플래그를 사용하여 URI를 Cloud Storage의 JAR 파일 위치로 설정합니다(예:gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
).
Spark 애플리케이션에 Bigtable 구성 추가
Spark 애플리케이션에서 Bigtable과 상호작용할 수 있는 Spark 옵션을 추가합니다.
지원되는 Spark 옵션
com.google.cloud.spark.bigtable
패키지의 일부로 제공되는 Spark 옵션을 사용합니다.
옵션 이름 | 필수 | 기본값 | 의미 |
---|---|---|---|
spark.bigtable.project.id |
예 | 해당 사항 없음 | Bigtable 프로젝트 ID를 설정합니다. |
spark.bigtable.instance.id |
예 | 해당 사항 없음 | Bigtable 인스턴스 ID를 설정합니다. |
catalog |
예 | 해당 사항 없음 | DataFrame의 SQL과 유사한 스키마와 Bigtable 테이블의 스키마 간의 변환 형식을 지정하는 JSON 형식을 설정합니다. 자세한 내용은 JSON 형식으로 테이블 메타데이터 만들기를 참조하세요. |
spark.bigtable.app_profile.id |
No | default |
Bigtable 앱 프로필 ID를 설정합니다. |
spark.bigtable.write.timestamp.milliseconds |
No | 현재 시스템 시간 | Bigtable에 DataFrame을 쓸 때 사용할 타임스탬프를 밀리초 단위로 설정합니다. DataFrame의 모든 행은 동일한 타임스탬프를 사용하므로 DataFrame에 동일한 row key 열이 있는 행은 동일한 타임스탬프를 공유하므로 Bigtable에서 단일 버전으로 유지됩니다. |
spark.bigtable.create.new.table |
No | false |
새 테이블을 만들려면 Bigtable에 쓰기 전에 true 로 설정합니다. |
spark.bigtable.read.timerange.start.milliseconds 또는 spark.bigtable.read.timerange.end.milliseconds |
No | 해당 사항 없음 | 타임스탬프(에포크 시간 이후의 밀리초 단위)를 설정하여 각각 특정 시작일과 종료일이 있는 셀을 필터링합니다. 이러한 매개변수를 모두 지정하거나 전혀 지정하지 않아야 합니다. |
spark.bigtable.push.down.row.key.filters |
No | true |
서버 측에서 간단한 row key 필터링을 허용하려면 true 로 설정합니다. 복합 row key에 대한 필터링은 클라이언트 측에서 구현됩니다.자세한 내용은 필터를 사용하여 특정 DataFrame 행 읽기를 참조하세요. |
spark.bigtable.read.rows.attempt.timeout.milliseconds |
No | 30분 | Java용 Bigtable 클라이언트에서 하나의 DataFrame 파티션에 해당하는 읽기 행 시도의 제한 시간을 설정합니다. |
spark.bigtable.read.rows.total.timeout.milliseconds |
No | 12시간 | Java용 Bigtable 클라이언트에서 하나의 DataFrame 파티션에 해당하는 읽기 행 시도의 총 제한 시간을 설정합니다. |
spark.bigtable.mutate.rows.attempt.timeout.milliseconds |
No | 1분 | Java용 Bigtable 클라이언트에서 하나의 DataFrame 파티션에 해당하는 변형 행 시도의 제한 시간을 설정합니다. |
spark.bigtable.mutate.rows.total.timeout.milliseconds |
No | 10분 | Java용 Bigtable 클라이언트에서 하나의 DataFrame 파티션에 해당하는 변형 행 시도의 총 제한 시간을 설정합니다. |
spark.bigtable.batch.mutate.size |
No | 100 |
각 배치의 변형 수로 설정합니다. 설정할 수 있는 최댓값은 100000 입니다. |
spark.bigtable.enable.batch_mutate.flow_control |
No | false |
배치 변형에 흐름 제어를 사용 설정하려면 true 로 설정합니다. |
JSON 형식으로 테이블 메타데이터 만들기
Spark SQL DataFrame 테이블 형식은 JSON 형식의 문자열을 사용하여 Bigtable 테이블로 변환해야 합니다. 이 문자열 JSON 형식을 사용하면 데이터 형식이 Bigtable과 호환됩니다. .option("catalog", catalog_json_string)
옵션을 사용하여 애플리케이션 코드에 JSON 형식을 전달할 수 있습니다.
예를 들어 다음 DataFrame 테이블과 해당 Bigtable 테이블을 살펴보겠습니다.
이 예시에서는 DataFrame의 name
열과 birthYear
열이 info
column family 아래에 그룹화되고 이름이 각각 name
과 birth_year
로 바뀝니다. 마찬가지로 address
열은 열 이름이 같은 location
column family 아래에 저장됩니다. DataFrame의 id
열은 Bigtable row key로 변환됩니다.
Bigtable에서는 row key에 전용 열 이름이 없으며 이 예시에서 id_rowkey
는 커넥터에 이 열이 row key 열임을 알리는 데만 사용됩니다. row key 열에는 어떤 이름이든 사용할 수 있으며, JSON 형식으로 "rowkey":"column_name"
필드를 선언할 때 동일한 이름을 사용해야 합니다.
DataFrame | Bigtable 테이블 = t1 | |||||||
열 | row key | column family | ||||||
info | 위치 | |||||||
열 | 열 | |||||||
id | name | birthYear | 주소 | id_rowkey | name | birth_year | 주소 |
카탈로그의 JSON 형식은 다음과 같습니다.
"""
{
"table": {"name": "t1"},
"rowkey": "id_rowkey",
"columns": {
"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
"name": {"cf": "info", "col": "name", "type": "string"},
"birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
"address": {"cf": "location", "col": "address", "type": "string"}
}
}
"""
JSON 형식에 사용되는 키와 값은 다음과 같습니다.
카탈로그 키 | 카탈로그 값 | JSON 형식 |
---|---|---|
테이블 | Bigtable 테이블의 이름입니다. | "table":{"name":"t1"} 테이블이 없으면 .option("spark.bigtable.create.new.table", "true") 을 사용하여 테이블을 만듭니다. |
rowkey | Bigtable row key로 사용될 열의 이름입니다. DataFrame 열의 열 이름이 row key로 사용되는지 확인합니다(예: id_rowkey ). 복합 키도 row key로 허용됩니다. 예를 들면 "rowkey":"name:address" 입니다. 이 접근 방식을 사용하면 모든 읽기 요청에 전체 테이블 스캔이 필요한 row key가 생성될 수 있습니다. |
"rowkey":"id_rowkey" |
열 | 각 DataFrame 열을 해당 Bigtable column family("cf" ) 및 열 이름("col" )에 매핑합니다. 열 이름은 DataFrame 테이블의 열 이름과 다를 수 있습니다. 지원되는 데이터 유형에는 string , long , binary 가 있습니다. |
"columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}" 이 예시에서 id_rowkey 는 row key이고 info 및 location 은 column family입니다. |
지원되는 데이터 유형
커넥터는 카탈로그에서 string
, long
, binary
(바이트 배열) 유형 사용을 지원합니다. int
및 float
와 같은 다른 유형에 대한 지원이 추가되기 전까지, 커넥터를 사용하여 Bigtable에 쓰기 전에 이러한 데이터 유형을 바이트 배열(Spark SQL의 BinaryType
)로 수동으로 변환할 수 있습니다.
또한 Avro를 사용하여 ArrayType
과 같은 복잡한 유형을 직렬화할 수 있습니다. 자세한 내용은 Apache Avro를 사용하여 복잡한 데이터 유형 직렬화를 참조하세요.
Bigtable에 쓰기
.write()
함수와 지원되는 옵션을 사용하여 Bigtable에 데이터를 씁니다.
Java
GitHub 저장소의 다음 코드는 Java 및 Maven을 사용하여 Bigtable에 씁니다.
String catalog = "{" +
"\"table\":{\"name\":\"" + tableName + "\"," +
"\"tableCoder\":\"PrimitiveType\"}," +
"\"rowkey\":\"wordCol\"," +
"\"columns\":{" +
"\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
"\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
"}}".replaceAll("\\s+", "");
…
private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
String createNewTable) {
dataframe
.write()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.option("spark.bigtable.create.new.table", createNewTable)
.save();
}
Python
GitHub 저장소의 다음 코드는 Python을 사용하여 Bigtable에 씁니다.
catalog = ''.join(("""{
"table":{"name":" """ + bigtable_table_name + """
", "tableCoder":"PrimitiveType"},
"rowkey":"wordCol",
"columns":{
"word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
"count":{"cf":"example_family", "col":"countCol", "type":"long"}
}
}""").split())
…
input_data = spark.createDataFrame(data)
print('Created the DataFrame:')
input_data.show()
input_data.write \
.format('bigtable') \
.options(catalog=catalog) \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.option('spark.bigtable.create.new.table', create_new_table) \
.save()
print('DataFrame was written to Bigtable.')
…
Bigtable에서 읽기
.read()
함수를 사용하여 테이블을 Bigtable로 성공적으로 가져왔는지 확인합니다.
Java
…
private static Dataset<Row> readDataframeFromBigtable(String catalog) {
Dataset<Row> dataframe = spark
.read()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.load();
return dataframe;
}
Python
…
records = spark.read \
.format('bigtable') \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.options(catalog=catalog) \
.load()
print('Reading the DataFrame from Bigtable:')
records.show()
프로젝트 컴파일
Dataproc 클러스터, Dataproc Serverless 또는 로컬 Spark 인스턴스에서 작업을 실행하는 데 사용되는 JAR 파일을 생성합니다. JAR 파일을 로컬에서 컴파일한 후 이를 사용하여 작업을 제출할 수 있습니다. 작업을 제출하면 컴파일된 JAR의 경로가 PATH_TO_COMPILED_JAR
환경 변수로 설정됩니다.
이 단계는 PySpark 애플리케이션에 적용되지 않습니다.
종속 항목 관리
Bigtable Spark 커넥터는 다음과 같은 종속 항목 관리 도구를 지원합니다.
JAR 파일 컴파일
Maven
pom.xml 파일에
spark-bigtable
종속 항목을 추가합니다.<dependencies> <dependency> <groupId>com.google.cloud.spark.bigtable</groupId> <artifactId>spark-bigtable_SCALA_VERSION</artifactId> <version>0.1.0</version> </dependency> </dependencies>
pom.xml
파일에 Maven Shade 플러그인을 추가하여 Uber JAR을 만듭니다.<plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins>
mvn clean install
명령어를 실행하여 JAR 파일을 생성합니다.
sbt
spark-bigtable
종속 항목을build.sbt
파일에 추가합니다.libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
project/plugins.sbt
또는project/assembly.sbt
파일에sbt-assembly
플러그인을 추가하여 Uber JAR 파일을 만듭니다.addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
sbt clean assembly
명령어를 실행하여 JAR 파일을 생성합니다.
Gradle
spark-bigtable
종속 항목을build.gradle
파일에 추가합니다.dependencies { implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0' }
build.gradle
파일에 Shadow 플러그인을 추가하여 Uber JAR 파일을 만듭니다.plugins { id 'com.github.johnrengelman.shadow' version '8.1.1' id 'java' }
자세한 구성 및 JAR 컴파일 정보는 섀도 플러그인 문서를 참조하세요.
작업 제출
Dataproc, Dataproc Serverless, 또는 로컬 Spark 인스턴스를 사용하여 Spark 작업을 제출하여 애플리케이션을 실행합니다.
런타임 환경 설정
다음 환경 변수를 설정합니다.
#Google Cloud
export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE
#Dataproc Serverless
export BIGTABLE_SPARK_SUBNET=SUBNET
export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME
#Scala/Java
export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR
#PySpark
export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR
다음을 바꿉니다.
- PROJECT_ID: Bigtable 프로젝트의 영구 식별자입니다.
- INSTANCE_ID: Bigtable 인스턴스의 영구 식별자입니다.
- TABLE_NAME: 테이블의 영구 식별자입니다.
- DATAPROC_CLUSTER: Dataproc 클러스터의 영구 식별자입니다.
- DATAPROC_REGION: Dataproc 인스턴스의 클러스터 중 하나가 포함된 Dataproc 리전입니다(예:
northamerica-northeast2
). - DATAPROC_ZONE: Dataproc 클러스터가 실행되는 영역입니다.
- SUBNET: 서브넷의 전체 리소스 경로입니다.
- GCS_BUCKET_NAME: Spark 워크로드 종속 항목을 업로드할 Cloud Storage 버킷입니다.
- PATH_TO_COMPILED_JAR: 컴파일된 JAR의 전체 또는 상대 경로입니다(예: Maven의 경우
/path/to/project/root/target/<compiled_JAR_name>
). - GCS_PATH_TO_CONNECTOR_JAR:
spark-bigtable_SCALA_VERSION_CONNECTOR_VERSION.jar
파일이 있는gs://spark-lib/bigtable
Cloud Storage 버킷입니다. - PATH_TO_PYTHON_FILE: PySpark 애플리케이션의 경우 Bigtable에서 데이터를 쓰고 읽는 데 사용할 Python 파일의 경로입니다.
- LOCAL_PATH_TO_CONNECTOR_JAR: PySpark 애플리케이션의 경우 다운로드한 Bigtable Spark 커넥터 JAR 파일의 경로입니다.
Spark 작업 제출
Dataproc 인스턴스 또는 로컬 Spark 설정의 경우 Spark 작업을 실행하여 데이터를 Bigtable에 업로드합니다.
Dataproc 클러스터
컴파일된 JAR 파일을 사용하여 Bigtable에서 데이터를 읽고 쓰는 Dataproc 클러스터 작업을 만듭니다.
Dataproc 클러스터를 만듭니다. 다음 예시는 Debian 10, 워커 노드 2개, 기본 구성으로 Dataproc v2.0 클러스터를 만드는 샘플 명령어를 보여줍니다.
gcloud dataproc clusters create \ $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \ --zone $BIGTABLE_SPARK_DATAPROC_ZONE \ --master-machine-type n2-standard-4 --master-boot-disk-size 500 \ --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \ --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
작업 제출
Scala/Java
다음 예시에서는 DataFrame에 테스트 테이블을 만들고 테이블을 Bigtable에 작성한 다음 테이블의 단어 수를 계산하는 로직이 포함된
spark.bigtable.example.WordCount
클래스를 보여줍니다.gcloud dataproc jobs submit spark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --class=spark.bigtable.example.WordCount \ --jar=$PATH_TO_COMPILED_JAR \ -- \ $BIGTABLE_SPARK_PROJECT_ID \ $BIGTABLE_SPARK_INSTANCE_ID \ $BIGTABLE_SPARK_TABLE_NAME \
PySpark
gcloud dataproc jobs submit pyspark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --jars=$GCS_PATH_TO_CONNECTOR_JAR \ --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \ $PATH_TO_PYTHON_FILE \ -- \ --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \ --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \ --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
Dataproc Serverless
컴파일된 JAR 파일을 사용하고 Dataproc Serverless 인스턴스로 Bigtable에서 데이터를 읽고 쓰는 Dataproc 작업을 만듭니다.
Scala/Java
gcloud dataproc batches submit spark \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
-- \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
--jars=$GCS_PATH_TO_CONNECTOR_JAR \
--properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
-- \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
로컬 Spark
다운로드한 JAR 파일을 사용하여 로컬 Spark 인스턴스로 Bigtable에서 데이터를 읽고 쓰는 Spark 작업을 만듭니다. Bigtable 에뮬레이터를 사용하여 Spark 작업을 제출할 수도 있습니다.
Bigtable 에뮬레이터 사용
Bigtable 에뮬레이터를 사용하려면 다음 단계를 따르세요.
다음 명령어를 실행하여 에뮬레이터를 시작합니다.
gcloud beta emulators bigtable start
기본적으로 에뮬레이터는
localhost:8086
을 선택합니다.BIGTABLE_EMULATOR_HOST
환경 변수를 설정합니다.export BIGTABLE_EMULATOR_HOST=localhost:8086
Bigtable 에뮬레이터 사용에 대한 자세한 내용은 에뮬레이터 사용 테스트를 참조하세요.
Spark 작업 제출
로컬 Bigtable 에뮬레이터를 사용하는지 여부에 관계없이 spark-submit
명령어를 사용하여 Spark 작업을 제출합니다.
Scala/Java
spark-submit $PATH_TO_COMPILED_JAR \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
spark-submit \
--jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
--packages=org.slf4j:slf4j-reload4j:1.7.36 \
$PATH_TO_PYTHON_FILE \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
테이블 데이터 확인
다음 cbt
CLI 명령어를 실행하여 데이터가 Bigtable에 기록되는지 확인합니다. cbt
CLI는 Google Cloud CLI의 구성요소입니다. 자세한 내용은 cbt
CLI 개요를 참조하세요.
cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
read $BIGTABLE_SPARK_TABLE_NAME
추가 솔루션
복잡한 Spark SQL 유형 직렬화, 특정 행 읽기, 클라이언트 측 측정항목 생성과 같은 특정 솔루션에 Bigtable Spark 커넥터를 사용합니다.
필터를 사용하여 특정 DataFrame 행 읽기
DataFrame을 사용하여 Bigtable에서 읽는 경우 특정 행만 읽도록 필터를 지정할 수 있습니다. row key 열의 ==
, <=
, startsWith
와 같은 간단한 필터는 전체 테이블 스캔을 방지하기 위해 서버 측에 적용됩니다. 복합 row key의 필터나 row key 열의 LIKE
필터와 같은 복합 필터는 클라이언트 측에서 적용됩니다.
큰 테이블을 읽는 경우 전체 테이블 스캔을 수행하지 않도록 간단한 row key 필터를 사용하는 것이 좋습니다. 다음 샘플 문에서는 간단한 필터를 사용하여 읽는 방법을 보여줍니다. Spark 필터에서 row key로 변환되는 DataFrame 열의 이름을 사용해야 합니다.
dataframe.filter("id == 'some_id'").show()
필터를 적용할 때는 Bigtable 테이블 열 이름 대신 DataFrame 열 이름을 사용합니다.
Apache Avro를 사용하여 복잡한 데이터 유형 직렬화
Bigtable Spark 커넥터는 Apache Avro를 사용하여 ArrayType
, MapType
또는 StructType
과 같은 복잡한 Spark SQL 유형을 직렬화할 수 있도록 지원합니다. Apache Avro는 복잡한 데이터 구조를 처리하고 저장하는 데 일반적으로 사용되는 레코드 데이터의 데이터 직렬화를 제공합니다.
"avro":"avroSchema"
와 같은 구문을 사용하여 Bigtable의 열이 Avro로 인코딩되도록 지정합니다. 그런 다음 Bigtable에서 읽거나 쓸 때 .option("avroSchema", avroSchemaString)
을 사용하여 해당 열에 해당하는 Avro 스키마를 문자열 형식으로 지정할 수 있습니다. 예를 들어 "anotherAvroSchema"
와 같은 다양한 옵션 이름을 사용하고 여러 열에 대한 Avro 스키마를 전달할 수 있습니다.
def catalogWithAvroColumn = s"""{
|"table":{"name":"ExampleAvroTable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
|}
|}""".stripMargin
클라이언트 측 측정항목 사용
Bigtable Spark 커넥터는 Java용 Bigtable 클라이언트를 기반으로 하므로 클라이언트 측 측정항목은 커넥터 내에서 기본적으로 사용 설정됩니다. 이러한 측정항목 액세스 및 해석에 대한 자세한 내용은 클라이언트 측 측정항목 문서를 참조하세요.
하위 수준 RDD 함수와 함께 Java용 Bigtable 클라이언트 사용
Bigtable Spark 커넥터는 Java용 Bigtable 클라이언트를 기반으로 하므로 에서 Spark 애플리케이션에서 직접 클라이언트를 사용하고 mapPartitions
및 foreachPartition
과 같은 하위 수준 RDD 함수 내에서 분산 읽기 또는 쓰기 요청을 수행할 수 있습니다.
Java 클래스용 Bigtable 클라이언트를 사용하려면 패키지 이름에 com.google.cloud.spark.bigtable.repackaged
프리픽스를 추가합니다. 예를 들어 클래스 이름을 com.google.cloud.bigtable.data.v2.BigtableDataClient
대신 com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient
를 사용합니다.
Java용 Bigtable 클라이언트에 대한 자세한 내용은 Java용 Bigtable 클라이언트를 참조하세요.
다음 단계
- Dataproc에서 Spark 작업을 미세 조정하는 방법 알아보기
- Bigtable Spark 커넥터와 함께 Java용 Bigtable 클라이언트의 클래스 사용하기