Bigtable Spark 커넥터 사용

Bigtable Spark 커넥터를 사용하면 Bigtable에서 데이터를 읽고 쓸 수 있습니다. Spark SQL 및 DataFrame을 사용하여 Spark 애플리케이션에서 데이터를 읽을 수 있습니다. Bigtable Spark 커넥터를 사용하여 지원되는 Bigtable 작업은 다음과 같습니다.

  • 데이터 쓰기
  • 데이터 읽기
  • 새 테이블 만들기

이 문서에서는 Spark SQL DataFrame 테이블을 Bigtable 테이블로 변환한 다음 JAR 파일을 컴파일하고 만들어 Spark 작업을 제출하는 방법을 보여줍니다.

Spark 및 Scala 지원 상태

Bigtable Spark 커넥터는 Scala 2.12 버전과 다음 Spark 버전만 지원합니다.

Bigtable Spark 커넥터는 다음과 같은 Dataproc 버전을 지원합니다.

비용 계산

다음 Google Cloud의 청구 가능한 구성요소를 사용하면 사용하는 리소스에 대한 요금이 청구됩니다.

  • Bigtable(Bigtable 에뮬레이터 사용 요금은 부과되지 않음)
  • Dataproc
  • Cloud Storage

Dataproc 가격 책정은 Compute Engine 클러스터의 Dataproc 사용에 적용됩니다. Dataproc Serverless 가격 책정은 Spark를 위한 서버리스 Dataproc에서 실행되는 워크로드 및 세션에 적용됩니다.

프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용합니다.

시작하기 전에

Bigtable Spark 커넥터를 사용하기 전에 다음 기본 요건을 완료하세요.

필요한 역할

Bigtable Spark 커넥터를 사용하는 데 필요한 권한을 얻으려면 관리자에게 프로젝트에 대한 다음 IAM 역할을 부여해 달라고 요청하세요.

  • Bigtable 관리자(roles/bigtable.admin)(선택사항): 데이터를 읽거나 쓰고 새 테이블을 만들 수 있습니다.
  • Bigtable 사용자(roles/bigtable.user): 데이터를 읽거나 쓸 수 있지만 새 테이블을 만들 수는 없습니다.

역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

커스텀 역할이나 다른 사전 정의된 역할을 통해 필요한 권한을 얻을 수도 있습니다.

Dataproc 또는 Cloud Storage를 사용하는 경우 추가 권한이 필요할 수 있습니다. 자세한 내용은 Dataproc 권한Cloud Storage 권한을 참고하세요.

Spark 설정

Bigtable 인스턴스를 만드는 것 외에도 Spark 인스턴스를 설정해야 합니다. 로컬에서 수행하거나 다음 옵션 중 하나를 선택하여 Dataproc에서 Spark를 사용할 수 있습니다.

  • Dataproc 클러스터
  • Dataproc Serverless

Dataproc 클러스터 또는 서버리스 옵션 중에서 선택하는 방법에 대한 자세한 내용은 Spark용 Dataproc Serverless와 Compute Engine 기반 Dataproc 비교 문서를 참조하세요.

커넥터 JAR 파일 다운로드

Bigtable Spark 커넥터 GitHub 저장소에서 예시와 함께 Bigtable Spark 커넥터 소스 코드를 확인할 수 있습니다.

Spark 설정에 따라 다음과 같이 JAR 파일에 액세스할 수 있습니다.

  • PySpark를 로컬에서 실행하는 경우 gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar Cloud Storage 위치에서 커넥터의 JAR 파일을 다운로드해야 합니다.

    SCALA_VERSION을 Scala 버전으로 바꾸고, 2.12를 지원되는 유일한 버전으로 설정하고, CONNECTOR_VERSION을 사용할 커넥터 버전으로 바꿉니다.

  • Dataproc 클러스터 또는 서버리스 옵션의 경우 최신 JAR 파일을 Scala 또는 Java Spark 애플리케이션에 추가할 수 있는 아티팩트로 사용합니다. JAR 파일을 아티팩트로 사용하는 방법에 관한 자세한 내용은 종속 항목 관리를 참고하세요.

  • Dataproc에 PySpark 작업을 제출하는 경우 gcloud dataproc jobs submit pyspark --jars 플래그를 사용하여 URI를 Cloud Storage의 JAR 파일 위치(예: gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar)로 설정합니다.

