Pub/Sub I/O

기본 제공되는 Cloud Pub/SubReadWrite 변환은 PubsubIO에 포함되어 있습니다. PubsubIO를 사용하여 Cloud Pub/Sub 주제 또는 구독에서 데이터를 읽거나 쓸 수 있습니다. 또한 PubsubIO커스텀 메시지 식별자 또는 Cloud Pub/Sub가 할당한 식별자를 기준으로 중복된 메시지를 삭제하므로, Cloud Pub/Sub 메시지 스트림을 정확하게 한 번으로 처리할 수 있습니다.

참고: 기본적으로 PubsubIO 변환은 제한되지 않은 PCollection을 만듭니다. PubsubIO.Read를 사용하여 PCollection을 만든 후에는 GroupByKey 또는 Combine과 같이 요소를 그룹화하는 변환을 적용하기 전에 해당 PCollection윈도우 전략을 적용해야 합니다.

Cloud Pub/Sub 주제 또는 구독 지정

PubsubIO를 사용하려면 Cloud Pub/Sub 주제의 이름 또는 주어진 주제에 대해 이미 생성한 구독 중 하나를 제공합니다. PubsubIO 변환을 주제 이름과 함께 사용하는 경우에는 Dataflow가 백그라운드에서 구독을 자동으로 생성하고 관리합니다.

Cloud Pub/Sub 주제 지정

PubsubIO를 사용할 때는 읽고 쓸 Cloud Pub/Sub 주제를 지정할 수 있습니다. 주제 이름을 제공하면 Dataflow가 해당 주제에 대한 구독을 자동으로 생성합니다. Dataflow는 파이프라인이 시작될 때부터 주제에서 읽기 시작하며, 실제 파이프라인이 시작되기 전에 주제에 게시된 데이터는 파이프라인에서 사용할 수 없습니다.

참고: Dataflow는 파이프라인 설정의 일환으로 필요한 Cloud Pub/Sub 구독을 생성합니다. 파이프라인 설정 시간은 Compute Engine 인스턴스 및 Cloud Platform에 할당된 다른 리소스의 수에 따라 달라질 수 있으므로 파이프라인이 Cloud Pub/Sub 주제에서 읽기 시작한 정확한 시기를 알기 어려울 수 있습니다. 데이터 읽기 및 쓰기 타이밍을 더욱 세부적으로 제어해야 할 경우에는 Cloud Pub/Sub 주제에 대한 구독을 직접 생성 및 관리하고 해당 구독을 PubsubIO에 전달할 수 있습니다. 자세한 내용은 Cloud Pub/Sub 구독 지정을 참조하세요.

제공하는 주제 이름은 projects/<Cloud Platform Project Name>/topics/<topic name> 형식을 따라야 합니다. 여기서 프로젝트 이름은 주제가 속한 프로젝트의 이름입니다. 또한 주제 이름은 다음과 같은 요구사항을 충족해야 합니다.

  • 주제 이름은 길이가 3자에서 255자 사이여야 합니다.
  • 주제 이름은 소문자, 숫자, 대시('-'), 밑줄('_'), 점('.')만 포함할 수 있습니다.
  • 주제 이름은 문자로 시작해야 합니다.
  • 주제 이름은 문자나 숫자로 끝나야 합니다.
  • 주제 이름은 'goog' 프리픽스로 시작할 수 없습니다.

Cloud Pub/Sub 구독 지정

PubsubIO 변환을 사용할 경우 특정 Cloud Pub/Sub 주제에서 읽거나 쓸 때 사용할 Cloud Pub/Sub 구독을 지정할 수 있습니다. 구독을 사용하려면 이미 구독을 직접 생성하고 관리했어야 합니다. Cloud Pub/Sub 구독을 생성하는 방법에 대한 자세한 내용은 Cloud Pub/Sub 구독자 문서를 참조하세요.

제공하는 구독 이름은 /projects/<Cloud Platform Project Name>/subscriptions/<subscription name> 형식을 따라야 합니다. 여기서 프로젝트 이름은 구독이 속한 프로젝트의 이름입니다. 또한 제공하는 구독 이름은 다음과 같은 요구사항을 충족해야 합니다.

  • 구독 이름은 길이가 3자에서 255자 사이여야 합니다.
  • 구독 이름은 소문자, 숫자, 대시('-'), 밑줄('_'), 점('.')만 포함할 수 있습니다.
  • 구독 이름은 문자로 시작해야 합니다.
  • 구독 이름은 문자나 숫자로 끝나야 합니다.
  • 구독 이름은 'goog' 프리픽스로 시작할 수 없습니다.

파이프라인이 Cloud Pub/Sub 주제에서 어떠한 데이터도 놓치지 않도록 하려면 구독을 사용해야 합니다. Cloud Pub/Sub 구독은 읽는 파이프라인이 없더라도 계속해서 데이터를 수집합니다. 파이프라인이 시작되면 파이프라인 시작 전에 도착한 데이터까지 포함하여 구독이 수집한 모든 데이터에 액세스할 수 있습니다. 또한 파이프라인을 중지하고 같은 구독에서 읽을 새 파이프라인을 생성하는 경우, 데이터가 전혀 손실되지 않습니다. 파이프라인이 없을 때도 구독이 계속해서 데이터를 수집하기 때문입니다.

