기존 파이프라인 업데이트

Apache Beam SDK에서는 새 파이프라인 코드를 사용하여 Cloud Dataflow 관리형 서비스에서 진행 중인 스트리밍 작업을 업데이트할 수 있습니다.

기존 Cloud Dataflow 작업을 업데이트하려는 이유는 다양합니다.

  • 파이프라인 코드를 향상 또는 개선하려는 경우
  • 파이프라인 코드에서 버그를 수정하려는 경우
  • 데이터 형식의 변경을 처리하거나 데이터 소스의 버전 또는 기타 변경 사항을 설명하기 위해 파이프라인을 업데이트하려는 경우

작업을 업데이트하면 Cloud Dataflow 서비스는 현재 실행 중인 작업과 잠재적인 교체 작업 사이의 호환성 검사를 수행합니다. 호환성 검사를 통해 중간 상태 정보 및 버퍼링된 데이터와 같은 항목을 이전 작업에서 교체 작업으로 전송할 수 있는지 확인할 수 있습니다.

업데이트 프로세스 및 효과

Cloud Dataflow 서비스에서 작업을 업데이트하는 경우, 기존 작업을 업데이트된 파이프라인 코드를 실행하는 새 작업으로 교체합니다. Cloud Dataflow 서비스는 작업 이름을 유지하지만 업데이트된 작업 ID로 교체 작업을 실행합니다.

교체 작업은 이전 작업의 중간 상태 데이터뿐만 아니라 이전 작업에서 현재 '진행 중인' 버퍼링된 데이터 레코드 또는 메타데이터도 보존합니다. 예를 들어, 파이프라인의 일부 레코드는 윈도우의 확인을 기다리는 동안 버퍼링될 수 있습니다.

진행 중인 데이터

'진행 중인' 데이터는 새 파이프라인의 변환에 의해 계속 처리됩니다. 하지만 교체 파이프라인 코드에 추가되는 추가 변환은 레코드가 버퍼링되는 위치에 따라 적용되거나 적용되지 않을 수 있습니다. 예를 들어, 기존 파이프라인에 다음과 같은 변환이 있다고 가정합니다.

자바: SDK 2.x

  p.apply("Read", ReadStrings())
   .apply("Format", FormatStrings());

Python

  p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
    | 'Format' >> FormatStrings()

자바: SDK 1.x

  p.apply(ReadStrings().named("Read"))
   .apply(FormatStrings().named("Format"));

다음과 같이 작업을 새 파이프라인 코드로 바꿀 수 있습니다.

자바: SDK 2.x

  p.apply("Read", ReadStrings())
   .apply("Remove", RemoveStringsStartingWithA())
   .apply("Format", FormatStrings());

Python

  p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
    | 'Remove', RemoveStringsStartingWithA()
    | 'Format' >> FormatStrings()

자바: SDK 1.x

  p.apply(ReadStrings().named("Read"))
   .apply(RemoveStringsStartingWithA().named("Remove"))
   .apply(FormatStrings().named("Format"));

문자 'A'로 시작되는 문자열을 필터링하도록 변환을 추가한 경우에도 다음 변환(FormatStrings) 시 이전 작업에서 전송되고 'A'로 시작되는 버퍼링된 문자열 또는 진행 중인 문자열이 계속 표시될 수 있습니다.

윈도우 변경

교체 파이프라인에서 PCollection기간 설정트리거 전략을 변경할 수 있지만 주의해야 합니다. 윈도우 또는 트리거 전략을 변경해도 이미 버퍼링되었거나 진행 중인 데이터는 영향을 받지 않습니다.

고정 기간 또는 슬라이딩 기간 창과 같이 파이프라인의 윈도우에 더 사소한 변경만 시도하는 것이 좋습니다. 윈도우 알고리즘 변경과 같이 윈도우나 트리거에 대대적인 변경을 수행하면 파이프라인 출력에 예측할 수 없는 결과가 발생할 수 있습니다.

교체 작업 실행

작업을 업데이트하려면 진행 중인 작업을 교체할 새 작업을 실행해야 합니다. 교체 작업을 실행할 때 작업의 일반 옵션 이외에도 업데이트 프로세스를 수행하기 위한 다음 파이프라인 옵션을 설정해야 합니다.

자바

  • --update 옵션을 전달합니다.
  • PipelineOptions의 --jobName 옵션을 업데이트할 작업과 동일한 이름으로 설정합니다.
  • 파이프라인의 변환 이름이 변경되면 변환 매핑을 제공하고 --transformNameMapping 옵션을 사용하여 전달해야 합니다.

Python

  • --update 옵션을 전달합니다.
  • PipelineOptions의 --job_name 옵션을 업데이트할 작업과 동일한 이름으로 설정합니다.
  • 파이프라인의 변환 이름이 변경되면 변환 매핑을 제공하고 --transform_name_mapping 옵션을 사용하여 전달해야 합니다.

