자바용 Dataflow SDK 1.x에서 이전

이 문서에서는 자바 1.x 및 2.x용 Dataflow SDK의 주요 변경 사항을 중점적으로 설명합니다.

Dataflow SDK 지원 중단 알림: Dataflow SDK 2.5.0은 Apache Beam SDK 릴리스와 별도로 제공되는 마지막 Dataflow SDK 릴리스입니다. Dataflow 서비스는 공식 Apache Beam SDK 릴리스를 완벽하게 지원합니다. 또한 Dataflow 서비스는 2.0.0 이상 버전에서 이전에 출시 된 Apache Beam SDK를 지원합니다. 각 SDK에 대한 지원 여부는 Dataflow 지원 페이지를 참조하세요. Apache Beam 다운로드 페이지에는 Apache Beam SDK 출시 버전별 출시 노트가 포함되어 있습니다.

1.x에서 2.x로 이전

자바용 Apache Beam SDK 2.x를 설치 및 사용하려면 Apache Beam SDK 설치 가이드를 참조하세요.

1.x에서 2.x로의 주요 변경사항

참고: 2.x 버전으로 업그레이드하려는 모든 사용자는 이러한 변경사항을 알고 있어야 합니다.

패키지 이름 변경 및 구조 조정

Google Cloud Platform 이외의 환경에서도 문제없이 작동하도록 Apache Beam을 범용화 하려는 노력의 일환으로 SDK 코드 이름이 바뀌고 구조가 조정되었습니다.

이름이 com.google.cloud.dataflow에서 org.apache.beam(으)로 변경되었습니다.

영향을 받는 사용자: 모두 | 영향 : 컴파일 오류 | JIRA 문제: BEAM-78

이제 SDK는 org.apache.beam이 아닌 com.google.cloud.dataflow 패키지에 선언됩니다. 모든 가져오기 문을 이 변경에 맞게 업데이트해야 합니다.

새 하위 패키지: runners.dataflow, runners.direct, io.gcp

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-77

실행기가 고유 패키지로 재구성되었으므로 com.google.cloud.dataflow.sdk.runnersorg.apache.beam.runners.direct 또는 org.apache.beam.runners.dataflow 중 하나로 이동했습니다.

Dataflow 서비스를 실행하는 데 사용되는 파이프 라인 옵션은 com.google.cloud.dataflow.sdk.options에서 org.apache.beam.runners.dataflow.options로 이동했습니다.

Google Cloud Platform 서비스에 대한 대부분의 I/O 커넥터들이 하위 패키지로 이동했습니다. 예를 들어 BigQueryIO는 com.google.cloud.dataflow.sdk.io 에서 org.apache.beam.sdk.io.gcp.bigquery로 이동했습니다.

대부분의 IDE가 새로운 위치를 찾는 데 유용할 수 있습니다. 특정 파일의 새 위치를 확인하려면 t GitHub에서 코드를 검색합니다. 자바 용 Dataflow SDK 1.x 릴리스는 GoogleCloudPlatform/DataflowJavaSDK 저장소(master-1.x 브랜치)에서 빌드됩니다. 자바 용 Dataflow SDK 2.x 릴리스는 apache/beam 저장소의 코드에 해당합니다.

실행기

실행기 이름에서 Pipeline 삭제됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1185

이름에서 PipelinePipeline이 삭제되어 모든 실행기 이름이 짧아졌습니다. 예를 들어 DirectPipelineRunnerDirectRunner, DataflowPipelineRunner는 이제 DataflowRunner입니다.

Google Cloud Storage 경로에 --tempLocation 설정 필요

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-430

다음 중 하나만 지정하도록 허용하는 대신 --stagingLocation 또는 --tempLocation Dataflow가 다른 하나를 유추하면 Dataflow 서비스는 --gcpTempLocation Google Cloud Storage 경로로 설정 될 수 있지만 더 일반적인 경로에서 유추 할 수 있습니다. --tempLocation. 재정의되지 않으면 --stagingLocation에서도 사용됩니다.

InProcessPipelineRunner 삭제됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-243

