이 문서에서는 자바용 Dataflow SDK 1.x 및 2.x 출시 버전의 주요 변경사항을 중점적으로 설명합니다.
Dataflow SDK 지원 중단 알림: Dataflow SDK 2.5.0은 Apache Beam SDK 출시 버전과 별도로 제공되는 마지막 Dataflow SDK 출시 버전입니다. Dataflow 서비스는 공식 Apache Beam SDK 출시 버전을 완벽하게 지원합니다. 또한 Dataflow 서비스는 2.0.0 이상 버전에서 이전에 출시된 Apache Beam SDK를 지원합니다. 다양한 SDK의 지원 여부는 Dataflow 지원 페이지를 참조하세요. Apache Beam 다운로드 페이지에는 Apache Beam SDK 출시 버전별 출시 노트가 포함되어 있습니다.
1.x에서 2.x로 이전
자바용 Apache Beam SDK 2.x를 설치 및 사용하려면 Apache Beam SDK 설치 가이드를 참조하세요.
1.x에서 2.x로의 주요 변경사항
참고: 2.x 버전으로 업그레이드하려는 모든 사용자가 이러한 변경사항을 알고 있어야 합니다.
패키지 이름 변경 및 구조 조정
Google Cloud Platform 이외의 환경에서도 문제없이 작동하도록 Apache Beam을 범용화하려는 노력의 일환으로 SDK 코드 이름이 바뀌고 구조가 조정되었습니다.
이름이 com.google.cloud.dataflow
에서 org.apache.beam
로 변경됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-78
이제 SDK는 com.google.cloud.dataflow
가 아닌 org.apache.beam
패키지에 선언됩니다. 모든 가져오기 문을 이 변경에 맞게 업데이트해야 합니다.
새 하위 패키지: runners.dataflow
, runners.direct
, io.gcp
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-77
실행기가 고유 패키지로 재구성되었으므로 com.google.cloud.dataflow.sdk.runners
가 org.apache.beam.runners.direct
또는 org.apache.beam.runners.dataflow
중 하나로 이동했습니다.
Dataflow 서비스를 실행하는 데 사용되는 파이프라인 옵션이 com.google.cloud.dataflow.sdk.options
에서 org.apache.beam.runners.dataflow.options
로 이동했습니다.
Google Cloud Platform 서비스에 대한 대부분의 I/O 커넥터들이 하위 패키지로 이동했습니다. 예를 들어 BigQueryIO는 com.google.cloud.dataflow.sdk.io
에서 org.apache.beam.sdk.io.gcp.bigquery
로 이동했습니다.
대부분의 IDE가 새로운 위치를 찾는 데 유용할 수 있습니다. 특정 파일의 새 위치를 확인하려면 t
를 사용하여 GitHub에서 코드를 검색할 수 있습니 자바용 Cloud Dataflow SDK 1.x 출시는 GoogleCloudPlatform/DataflowJavaSDK 저장소(master-1.x
분기)에서 빌드됩니다. 자바용 Dataflow SDK 2.x 출시는 apache/beam 저장소의 코드에 대응합니다.
실행기
실행기 이름에서 Pipeline
삭제됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1185
이름에서 Pipeline
이 삭제되어 모든 실행기 이름이 짧아졌습니다. 예를 들어 DirectPipelineRunner
는 현재 DirectRunner
이고, DataflowPipelineRunner
는 현재 DataflowRunner
입니다.
--tempLocation
을 Google Cloud Storage 경로로 설정해야 함
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-430
--stagingLocation
또는 --tempLocation
중 하나만 지정하고 다른 하나는 Dataflow가 유추하도록 하는 대신, Dataflow 서비스에서 --gcpTempLocation
을 Google Cloud Storage 경로로 설정해야 합니다. 이 경우 더 일반적인 --tempLocation
에서 위치를 유추할 수 있습니다.
재정의되지 않으면 --stagingLocation
에서도 사용됩니다.
InProcessPipelineRunner
삭제됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-243
DirectRunner
는 사용자의 로컬 머신에서 계속 실행되지만 이제 다중 스레드 실행, unbounded PCollections를 비롯하여 추측 및 지연 출력에 대한 트리거가 추가적으로 지원됩니다. 문서화된 Beam 모델과 더 긴밀하게 조율되었으며 추가 단위 테스트 실패가 (올바르게) 발생할 수 있습니다.
이제 이 기능은 DirectRunner
에 포함되었으므로, InProcessPipelineRunner
(자바용 Dataflow SDK 1.6+)는 삭제되었습니다.
BlockingDataflowPipelineRunner
가 PipelineResult.waitToFinish()
로 바뀜
영향을 받는 사용자: 모두 | 영향: 컴파일 오류
이제 BlockingDataflowPipelineRunner
가 삭제되었습니다. 코드가 프로그래매틱 방식으로 파이프라인 실행을 예상하고 종료될 때까지 대기하는 경우, DataflowRunner
를 사용하고 pipeline.run().waitToFinish()
를 명시적으로 호출해야 합니다.
명령줄에서 --runner BlockingDataflowPipelineRunner
를 사용하여 파이프라인이 종료될 때까지 대화식으로 기본 프로그램을 차단하도록 유도한 경우, 이는 기본 프로그램의 우려사항입니다. waitToFinish()
를 호출하도록 유도하는 --blockOnRun
과 같은 옵션이 제공되어야 합니다.
TemplatingDataflowPipelineRunner
가 --templateLocation
로 바뀜
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-551
TemplatingDataflowPipelineRunner
의 기능이 (자바용 Dataflow SDK 1.9 이상) --templateLocation
을 사용하여 DataflowRunner
으로 대체되었습니다.
ParDo 및 DoFn
DoFn
는 메서드 재정의 대신 주석을 사용함
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-37
유연성과 맞춤설정을 높이기 위해 이제 DoFn
은 사용자에게 특정 메서드 재정의를 요구하는 대신 메서드 주석을 사용하여 처리를 맞춤설정합니다.
새 DoFn
과 이전 버전 간의 차이점은 다음 코드 샘플에 나와 있습니다. 이전에는(자바용 Dataflow SDK 1.x를 사용하는 경우) 코드가 다음과 같았습니다.
new DoFn<Foo, Baz>() {
@Override
public void processElement(ProcessContext c) { … }
}
이제(자바용 Apache Beam SDK 2.x를 사용하는 경우) 코드는 다음과 같습니다.
new DoFn<Foo, Baz>() {
@ProcessElement // <-- This is the only difference
public void processElement(ProcessContext c) { … }
}
DoFn
이 ProcessContext#window()
에 액세스한 경우, 추가 변경이 있습니다. 코드가 다음과 같이 바뀝니다.
public class MyDoFn extends DoFn<Foo, Baz> implements RequiresWindowAccess {
@Override
public void processElement(ProcessContext c) {
… (MyWindowType) c.window() …
}
}
사용자가 작성하는 코드는 다음과 같습니다.
public class MyDoFn extends DoFn<Foo, Baz> {
@ProcessElement
public void processElement(ProcessContext c, MyWindowType window) {
… window …
}
}
또는
return new DoFn<Foo, Baz>() {
@ProcessElement
public void processElement(ProcessContext c, MyWindowType window) {
… window …
}
}
런타임이 DoFn
에 자동으로 창을 제공합니다.
DoFn
은 여러 번들에서 재사용됨
영향을 받는 사용자: 모두 | 영향: 예기치 않은 결과가 자동으로 발생 가능 | JIRA 문제: BEAM-38
성능 개선을 위해 번들당 새 인스턴스를 보장하는 대신 이제 여러 요소 번들을 처리하는 데 동일한 DoFn
이 재사용될 수 있습니다. 다음 번들이 새 복사본 대신 상태로 시작되므로 번들 끝 부분에서 로컬 상태(예: 인스턴스 변수)를 유지하는 모든 DoFn
의 동작이 변경될 수 있습니다.
수명주기를 관리하기 위해 새 @Setup
및 @Teardown
메서드가 추가되었습니다.
전체 수명주기는 다음과 같습니다(시점에 관계없이 장애가 발생하면 수명주기가 끊어짐).
@Setup
: 재사용 가능한 연결을 여는 등DoFn
을 인스턴스별로 초기화- 시퀀스 다수:
@StartBundle
:DoFn
의 상태를 재설정하는 등의 번들별 초기화@ProcessElement
: 일반적인 요소 처리@FinishBundle
: 부작용을 삭제하는 등의 번들별 종결 단계
@Teardown
: 재사용 가능한 연결을 닫는 등DoFn
이 보유하고 있는 리소스를 인스턴스별로 해제
참고: 실제 사용 시 이 변경의 영향이 제한적일 것으로 예상됩니다. 그러나 컴파일 시간 오류가 생성되지 않으며 예상치 않은 결과가 자동으로 발생할 수 있습니다.
부가 입력 또는 출력 지정 시 매개변수 순서가 변경됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1422
이제 `ParDo 적용 시 DoFn
이 우선적으로 항상 지정되어야 합니다.
foos.apply(ParDo
.withSideInputs(sideA, sideB)
.withOutputTags(mainTag, sideTag)
.of(new MyDoFn()))
사용자가 작성하는 코드는 다음과 같습니다.
foos.apply(ParDo
.of(new MyDoFn())
.withSideInputs(sideA, sideB)
.withOutputTags(mainTag, sideTag))
PTransforms
.named()
삭제됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-370
PTransforms 및 하위 클래스에서 .named()
메서드가 삭제되었습니다. 대신 PCollection.apply(“name”, PTransform)
를 사용합니다.
이름이 PTransform.apply()
에서 PTransform.expand()
로 변경됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-438
PCollection.apply()
와의 혼동을 방지하기 위해 PTransform.apply()
이름이 PTransform.expand()
로 변경되었습니다. 모든 사용자 작성 복합 변환은 재정의된 apply()
메서드 이름을 expand()
로 변경해야 합니다. 파이프라인이 구성되는 방식에는 변화가 없습니다.
다른 주요 변경사항
다음은 기타 주요 변경 사항과 예정된 변경 사항 목록입니다.
개별 API 변경
--credentialDir
, --tokenServerUrl
, 관련 옵션이 삭제됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-725
GcpOptions
(TokenServerUrl
, CredentialDir
, CredentialId
, SecretsFile
, ServiceAccountName
, ServiceAccountKeyFile
)가 삭제되었습니다.
GoogleCredentials.fromStream(InputStream for credential)
을 사용합니다. 스트림에 Google Developers Console에서 가져온 JSON 형식의 서비스 계정 키 파일 또는 Cloud SDK에서 지원되는 형식을 사용하여 저장된 사용자 인증 정보가 포함될 수 있습니다.
--enableProfilingAgent
가 --saveProfilesToGcs
로 변경됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1122
--update
가 DataflowPipelineOptions
로 이동됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-81
--update PipelineOption
가 DataflowPipelineDebugOptions
에서 DataflowPipelineOptions
로 이동됨
BoundedSource.producesSortedKeys()
삭제됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1201
BoundedSource
에서 producesSortedKeys()
가 삭제됨
PubsubIO
API가 변경됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-974, BEAM-1415
2.0.0-beta2부터 PubsubIO.Read.topic(String)
과 같은 정적 팩토리 메서드 대신 PubsubIO.<T>read()
및 PubsubIO.<T>write()
를 사용하여 PubsubIO.Read
및 PubsubIO.Write
를 인스턴스화해야 합니다.
PubsubIO
을 구성하기 위한 메서드 이름이 변경되었습니다. 예를 들어 PubsubIO.read().topic(String)
이름이 PubsubIO.read().fromTopic()
으로 변경되었습니다. 마찬가지로 subscription()
이 fromSubscription()
으로, timestampLabel
및 idLabel
이 각각 withTimestampAttribute
및 withIdAttribute
로, PubsubIO.write().topic()
이 PubsubIO.write().to()
로 변경되었습니다.
메시지 페이로드를 파싱하기 위한 Coder
를 지정하는 대신 PubsubIO
은 문자열, Avro 메시지, Protobuf 메시지(예: PubsubIO.readStrings()
, PubsubIO.writeAvros()
)를 읽고 쓰기 위한 함수를 노출시킵니다. 커스텀 유형을 읽고 쓰기 위해서는 PubsubIO.read/writeMessages()
(및 메시지 속성을 포함시켜야 하는 경우, PubsubIO.readMessagesWithAttributes
)를 사용하고 ParDo
또는 MapElements
를 사용하여 커스텀 형식 또는 PubsubMessage
로 변환합니다.
지원되지 않는 v1beta2 API에 대한 DatastoreIO 지원이 삭제됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-354
DatastoreIO는 이제 Cloud Datastore API v1을 기반으로 합니다.
DisplayData.Builder
가 변경됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-745
DisplayData.Builder.include(..)
에 하위 구성요소 표시 데이터를 등록하기 위한 새로운 필수 경로 매개변수가 있습니다. 이제 Builder API가 DisplayData.Item
대신 DisplayData.ItemSpec<>
을 반환합니다.
FileBasedSink.getWriterResultCoder()
가 필요함
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-887
FileBasedSink.getWriterResultCoder
가 필수적으로 제공되어야 하는 추상 메서드로 변환되었습니다.
이름이 Filter.byPredicate()
에서 Filter.by()
로 변경됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-342
IntraBundleParallelization
삭제됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-414
이름이 RemoveDuplicates
에서 Distinct
로 변경됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-239
TextIO
가 다른 구문을 사용하고 문자열에 대해서만 작동하도록 변경됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1354
TextIO.Read.from()
은 TextIO.read().from()
으로 변경되고, 마찬가지로 TextIO.Write.to()
는 TextIO.write().to()
로 변경됩니다.
이제 TextIO.Read
가 항상 PCollection<String>
을 반환하고 문자열 파싱하기 위해 .withCoder()
를 취하지 않습니다. 대신 컬렉션에 ParDo
또는 MapElements
를 적용하여 문자열을 파싱합니다.
마찬가지로 이제 TextIO.Write
는 항상 PCollection<String>
을 취하고, TextIO
에 다른 내용을 쓰기 위해 ParDo
또는 MapElements
를 사용하여 이를 String
으로 변환합니다.
AvroIO
가 다른 구문을 사용하도록 변경됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1402
Avro 생성 유형을 읽고 쓰기 위해 AvroIO.Read.from().withSchema(Foo.class)
가 AvroIO.read(Foo.class).from()
으로 변경되고, 마찬가지로 AvroIO.Write
도 변경되었습니다.
지정된 스키마를 사용하여 Avro 일반 레코드를 읽고 쓰기 위해 AvroIO.Read.from().withSchema(Schema or String)
가 AvroIO.readGenericRecords().from()
으로 변경되고, 마찬가지로 AvroIO.Write
도 변경되었습니다.
KafkaIO
가 유형 매개변수를 명시적으로 지정하고 Kafka 직렬 변환기/역직렬 변환기를 사용하도록 변경됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1573, BEAM-2221
KafkaIO에서 이제 키와 값 유형 매개변수를 명시적으로 지정해야 합니다(예: KafkaIO.<Foo, Bar>read()
, KafkaIO.<Foo, Bar>write()
).
키와 값 바이트를 해석하는 데 Coder
를 사용하는 대신 표준 Kafka Serializer
및 Deserializer
클래스를 사용합니다. 예를 들어 KafkaIO.read().withKeyCoder(StringUtf8Coder.of())
대신 KafkaIO.read().withKeyDeserializer(StringDeserializer.class)
를 사용하고, KafkaIO.write()
에 대해서도 마찬가지입니다.
BigQueryIO
구문이 변경됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1427
BigQueryIO.Read.from()
및 BigQueryIO.Write.to()
대신 BigQueryIO.read().from()
및 BigQueryIO.write().to()
를 사용합니다.
KinesisIO.Read
구문이 변경됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1428
KinesisIO.Read.from().using()
대신 KinesisIO.read().from().withClientProvider()
를 사용합니다.
TFRecordIO
구문이 변경됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1913
TFRecordIO.Read.from()
및 TFRecordIO.Write.to()
대신 TFRecordIO.read().from()
및 TFRecordIO.write().to()
를 사용합니다.
XmlSource
및 XmlSink
가 XmlIO
에 통합됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1914
XmlSource
및 XmlSink
를 직접 사용하는 대신 XmlIO
를 사용합니다.
예를 들어 Read.from(XmlSource.from())
대신 XmlIO.read().from()
을 사용하고 Write.to(XmlSink.writeOf())
대신 XmlIO.write().to()
를 사용합니다.
CountingInput
이름이 GenerateSequence
로 변경되고 일반화됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1414
CountingInput.unbounded()
대신 GenerateSequence.from(0)
을 사용합니다. CountingInput.upTo(n)
대신 GenerateSequence.from(0).to(n)
를 사용합니다.
Count
, Latest
, Sample
이 변경됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1417, BEAM-1421, BEAM-1423
클래스 Count.PerElement
, Count.PerKey
, Count.Globally
가 이제 비공개이므로, Count.perElement()
와 같은 팩토리 함수를 사용해야 합니다(이전에는 new Count.PerElement()
를 사용할 수 있었음). 예를 들어 변환 결과에 대해 .withHotKeyFanout()
을 사용하려는 경우 .apply(Count.perElement())
의 결과로 직접 수행할 수는 없습니다. 대신 Count
가 해당 결합 함수를 Count.combineFn()
으로 노출하고 사용자가 직접 Combine.globally(Count.combineFn())
를 적용해야 합니다.
Latest
및 Sample
변환에도 이와 비슷한 변경이 적용됩니다.
MapElements
및 FlatMapElements
에 대한 매개변수의 순서가 변경됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1418
MapElements
및 FlatMapElements.via(SerializableFunction).withOutputType(TypeDescriptor)
을 사용하는 경우, 이제 먼저 설명어를 지정해야 합니다(예: FlatMapElements.into(descriptor).via(fn)
).
추가 매개변수 구성 시 Window
가 변경됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1425
Window
를 사용하여 WindowFn
자체 이외의 내용을 구성하는 경우(Window.into()
), Window.configure()
이 사용됩니다. 예를 들어 Window.triggering(...)
대신 Window.configure().triggering(...)
을 사용합니다.
이름이 Write.Bound
에서 Write
로 변경됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1416
이제 Write.Bound
클래스가 Write
입니다. 이는 Write.to(Sink)
의 적용을 변수로 추출한 경우에만 사용되며, 해당 유형이 Write.Bound<...>
였으나 이제는 Write<...>
가 됩니다.
Flatten
변환 클래스 이름이 변경됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1419
클래스 Flatten.FlattenIterables
및 Flatten.FlattenPCollectionList
의 이름이 각각 Flatten.Iterables
및 Flatten.PCollections
로 변경되었습니다.
GroupByKey.create(boolean)
가 메서드 두 개로 분할됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1420
GroupByKey.create(boolean fewKeys)
가 이제 GroupByKey.create()
및 GroupByKey.createWithFewKeys()
입니다.
SortValues
가 변경됨
영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1426
BufferedExternalSorter.Options
setter 메서드의 이름이 setSomeProperty
에서 withSomeProperty
로 변경되었습니다.
추가 Google API 종속성
SDK 버전 2.0.0부터 Cloud Resource Manager API도 사용해야 합니다.
종속성 업그레이드
2.x 출시 버전은 Avro, protobuf, gRPC를 포함하여 대부분의 고정된 종속 항목 버전을 업그레이드합니다. 이러한 종속 항목 중 일부는 자체적으로 크게 변경되었을 수 있습니다. 이로 인해 코드가 종속 항목에 직접적으로 의존하면 문제가 발생할 수 있습니다. 2.0.0에서 사용된 버전을 pom.xml
에서 확인하거나 mvn dependency:tree
를 사용하여 확인할 수 있습니다.
내부 리팩터링
SDK의 내부 구조가 크게 변경되었습니다. 공개 API(Internal
로 끝나거나 util
패키지에 있는 클래스 또는 메서드 등) 이상의 요소를 사용하고 있던 사용자는 크게 변경된 부분을 확인할 수 있습니다.
StateInternals
또는 TimerInternals
를 사용했던 경우: 해당 내부 API가 삭제되었습니다. 이제 State
에 대해 실험용 DoFn
및 Timer
API를 사용할 수 있습니다.