대체 작업 이름 지정

자바

교체 작업 실행 시 --jobName 옵션과 관련하여 전달하는 값은 교체할 작업 이름과 정확히 일치해야 합니다.

Python

교체 작업 실행 시 --job_name 옵션과 관련하여 전달하는 값은 교체할 작업 이름과 정확히 일치해야 합니다.

올바른 작업 이름 값을 찾으려면 Cloud Dataflow 모니터링 인터페이스에서 이전 작업을 선택하고 요약 탭에서 작업 이름 필드를 찾습니다.

그림 1: 작업 이름 필드가 강조 표시된 실행 중인 Cloud Dataflow 작업의 요약 탭입니다.

또는 Cloud Dataflow 명령줄 인터페이스를 사용하여 기존 작업 목록을 쿼리할 수 있습니다. gcloud alpha dataflow jobs list 명령어를 셸 또는 터미널 창에 입력하여 Google Cloud Platform 프로젝트의 Cloud Dataflow 작업 목록을 가져오고 교체할 작업의 NAME 필드를 찾습니다.

ID                                        NAME                                 TYPE       CREATION_TIME        STATUS
2015-07-28_17_02_27-7257409117866690674   windowedwordcount-johndoe-0729000214 Streaming  2015-07-28 17:02:28  Running

변환 매핑 만들기

자바

교체 파이프라인이 이전 파이프라인의 변환 이름을 변경한 경우 Cloud Dataflow 서비스에 변환 매핑이 필요합니다. 변환 매핑은 이전 파이프라인 코드의 이름이 지정된 변환을 교체 파이프라인 코드의 이름에 매핑합니다 --transformNameMapping 명령줄 옵션을 사용하여 매핑을 전달할 때 다음의 일반적인 형식을 사용할 수 있습니다.

--transformNameMapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

이전 파이프라인과 교체 파이프라인 사이에 변경된 변환 이름에 대해서만 --transformNameMapping에 매핑 항목을 제공해야 합니다.

Python

교체 파이프라인이 이전 파이프라인의 변환 이름을 변경한 경우 Cloud Dataflow 서비스에 변환 매핑이 필요합니다. 변환 매핑은 이전 파이프라인 코드의 이름이 지정된 변환을 교체 파이프라인 코드의 이름에 매핑합니다 --transform_name_mapping 명령줄 옵션을 사용하여 매핑을 전달할 때 다음의 일반적인 형식을 사용할 수 있습니다.

--transform_name_mapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

이전 파이프라인과 교체 파이프라인 사이에 변경된 변환 이름에 대해서만 --transform_name_mapping에 매핑 항목을 제공해야 합니다.

변환 이름 결정

맵의 각 인스턴스에서 변환 이름은 파이프라인에서 변환 적용 시 제공한 이름입니다 예를 들면 다음과 같습니다.

자바: SDK 2.x

.apply("FormatResults", ParDo
  .of(new DoFn<KV<String, Long>>, String>() {
    ...
   }
}))

Python

| 'FormatResults' >> beam.ParDo(MyDoFn())

자바: SDK 1.x

.apply(ParDo
  .named("FormatResults")
  .of(new DoFn<KV<String, Long>>, String>() {
    ...
   }
}))

Cloud Dataflow 모니터링 인터페이스에서 작업의 실행 그래프를 검토하여 이전 작업의 변환 이름을 가져올 수도 있습니다.

그림 2: Cloud Dataflow 모니터링 인터페이스에 표시된 WordCount 파이프라인의 실행 그래프

복합 변환에서 이름 지정

변환 이름은 계층적이며 파이프라인의 변환 계층구조를 기반으로 합니다. 파이프라인에 복합 변환이 있는 경우, 중첩된 변환은 포괄하는 변환의 관점에서 이름이 지정됩니다. 예를 들어, 파이프라인에 CountWidgets라는 복합 변환이 포함되어 있고 여기에 Parse라는 내부 변환이 포함되어 있다고 가정합니다. 내부 변환의 전체 이름은 CountWidgets/Parse가 되고 변환 매핑에 이 전체 이름을 지정해야 합니다.

새 파이프라인이 복합 변환을 다른 이름에 매핑하는 경우, 중첩된 모든 변환의 이름도 자동으로 바뀌므로, 변환 매핑의 내부 변환에 변경된 이름을 지정해야 합니다.

변환 계층 리팩토링

교체 파이프라인이 이전 파이프라인과 다른 변환 계층을 사용하는 경우(예: 복합 변환을 리팩토링했거나 파이프라인이 변경된 라이브러리의 복합 변환을 사용하는 경우), 매핑을 명시적으로 선언해야 합니다.

