Pub/Sub Lite를 Apache Kafka에 연결

이 문서에서는 Pub/Sub Group Kafka Connector를 사용하여 Apache Kafka와 Pub/Sub Lite를 통합하는 방법을 설명합니다.

Pub/Sub Group Kafka Connector 정보

Apache Kafka는 이벤트 스트리밍을 위한 오픈소스 플랫폼입니다. Kafka는 분산 아키텍처에서 느슨하게 결합된 구성요소 간의 통신을 사용 설정하기 위해 일반적으로 사용됩니다. Pub/Sub Lite는 메시지를 비동기적으로 주고받기 위한 관리형 서비스입니다. Kafka와 마찬가지로 Pub/Sub Lite를 사용하여 클라우드 아키텍처의 구성요소 간에 통신할 수 있습니다.

Pub/Sub Group Kafka Connector를 사용하면 이러한 두 시스템을 통합할 수 있습니다. 다음 커넥터는 Connector JAR로 패키징됩니다.

  • 싱크 커넥터는 하나 이상의 Kafka 주제에서 레코드를 읽어 Pub/Sub Lite에 게시합니다.
  • 소스 커넥터는 Pub/Sub Lite 주제에서 메시지를 읽고 Kafka에 게시합니다.

다음은 Pub/Sub Group Kafka Connector를 사용할 수 있는 몇 가지 시나리오입니다.

  • Kafka 기반 아키텍처를 Google Cloud로 마이그레이션합니다.
  • Google Cloud 외부의 Kafka에 이벤트를 저장하는 프런트엔드 시스템이 있지만 Google Cloud를 사용하여 Kafka 이벤트를 수신해야 할 일부 백엔드 서비스를 실행합니다.
  • 온프레미스 Kafka 솔루션에서 로그를 수집하여 데이터 분석을 위해 Google Cloud로 전송합니다.
  • Google Cloud를 사용하는 프런트엔드 시스템이 있지만 Kafka를 사용하여 데이터를 온프레미스에 저장합니다.

커넥터에는 Kafka 및 기타 시스템 간 데이터 스트리밍을 위한 프레임워크인 Kafka Connect가 필요합니다. 커넥터를 사용하려면 Kafka 클러스터와 함께 Kafka Connect를 실행해야 합니다.

이 문서에서는 사용자가 Kafka와 Pub/Sub Lite에 모두 익숙하다고 가정합니다. Pub/Sub Lite를 시작하려면 Google Cloud 콘솔을 사용하여 Pub/Sub Lite에서 메시지 게시 및 수신을 참조하세요.

Pub/Sub Group Kafka Connector 시작하기

이 섹션에서는 다음 작업을 안내합니다.

  1. Pub/Sub Group Kafka Connector를 구성합니다.
  2. Kafka에서 Pub/Sub Lite로 이벤트를 전송합니다.
  3. Pub/Sub Lite에서 Kafka로 메시지를 전송합니다.

기본 요건

Kafka 설치

Apache Kafka 빠른 시작을 따라 로컬 머신에 단일 노드 Kafka를 설치합니다. 빠른 시작에서 다음 단계를 완료합니다.

  1. 최신 Kafka 버전을 다운로드하고 압축을 풉니다.
  2. Kafka 환경을 시작합니다.
  3. Kafka 주제를 만듭니다.

인증

Pub/Sub Group Kafka Connector는 Pub/Sub 메시지를 전송 및 수신하기 위해 Pub/Sub로 인증해야 합니다. 인증을 설정하려면 다음 단계를 수행합니다.

  1. Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
  2. Google Cloud CLI를 설치합니다.
  3. gcloud CLI를 초기화하려면 다음 명령어를 실행합니다.

    gcloud init
  4. Google Cloud 프로젝트를 만들거나 선택합니다.

    • Google Cloud 프로젝트를 만듭니다.

      gcloud projects create PROJECT_ID

      PROJECT_ID를 만들려는 Google Cloud 프로젝트의 이름으로 바꿉니다.

    • 만든 Google Cloud 프로젝트를 선택합니다.

      gcloud config set project PROJECT_ID

      PROJECT_ID를 Google Cloud 프로젝트 이름으로 바꿉니다.

  5. Google 계정의 로컬 인증 사용자 인증 정보를 만듭니다.

    gcloud auth application-default login
  6. Google 계정에 역할을 부여합니다. 다음 각 IAM 역할에 대해 다음 명령어를 한 번씩 실행합니다. roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID를 프로젝트 ID로 바꿉니다.
    • EMAIL_ADDRESS를 이메일 주소로 바꿉니다.
    • ROLE을 각 개별 역할로 바꿉니다.
  7. Google Cloud CLI를 설치합니다.
  8. gcloud CLI를 초기화하려면 다음 명령어를 실행합니다.

    gcloud init
  9. Google Cloud 프로젝트를 만들거나 선택합니다.

    • Google Cloud 프로젝트를 만듭니다.

      gcloud projects create PROJECT_ID

      PROJECT_ID를 만들려는 Google Cloud 프로젝트의 이름으로 바꿉니다.

    • 만든 Google Cloud 프로젝트를 선택합니다.

      gcloud config set project PROJECT_ID

      PROJECT_ID를 Google Cloud 프로젝트 이름으로 바꿉니다.

  10. Google 계정의 로컬 인증 사용자 인증 정보를 만듭니다.

    gcloud auth application-default login
  11. Google 계정에 역할을 부여합니다. 다음 각 IAM 역할에 대해 다음 명령어를 한 번씩 실행합니다. roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID를 프로젝트 ID로 바꿉니다.
    • EMAIL_ADDRESS를 이메일 주소로 바꿉니다.
    • ROLE을 각 개별 역할로 바꿉니다.