컴퓨팅 유형 확인

읽기 전용 작업의 경우 Data Boost(미리보기) 서버리스 컴퓨팅을 사용하면 애플리케이션 제공 클러스터에 영향을 주지 않을 수 있습니다. Spark 애플리케이션에서 Data Boost를 사용하려면 Spark 커넥터 버전 1.1.0 이상을 사용해야 합니다.

Data Boost를 사용하려면 Data Boost 앱 프로필을 만든 다음 Spark 애플리케이션에 Bigtable 구성을 추가할 때 spark.bigtable.app_profile.id Spark 옵션에 대한 앱 프로필 ID를 제공해야 합니다. 이미 Spark 읽기 작업에 사용할 앱 프로필을 만들었고 애플리케이션 코드를 변경하지 않고 계속 사용하려면 앱 프로필을 Data Boost 앱 프로필로 변환하면 됩니다. 자세한 내용은 앱 프로필 변환을 참고하세요.

자세한 내용은 Bigtable Data Boost 개요를 참고하세요.

읽기 및 쓰기가 포함된 작업의 경우 요청에 표준 앱 프로필을 지정하여 인스턴스의 클러스터 노드를 컴퓨팅에 사용할 수 있습니다.

사용할 앱 프로필 식별 또는 만들기

앱 프로필 ID를 지정하지 않으면 커넥터는 기본 앱 프로필을 사용합니다.

Spark 애플리케이션을 비롯하여 실행하는 각 애플리케이션마다 고유한 앱 프로필을 사용하는 것이 좋습니다. 앱 프로필 유형 및 설정에 관한 자세한 내용은 앱 프로필 개요를 참고하세요. 자세한 내용은 앱 프로필 만들기 및 구성을 참고하세요.

Spark 애플리케이션에 Bigtable 구성 추가

Spark 애플리케이션에서 Bigtable과 상호작용할 수 있는 Spark 옵션을 추가합니다.

지원되는 Spark 옵션

com.google.cloud.spark.bigtable 패키지의 일부로 제공되는 Spark 옵션을 사용합니다.

옵션 이름 필수 기본값 의미
spark.bigtable.project.id 해당 사항 없음 Bigtable 프로젝트 ID를 설정합니다.
spark.bigtable.instance.id 해당 사항 없음 Bigtable 인스턴스 ID를 설정합니다.
catalog 해당 사항 없음 DataFrame의 SQL과 유사한 스키마와 Bigtable 테이블의 스키마 간의 변환 형식을 지정하는 JSON 형식을 설정합니다.

자세한 내용은 JSON 형식의 테이블 메타데이터 만들기를 참고하세요.
spark.bigtable.app_profile.id 아니요 default Bigtable 앱 프로필 ID를 설정합니다.
spark.bigtable.write.timestamp.milliseconds 아니요 현재 시스템 시간 Bigtable에 DataFrame을 쓸 때 사용할 밀리초 단위의 타임스탬프를 설정합니다.

DataFrame의 모든 행은 동일한 타임스탬프를 사용하므로 DataFrame에서 동일한 row key 열을 가진 행은 동일한 타임스탬프를 공유할 때 Bigtable에서 단일 버전으로 유지됩니다.
spark.bigtable.create.new.table 아니요 false Bigtable에 쓰기 전에 새 테이블을 만들려면 true로 설정합니다.
spark.bigtable.read.timerange.start.milliseconds 또는 spark.bigtable.read.timerange.end.milliseconds 아니요 해당 사항 없음 타임스탬프(에포크 시간 이후의 밀리초 단위)를 설정하여 각각 특정 시작일과 종료일이 있는 셀을 필터링합니다.
spark.bigtable.push.down.row.key.filters 아니요 true 서버 측에서 간단한 row key 필터링을 허용하려면 true로 설정합니다. 복합 row key에 대한 필터링은 클라이언트 측에서 구현됩니다.

