자바용 Cloud Dataflow SDK 1.x에서 이전

이 문서에서는 자바용 Cloud Dataflow SDK 1.x 및 2.x 출시 버전의 주요 변경사항을 중점적으로 설명합니다.

Cloud Dataflow SDK 지원 중단 알림: Cloud Dataflow SDK 2.5.0은 Apache Beam SDK 출시 버전과 별개로 출시되는 마지막 Cloud Dataflow SDK 버전입니다. Cloud Dataflow 서비스는 Apache Beam SDK 공식 출시 버전을 완벽하게 지원합니다. 또한 Cloud Dataflow 서비스는 이전에 출시된 Apache Beam SDK 버전 2.0.0 이상을 지원합니다. 각 SDK에 대한 지원 여부에 대해서는 Cloud Dataflow 지원 페이지를 참조하세요. Apache Beam 다운로드 페이지에는 Apache Beam SDK 출시 버전별 출시 노트가 포함되어 있습니다.

1.x에서 2.x로 이전

자바용 Apache Beam SDK 2.x를 설치 및 사용하려면 Apache Beam SDK 설치 가이드를 참조하세요.

1.x에서 2.x로의 주요 변경사항

참고: 2.x 버전으로 업그레이드하려는 모든 사용자가 이러한 변경사항을 알고 있어야 합니다.

패키지 이름 변경 및 구조 조정

Google Cloud Platform 이외의 환경에서도 문제없이 작동하도록 Apache Beam을 범용화하려는 노력의 일환으로 SDK 코드 이름이 바뀌고 구조가 조정되었습니다.

com.google.cloud.dataflow 이름이 org.apache.beam으로 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-78

이제 SDK는 com.google.cloud.dataflow가 아니라 org.apache.beam 패키지에서 선언됩니다. 모든 가져오기 문을 이 변경에 맞게 업데이트해야 합니다.

새 하위 패키지: runners.dataflow, runners.direct, io.gcp

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-77

실행기가 고유 패키지로 재편성되면서 com.google.cloud.dataflow.sdk.runners의 많은 내용들이 org.apache.beam.runners.direct 또는 org.apache.beam.runners.dataflow로 이동했습니다.

Cloud Dataflow 서비스에서 실행에 관련된 파이프라인 옵션들이 com.google.cloud.dataflow.sdk.options에서 org.apache.beam.runners.dataflow.options로 이동했습니다.

Google Cloud Platform 서비스에 대한 대부분의 I/O 커넥터들이 하위 패키지로 이동했습니다. 예를 들어, BigQueryIO는 com.google.cloud.dataflow.sdk.io에서 org.apache.beam.sdk.io.gcp.bigquery로 이동했습니다.

대부분의 IDE가 새로운 위치를 찾는 데 유용할 수 있습니다. 특정 파일의 새 위치를 확인하려면 t를 사용하여 GitHub에서 코드를 검색할 수 있습니다. 자바용 Cloud Dataflow SDK 1.x 출시 버전은 GoogleCloudPlatform/DataflowJavaSDK 저장소(master-1.x 브랜치)에서 빌드됩니다. 자바용 Cloud Dataflow SDK 2.x 버전은 apache/beam 저장소의 코드에 대응합니다.

실행기

실행기 이름에서 Pipeline이 제거됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1185

이름에서 Pipeline이 삭제되어 모든 실행기 이름이 짧아졌습니다. 예를 들어, DirectPipelineRunner는 이제 DirectRunner이고 DataflowPipelineRunner는 이제 DataflowRunner입니다.

--tempLocation을 Google Cloud Storage 경로로 설정해야 함

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-430

--stagingLocation 또는 --tempLocation 중 하나만 지정하고 다른 하나는 Cloud Dataflow가 유추하도록 하는 대신 Cloud Dataflow Service에서 이제 --gcpTempLocation을 Google Cloud Storage 경로로 설정해야 합니다. 이 경우, 더 일반적인 --tempLocation에서 위치를 유추할 수 있습니다. 재정의하지 않으면 --stagingLocation에도 동일한 내용이 적용됩니다.

InProcessPipelineRunner가 삭제됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-243

