Bigtable을 사용한 Hadoop 맵리듀스 작업

이 예시에서는 Hadoop을 사용하여 단어가 텍스트 파일에 표시되는 횟수를 계산하는 간단한 맵리듀스 작업을 수행합니다. 맵리듀스 작업은 Bigtable을 사용하여 매핑 작업 결과를 저장합니다. 이 예의 코드는 GitHub 저장소 GoogleCloudPlatform/cloud-bigtable-examplesjava/dataproc-wordcount 디렉터리에 있습니다.

인증 설정

로컬 개발 환경에서 이 페이지의 Java 샘플을 사용하려면 gcloud CLI를 설치하고 초기화한 후 사용자 인증 정보로 애플리케이션 기본 사용자 인증 정보를 설정합니다.

  1. Install the Google Cloud CLI.

  2. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

자세한 내용은 다음을 참조하세요: Set up authentication for a local development environment.

코드 샘플 개요

이 코드 샘플은 텍스트 파일 하나 이상과 테이블 이름을 입력으로 사용하는 간단한 명령줄 인터페이스를 제공하고, 파일에 표시되는 모든 단어를 찾고, 각 단어가 표시된 횟수를 계산합니다. 맵리듀스 논리는 WordCountHBase 클래스에 표시됩니다.

먼저 매퍼가 텍스트 파일의 콘텐츠를 토큰화하고 키-값 쌍을 생성합니다. 여기서 키는 텍스트 파일의 단어이고 값은 1입니다.

public static class TokenizerMapper extends
    Mapper<Object, Text, ImmutableBytesWritable, IntWritable> {

  private final static IntWritable one = new IntWritable(1);

  @Override
  public void map(Object key, Text value, Context context) throws IOException,
      InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    ImmutableBytesWritable word = new ImmutableBytesWritable();
    while (itr.hasMoreTokens()) {
      word.set(Bytes.toBytes(itr.nextToken()));
      context.write(word, one);
    }
  }
}

그런 다음 감소기가 각 키의 값을 합산하고 사용자가 지정한 Bigtable 테이블에 결과를 기록합니다. 각 row key는 텍스트 파일에서 가져온 단어입니다. 각 행에는 텍스트 파일에 row key가 표시되는 횟수를 포함하는 cf:count 열이 포함됩니다.

public static class MyTableReducer extends
    TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable> {

  @Override
  public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    int sum = sum(values);
    Put put = new Put(key.get());
    put.addColumn(COLUMN_FAMILY, COUNT_COLUMN_NAME, Bytes.toBytes(sum));
    context.write(null, put);
  }

  public int sum(Iterable<IntWritable> values) {
    int i = 0;
    for (IntWritable val : values) {
      i += val.get();
    }
    return i;
  }
}