Pub/Sub를 Apache Kafka에 연결

이 문서에서는 Pub/Sub Group Kafka Connector를 사용하여 Apache Kafka와 Pub/Sub를 통합하는 방법을 설명합니다.

Pub/Sub Group Kafka Connector 정보

Apache Kafka는 이벤트 스트리밍을 위한 오픈소스 플랫폼입니다. Kafka는 분산 아키텍처에서 느슨하게 결합된 구성요소 간의 통신을 사용 설정하기 위해 일반적으로 사용됩니다. Pub/Sub는 메시지를 비동기적으로 주고받기 위한 관리형 서비스입니다. Kafka와 마찬가지로 Pub/Sub를 사용하여 클라우드 아키텍처의 구성요소 간에 통신할 수 있습니다.

Pub/Sub Group Kafka Connector를 사용하면 이러한 두 시스템을 통합할 수 있습니다. 다음 커넥터는 Connector JAR로 패키징됩니다.

  • 싱크 커넥터는 하나 이상의 Kafka 주제에서 레코드를 읽어 Pub/Sub에 게시합니다.
  • 소스 커넥터는 Pub/Sub 주제에서 메시지를 읽고 Kafka에 게시합니다.

다음은 Pub/Sub Group Kafka Connector를 사용할 수 있는 몇 가지 시나리오입니다.

  • Kafka 기반 아키텍처를 Google Cloud로 마이그레이션합니다.
  • Google Cloud 외부의 Kafka에 이벤트를 저장하는 프런트엔드 시스템이 있지만 Google Cloud를 사용하여 Kafka 이벤트를 수신해야 할 일부 백엔드 서비스를 실행합니다.
  • 온프레미스 Kafka 솔루션에서 로그를 수집하여 데이터 분석을 위해 Google Cloud로 전송합니다.
  • Google Cloud를 사용하는 프런트엔드 시스템이 있지만 Kafka를 사용하여 데이터를 온프레미스에 저장합니다.

커넥터에는 Kafka 및 기타 시스템 간 데이터 스트리밍을 위한 프레임워크인 Kafka Connect가 필요합니다. 커넥터를 사용하려면 Kafka 클러스터와 함께 Kafka Connect를 실행해야 합니다.

이 문서에서는 사용자가 Kafka와 Pub/Sub에 모두 익숙하다고 가정합니다. 이 문서를 읽기 전에 Pub/Sub 빠른 시작 중 하나를 완료하는 것이 좋습니다.

Pub/Sub 커넥터는 Google Cloud IAM과 Kafka Connect ACL 간의 통합을 지원하지 않습니다.

커넥터 시작하기

이 섹션에서는 다음 작업을 안내합니다.

  1. Pub/Sub Group Kafka Connector를 구성합니다.
  2. Kafka에서 Pub/Sub로 이벤트를 전송합니다.
  3. Pub/Sub에서 Kafka로 메시지를 전송합니다.

기본 요건

Kafka 설치

Apache Kafka 빠른 시작을 따라 로컬 머신에 단일 노드 Kafka를 설치합니다. 빠른 시작에서 다음 단계를 완료합니다.

  1. 최신 Kafka 버전을 다운로드하고 압축을 풉니다.
  2. Kafka 환경을 시작합니다.
  3. Kafka 주제를 만듭니다.

인증

Pub/Sub Group Kafka Connector는 Pub/Sub 메시지를 전송 및 수신하기 위해 Pub/Sub로 인증해야 합니다. 인증을 설정하려면 다음 단계를 수행합니다.

  1. Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Create local authentication credentials for your user account:

    gcloud auth application-default login
  6. Google 계정에 역할을 부여합니다. 다음 각 IAM 역할에 대해 다음 명령어를 한 번씩 실행합니다. roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID를 프로젝트 ID로 바꿉니다.
    • EMAIL_ADDRESS를 이메일 주소로 바꿉니다.
    • ROLE을 각 개별 역할로 바꿉니다.
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Create local authentication credentials for your user account:

    gcloud auth application-default login
  11. Google 계정에 역할을 부여합니다. 다음 각 IAM 역할에 대해 다음 명령어를 한 번씩 실행합니다. roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID를 프로젝트 ID로 바꿉니다.
    • EMAIL_ADDRESS를 이메일 주소로 바꿉니다.
    • ROLE을 각 개별 역할로 바꿉니다.