커넥터 JAR 다운로드

커넥터 JAR 파일을 로컬 머신에 다운로드합니다. 자세한 내용은 GitHub 리드미에서 커넥터 획득을 참조하세요.

커넥터 구성 파일 복사

  1. 커넥터에서 GitHub 저장소를 클론하거나 다운로드합니다.

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. config 디렉터리 콘텐츠를 Kafka 설치의 config 하위 디렉터리에 복사합니다.

    cp config/* [path to Kafka installation]/config/
    

이러한 파일에는 커넥터의 구성 설정이 포함됩니다.

Kafka Connect 구성 업데이트

  1. 다운로드한 Kafka Connect 바이너리가 포함된 디렉터리로 이동합니다.
  2. Kafka Connect 바이너리 디렉터리에서 config/connect-standalone.properties라는 파일을 텍스트 편집기로 엽니다.
  3. plugin.path property가 주석 처리된 경우 주석 처리를 삭제합니다.
  4. 커넥터 JAR의 경로를 포함하도록 plugin.path property를 업데이트합니다.

    예를 들면 다음과 같습니다.

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. offset.storage.file.filename 속성을 로컬 파일 이름으로 설정합니다. 독립형 모드에서 Kafka는 이 파일을 사용하여 오프셋 데이터를 저장합니다.

    예를 들면 다음과 같습니다.

    offset.storage.file.filename=/tmp/connect.offsets
    

Kafka에서 Pub/Sub Lite로 이벤트 전달

이 섹션에서는 싱크 커넥터를 시작하고, Kafka에 이벤트를 게시한 후 Pub/Sub Lite에서 전달된 메시지를 읽는 방법을 설명합니다.

  1. Google Cloud CLI를 사용하여 Pub/Sub Lite 예약을 만듭니다.

    gcloud pubsub lite-reservations create RESERVATION_NAME \
    --location=LOCATION \
    --throughput-capacity=4
    

    다음을 바꿉니다.

    • RESERVATION_NAME: Pub/Sub Lite 예약의 이름입니다.
    • LOCATION: 예약 위치입니다.
  2. Google Cloud CLI를 사용하여 Pub/Sub Lite 주제와 구독을 만듭니다.

    gcloud pubsub lite-topics create LITE_TOPIC \
    --location=LOCATION \
    --partitions=2 \
    --per-partition-bytes=30GiB \
    --throughput-reservation=RESERVATION_NAME
    
    gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \
    --location=LOCATION \
    --topic=LITE_TOPIC
    

    다음을 바꿉니다.

    • LITE_TOPIC: Kafka에서 메시지를 수신할 Pub/Sub Lite 주제의 이름입니다.
    • LOCATION: 주제의 위치입니다. 값은 예약 위치와 일치해야 합니다.
    • RESERVATION_NAME: Pub/Sub Lite 예약의 이름입니다.
    • LITE_SUBSCRIPTION: 주제의 Pub/Sub Lite 구독 이름입니다.
  3. 텍스트 편집기에서 /config/pubsub-lite-sink-connector.properties 파일을 엽니다. 다음 속성 값을 추가합니다. 주석에는 "TODO"로 표시됩니다.

    topics=KAFKA_TOPICS
    pubsublite.project=PROJECT_ID
    pubsublite.location=LOCATION
    pubsublite.topic=LITE_TOPIC
    

    다음을 바꿉니다.

    • KAFKA_TOPICS: 읽어올 쉼표로 구분된 Kafka 주제 목록입니다.
    • PROJECT_ID: Pub/Sub Lite 주제가 포함된 Google Cloud 프로젝트입니다.
    • LOCATION: Pub/Sub Lite 주제의 위치입니다.
    • LITE_TOPIC: Kafka에서 메시지를 수신할 Pub/Sub Lite 주제입니다.
  4. Kafka 디렉터리에서 다음 명령어를 실행합니다.

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/pubsub-lite-sink-connector.properties
    
  5. Apache Kafka 빠른 시작의 단계를 따라 Kafka 주제에 일부 이벤트를 씁니다.

  6. Lite 구독에서 메시지 수신에 표시된 방법 중 하나를 사용하여 Pub/Sub Lite 구독을 수신합니다.

Pub/Sub Lite에서 Kafka로 메시지 전달

이 섹션에서는 소스 커넥터를 시작하고, Pub/Sub Lite에 메시지를 게시하고, Kafka에서 전달된 메시지를 읽는 방법을 설명합니다.

  1. Google Cloud CLI를 사용하여 Pub/Sub Lite 예약을 만듭니다.

    gcloud pubsub lite-reservations create RESERVATION_NAME \
    --location=LOCATION \
    --throughput-capacity=4
    

    다음을 바꿉니다.

    • RESERVATION_NAME: Pub/Sub Lite 예약의 이름입니다.
    • LOCATION: 예약 위치입니다.
  2. Google Cloud CLI를 사용하여 Pub/Sub Lite 주제와 구독을 만듭니다.

    gcloud pubsub lite-topics create LITE_TOPIC \
    --location=LOCATION \
    --partitions=2 \
    --per-partition-bytes=30GiB \
    --throughput-reservation=RESERVATION_NAME
    
    gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \
    --location=LOCATION \
    --topic=LITE_TOPIC
    

    다음을 바꿉니다.

    • LITE_TOPIC: Pub/Sub Lite 주제의 이름입니다.
    • LOCATION: 주제의 위치입니다. 값은 예약 위치와 일치해야 합니다.
    • RESERVATION_NAME: Pub/Sub Lite 예약의 이름입니다.
    • LITE_SUBSCRIPTION: 주제의 Pub/Sub Lite 구독 이름입니다.
  3. 텍스트 편집기에서 /config/pubsub-lite-source-connector.properties라는 파일을 엽니다. 다음 속성 값을 추가합니다. 주석에는 "TODO"로 표시됩니다.

    topic=KAFKA_TOPIC
    pubsublite.project=PROJECT_ID
    pubsublite.location=LOCATION
    pubsublite.subscription=LITE_SUBSCRIPTION
    

    다음을 바꿉니다.

    • KAFKA_TOPIC: Pub/Sub 메시지를 수신할 Kafka 주제입니다.
    • PROJECT_ID: Pub/Sub 주제가 포함된 Google Cloud 프로젝트입니다.
    • LOCATION: Pub/Sub Lite 주제의 위치입니다.
    • LITE_SUBSCRIPTION: Pub/Sub Lite 주제입니다.
  4. Kafka 디렉터리에서 다음 명령어를 실행합니다.

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/pubsub-lite-source-connector.properties
    
  5. Lite 주제에 메시지를 게시에 표시된 방법을 사용하여 Pub/Sub Lite 주제에 메시지를 게시합니다.

  6. Kafka에서 메시지를 읽습니다. Apache Kafka 빠른 시작의 단계를 따라 Kafka 주제의 메시지를 읽습니다.

메시지 변환

Kafka 레코드는 가변 길이 바이트 배열인 키와 값을 포함합니다. 필요에 따라 Kafka 레코드에 키-값 쌍인 헤더가 있을 수도 있습니다. Pub/Sub Lite 메시지에는 다음과 같은 필드가 있습니다.

  • key: 메시지 키(bytes)
  • data: 메시지 데이터(bytes)
  • attributes: 0개 이상의 속성. 각 속성은 (key,values[]) 맵입니다. 단일 속성에 값이 여러 개 있을 수 있습니다.
  • event_time: 사용자가 제공한 이벤트 타임스탬프(선택사항)입니다.

Kafka Connect는 변환기를 사용하여 Kafka 간의 키와 값을 직렬화합니다. 직렬화를 제어하려면 커넥터 구성 파일에서 다음 속성을 설정합니다.

  • key.converter: 레코드 키를 직렬화하는 데 사용되는 변환기입니다.
  • value.converter: 레코드 값을 직렬화하는 데 사용되는 변환기입니다.

Kafka에서 Pub/Sub Lite로 변환

싱크 커넥터는 다음과 같이 Kafka 레코드를 Pub/Sub Lite 메시지로 변환합니다.

Kafka 레코드(SinkRecord) Pub/Sub Lite 메시지
key
data
헤더 attributes
타임스탬프 eventTime
타임스탬프 유형 attributes["x-goog-pubsublite-source-kafka-event-time-type"]
주제 attributes["x-goog-pubsublite-source-kafka-topic"]
파티션 attributes["x-goog-pubsublite-source-kafka-offset"]
오프셋 attributes["x-goog-pubsublite-source-kafka-partition"]

키, 값, 헤더는 다음과 같이 인코딩됩니다.

  • null 스키마는 문자열 스키마로 처리됩니다.
  • 바이트 페이로드는 변환 없이 직접 기록됩니다.
  • 문자열, 정수, 부동 소수점 페이로드는 UTF-8 바이트 시퀀스로 인코딩됩니다.
  • 다른 모든 페이로드는 프로토콜 버퍼 Value 유형으로 인코딩된 후 바이트 문자열로 변환됩니다.
    • 중첩된 문자열 필드는 protobuf Value로 인코딩됩니다.
    • 중첩된 바이트 필드는 base64로 인코딩된 바이트가 있는 protobuf Value로 인코딩됩니다.
    • 중첩된 숫자 필드는 protobuf Value에 double로 인코딩됩니다.
    • 배열, 맵, 구조체 키가 있는 맵은 지원되지 않습니다.

Pub/Sub Lite에서 Kafka로 변환

소스 커넥터는 다음과 같이 Pub/Sub Lite 메시지를 Kafka 레코드로 변환합니다.

Pub/Sub Lite 메시지 Kafka 레코드(SourceRecord)
key
data
attributes 헤더
event_time 타임스탬프. event_time이 없으면 게시 시간이 사용됩니다.

구성 옵션

커넥터는 Kafka Connect API에서 제공하는 구성 외에도 다음 Pub/Sub Lite 구성을 지원합니다.

싱크 커넥터 구성 옵션

싱크 커넥터는 다음 구성 옵션을 지원합니다.

설정 데이터 유형 설명
connector.class String 필수. 커넥터의 자바 클래스입니다. Pub/Sub Lite 싱크 커넥터의 값은 com.google.pubsublite.kafka.sink.PubSubLiteSinkConnector여야 합니다.
gcp.credentials.file.path String 선택사항. Pub/Sub Lite 인증을 위해 Google Cloud 사용자 인증 정보를 저장하는 파일의 경로입니다.
gcp.credentials.json String 선택사항. Pub/Sub Lite 인증을 위한 Google Cloud가 포함된 JSON blob입니다.
pubsublite.location String 필수. Pub/Sub Lite 주제의 위치입니다.
pubsublite.project String 필수. Pub/Sub Lite 주제가 포함된 Google Cloud입니다.
pubsublite.topic String 필수. Kafka 레코드를 게시할 Pub/Sub Lite 주제입니다.
topics String 필수. 읽어올 쉼표로 구분된 Kafka 주제 목록입니다.

소스 커넥터 구성 옵션

소스 커넥터는 다음 구성 옵션을 지원합니다.

설정 데이터 유형 설명
connector.class String 필수. 커넥터의 자바 클래스입니다. Pub/Sub Lite 소스 커넥터의 값은 com.google.pubsublite.kafka.source.PubSubLiteSourceConnector여야 합니다.
gcp.credentials.file.path String 선택사항. Pub/Sub Lite 인증을 위해 Google Cloud 사용자 인증 정보를 저장하는 파일의 경로입니다.
gcp.credentials.json String 선택사항. Pub/Sub Lite 인증을 위한 Google Cloud가 포함된 JSON blob입니다.
kafka.topic String 필수. Pub/Sub Lite에서 메시지를 수신하는 Kafka 주제입니다.
pubsublite.location String 필수. Pub/Sub Lite 주제의 위치입니다.
pubsublite.partition_flow_control.bytes Long

Pub/Sub Lite 파티션당 최대 미해결 바이트 수입니다.

기본값: 20,000,000

pubsublite.partition_flow_control.messages Long

Pub/Sub Lite 파티션당 최대 미해결 메시지 수입니다.

기본값: Long.MAX_VALUE

pubsublite.project String 필수. Pub/Sub Lite 주제가 포함된 Google Cloud 프로젝트입니다.
pubsublite.subscription String 필수. 메시지를 가져올 Pub/Sub Lite 구독의 이름입니다.

다음 단계