DirectRunner는 계속 사용자의 로컬 머신에서 실행되지만 이제 다중 스레드 실행, 제한되지 않은 PCollections, 추측 및 지연 출력에 대한 트리거가 추가적으로 지원됩니다. 문서화된 Beam 모델과 더 긴밀하게 조율되었으며 추가 단위 테스트 실패가 (올바르게) 발생할 수 있습니다.

이제 이 기능은 DirectRunner에 포함되었으므로, InProcessPipelineRunner(자바용 Cloud Dataflow SDK 1.6+)는 삭제되었습니다.

BlockingDataflowPipelineRunnerPipelineResult.waitToFinish()로 교체됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류

이제 BlockingDataflowPipelineRunner가 제거되었습니다. 코드가 프로그래매틱 방식으로 파이프라인 실행을 예상하고 종료될 때까지 대기하는 경우, DataflowRunner를 사용하고 pipeline.run().waitToFinish()를 명시적으로 호출해야 합니다.

명령줄에서 --runner BlockingDataflowPipelineRunner를 사용하여 파이프라인이 종료될 때까지 대화식으로 기본 프로그램을 차단하도록 유도한 경우, 이는 기본 프로그램의 우려 사항입니다. waitToFinish()를 호출하도록 유도하는 --blockOnRun과 같은 옵션이 제공되어야 합니다.

TemplatingDataflowPipelineRunner--templateLocation으로 교체됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-551

TemplatingDataflowPipelineRunner의 기능(자바용 Cloud Dataflow SDK 1.9+)이 DataflowRunner를 포함한 --templateLocation을 사용하는 것으로 교체되었습니다.

ParDo 및 DoFn

DoFn은 메소드 재정의 대신 주석을 사용함

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-37

유연성과 맞춤설정을 높이기 위해 이제 DoFn은 사용자에게 특정 메소드 재정의를 요구하는 대신 메소드 주석을 사용하여 처리를 맞춤설정합니다.

DoFn과 이전 버전 간의 차이점을 다음 코드 샘플에 나타내었습니다. 이전에는(자바용 Cloud Dataflow SDK 1.x를 사용하는 경우) 코드가 다음과 같았습니다.

new DoFn<Foo, Baz>() {
  @Override
  public void processElement(ProcessContext c) { … }
}

이제(자바용 Apache Beam SDK 2.x를 사용하는 경우) 코드는 다음과 같습니다.

new DoFn<Foo, Baz>() {
  @ProcessElement   // <-- This is the only difference
  public void processElement(ProcessContext c) { … }
}

DoFnProcessContext#window()에 액세스한 경우, 추가 변경이 있습니다. 코드가 다음과 같이 바뀝니다.

public class MyDoFn extends DoFn<Foo, Baz> implements RequiresWindowAccess {
  @Override
  public void processElement(ProcessContext c) {
    … (MyWindowType) c.window() …
  }
}

사용자가 작성하는 코드는 다음과 같습니다.

public class MyDoFn extends DoFn<Foo, Baz> {
  @ProcessElement
  public void processElement(ProcessContext c, MyWindowType window) {
    … window …
  }
}

또는:

return new DoFn<Foo, Baz>() {
  @ProcessElement
  public void processElement(ProcessContext c, MyWindowType window) {
    … window …
  }
}

런타임이 DoFn에 자동으로 창을 제공합니다.

DoFn은 여러 번들에서 재사용됨

영향을 받는 사용자: 모두 | 영향: 예기치 않은 결과가 자동으로 발생 가능 | JIRA 문제: BEAM-38

성능 개선을 위해 번들당 새 인스턴스를 보장하는 대신 이제 여러 요소 번들을 처리하는 데 동일한 DoFn이 재사용될 수 있습니다. 번들의 끝을 넘어 로컬 상태(예: 인스턴스 변수)를 유지하는 DoFn 동작이 변경될 수 있습니다. 다음 번들이 새로운 복사본이 아니라 해당 상태로 시작하기 때문입니다.