자세한 내용은 필터를 사용하여 특정 DataFrame 행 읽기를 참고하세요.
spark.bigtable.read.rows.attempt.timeout.milliseconds 아니요 30분 Java용 Bigtable 클라이언트에서 하나의 DataFrame 파티션에 해당하는 읽기 행 시도의 제한 시간을 설정합니다.
spark.bigtable.read.rows.total.timeout.milliseconds 아니요 12시간 Java용 Bigtable 클라이언트에서 하나의 DataFrame 파티션에 해당하는 읽기 행 시도의 제한 시간을 설정합니다.
spark.bigtable.mutate.rows.attempt.timeout.milliseconds 아니요 1분 Java용 Bigtable 클라이언트에서 하나의 DataFrame 파티션에 해당하는 변형 행 시도의 제한 시간을 설정합니다.
spark.bigtable.mutate.rows.total.timeout.milliseconds 아니요 10분 Java용 Bigtable 클라이언트에서 하나의 DataFrame 파티션에 해당하는 변형 행 시도의 제한 시간을 설정합니다.
spark.bigtable.batch.mutate.size 아니요 100 각 일괄 처리의 변형 수로 설정합니다. 설정할 수 있는 최대값은 100000입니다.
spark.bigtable.enable.batch_mutate.flow_control 아니요 false 배치 변형에 흐름 제어를 사용 설정하려면 true로 설정합니다.

JSON 형식의 테이블 메타데이터 만들기

Spark SQL DataFrame 테이블 형식은 JSON 형식의 문자열을 사용하여 Bigtable 테이블로 변환해야 합니다. 이 문자열 JSON 형식은 Bigtable과 호환되는 데이터 형식을 만듭니다. .option("catalog", catalog_json_string) 옵션을 사용하여 애플리케이션 코드에 JSON 형식을 전달할 수 있습니다.

예를 들어 다음 DataFrame 테이블과 해당 Bigtable 테이블을 살펴보겠습니다.

이 예시에서는 DataFrame의 name 열과 birthYear 열이 info column family 아래에 그룹화되고 이름이 각각 namebirth_year로 바뀝니다. 마찬가지로 address 열은 동일한 열 이름을 가진 location column family 아래에 저장됩니다. DataFrame의 id 열이 Bigtable row key로 변환됩니다.

Bigtable에서는 row key에 전용 열 이름이 없으며 이 예시에서 id_rowkey는 커넥터에 이 열이 row key 열임을 알리는 데만 사용됩니다. row key 열에는 어떤 이름이든 사용할 수 있으며, JSON 형식으로 "rowkey":"column_name" 필드를 선언할 때 동일한 이름을 사용해야 합니다.

DataFrame Bigtable 테이블 = t1
row key column family
info 위치
id name birthYear 주소 id_rowkey name birth_year 주소

카탈로그의 JSON 형식은 다음과 같습니다.

    """
    {
      "table": {"name": "t1"},
      "rowkey": "id_rowkey",
      "columns": {
        "id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
        "name": {"cf": "info", "col": "name", "type": "string"},
        "birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
        "address": {"cf": "location", "col": "address", "type": "string"}
      }
    }
    """

JSON 형식에 사용되는 키와 값은 다음과 같습니다.

카탈로그 키 카탈로그 값 JSON 형식
Bigtable 테이블의 이름입니다. "table":{"name":"t1"}

