Cloud Storage에서 Dataflow에 읽기

Cloud Storage에서 Dataflow로 데이터를 읽으려면 Apache Beam TextIO 또는 AvroIO I/O 커넥터를 사용합니다.

Google Cloud 라이브러리 종속 항목 포함

Cloud Storage에 TextIO 또는 AvroIO 커넥터를 사용하려면 다음 종속 항목을 포함합니다. 이 라이브러리는 "gs://" 파일 이름에 대한 스키마 핸들러를 제공합니다.

자바

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Go

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

자세한 내용은 Apache Beam SDK 설치를 참조하세요.

Dataflow의 Apache Beam I/O 커넥터에서 gRPC 사용 설정

Dataflow의 Apache Beam I/O 커넥터를 통해 gRPC를 사용하여 Cloud Storage에 연결할 수 있습니다. gRPC는 Google에서 개발한 고성능 오픈소스 원격 프로시저 호출 (RPC) 프레임워크로, Cloud Storage와 상호작용하는 데 사용할 수 있습니다.

Dataflow 작업의 Cloud Storage 읽기 요청 속도를 높이려면 Dataflow에서 Apache Beam I/O 커넥터를 사용 설정하여 gRPC를 사용하면 됩니다.

명령줄

  1. Apache Beam SDK 버전 2.55.0 이상을 사용해야 합니다.
  2. Dataflow 작업을 실행하려면 --additional-experiments=use_grpc_for_gcs 파이프라인 옵션을 사용하세요. 다양한 파이프라인 옵션에 대한 자세한 내용은 선택적 플래그를 참고하세요.

Apache Beam SDK

  1. Apache Beam SDK 버전 2.55.0 이상을 사용해야 합니다.
  2. Dataflow 작업을 실행하려면 --experiments=use_grpc_for_gcs 파이프라인 옵션을 사용하세요. 다양한 파이프라인 옵션에 대한 자세한 내용은 기본 옵션을 참고하세요.

Dataflow에서 Apache Beam I/O 커넥터를 구성하여 Cloud Monitoring에서 gRPC 관련 측정항목을 생성할 수 있습니다. gRPC 관련 측정항목을 사용하면 다음을 수행할 수 있습니다.

  • Cloud Storage에 대한 gRPC 요청의 성능을 모니터링하고 최적화합니다.
  • 문제 해결 및 디버깅
  • 애플리케이션의 사용 및 동작에 대한 유용한 정보를 파악합니다.

Dataflow에서 Apache Beam I/O 커넥터를 구성하여 gRPC 관련 측정항목을 생성하는 방법에 관한 자세한 내용은 클라이언트 측 측정항목 사용을 참고하세요. 사용 사례에 측정항목 수집이 필요하지 않은 경우 측정항목 수집을 선택 해제할 수 있습니다. 자세한 내용은 클라이언트 측 측정항목 선택 해제를 참고하세요.

동시 로드

TextIOAvroIO 커넥터는 두 가지 수준의 동시 로드를 지원합니다.

  • 여러 작업자가 읽을 수 있도록 개별 파일에 별도로 키가 지정됩니다.
  • 파일이 압축되지 않은 경우 커넥터가 각 파일의 하위 범위를 개별적으로 읽을 수 있으므로 매우 높은 수준의 동시 로드가 발생합니다. 이러한 분할은 파일의 각 줄이 의미 있는 레코드인 경우에만 가능합니다. 예를 들어 JSON 파일에는 기본적으로 사용할 수 없습니다.

성능

다음 표에는 Cloud Storage에서 읽기의 성능 측정항목이 나와 있습니다. 워크로드는 자바용 Apache Beam SDK 2.49.0을 사용해 하나의 e2-standard2 작업자에서 실행되었습니다. Runner v2를 사용하지 않았습니다.

레코드 1억 건 | 1KB | 열 1개 처리량(바이트) 처리량(요소)
Read 320MBps 초당 요소 320,000개

이러한 측정항목은 단순 배치 파이프라인을 기반으로 합니다. 이러한 측정항목은 I/O 커넥터 사이의 성능 비교를 위해 사용되며 반드시 실제 파이프라인을 나타내지는 않습니다. Dataflow 파이프라인 성능은 복잡하며 VM 유형, 처리 중인 데이터, 외부 소스 및 싱크의 성능, 사용자 코드와 상관관계가 있습니다. 측정항목은 Java SDK 실행을 기반으로 하며 다른 언어 SDK의 성능 특성을 나타내지 않습니다. 자세한 내용은 Beam IO 성능을 참조하세요.

권장사항

  • Cloud Storage에서 watchForNewFiles를 사용하지 마세요. 이 접근 방식은 커넥터가 본 파일 목록을 메모리에 유지해야 하므로 대규모 프로덕션 파이프라인에는 확장성이 떨어집니다. 목록을 메모리에서 플러시할 수 없으므로 시간이 지남에 따라 작업자의 작업 메모리가 줄어듭니다. 대신 Cloud Storage용 Pub/Sub 알림을 사용하는 것이 좋습니다. 자세한 내용은 파일 처리 패턴을 참고하세요.

  • 파일 이름과 파일 콘텐츠가 모두 유용한 데이터인 경우 FileIO 클래스를 사용하여 파일 이름을 읽습니다. 예를 들어 파일 이름에 파일의 데이터를 처리할 때 유용한 메타데이터가 포함되어 있을 수 있습니다. 자세한 내용은 파일 이름 액세스를 참고하세요. FileIO 문서에도 이 패턴의 예시가 나와 있습니다.

예시

다음 예는 Cloud Storage에서 읽는 방법을 보여줍니다.

Java

Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class ReadFromStorage {
  public static Pipeline createPipeline(Options options) {
    var pipeline = Pipeline.create(options);
    pipeline
        // Read from a text file.
        .apply(TextIO.read().from(
            "gs://" + options.getBucket() + "/*.txt"))
        .apply(
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (x -> {
                      System.out.println(x);
                      return x;
                    })));
    return pipeline;
  }
}

다음 단계