DirectRunner 사용자의 로컬 머신에서 계속 실행되지만 다중 스레드 실행, 제한되지 않은 PCollection 및 추측 및 지연 출력에 대한 트리거를 추가적으로 지원합니다. 문서화된 Beam 모델과 더 긴밀하게 조율되었으며 추가 단위 테스트 실패가 (올바르게) 발생할 수 있습니다.

이 기능은 현재 DirectRunner, InProcessPipelineRunner 자바 용 Dataflow SDK 1.6+가 삭제되었습니다.

'BlockingDataflowPipelineRunner'을(를) 'PipelineResult.waitToFinish()'(으)로 바꿨습니다

영향을 받는 사용자: 모두 | 영향: 컴파일 오류

이제 BlockingDataflowPipelineRunner이 삭제되었습니다. 코드가 프로그래매틱 방식으로 파이프라인 실행을 예상하고 종료될 때까지 대기하는 경우, DataflowRunner를 사용하고 pipeline.run().waitToFinish()를 명시적으로 호출해야 합니다.

명령줄에서 --runner BlockingDataflowPipelineRunner를 사용하여 파이프라인이 종료될 때까지 기본 프로그램을 차단하도록 대화식으로 유도한 경우, waitToFinish()호출을 유도하는 --blockOnRun와 같은 옵션을 제공해야합니다. .

'TemplatingDataflowPipelineRunner'을(를) '--templateLocation'(으)로 바꿨습니다

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-551

TemplatingDataflowPipelineRunner의 기능은 (자바 용 Dataflow SDK 1.9 이상) --templateLocation 을 사용하여 DataflowRunner으로 대체되었습니다.

ParDo 및 DoFn

DoFn 메서드 오버라이드 대신 주석 사용

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-37

더 많은 유연성과 맞춤설정을 위해 DoFn는 이제 메서드 주석을 사용하여 사용자가 특정 메서드를 재정의하는 대신 처리를 맞춤설정합니다.

새 광고 항목과 DoFn 및 이전 코드는 다음 코드 샘플에서 설명합니다. 이전에는 자바용 Dataflow SDK 1.x를 사용하는 경우 코드는 다음과 같았습니다.

new DoFn<Foo, Baz>() {
      @Override
      public void processElement(ProcessContext c) { … }
    }
    

이제(자바용 Apache Beam SDK 2.x를 사용하는 경우) 코드는 다음과 같습니다.

new DoFn<Foo, Baz>() {
      @ProcessElement   // <-- This is the only difference
      public void processElement(ProcessContext c) { … }
    }
    

DoFnProcessContext#window()에 액세스 한 경우, 추가 변경 사항이 있습니다. 코드가 다음과 같이 바뀝니다.

public class MyDoFn extends DoFn<Foo, Baz> implements RequiresWindowAccess {
      @Override
      public void processElement(ProcessContext c) {
        … (MyWindowType) c.window() …
      }
    }
    

사용자가 작성하는 코드는 다음과 같습니다.

public class MyDoFn extends DoFn<Foo, Baz> {
      @ProcessElement
      public void processElement(ProcessContext c, MyWindowType window) {
        … window …
      }
    }
    

또는

return new DoFn<Foo, Baz>() {
      @ProcessElement
      public void processElement(ProcessContext c, MyWindowType window) {
        … window …
      }
    }
    

런타임이 DoFn에 자동으로 창을 제공합니다.

DoFn는 여러 번들에서 재사용됩니다.

영향을 받는 사용자: 모두 | 영향: 예기치 않은 결과가 자동으로 발생 가능 | JIRA 문제: BEAM-38

성능 개선을 위해 Bundle 당 새 인스턴스를 보장하는 대신 동일한 DoFn를 사용하여 요소의 여러 번들을 처리 할 수 있습니다. 모두 DoFn 번들 끝 부분에서 로컬 상태 (예 : 인스턴스 변수)를 유지하는 경우, 다음 번들이 새 복사본 대신 상태로 시작되므로 동작 변경 사항이 발생할 수 있습니다.

