Cloud Storage에서 Dataflow에 읽기

Cloud Storage에서 Dataflow로 데이터를 읽으려면 Apache Beam TextIO 또는 AvroIO I/O 커넥터를 사용합니다.

Google Cloud 라이브러리 종속 항목 포함

Cloud Storage에 TextIO 또는 AvroIO 커넥터를 사용하려면 다음 종속 항목을 포함합니다. 이 라이브러리는 "gs://" 파일 이름에 대한 스키마 핸들러를 제공합니다.

자바

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Go

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

자세한 내용은 Apache Beam SDK 설치를 참조하세요.

Dataflow의 Apache Beam I/O 커넥터에서 gRPC 사용 설정

Dataflow의 Apache Beam I/O 커넥터를 통해 gRPC를 사용하여 Cloud Storage에 연결할 수 있습니다. gRPC는 Google에서 개발한 고성능 오픈소스 원격 프로시저 호출 (RPC) 프레임워크로, Cloud Storage와 상호작용하는 데 사용할 수 있습니다.

Dataflow 작업의 Cloud Storage 읽기 요청 속도를 높이려면 Dataflow에서 Apache Beam I/O 커넥터를 사용 설정하여 gRPC를 사용하면 됩니다.

명령줄

  1. Apache Beam SDK 버전 2.55.0 이상을 사용해야 합니다.
  2. Dataflow 작업을 실행하려면 --additional-experiments=use_grpc_for_gcs 파이프라인 옵션을 사용하세요. 다양한 파이프라인 옵션에 대한 자세한 내용은 선택적 플래그를 참고하세요.

Apache Beam SDK

  1. Apache Beam SDK 버전 2.55.0 이상을 사용해야 합니다.
  2. Dataflow 작업을 실행하려면 --experiments=use_grpc_for_gcs 파이프라인 옵션을 사용하세요. 다양한 파이프라인 옵션에 대한 자세한 내용은 기본 옵션을 참고하세요.

Dataflow에서 Apache Beam I/O 커넥터를 구성하여 Cloud Monitoring에서 gRPC 관련 측정항목을 생성할 수 있습니다. gRPC 관련 측정항목을 사용하면 다음을 수행할 수 있습니다.

  • Cloud Storage에 대한 gRPC 요청의 성능을 모니터링하고 최적화합니다.
  • 문제 해결 및 디버깅
  • 애플리케이션의 사용 및 동작에 대한 유용한 정보를 파악합니다.

Dataflow에서 Apache Beam I/O 커넥터를 구성하여 gRPC 관련 측정항목을 생성하는 방법에 관한 자세한 내용은 클라이언트 측 측정항목 사용을 참고하세요. 사용 사례에 측정항목 수집이 필요하지 않은 경우 측정항목 수집을 선택 해제할 수 있습니다. 자세한 내용은 클라이언트 측 측정항목 선택 해제를 참고하세요.

동시 로드

TextIOAvroIO 커넥터는 두 가지 수준의 동시 로드를 지원합니다.

  • 여러 작업자가 읽을 수 있도록 개별 파일에 별도로 키가 지정됩니다.
  • 파일이 압축되지 않은 경우 커넥터가 각 파일의 하위 범위를 개별적으로 읽을 수 있으므로 매우 높은 수준의 동시 로드가 발생합니다. 이러한 분할은 파일의 각 줄이 의미 있는 레코드인 경우에만 가능합니다. 예를 들어 JSON 파일에는 기본적으로 사용할 수 없습니다.

성능

다음 표에는 Cloud Storage에서 읽기의 성능 측정항목이 나와 있습니다. 워크로드는 자바용 Apache Beam SDK 2.49.0을 사용해 하나의 e2-standard2 작업자에서 실행되었습니다. Runner v2를 사용하지 않았습니다.

레코드 1억 건 | 1KB | 열 1개 처리량(바이트) 처리량(요소)
Read 320MBps 초당 요소 320,000개

이러한 측정항목은 단순 배치 파이프라인을 기반으로 합니다. 이러한 측정항목은 I/O 커넥터 사이의 성능 비교를 위해 사용되며 반드시 실제 파이프라인을 나타내지는 않습니다. Dataflow 파이프라인 성능은 복잡하며 VM 유형, 처리 중인 데이터, 외부 소스 및 싱크의 성능, 사용자 코드와 상관관계가 있습니다. 측정항목은 Java SDK 실행을 기반으로 하며 다른 언어 SDK의 성능 특성을 나타내지 않습니다. 자세한 내용은 Beam IO 성능을 참조하세요.

권장사항

  • Cloud Storage에서 watchForNewFiles를 사용하지 마세요. 이 접근 방식은 커넥터가 본 파일 목록을 메모리에 유지해야 하므로 대규모 프로덕션 파이프라인에는 확장성이 떨어집니다. 목록을 메모리에서 플러시할 수 없으므로 시간이 지남에 따라 작업자의 작업 메모리가 줄어듭니다. 대신 Cloud Storage용 Pub/Sub 알림을 사용하는 것이 좋습니다. 자세한 내용은 파일 처리 패턴을 참고하세요.

  • 파일 이름과 파일 콘텐츠가 모두 유용한 데이터인 경우 FileIO 클래스를 사용하여 파일 이름을 읽습니다. 예를 들어 파일 이름에 파일의 데이터를 처리할 때 유용한 메타데이터가 포함되어 있을 수 있습니다. 자세한 내용은 파일 이름 액세스를 참고하세요. FileIO 문서에도 이 패턴의 예시가 나와 있습니다.

다음 예는 Cloud Storage에서 읽는 방법을 보여줍니다.

Java

Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class ReadFromStorage {
  public static Pipeline createPipeline(Options options) {
    var pipeline = Pipeline.create(options);
    pipeline
        // Read from a text file.
        .apply(TextIO.read().from(
            "gs://" + options.getBucket() + "/*.txt"))
        .apply(
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (x -> {
                      System.out.println(x);
                      return x;
                    })));
    return pipeline;
  }
}

다음 단계