커넥터 JAR 다운로드

커넥터 JAR 파일을 로컬 머신에 다운로드합니다. 자세한 내용은 GitHub 리드미에서 커넥터 획득을 참조하세요.

커넥터 구성 파일 복사

  1. 커넥터에서 GitHub 저장소를 클론하거나 다운로드합니다.

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. config 디렉터리 콘텐츠를 Kafka 설치의 config 하위 디렉터리에 복사합니다.

    cp config/* [path to Kafka installation]/config/
    

이러한 파일에는 커넥터의 구성 설정이 포함됩니다.

Kafka Connect 구성 업데이트

  1. 다운로드한 Kafka Connect 바이너리가 포함된 디렉터리로 이동합니다.
  2. Kafka Connect 바이너리 디렉터리에서 config/connect-standalone.properties라는 파일을 텍스트 편집기로 엽니다.
  3. plugin.path property가 주석 처리된 경우 주석 처리를 삭제합니다.
  4. 커넥터 JAR의 경로를 포함하도록 plugin.path property를 업데이트합니다.

    예를 들면 다음과 같습니다.

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. offset.storage.file.filename 속성을 로컬 파일 이름으로 설정합니다. 독립형 모드에서 Kafka는 이 파일을 사용하여 오프셋 데이터를 저장합니다.

    예를 들면 다음과 같습니다.

    offset.storage.file.filename=/tmp/connect.offsets
    

Kafka에서 Pub/Sub로 이벤트 전달

이 섹션에서는 싱크 커넥터를 시작하고, Kafka에 이벤트를 게시한 후 Pub/Sub에서 전달된 메시지를 읽는 방법을 설명합니다.

  1. Google Cloud CLI를 사용하여 Pub/Sub 주제와 구독을 만듭니다.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    다음을 바꿉니다.

    • PUBSUB_TOPIC: Kafka에서 메시지를 수신할 Pub/Sub 주제의 이름입니다.
    • PUBSUB_SUBSCRIPTION: 주제의 Pub/Sub 구독 이름입니다.
  2. 텍스트 편집기에서 /config/cps-sink-connector.properties 파일을 엽니다. 다음 속성 값을 추가합니다. 주석에는 "TODO"로 표시됩니다.

    topics=KAFKA_TOPICS
    cps.project=PROJECT_ID
    cps.topic=PUBSUB_TOPIC
    

    다음을 바꿉니다.

    • KAFKA_TOPICS: 읽어올 쉼표로 구분된 Kafka 주제 목록입니다.
    • PROJECT_ID: Pub/Sub 주제가 포함된 Google Cloud 프로젝트입니다.
    • PUBSUB_TOPIC: Kafka에서 메시지를 수신할 Pub/Sub 주제입니다.
  3. Kafka 디렉터리에서 다음 명령어를 실행합니다.

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-sink-connector.properties
    
  4. Apache Kafka 빠른 시작의 단계를 따라 Kafka 주제에 일부 이벤트를 씁니다.

  5. gcloud CLI를 사용하여 Pub/Sub에서 이벤트를 읽습니다.

    gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
    

Pub/Sub에서 Kafka로 메시지 전달

이 섹션에서는 소스 커넥터를 시작하고, Pub/Sub에 메시지를 게시하고, Kafka에서 전달된 메시지를 읽는 방법을 설명합니다.

  1. gcloud CLI를 사용하여 Pub/Sub 주제와 구독을 만듭니다.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    다음을 바꿉니다.

    • PUBSUB_TOPIC: Pub/Sub 주제의 이름입니다.
    • PUBSUB_SUBSCRIPTION: Pub/Sub 구독의 이름입니다.
  2. 텍스트 편집기에서 /config/cps-source-connector.properties라는 파일을 엽니다. 다음 속성 값을 추가합니다. 주석에는 "TODO"로 표시됩니다.

    kafka.topic=KAFKA_TOPIC
    cps.project=PROJECT_ID
    cps.subscription=PUBSUB_SUBSCRIPTION
    

    다음을 바꿉니다.

    • KAFKA_TOPIC: Pub/Sub 메시지를 수신할 Kafka 주제입니다.
    • PROJECT_ID: Pub/Sub 주제가 포함된 Google Cloud 프로젝트입니다.
    • PUBSUB_TOPIC: Pub/Sub 주제입니다.
  3. Kafka 디렉터리에서 다음 명령어를 실행합니다.

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-source-connector.properties
    
  4. gcloud CLI를 사용하여 Pub/Sub에 메시지를 게시합니다.

    gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
    
  5. Kafka에서 메시지를 읽습니다. Apache Kafka 빠른 시작의 단계를 따라 Kafka 주제의 메시지를 읽습니다.

메시지 변환

Kafka 레코드는 가변 길이 바이트 배열인 키와 값을 포함합니다. 필요에 따라 Kafka 레코드에 키-값 쌍인 헤더가 있을 수도 있습니다. Pub/Sub 메시지는 메시지 본문과 0개 이상의 키-값 속성이라는 두 가지 주요 부분으로 구성됩니다.

Kafka Connect는 변환기를 사용하여 Kafka 간의 키와 값을 직렬화합니다. 직렬화를 제어하려면 커넥터 구성 파일에서 다음 속성을 설정합니다.

  • key.converter: 레코드 키를 직렬화하는 데 사용되는 변환기입니다.
  • value.converter: 레코드 값을 직렬화하는 데 사용되는 변환기입니다.

Pub/Sub 메시지 본문은 ByteString 객체이므로 가장 효율적인 변환은 페이로드를 직접 복사하는 것입니다. 따라서 가능한 경우 동일한 메시지 본문을 역직렬화 및 재직렬화하지 않도록 기본 데이터 유형(정수, 부동 소수점 수, 문자열 또는 바이트 스키마)을 생성하는 변환기를 사용하는 것이 좋습니다.

Kafka에서 Pub/Sub로 변환

싱크 커넥터는 다음과 같이 Kafka 레코드를 Pub/Sub 메시지로 변환합니다.

  • Kafka 레코드 키는 Pub/Sub 메시지에 "key"라는 속성으로 저장됩니다.
  • 기본적으로 커넥터는 Kafka 레코드의 헤더를 삭제합니다. 그러나 headers.publish 구성 옵션을 true로 설정하면 커넥터가 헤더를 Pub/Sub 속성으로 작성합니다. 커넥터는 Pub/Sub 메시지 속성 한도를 초과하는 헤더를 건너뜁니다.
  • 정수, 부동 소수점 수, 문자열, 바이트 스키마의 경우 커넥터가 Kafka 레코드 값의 바이트를 Pub/Sub 메시지 본문으로 직접 전달합니다.
  • 구조체 스키마의 경우 커넥터에서 각 필드를 Pub/Sub 메시지의 속성으로 씁니다. 예를 들어 필드가 { "id"=123 }이면 결과 Pub/Sub 메시지에 "id"="123" 속성이 포함됩니다. 필드 값은 항상 문자열로 변환됩니다. 맵 및 구조체 유형은 구조체 내에서 필드 유형으로 지원되지 않습니다.
  • 맵 스키마의 경우 커넥터에서 각 키-값 쌍을 Pub/Sub 메시지의 속성으로 씁니다. 예를 들어 맵이 {"alice"=1,"bob"=2}이면 결과 Pub/Sub 메시지에 "alice"="1""bob"="2"라는 두 가지 속성이 있습니다. 키와 값은 문자열로 변환됩니다.

구조체 및 맵 스키마에는 몇 가지 추가 동작이 있습니다.

  • 필요에 따라 messageBodyName 구성 속성을 설정하여 특정 구조체 필드 또는 맵 키를 메시지 본문으로 지정할 수 있습니다. 필드 또는 키 값은 메시지 본문에 ByteString으로 저장됩니다. messageBodyName을 설정하지 않으면 구조체 및 맵 스키마에 대한 메시지 본문이 비어 있습니다.

  • 배열 값의 경우 커넥터에서 기본 배열 유형만 지원합니다. 배열의 값 시퀀스는 단일 ByteString 객체에 연결됩니다.

Pub/Sub에서 Kafka로 변환

소스 커넥터는 다음과 같이 Pub/Sub 메시지를 Kafka 레코드로 변환합니다.

  • Kafka 레코드 키: 기본적으로 키는 null로 설정됩니다. 필요에 따라 kafka.key.attribute 구성 옵션을 설정하여 키로 사용할 Pub/Sub 메시지 속성을 지정할 수 있습니다. 이러한 경우 커넥터에서 해당 이름의 속성을 찾아 레코드 키를 속성 값으로 설정합니다. 지정된 속성이 없으면 레코드 키가 null로 설정됩니다.

  • Kafka 레코드 값: 커넥터에서 다음과 같이 레코드 값을 씁니다.

    • Pub/Sub 메시지에 커스텀 속성이 없으면 커넥터에서 value.converter에 지정된 변환기를 사용하여 Pub/Sub 메시지 본문을 Kafka 레코드 값에 직접 byte[] 유형으로 씁니다.

    • Pub/Sub 메시지에 커스텀 속성이 있고 kafka.record.headersfalse인 경우, 커넥터는 레코드 값에 구조체를 씁니다. 구조체에는 속성마다 하나의 필드가 있으며 값이 Pub/Sub 메시지 본문(바이트로 저장됨)인 "message"라는 필드가 있습니다.

      {
        "message": "<Pub/Sub message body>",
        "<attribute-1>": "<value-1>",
        "<attribute-2>": "<value-2>",
        ....
      }
      

      이 경우 struct 스키마와 호환되는 value.converter(예: org.apache.kafka.connect.json.JsonConverter)를 사용해야 합니다.

    • Pub/Sub 메시지에 커스텀 속성이 있고 kafka.record.headerstrue인 경우, 커넥터는 속성을 Kafka 레코드 헤더로 씁니다. 커넥터에서 value.converter에 지정된 변환기를 사용하여 Pub/Sub 메시지 본문을 Kafka 레코드 값에 직접 byte[] 유형으로 씁니다.

  • Kafka 레코드 헤더: kafka.record.headerstrue로 설정하지 않으면 기본적으로 헤더가 비어 있습니다.

구성 옵션

Pub/Sub Group Kafka Connector는 Kafka Connect API에서 제공하는 구성 외에도 다음 구성을 지원합니다.

싱크 커넥터 구성 옵션

싱크 커넥터는 다음 구성 옵션을 지원합니다.

설정 데이터 유형 설명
connector.class String 필수. 커넥터의 자바 클래스입니다. Pub/Sub 싱크 커넥터의 값은 com.google.pubsub.kafka.sink.CloudPubSubSinkConnector여야 합니다.
cps.endpoint String

사용할 Pub/Sub 엔드포인트입니다.

기본값: "pubsub.googleapis.com:443"

cps.project String 필수. Pub/Sub 주제가 포함된 Google Cloud입니다.
cps.topic String 필수. Kafka 레코드를 게시할 Pub/Sub 주제입니다.
gcp.credentials.file.path String 선택사항. Pub/Sub Lite 인증을 위해 Google Cloud 사용자 인증 정보를 저장하는 파일의 경로입니다.
gcp.credentials.json String 선택사항. Pub/Sub Lite 인증을 위한 Google Cloud가 포함된 JSON blob입니다.
headers.publish Boolean

true인 경우 모든 Kafka 레코드 헤더를 Pub/Sub 메시지 속성으로 포함합니다.

기본값: false

maxBufferBytes Long

주제 Kafka 파티션에서 Pub/Sub에 게시하기 전에 수신할 최대 바이트 수입니다.

기본값: 10000000

maxBufferSize Integer

Pub/Sub에 게시하기 전에 Kafka 주제 파티션에서 수신할 최대 레코드 수입니다.

기본값: 100

maxDelayThresholdMs Integer

Pub/Sub에 미해결 레코드를 게시하기 전에 maxBufferSize 또는 maxBufferBytes에 도달할 때까지 대기하는 최대 시간(밀리초)입니다.

기본값: 100

maxOutstandingMessages Long

게시자가 추가 게시를 차단하기 전에 완료되지 않고 대기 중인 배치를 포함해 미해결 상태일 수 있는 최대 레코드 수입니다.

기본값: Long.MAX_VALUE

maxOutstandingRequestBytes Long

게시자가 추가 게시를 차단하기 전에 완료되지 않고 대기 중인 배치를 포함해 미해결 상태일 수 있는 최대 총 바이트 수입니다.

기본값: Long.MAX_VALUE

maxRequestTimeoutMs Integer

Pub/Sub에 대한 개별 게시 요청 제한 시간(밀리초)입니다.

기본값: 10000

maxTotalTimeoutMs Integer

재시도를 포함하여 Pub/Sub에 게시할 호출의 총 제한 시간(밀리초)입니다.

기본값: 60000

metadata.publish Boolean

true인 경우 Kafka 주제, 파티션, 오프셋, 타임스탬프를 Pub/Sub 메시지 속성으로 포함합니다.

기본값: false

messageBodyName String

구조체 또는 맵 값 스키마를 사용할 때 Pub/Sub 메시지 본문으로 사용할 필드 또는 키의 이름을 지정합니다. Kafka에서 Pub/Sub로 변환을 참조하세요.

기본값: "cps_message_body"

orderingKeySource String

Pub/Sub 메시지에서 순서 키를 설정하는 방법을 지정합니다. 다음 값 중 하나일 수 있습니다.

  • none: 순서 키를 설정하지 않습니다.
  • key: Kafka 레코드 키를 순서 키로 사용합니다.
  • partition: 문자열로 변환된 파티션 번호를 순서 키로 사용합니다. 이 설정은 처리량이 적은 주제 또는 파티션이 수천 개인 주제에만 사용합니다.

기본값: none

topics String 필수. 읽어올 쉼표로 구분된 Kafka 주제 목록입니다.

소스 커넥터 구성 옵션

소스 커넥터는 다음 구성 옵션을 지원합니다.

설정 데이터 유형 설명
connector.class String 필수. 커넥터의 자바 클래스입니다. Pub/Sub 소스 커넥터의 값은 com.google.pubsub.kafka.source.CloudPubSubSourceConnector여야 합니다.
cps.endpoint String

사용할 Pub/Sub 엔드포인트입니다.

기본값: "pubsub.googleapis.com:443"

cps.makeOrderingKeyAttribute Boolean

true인 경우 Pub/Sub 메시지 속성과 동일한 형식을 사용하여 Kafka 레코드에 순서 키를 작성합니다. Pub/Sub에서 Kafka 레코드로 변환을 참조하세요.

기본값: false

cps.maxBatchSize Integer

Pub/Sub에 대한 pull 요청당 일괄 처리할 최대 메시지 수입니다.

기본값: 100

cps.project String 필수. Pub/Sub 주제가 포함된 Google Cloud 프로젝트입니다.
cps.subscription String 필수. 메시지를 가져올 Pub/Sub 구독의 이름입니다.
gcp.credentials.file.path String 선택사항. Pub/Sub Lite 인증을 위해 Google Cloud 사용자 인증 정보를 저장하는 파일의 경로입니다.
gcp.credentials.json String 선택사항. Pub/Sub Lite 인증을 위한 Google Cloud가 포함된 JSON blob입니다.
kafka.key.attribute String

Kafka에 게시된 메시지의 키로 사용할 Pub/Sub 메시지 속성입니다. "orderingKey"로 설정된 경우 메시지의 순서 키를 사용합니다. null인 경우 Kafka 레코드에 키가 없습니다.

기본값: null

kafka.partition.count Integer

메시지가 게시되는 Kafka 주제의 Kafka 파티션 수입니다. 파티션 스키마가 "kafka_partitioner"인 경우 이 매개변수가 무시됩니다.

기본값: 1

kafka.partition.scheme String

Kafka의 파티션에 메시지를 할당하는 스키마입니다. 다음 값 중 하나일 수 있습니다.

  • round_robin: 라운드 로빈 방식으로 파티션을 할당합니다.
  • hash_key: 레코드 키를 해싱하여 파티션을 찾습니다.
  • hash_value: 레코드 값을 해싱하여 파티션을 찾습니다.
  • kafka_partitioner: 파티셔닝 로직을 Kafka 제작자에게 위임합니다. 기본적으로 Kafka 제작자는 레코드 키 제공 여부에 따라 파티션 수를 자동으로 감지하고 MurmurHash 기반 파티션 매핑 또는 라운드 로빈을 수행합니다.
  • ordering_key: 메시지 순서 키의 해시 코드를 사용합니다. 순서 키가 없으면 round_robin을 사용합니다.

기본값: round_robin

kafka.record.headers Boolean

true이면 Pub/Sub 메시지 속성을 Kafka 헤더로 씁니다.

kafka.topic String 필수. Pub/Sub에서 메시지를 수신하는 Kafka 주제입니다.

지원 받기

도움이 필요하면 지원 티켓을 만듭니다. 일반적인 질문과 토론은 GitHub 저장소에서 문제 사례를 만드세요.

다음 단계