PubsubIO로 읽기

PubsubIO.Read 변환은 지속적으로 Cloud Pub/Sub 스트림에서 읽고 스트림의 데이터를 나타내는 String제한되지 않는 PCollection을 반환합니다. 기본적으로, 결과 PCollection에 있는 각 요소는 UTF-8 문자열로 인코딩됩니다. PubsubIO.Read를 호출할 때 .withCoder를 사용하여 기본 인코딩을 재정의할 수 있습니다.

자바

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  // streamData is Unbounded; apply windowing afterward.
  PCollection<String> streamData =
    p.apply(PubsubIO.Read.named("ReadFromPubsub")
                       .topic("/topics/my-topic"));

Cloud Pub/Sub로부터 제한된 레코드 집합 읽기

테스트 목적을 위해 InProcessPipelineRunner(자바 1.X용 Dataflow SDK) 또는 DirectRunner(자바 2.X용 Dataflow SDK)를 사용하여 Pub/Sub로부터 읽을 수 있습니다.

테스트 목적을 위해 제한된 컬렉션에서 작업해야 하는 경우에는 읽을 입력 양에 대한 한도를 지정할 수 있습니다. .maxNumRecords 옵션을 사용하여 고정된 최대 레코드 수를 읽거나, .maxReadTime을 사용하여 고정된 기간 동안의 레코드를 읽을 수 있습니다.

이 모드에서는 오류 시 중복 제거, 재시도, 복구가 보장되지 않습니다. 프로덕션의 경우, Cloud Pub/Sub를 제한되지 않은 소스로 취급해야 합니다.

PubsubIO로 쓰기

PubsubIO.Write 변환은 String 객체의 제한되지 않은 PCollection을 Cloud Pub/Sub 스트림에 작성합니다. 기본적으로, PubsubIO.Write로의 입력 PCollection은 UTF-8로 인코딩된 문자열을 포함해야 합니다. withCoder를 사용하여 예상 입력 유형 및 인코딩을 변경할 수 있습니다.

자바

  // streamData is Unbounded.
  PCollection<String> streamData = ...;
  streamData.apply(PubsubIO.Write.named("WriteToPubsub")
                       .topic("/topics/my-topic"));

타임스탬프 및 레코드 ID

PubsubIO를 사용하여 읽고 쓰는 레코드에 타임스탬프레코드 ID라는 두 가지 유형의 메타데이터를 추가할 수 있습니다.

사용자 지정 타임스탬프 사용

Cloud Pub/Sub로부터 읽은 요소가 Dataflow 파이프라인의 윈도우에 할당되는 방법을 사용자 지정 타임스탬프를 사용하여 정교하게 제어할 수 있습니다. 사용자 지정 타임스탬프를 설정하려면 PubsubIO.Read 또는 PubsubIO.Write 변환을 설정할 때 timestampLabel을 호출하여 원하는 문자열 값을 전달합니다.

PubsubIO.Read 사용 시 사용자 지정 타임스탬프 라벨을 설정한 경우, Cloud Pub/Sub에서 요소를 읽으면 변환은 속성의 값을 timestampLabel에 각 수신 메시지의 타임스탬프로 전달한 문자열 이름과 함께 사용합니다. 타임스탬프는 Unix epoch 이후 밀리초 형식이거나 RFC 3339에 따른 형식이어야 합니다.

PubsubIO.Write 사용 시 사용자 지정 타임스탬프 라벨을 설정한 경우, 변환은 특정 이름의 속성을 Unix epoch 이후 밀리초로 된 요소의 타임스탬프 값과 함께 사용하여 각 요소를 Cloud Pub/Sub 메시지로 작성합니다.

레코드 ID 사용

레코드 ID는 Dataflow와 다른 시스템 사이의 경계에서 정확히 한 번만 처리되도록 합니다. 레코드 ID를 사용하려면 PubsubIO.Read 또는 PubsubIO.Write 변환을 설정할 때 idLabel을 호출하여 원하는 문자열 값을 전달합니다.

PubsubIO.Read 사용 시 레코드 ID 라벨을 설정한 경우, Dataflow가 동일 ID(idLabel에 전달된 문자열 이름을 사용하는 속성에서 읽음)의 여러 메시지를 수신하면 한 메시지를 제외하고 모두 삭제합니다. 하지만 Dataflow는 10분이 지나 Cloud Pub/Sub에 게시된 동일 레코드 ID 값에 대해서는 중복 메시지를 제거하지 않습니다.

PubsubIO.Write 사용 시 레코드 ID 라벨을 설정한 경우, 변환은 모든 발신 메시지에 속성을 지정된 이름 및 고유 값과 함께 작성합니다. 다운스트림 시스템은 이 고유 값을 사용하여 중복 메시지를 제거할 수 있습니다.

이 페이지가 도움이 되었나요? 평가를 부탁드립니다.

다음에 대한 의견 보내기...

도움이 필요하시나요? 지원 페이지를 방문하세요.