테이블이 없으면 .option("spark.bigtable.create.new.table", "true")을 사용하여 테이블을 만듭니다.
rowkey Bigtable row key로 사용될 열의 이름입니다. DataFrame 열의 열 이름이 row key로 사용되는지 확인합니다(예: id_rowkey).

복합 키도 row key로 허용됩니다. 예를 들면 "rowkey":"name:address"입니다. 이 접근 방식을 사용하면 모든 읽기 요청에 전체 테이블 스캔이 필요한 row key가 생성될 수 있습니다.
"rowkey":"id_rowkey",
각 DataFrame 열을 해당 Bigtable column family("cf") 및 열 이름("col")에 매핑합니다. 열 이름은 DataFrame 테이블의 열 이름과 다를 수 있습니다. 지원되는 데이터 유형에는 string, long, binary가 있습니다. "columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}"

이 예시에서 id_rowkey는 row key이고 infolocation은 column family입니다.

지원되는 데이터 유형

커넥터는 카탈로그에서 string, long, binary(바이트 배열) 유형 사용을 지원합니다. intfloat와 같은 다른 유형에 대한 지원이 추가되기 전까지, 커넥터를 사용하여 Bigtable에 쓰기 전에 이러한 데이터 유형을 바이트 배열(Spark SQL의 BinaryType)로 수동으로 변환할 수 있습니다.

또한 Avro를 사용하여 ArrayType과 같은 복합 유형을 직렬화할 수 있습니다. 자세한 내용은 Apache Avro를 사용하여 복잡한 데이터 유형 직렬화를 참고하세요.

Bigtable에 쓰기

.write() 함수와 지원되는 옵션을 사용하여 Bigtable에 데이터를 씁니다.

자바

GitHub 저장소의 다음 코드는 Java 및 Maven을 사용하여 Bigtable에 씁니다.

  String catalog = "{" +
        "\"table\":{\"name\":\"" + tableName + "\"," +
        "\"tableCoder\":\"PrimitiveType\"}," +
        "\"rowkey\":\"wordCol\"," +
        "\"columns\":{" +
        "\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
        "\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
        "}}".replaceAll("\\s+", "");



  private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
        String createNewTable) {
      dataframe
          .write()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .option("spark.bigtable.create.new.table", createNewTable)
          .save();
    }

Python

GitHub 저장소의 다음 코드는 Python을 사용하여 Bigtable에 씁니다.

  catalog = ''.join(("""{
        "table":{"name":" """ + bigtable_table_name + """
        ", "tableCoder":"PrimitiveType"},
        "rowkey":"wordCol",
        "columns":{
          "word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
          "count":{"cf":"example_family", "col":"countCol", "type":"long"}
        }
        }""").split())
  

  input_data = spark.createDataFrame(data)
  print('Created the DataFrame:')
  input_data.show()

  input_data.write \
        .format('bigtable') \
        .options(catalog=catalog) \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .option('spark.bigtable.create.new.table', create_new_table) \
        .save()
  print('DataFrame was written to Bigtable.')

  

Bigtable에서 읽기

.read() 함수를 사용하여 테이블을 Bigtable로 성공적으로 가져왔는지 확인합니다.

자바

  
  private static Dataset<Row> readDataframeFromBigtable(String catalog) {
      Dataset<Row> dataframe = spark
          .read()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .load();
      return dataframe;
    }

Python

  

  records = spark.read \
        .format('bigtable') \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .options(catalog=catalog) \
        .load()

  print('Reading the DataFrame from Bigtable:')
  records.show()

프로젝트 컴파일

Dataproc 클러스터, Dataproc Serverless 또는 로컬 Spark 인스턴스에서 작업을 실행하는 데 사용되는 JAR 파일을 생성합니다. JAR 파일을 로컬에서 컴파일한 다음 이를 사용하여 작업을 제출할 수 있습니다. 컴파일된 JAR의 경로는 작업을 제출할 때 PATH_TO_COMPILED_JAR 환경 변수로 설정됩니다.