수명주기를 관리하기 위해 새 @Setup@Teardown 메소드가 추가되었습니다. 전체 수명주기는 다음과 같습니다(시점에 관계없이 장애가 발생하면 수명주기가 끊어짐).

  • @Setup: 재사용 가능한 연결을 여는 등 DoFn을 인스턴스별로 초기화
  • 시퀀스 다수:
    • @StartBundle: DoFn의 상태를 재설정하는 등의 번들별 초기화
    • @ProcessElement: 일반적인 요소 처리
    • @FinishBundle: 부작용을 플러시하는 등의 번들별 종결 단계
  • @Teardown: 재사용 가능한 연결을 닫는 등 DoFn이 보유하고 있는 리소스를 인스턴스별로 해제

참고: 실제 사용 시 이 변경의 영향이 제한적일 것으로 예상됩니다. 그러나 컴파일 시간 오류가 생성되지 않으며 예상치 않은 결과가 자동으로 발생할 수 있습니다.

부가 입력 또는 출력 지정 시 매개변수 순서가 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1422

이제 `ParDo 적용 시 DoFn이 우선적으로 항상 지정되어야 합니다.

foos.apply(ParDo
    .withSideInputs(sideA, sideB)
    .withOutputTags(mainTag, sideTag)
    .of(new MyDoFn()))

사용자가 작성하는 코드는 다음과 같습니다.

foos.apply(ParDo
    .of(new MyDoFn())
    .withSideInputs(sideA, sideB)
    .withOutputTags(mainTag, sideTag))

PTransforms

.named()가 삭제됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-370

PTransforms 및 하위 클래스에서 .named() 메소드가 삭제되었습니다. 대신 PCollection.apply(“name”, PTransform)가 사용됩니다.

PTransform.apply() 이름이 PTransform.expand()로 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-438

PCollection.apply()와의 혼동을 방지하기 위해 PTransform.apply() 이름이 PTransform.expand()로 변경되었습니다. 모든 사용자 작성 복합 변환은 재정의된 apply() 메소드 이름을 expand()로 바꿔야 합니다. 파이프라인이 구성되는 방식에는 변화가 없습니다.

다른 주요 변경사항

다음은 기타 주요 변경 사항과 예정된 변경 사항 목록입니다.

개별 API 변경

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-725

다음 GcpOptions: TokenServerUrl, CredentialDir, CredentialId, SecretsFile, ServiceAccountName, ServiceAccountKeyFile이 삭제되었습니다.

GoogleCredentials.fromStream(InputStream for credential)을 사용하세요. 스트림에 Google Developers Console에서 가져온 JSON 형식의 서비스 계정 키 파일 또는 Cloud SDK에서 지원되는 형식을 사용하여 저장된 사용자 인증 정보가 포함될 수 있습니다.

--enableProfilingAgent--saveProfilesToGcs로 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1122

--updateDataflowPipelineOptions로 이동함

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-81

--update PipelineOptionDataflowPipelineOptions에서 DataflowPipelineDebugOptions로 이동합니다.

BoundedSource.producesSortedKeys()가 삭제됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1201

BoundedSource에서 producesSortedKeys()가 삭제되었습니다.

PubsubIO API가 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-974, BEAM-1415

2.0.0-beta2부터 PubsubIO.Read.topic(String)과 같은 정적 팩토리 메소드 대신 PubsubIO.<T>read()PubsubIO.<T>write()를 사용하여 PubsubIO.ReadPubsubIO.Write를 인스턴스화해야 합니다.

PubsubIO을 구성하기 위한 메소드 이름이 변경되었습니다. 예를 들어, PubsubIO.read().topic(String) 이름은 PubsubIO.read().fromTopic()으로 변경되었습니다. 마찬가지로 subscription()fromSubscription()으로, timestampLabelidLabel은 각각 withTimestampAttributewithIdAttribute로, PubsubIO.write().topic()PubsubIO.write().to()로 변경되었습니다.

메시지 페이로드를 파싱하기 위한 Coder를 지정하는 대신 PubsubIO은 문자열, Avro 메시지, Protobuf 메시지(예: PubsubIO.readStrings(), PubsubIO.writeAvros())를 읽고 쓰기 위한 함수를 노출시킵니다. 커스텀 형식을 읽고 쓰기 위해서는 PubsubIO.read/writeMessages()(및 메시지 속성을 포함시켜야 하는 경우 PubsubIO.readMessagesWithAttributes)를 사용하고 ParDo 또는 MapElements를 사용하여 커스텀 형식 또는 PubsubMessage로 변환합니다.

지원되지 않는 v1beta2 API에 대한 DatastoreIO 지원이 삭제됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-354

DatastoreIO는 이제 Cloud Datastore API v1을 기반으로 합니다.

DisplayData.가 변경됨Builder

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-745

DisplayData.Builder.include(..)에 하위 구성요소 표시 데이터를 등록하기 위한 새로운 필수 경로 매개변수가 있습니다. Builder API가 이제 DisplayData.ItemSpec<>을 반환합니다(DisplayData.Item 아님).

FileBasedSink.getWriterResultCoder()가 필요함

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-887

FileBasedSink.getWriterResultCoder가 필수적으로 제공되어야 하는 추상 메소드로 변환되었습니다.

Filter.byPredicate() 이름이 Filter.by()로 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-342

IntraBundleParallelization이 삭제됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-414

RemoveDuplicates 이름이 Distinct로 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-239

TextIO가 다른 구문을 사용하고 문자열에 대해서만 작동하도록 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1354

TextIO.Read.from()TextIO.read().from()으로 변경되고, 마찬가지로 TextIO.Write.to()TextIO.write().to()로 변경되었습니다.

이제 TextIO.Read가 항상 PCollection<String>을 반환하고 문자열 파싱을 위해 .withCoder()를 취하지 않습니다. 대신 컬렉션에 ParDo 또는 MapElements를 적용하여 문자열을 파싱합니다. 마찬가지로, 이제 TextIO.Write는 항상 PCollection<String>을 취하고 TextIO에 다른 내용을 쓰기 위해 ParDo 또는 MapElements를 사용하여 이를 String으로 변환합니다.

AvroIO가 다른 구문을 사용하도록 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1402

Avro 생성 유형을 읽고 쓰기 위해 AvroIO.Read.from().withSchema(Foo.class)AvroIO.read(Foo.class).from()으로 변경되고, 마찬가지로 AvroIO.Write도 변경되었습니다.

지정된 스키마를 사용하여 Avro 일반 레코드를 읽고 쓰기 위해 AvroIO.Read.from().withSchema(Schema or String)AvroIO.readGenericRecords().from()으로 변경되고, 마찬가지로 AvroIO.Write도 변경되었습니다.

KafkaIO가 유형 매개변수를 명시적으로 지정하고 Kafka 직렬 변환기/역직렬 변환기를 사용하도록 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1573, BEAM-2221

KafkaIO에서 이제 키와 값 유형 매개변수를 명시적으로 지정해야 합니다(예: KafkaIO.<Foo, Bar>read(), KafkaIO.<Foo, Bar>write()).

키와 값 바이트를 해석하는 데 Coder를 사용하는 대신 표준 Kafka SerializerDeserializer 클래스를 사용합니다. 예를 들어, KafkaIO.read().withKeyCoder(StringUtf8Coder.of()) 대신 KafkaIO.read().withKeyDeserializer(StringDeserializer.class)를 사용하고 KafkaIO.write()에 대해서도 마찬가지입니다.

BigQueryIO에 대한 구문이 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1427

BigQueryIO.Read.from()BigQueryIO.Write.to() 대신 BigQueryIO.read().from()BigQueryIO.write().to()가 사용됩니다.

KinesisIO.Read에 대한 구문이 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1428

KinesisIO.Read.from().using() 대신 KinesisIO.read().from().withClientProvider()가 사용됩니다.

TFRecordIO에 대한 구문이 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1913

TFRecordIO.Read.from()TFRecordIO.Write.to() 대신 TFRecordIO.read().from()TFRecordIO.write().to()가 사용됩니다.

XmlSourceXmlSinkXmlIO에 통합됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1914

XmlSourceXmlSink를 직접 사용하는 대신 XmlIO가 사용됩니다.

예를 들어, Read.from(XmlSource.from()) 대신 XmlIO.read().from()을 사용하고 Write.to(XmlSink.writeOf()) 대신 XmlIO.write().to()를 사용합니다.

CountingInput 이름이 GenerateSequence로 변경되고 일반화됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1414

CountingInput.unbounded() 대신 GenerateSequence.from(0)이 사용됩니다. CountingInput.upTo(n) 대신 GenerateSequence.from(0).to(n)가 사용됩니다.

Count, Latest, Sample이 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1417, BEAM-1421, BEAM-1423

클래스 Count.PerElement, Count.PerKey, Count.Globally가 이제 비공개이므로, Count.perElement()와 같은 팩토리 함수를 사용해야 합니다(하지만 이전에는 new Count.PerElement()를 사용할 수 있었음). 또한 예를 들어 변환 결과에 대해 .withHotKeyFanout()을 사용하려는 경우, .apply(Count.perElement()) 등의 결과에 대해 더 이상 직접적으로 이를 수행할 수 없습니다. 대신 Count가 해당 결합 함수를 Count.combineFn()으로 노출시키고 사용자가 직접 Combine.globally(Count.combineFn())을 적용해야 합니다.

LatestSample 변환에도 이와 비슷한 변경이 적용됩니다.

MapElementsFlatMapElements에 대한 매개변수 순서가 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1418

MapElementsFlatMapElements.via(SerializableFunction).withOutputType(TypeDescriptor)를 사용하는 경우, 이제 우선 설명어를 지정해야 합니다(예: FlatMapElements.into(descriptor).via(fn)).

추가 매개변수 구성 시 Window가 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1425

Window를 사용하여 WindowFn 자체 이외의 내용을 구성하는 경우(Window.into()), Window.configure()가 사용됩니다. 예를 들어, Window.triggering(...) 대신 Window.configure().triggering(...)이 사용됩니다.

Write.Bound 이름이 Write로 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1416

클래스 Write.Bound가 이제 Write입니다. 이는 Write.to(Sink)의 적용을 변수로 추출하는 경우에만 관련이 있습니다. 해당 유형이 Write.Bound<...>였지만 이제 이제는 Write<...>입니다.

Flatten 변환 클래스 이름이 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1419

클래스 Flatten.FlattenIterablesFlatten.FlattenPCollectionList 이름이 각각 Flatten.IterablesFlatten.PCollections로 변경되었습니다.

GroupByKey.create(boolean)가 메소드 두 개로 분할됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1420

이제 GroupByKey.create(boolean fewKeys)가 단순히 GroupByKey.create()GroupByKey.createWithFewKeys()입니다.

SortValues가 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1426

BufferedExternalSorter.Options setter 메소드 이름이 setSomeProperty에서 withSomeProperty로 변경되었습니다.

추가 Google API 종속성

SDK 버전 2.0.0부터 Cloud Resource Manager API도 사용해야 합니다.

종속 항목 업그레이드

2.x 출시 버전은 Avro, protobuf, gRPC를 포함하여 대부분의 고정된 종속 항목 버전을 업그레이드합니다. 이러한 종속 항목 중 일부는 자체적으로 크게 변경되었을 수 있습니다. 이로 인해 코드가 종속 항목에 직접적으로 의존하면 문제가 발생할 수 있습니다. 2.0.0에서 사용된 버전을 pom.xml에서 확인하거나 mvn dependency:tree를 사용하여 확인할 수 있습니다.

내부 리팩터링

SDK의 내부 구조가 크게 변경되었습니다. 공개 API(Internal로 끝나거나 util 패키지에 있는 클래스 또는 메소드 등) 이상의 요소를 사용하고 있던 사용자는 크게 변경된 부분을 확인할 수 있습니다.

StateInternals 또는 TimerInternals를 사용했던 경우: 이들 내부 API가 삭제되었습니다. 이제 DoFn에 대해 실험적 StateTimer API를 사용할 수 있습니다.

이 페이지가 도움이 되었나요? 평가를 부탁드립니다.

다음에 대한 의견 보내기...

도움이 필요하시나요? 지원 페이지를 방문하세요.