Dataflow에서 Cloud Storage로 쓰기(자바)

이 문서에서는 자바에서 TextIO I/O 커넥터를 사용하여 Dataflow에서 Cloud Storage로 텍스트 데이터를 쓰는 방법을 설명합니다.

Google Cloud 라이브러리 종속 항목 포함

Cloud Storage에 TextIO 커넥터를 사용하려면 다음 종속 항목을 포함합니다. 이 라이브러리는 "gs://" 파일 이름에 대한 스키마 핸들러를 제공합니다.

<dependency>
   <groupId>org.apache.beam</groupId>
   <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
   <version>${beam.version}</version>
</dependency>

자세한 내용은 Apache Beam SDK 설치를 참조하세요.

Cloud Storage에 텍스트 파일 쓰기

다음 예시에서는 GZIP 압축을 사용하여 텍스트 파일을 쓰는 일괄 파이프라인을 만듭니다.

Java

import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BatchWriteStorage {
  public interface Options extends PipelineOptions {
    @Description("The Cloud Storage bucket to write to")
    String getBucketName();

    void setBucketName(String value);
  }

  // Write text data to Cloud Storage
  public static void main(String[] args) {
    final List<String> wordsList = Arrays.asList("1", "2", "3", "4");

    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        .apply(Create
            .of(wordsList))
        .apply(TextIO
            .write()
            .to(options.getBucketName())
            .withSuffix(".txt")
            .withCompression(Compression.GZIP)
        );
    pipeline.run().waitUntilFinish();
  }
}

출력 파일 경로의 경우 버킷 이름과 파일 이름 프리픽스가 포함된 Cloud Storage 경로를 지정합니다. 예를 들어 gs://my_bucket/output/file을 지정하면 TextIO 커넥터는 my_bucket이라는 Cloud Storage 버킷을 대상으로 쓰고 출력 파일에는 output/file* 프리픽스가 포함됩니다.

기본적으로 TextIO 커넥터는 <file-prefix>-00000-of-00001과 같은 이름 지정 규칙을 사용하여 출력 파일을 샤딩합니다. 원하는 경우 예시와 같이 파일 이름 서픽스와 압축 스키마를 지정할 수 있습니다.

PCollection 입력이 제한되지 않은 경우 컬렉션에서 윈도우 또는 트리거를 정의한 후 TextIO.Write.withWindowedWrites를 호출하여 기간이 설정된 쓰기를 지정해야 합니다.

다음 단계