이 단계는 PySpark 애플리케이션에는 적용되지 않습니다.

종속 항목 관리

Bigtable Spark 커넥터는 다음과 같은 종속 항목 관리 도구를 지원합니다.

JAR 파일 컴파일

Maven

  1. pom.xml 파일에 spark-bigtable 종속 항목을 추가합니다.

    <dependencies>
    <dependency>
      <groupId>com.google.cloud.spark.bigtable</groupId>
      <artifactId>spark-bigtable_SCALA_VERSION</artifactId>
      <version>0.1.0</version>
    </dependency>
    </dependencies>
    
  2. pom.xml 파일에 Maven Shade 플러그인을 추가하여 uber JAR을 만듭니다.

    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.2.4</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
    
  3. mvn clean install 명령어를 실행하여 JAR 파일을 생성합니다.

sbt

  1. spark-bigtable 종속 항목을 build.sbt 파일에 추가합니다.

    libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
  2. sbt-assembly 플러그인을 project/plugins.sbt 또는 project/assembly.sbt 파일에 추가하여 Uber JAR 파일을 만듭니다.

    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
  3. sbt clean assembly 명령어를 실행하여 JAR 파일을 생성합니다.

Gradle

  1. spark-bigtable 종속 항목을 build.gradle 파일에 추가합니다.

    dependencies {
    implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0'
    }
  2. build.gradle 파일에 Shadow 플러그인을 추가하여 Uber JAR 파일을 만듭니다.

    plugins {
    id 'com.github.johnrengelman.shadow' version '8.1.1'
    id 'java'
    }
  3. 구성 및 JAR 컴파일 정보에 관한 자세한 내용은 Shadow 플러그인 문서를 참고하세요.

작업 제출

Dataproc, Dataproc Serverless, 또는 로컬 Spark 인스턴스를 사용하여 Spark 작업을 제출하여 애플리케이션을 실행합니다.

런타임 환경 설정

다음 환경 변수를 설정합니다.

      #Google Cloud
      export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
      export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
      export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
      export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
      export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
      export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE

      #Dataproc Serverless
      export BIGTABLE_SPARK_SUBNET=SUBNET
      export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME

      #Scala/Java
      export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR

      #PySpark
      export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
      export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
      export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR

다음을 바꿉니다.

  • PROJECT_ID: Bigtable 프로젝트의 영구 식별자입니다.
  • INSTANCE_ID: Bigtable 인스턴스의 영구 식별자입니다.
  • TABLE_NAME: 테이블의 영구 식별자입니다.
  • DATAPROC_CLUSTER: Dataproc 클러스터의 영구 식별자입니다.
  • DATAPROC_REGION: Dataproc 인스턴스의 클러스터 중 하나가 포함된 Dataproc 리전입니다(예: northamerica-northeast2).
  • DATAPROC_ZONE: Dataproc 클러스터가 실행되는 영역입니다.
  • SUBNET: 서브넷의 전체 리소스 경로입니다.
  • GCS_BUCKET_NAME: Spark 워크로드 종속 항목을 업로드할 Cloud Storage 버킷입니다.
  • PATH_TO_COMPILED_JAR: 컴파일된 JAR의 전체 경로 또는 상대 경로입니다(예: Maven의 경우 /path/to/project/root/target/<compiled_JAR_name>).
  • GCS_PATH_TO_CONNECTOR_JAR: spark-bigtable_SCALA_VERSION_CONNECTOR_VERSION.jar 파일이 있는 gs://spark-lib/bigtable Cloud Storage 버킷입니다.
  • PATH_TO_PYTHON_FILE: PySpark 애플리케이션의 경우 Bigtable에 데이터를 쓰고 Bigtable에서 데이터를 읽는 데 사용되는 Python 파일의 경로입니다.
  • LOCAL_PATH_TO_CONNECTOR_JAR: PySpark 애플리케이션의 경우 다운로드한 Bigtable Spark 커넥터 JAR 파일의 경로입니다.