수명주기를 관리하기 위해 @Setup 그리고 @Teardown 메소드가 추가되었습니다. 전체 수명주기는 다음과 같습니다(시점에 관계없이 장애가 발생하면 수명주기가 끊어짐).

  • @Setup: 재사용 가능한 연결을 여는 등 DoFn를 인스턴스별로 초기화
  • 시퀀스 다수:
    • @StartBundle: DoFn의 상태를 재설정하는 등의 번들별 초기화
    • @ProcessElement: 일반적인 요소 처리
    • @FinishBundle: 부작용을 삭제하는 등의 번들별 종결 단계
  • @Teardown: 재사용 가능한 연결을 닫는 등 DoFn이 보유하고 있는 리소스를 인스턴스별로 해제

참고: 실제 사용 시 이 변경의 영향이 제한적일 것으로 예상됩니다. 그러나 컴파일 시간 오류가 생성되지 않으며 예상치 않은 결과가 자동으로 발생할 수 있습니다.

부가 입력 또는 출력 지정 시 매개변수 순서가 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1422

이제 `ParDo 적용 시 DoFn이 우선적으로 항상 지정되어야 합니다.

foos.apply(ParDo
        .withSideInputs(sideA, sideB)
        .withOutputTags(mainTag, sideTag)
        .of(new MyDoFn()))
    

사용자가 작성하는 코드는 다음과 같습니다.

foos.apply(ParDo
        .of(new MyDoFn())
        .withSideInputs(sideA, sideB)
        .withOutputTags(mainTag, sideTag))
    

PTransforms

.named() 삭제됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-370

삭제 .named() 메소드를 호출합니다. 대신 PCollection.apply(“name”, PTransform)를 사용합니다.

이름이 PTransform.apply()에서 PTransform.expand()(으)로 변경되었습니다.

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-438

PCollection.apply()와의 혼동을 방지하기 위해 PTransform.apply() 이름이 PTransform.expand()로 변경되었습니다. 모든 사용자 작성 복합 변환은 재정의된 apply() 메서드 이름을 expand()로 변경해야 합니다. 파이프라인이 구성되는 방식에는 변화가 없습니다.

다른 주요 변경사항

다음은 기타 주요 변경 사항과 예정된 변경 사항 목록입니다.

개별 API 변경

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-725

GcpOptions다음을 삭제함: TokenServerUrl, CredentialDir , CredentialId, SecretsFile, ServiceAccountName, ServiceAccountKeyFile.

GoogleCredentials.fromStream(InputStream for credential)을(를) 사용합니다. 스트림에 Google Developers Console에서 가져온 JSON 형식의 서비스 계정 키 파일 또는 Cloud SDK에서 지원되는 형식을 사용하여 저장된 사용자 인증 정보가 포함될 수 있습니다.

--enableProfilingAgent--saveProfilesToGcs(으)로 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1122

--updateDataflowPipelineOptions 폴더로 이동됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-81

DataflowPipelineDebugOptions에서 --update PipelineOption, DataflowPipelineOptions로 이동.

BoundedSource.producesSortedKeys() 삭제됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1201

BoundedSource에서 producesSortedKeys() 삭제

PubsubIO API 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-974, BEAM-1415

2.0.0-beta2부터 PubsubIO.Read.topic(String)과 같은 정적 팩토리 메서드 대신 PubsubIO.ReadPubsubIO.Write를 사용하여 PubsubIO.<T>read()PubsubIO.<T>write()를 인스턴스화해야 합니다.

PubsubIO을 구성하기 위한 메소드 이름이 변경되었습니다. 예 : PubsubIO.read().topic(String)이름이 PubsubIO.read().fromTopic()으로 변경되었습니다. 마찬가지로: subscription(), fromSubscription(), timestampLabel, idLabel 이 각각 withTimestampAttribute, withIdAttribute, PubsubIO.write().topic(), PubsubIO.write().to()로 변경되었습니다.

메시지 페이로드를 파싱하기 위해Coder을 지정하는 대신 PubsubIO는 Avro 메시지 및 Protobuf 메시지(예 : PubsubIO.readStrings(), PubsubIO.writeAvros())를 읽고 쓰기 위한 함수를 노출시킵니다. 커스텀 유형을 읽고 쓰려면 PubsubIO.read/writeMessages() (메시지 속성을 포함해야하는 경우, PubsubIO.readMessagesWithAttributes)을 사용하고 ParDo 또는 MapElements을 사용하여 커스텀 유형과 PubsubMessage 간에 변환합니다.

지원되지 않는 v1beta2 API에 대한 DatastoreIO 지원이 삭제됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-354

DatastoreIO는 이제 Cloud Datastore API v1을 기반으로 합니다.

DisplayData.Builder 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-745

DisplayData.Builder.include(..)에 하위 요소 표시 데이터를 등록하기 위한 새 필수 경로 매개 변수가 있습니다. 이제 빌더 API가 DisplayData.Item 대신 DisplayData.ItemSpec<>를 반환합니다.

필수 FileBasedSink.getWriterResultCoder().

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-887

FileBasedSink.getWriterResultCoder을 제공해야 하는 추상 메서드로 변환했습니다.

이름이 Filter.byPredicate()에서 Filter.by()로 변경되었습니다.

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-342

IntraBundleParallelization 삭제됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-414

이름이 RemoveDuplicates에서 Distinct(으)로 변경되었습니다.

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-239

TextIO가 변경되어 다른 구문을 사용하고 문자열에서만 작동합니다.

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1354

TextIO.Read.from()TextIO.read().from()로 변경되고, 마찬가지로 TextIO.Write.to()TextIO.write().to()변경됩니다.

TextIO.Read은 이제 항상 PCollection<String> 를 반환하고 문자열을 파싱하기 위해 .withCoder()취하지 않습니다. 대신 컬렉션에 ParDo 또는 MapElements을 적용하여 문자열을 파싱합니다. 마찬가지로 TextIO.Write은 이제 항상 PCollection<String>을 반환하며, TextIO에 다른 항목을 쓰기 위해 ParDo 또는 MapElements을 사용하여 이를 String로 변환합니다.

다른 구문을 사용하기 위해 AvroIO변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1402

Avro에서 생성된 유형을 읽고 쓰기 위해 AvroIO.Read.from().withSchema(Foo.class)AvroIO.read(Foo.class).from()로 변경되고, 마찬가지로 AvroIO.Write도 변경되었습니다.

지정된 스키마를 사용하여 Avro 일반 레코드를 읽고 쓰기 위해 AvroIO.Read.from().withSchema(Schema or String)AvroIO.readGenericRecords().from()로 변경되고, 마찬가지로 AvroIO.Write도 변경되었습니다.

유형 매개변수를 명시적으로 지정하도록 KafkaIO가 변경되었고 Kafka serializers/deserializers 을 사용합니다.

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1573, BEAM-2221

KafkaIO에서 키와 값 유형 매개 변수를 명시적으로 지정해야 합니다. 예: KafkaIO.<Foo, Bar>read(), KafkaIO.<Foo, Bar>write().

Coder를 사용하는 대신 키와 값 바이트를 해석하기 위해 표준 Kafka SerializerDeserializer 클래스를 사용합니다. 예를 들어 KafkaIO.read().withKeyCoder(StringUtf8Coder.of()) 대신 KafkaIO.read().withKeyDeserializer(StringDeserializer.class),KafkaIO.write()를 사용합니다.

BigQueryIO 구문 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1427

BigQueryIO.Read.from(), BigQueryIO.Write.to()대신, BigQueryIO.read().from(), BigQueryIO.write().to()를 사용합니다.

KinesisIO.Read 구문 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1428

KinesisIO.Read.from().using() 대신 KinesisIO.read().from().withClientProvider()를 사용합니다.

TFRecordIO 구문 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1913

TFRecordIO.Read.from(), TFRecordIO.Write.to()대신, TFRecordIO.read().from(), TFRecordIO.write().to()를 사용합니다.

XmlIO아래에 XmlSourceXmlSink가 연결됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1914

XmlSource, XmlSink를 직접 사용하는 대신 XmlIO을 사용합니다.

예: Read.from(XmlSource.from()) 대신 XmlIO.read().from()를 사용하고 Write.to(XmlSink.writeOf())대신 XmlIO.write().to()를 사용합니다.

CountingInput이름이 GenerateSequence로 변경되어 일반화되었습니다.

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1414

CountingInput.unbounded() 대신 GenerateSequence.from(0)를 사용합니다. CountingInput.upTo(n) 대신 GenerateSequence.from(0).to(n)를 사용합니다.

Count, Latest, Sample가 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1417, BEAM-1421, BEAM-1423

클래스 Count.PerElement, Count.PerKey, Count.Globally은 이제 비공개이므로, Count.perElement()(이전에는 new Count.PerElement())와 같은 팩토리 함수를 사용해야합니다. 예를 들어 변환 결과에 대해 .withHotKeyFanout()을 사용하고자 할 경우, 더 이상 .apply(Count.perElement())의 결과로 직접 수행할 수는 없습니다. 대신 Count은 결합 함수를 Count.combineFn()로 노출하고 Combine.globally(Count.combineFn())을 스스로 적용해야 합니다.

유사한 변경 사항이 Latest, Sample 변환에 적용됩니다.

MapElementsFlatMapElements 매개 변수의 순서가 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1418

MapElementsFlatMapElements.via(SerializableFunction).withOutputType(TypeDescriptor)를 사용하는 경우, 이제 설명자를 먼저 지정해야 합니다. 예: FlatMapElements.into(descriptor).via(fn)

추가 매개 변수를 구성할 경우 Window 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1425

Window을 사용하여 WindowFn 자체 이외의 내용을 구성하는 경우(Window.into() ), Window.configure()가 사용됩니다. 예: Window.triggering(...) 대신 Window.configure().triggering(...)를 사용합니다.

이름이 Write.Bound에서 Write(으)로 변경되었습니다.

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1416

이제 Write.Bound 클래스는 단순히 Write입니다. 이것은 Write.to(Sink)의 애플리케이션을 변수로 추출한 경우에만 사용되며 유형이 Write.Bound<...> 이었지만 이제는 Write<...>가 됩니다.

Flatten 변환 클래스 이름이 변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1419

클래스 Flatten.FlattenIterablesFlatten.FlattenPCollectionList 이름이 각각 Flatten.Iterables, Flatten.PCollections로 변경되었습니다.

두 메소드에서 GroupByKey.create(boolean) 분할

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1420

GroupByKey.create(boolean fewKeys)는 이제 GroupByKey.create()GroupByKey.createWithFewKeys()입니다.

SortValues변경됨

영향을 받는 사용자: 모두 | 영향: 컴파일 오류 | JIRA 문제: BEAM-1426

BufferedExternalSorter.Options setter 메소드의 이름이 setSomeProperty 에서 withSomeProperty로 변경되었습니다.

추가 Google API 종속성

SDK 버전 2.0.0부터 Cloud Resource Manager API도 사용해야 합니다.

종속성 업그레이드

2.x 출시 버전은 Avro, protobuf, gRPC를 포함하여 대부분의 고정된 종속 항목 버전을 업그레이드합니다. 이러한 종속 항목 중 일부는 자체적으로 크게 변경되었을 수 있습니다. 이로 인해 코드가 종속 항목에 직접적으로 의존하면 문제가 발생할 수 있습니다. 2.0.0에서 사용된 버전은 pom.xml에서 또는 mvn dependency:tree를 사용하여 확인할 수 있습니다.

내부 리팩터링

SDK의 내부 구조가 크게 변경되었습니다. 공개 API(Internal로 끝나거나 util 패키지에 있는 클래스 또는 메소드 등) 이상의 요소를 사용하고 있던 사용자는 크게 변경된 부분을 확인할 수 있습니다.

사용중인 경우 StateInternals 또는 TimerInternals: 내부 API가 삭제되었습니다. 실험용 StateDoFnTimer API를 사용할 수 있습니다.