예를 들어 이전 파이프라인이 Parse라는 내부 변환을 포함하는 복합 변환 CountWidgets를 적용했다고 가정합니다. 그리고 이제 교체 파이프라인이 CountWidgets를 리팩토링하고 Scan이라는 다른 변환 안에 Parse를 중첩했다고 가정해 보겠습니다. 업데이트에 성공하려면 이전 파이프라인의 전체 변환 이름(CountWidgets/Parse)을 새 파이프라인의 변환 이름(CountWidgets/Scan/Parse)으로 명시적으로 매핑해야 합니다.

자바

--transformNameMapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

교체 파이프라인에서 변환을 완전히 삭제하는 경우, null 매핑을 제공해야 합니다. 교체 파이프라인이 CountWidgets/Parse 변환을 완전히 제거한다고 가정합니다.

--transformNameMapping={"CountWidgets/Parse":""}

Python

--transform_name_mapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

교체 파이프라인에서 변환을 완전히 삭제하는 경우, null 매핑을 제공해야 합니다. 교체 파이프라인이 CountWidgets/Parse 변환을 완전히 제거한다고 가정합니다.

--transform_name_mapping={"CountWidgets/Parse":""}

작업 호환성 검사

교체 작업을 실행하면 Cloud Dataflow 서비스는 교체 작업과 이전 작업 사이의 호환성 검사를 수행합니다. 호환성 검사에 통과하면 이전 작업이 중지되고, 교체 작업은 동일한 작업 이름을 유지한 상태로 Cloud Dataflow 서비스에서 실행됩니다. 호환성 검사에 실패하면 이전 작업이 Cloud Dataflow 서비스에서 계속 실행되며 교체 작업은 오류를 반환합니다.

자바

Python

호환성 검사를 통해 사용자가 제공하는 변환 매핑에 지정된 대로 Cloud Dataflow 서비스가 이전 작업의 단계에서 교체 작업으로 중간 상태 데이터를 전송할 수 있는지 확인할 수 있습니다. 또한 호환성 검사를 통해 파이프라인의 PCollection이 동일한 Coder를 사용 중인지 확인할 수 있습니다. Coder를 변경하면 진행 중인 데이터 또는 버퍼링된 레코드가 교체 파이프라인에서 제대로 직렬화되지 않을 수 있으므로, 호환성 검사가 실패할 수 있습니다.

호환성 중단 방지

이전 파이프라인과 교체 파이프라인 간의 특정 차이로 인해 호환성 검사가 실패할 수 있습니다. 이러한 차이점은 다음과 같습니다.

  • 매핑을 제공하지 않고 파이프라인 그래프 변경. 작업을 업데이트하면 Cloud Dataflow 서비스는 이전 작업의 변환을 교체 작업의 변환과 일치시켜 각 단계마다 중간 상태 데이터를 전송하려고 합니다. 단계 이름을 바꿨거나 단계를 제거한 경우, Cloud Dataflow가 이에 맞게 상태 데이터를 일치시킬 수 있도록 변환 매핑을 제공해야 합니다.
  • 단계의 부차 입력 변경. 교체 파이프라인의 변환에 부차 입력을 추가하거나 이 변환에서 부차 입력을 제거하면 호환성 검사가 실패할 수 있습니다.
  • 단계의 Coder 변경. 작업을 업데이트하면 Cloud Dataflow 서비스는 현재 버퍼링된 데이터 레코드를 보존(예: 윈도우를 확인하는 동안)하고 교체 작업에서 처리합니다. 교체 작업이 다르거나 호환되지 않는 데이터 인코딩을 사용하는 경우, Cloud Dataflow 서비스는 이러한 레코드를 직렬화하거나 역직렬화할 수 없습니다.
  • 파이프라인에서 '스테이트풀(Stateful)' 작업을 제거한 경우. 파이프라인에서 특정 스테이트풀(Stateful) 작업을 제거하면 교체 작업에서 Cloud Dataflow의 호환성 검사가 실패할 수 있습니다. Cloud Dataflow 서비스는 효율성을 위해 여러 단계를 함께 융합할 수 있습니다. 융합된 단계 내에서 상태 의존적인 작업을 제거하면 검사가 실패합니다. 스테이트풀(Stateful) 작업에는 다음이 포함됩니다.

    • 부차 입력을 생성하거나 소비하는 변환
    • I/O 읽기
    • 키가 지정된 상태를 사용하는 변환
    • 윈도우 병합이 있는 변환
  • 다른 영역에서 교체 작업을 실행하려는 경우. 이전 작업을 실행한 영역과 동일한 영역에서 교체 작업을 실행해야 합니다.

이 페이지가 도움이 되었나요? 평가를 부탁드립니다.

다음에 대한 의견 보내기...

도움이 필요하시나요? 지원 페이지를 방문하세요.