Spark 작업 제출

Dataproc 인스턴스 또는 로컬 Spark 설정의 경우 Spark 작업을 실행하여 Bigtable에 데이터를 업로드합니다.

Dataproc 클러스터

컴파일된 JAR 파일을 사용하여 Bigtable에서 데이터를 읽고 쓰는 Dataproc 클러스터 작업을 만듭니다.

  1. Dataproc 클러스터를 만듭니다. 다음 예는 Debian 10, 워커 노드 2개, 기본 구성으로 Dataproc v2.0 클러스터를 만드는 샘플 명령어를 보여줍니다.

    gcloud dataproc clusters create \
      $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \
      --zone $BIGTABLE_SPARK_DATAPROC_ZONE \
      --master-machine-type n2-standard-4 --master-boot-disk-size 500 \
      --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \
      --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
    
  2. 작업 제출

    Scala/Java

    다음 예는 DataFrame에서 테스트 테이블을 만들고, 테이블을 Bigtable에 작성한 다음, 테이블의 단어 수를 계산하는 로직이 포함된 spark.bigtable.example.WordCount 클래스를 보여줍니다.

        gcloud dataproc jobs submit spark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --class=spark.bigtable.example.WordCount \
        --jar=$PATH_TO_COMPILED_JAR \
        -- \
        $BIGTABLE_SPARK_PROJECT_ID \
        $BIGTABLE_SPARK_INSTANCE_ID \
        $BIGTABLE_SPARK_TABLE_NAME \
    

    PySpark

        gcloud dataproc jobs submit pyspark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --jars=$GCS_PATH_TO_CONNECTOR_JAR \
        --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
        $PATH_TO_PYTHON_FILE \
        -- \
        --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
        --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
        --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
    

Dataproc Serverless

컴파일된 JAR 파일을 사용하고 Dataproc Serverless 인스턴스로 Bigtable에서 데이터를 읽고 쓰는 Dataproc 작업을 만듭니다.

Scala/Java

  gcloud dataproc batches submit spark \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
  --  \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
  --jars=$GCS_PATH_TO_CONNECTOR_JAR \
  --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
  -- \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

로컬 Spark

다운로드한 JAR 파일을 사용하여 로컬 Spark 인스턴스로 Bigtable에서 데이터를 읽고 쓰는 Spark 작업을 만듭니다. Bigtable 에뮬레이터를 사용하여 Spark 작업을 제출할 수도 있습니다.

Bigtable 에뮬레이터 사용

Bigtable 에뮬레이터를 사용하기로 결정한 경우 다음 단계를 따르세요.

  1. 다음 명령어를 실행하여 에뮬레이터를 시작합니다.

    gcloud beta emulators bigtable start
    

    기본적으로 에뮬레이터는 localhost:8086을 선택합니다.

  2. BIGTABLE_EMULATOR_HOST 환경 변수를 설정합니다.

    export BIGTABLE_EMULATOR_HOST=localhost:8086
    
  3. Spark 작업을 제출합니다.

Bigtable 에뮬레이터 사용에 관한 자세한 내용은 에뮬레이터를 사용하여 테스트를 참고하세요.

Spark 작업 제출

로컬 Bigtable 에뮬레이터를 사용 여부왕 관계없이 spark-submit 명령어를 사용하여 Spark 작업을 제출합니다.

Scala/Java

  spark-submit $PATH_TO_COMPILED_JAR \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  spark-submit \
  --jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
  --packages=org.slf4j:slf4j-reload4j:1.7.36 \
  $PATH_TO_PYTHON_FILE \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

테이블 데이터 확인

다음 cbt CLI 명령어를 실행하여 데이터가 Bigtable에 쓰여졌는지 확인합니다. cbt CLI는 Google Cloud CLI의 구성요소입니다. 자세한 내용은 cbt CLI 개요를 참조하세요.

    cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
    read $BIGTABLE_SPARK_TABLE_NAME

추가 솔루션

복잡한 Spark SQL 유형 직렬화, 특정 행 읽기, 클라이언트 측 측정항목 생성 등 특정 솔루션에는 Bigtable Spark 커넥터를 사용하세요.

필터를 사용하여 특정 DataFrame 행 읽기

DataFrames를 사용하여 Bigtable에서 읽을 때 특정 행만 읽을 수 있도록 필터를 지정할 수 있습니다. 전체 테이블 스캔을 방지하기 위해 row key 열의 ==, <=, startsWith와 같은 간단한 필터가 서버 측에 적용됩니다. 복합 row key에 대한 필터 또는 row key 열의 LIKE 필터와 같은 복잡한 필터는 클라이언트 측에 적용됩니다.

큰 테이블을 읽는 경우 전체 테이블 검색을 실행하지 않도록 간단한 row key 필터를 사용하는 것이 좋습니다. 다음 샘플 문은 간단한 필터를 사용하여 읽는 방법을 보여줍니다. Spark 필터에서 row key로 변환된 DataFrame 열의 이름을 사용해야 합니다.

    dataframe.filter("id == 'some_id'").show()
  

필터를 적용할 때는 Bigtable 테이블 열 이름 대신 DataFrame 열 이름을 사용합니다.

Apache Avro를 사용하여 복잡한 데이터 유형 직렬화

Bigtable Spark 커넥터는 Apache Avro를 사용하여 ArrayType, MapType, StructType과 같은 복잡한 Spark SQL 유형을 직렬화하는 기능을 지원합니다. Apache Avro는 복잡한 데이터 구조를 처리하고 저장하는 데 일반적으로 사용되는 레코드 데이터에 대한 데이터 직렬화를 제공합니다.

"avro":"avroSchema"와 같은 구문을 사용하여 Bigtable의 열이 Avro로 인코딩되도록 지정합니다. 그런 다음 Bigtable에서 읽거나 쓸 때 .option("avroSchema", avroSchemaString)을 사용하여 해당 열에 해당하는 Avro 스키마를 문자열 형식으로 지정할 수 있습니다. 예를 들어 "anotherAvroSchema"와 같은 다양한 옵션 이름을 사용하고 여러 열에 대한 Avro 스키마를 전달할 수 있습니다.

def catalogWithAvroColumn = s"""{
                    |"table":{"name":"ExampleAvroTable"},
                    |"rowkey":"key",
                    |"columns":{
                    |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
                    |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
                    |}
                    |}""".stripMargin

클라이언트 측 측정항목 사용

Bigtable Spark 커넥터는 Java용 Bigtable 클라이언트를 기반으로 하므로 클라이언트 측 측정항목은 커넥터 내에서 기본적으로 사용 설정됩니다. 이러한 측정항목에 액세스하고 해석하는 방법에 관한 자세한 내용은 클라이언트 측 측정항목 문서를 참고하세요.

하위 수준 RDD 함수와 함께 Java용 Bigtable 클라이언트 사용

Bigtable Spark 커넥터는 Java용 Bigtable 클라이언트를 기반으로 하므로 에서 Spark 애플리케이션에서 직접 클라이언트를 사용하고 mapPartitionsforeachPartition과 같은 하위 수준 RDD 함수 내에서 분산 읽기 또는 쓰기 요청을 수행할 수 있습니다.

Java용 Bigtable 클라이언트의 클래스를 사용하려면 패키지 이름에 com.google.cloud.spark.bigtable.repackaged 접두사를 추가합니다. 예를 들어 클래스 이름을 com.google.cloud.bigtable.data.v2.BigtableDataClient 대신 com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient를 사용합니다.

Java용 Bigtable 클라이언트에 관한 자세한 내용은 Java용 Bigtable 클라이언트를 참고하세요.

다음 단계