Pub/Sub를 Apache Kafka에 연결

이 문서에서는 Pub/Sub Group Kafka Connector를 사용하여 Apache Kafka와 Pub/Sub를 통합하는 방법을 설명합니다.

Pub/Sub Group Kafka Connector 정보

Apache Kafka는 이벤트 스트리밍을 위한 오픈소스 플랫폼입니다. Kafka는 분산 아키텍처에서 느슨하게 결합된 구성요소 간의 통신을 사용 설정하기 위해 일반적으로 사용됩니다. Pub/Sub는 메시지를 비동기적으로 주고받기 위한 관리형 서비스입니다. Kafka와 마찬가지로 Pub/Sub를 사용하여 클라우드 아키텍처의 구성요소 간에 통신할 수 있습니다.

Pub/Sub Group Kafka Connector를 사용하면 이러한 두 시스템을 통합할 수 있습니다. 다음 커넥터는 Connector JAR로 패키징됩니다.

  • 싱크 커넥터는 하나 이상의 Kafka 주제에서 레코드를 읽어 Pub/Sub에 게시합니다.
  • 소스 커넥터는 Pub/Sub 주제에서 메시지를 읽고 Kafka에 게시합니다.

다음은 Pub/Sub Group Kafka Connector를 사용할 수 있는 몇 가지 시나리오입니다.

  • Kafka 기반 아키텍처를 Google Cloud로 마이그레이션합니다.
  • Google Cloud 외부의 Kafka에 이벤트를 저장하는 프런트엔드 시스템이 있지만 Google Cloud를 사용하여 Kafka 이벤트를 수신해야 할 일부 백엔드 서비스를 실행합니다.
  • 온프레미스 Kafka 솔루션에서 로그를 수집하여 데이터 분석을 위해 Google Cloud로 전송합니다.
  • Google Cloud를 사용하는 프런트엔드 시스템이 있지만 Kafka를 사용하여 데이터를 온프레미스에 저장합니다.

커넥터에는 Kafka 및 기타 시스템 간 데이터 스트리밍을 위한 프레임워크인 Kafka Connect가 필요합니다. 커넥터를 사용하려면 Kafka 클러스터와 함께 Kafka Connect를 실행해야 합니다.

이 문서에서는 사용자가 Kafka와 Pub/Sub에 모두 익숙하다고 가정합니다. 이 문서를 읽기 전에 Pub/Sub 빠른 시작 중 하나를 완료하는 것이 좋습니다.

Pub/Sub 커넥터는 Google Cloud IAM과 Kafka Connect ACL 간의 통합을 지원하지 않습니다.

커넥터 시작하기

이 섹션에서는 다음 작업을 안내합니다.

  1. Pub/Sub Group Kafka Connector를 구성합니다.
  2. Kafka에서 Pub/Sub로 이벤트를 전송합니다.
  3. Pub/Sub에서 Kafka로 메시지를 전송합니다.

기본 요건

Kafka 설치

Apache Kafka 빠른 시작을 따라 로컬 머신에 단일 노드 Kafka를 설치합니다. 빠른 시작에서 다음 단계를 완료합니다.

  1. 최신 Kafka 버전을 다운로드하고 압축을 풉니다.
  2. Kafka 환경을 시작합니다.
  3. Kafka 주제를 만듭니다.

인증

Pub/Sub Group Kafka Connector는 Pub/Sub 메시지를 전송 및 수신하기 위해 Pub/Sub로 인증해야 합니다. 인증을 설정하려면 다음 단계를 수행합니다.

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Create local authentication credentials for your user account:

    gcloud auth application-default login
  6. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Create local authentication credentials for your user account:

    gcloud auth application-default login
  11. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.

커넥터 JAR 다운로드

커넥터 JAR 파일을 로컬 머신에 다운로드합니다. 자세한 내용은 GitHub 리드미에서 커넥터 획득을 참조하세요.

커넥터 구성 파일 복사

  1. 커넥터에서 GitHub 저장소를 클론하거나 다운로드합니다.

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. config 디렉터리 콘텐츠를 Kafka 설치의 config 하위 디렉터리에 복사합니다.

    cp config/* [path to Kafka installation]/config/
    

이러한 파일에는 커넥터의 구성 설정이 포함됩니다.

Kafka Connect 구성 업데이트

  1. 다운로드한 Kafka Connect 바이너리가 포함된 디렉터리로 이동합니다.
  2. Kafka Connect 바이너리 디렉터리에서 config/connect-standalone.properties라는 파일을 텍스트 편집기로 엽니다.
  3. plugin.path property가 주석 처리된 경우 주석 처리를 삭제합니다.
  4. 커넥터 JAR의 경로를 포함하도록 plugin.path property를 업데이트합니다.

    예를 들면 다음과 같습니다.

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. offset.storage.file.filename 속성을 로컬 파일 이름으로 설정합니다. 독립형 모드에서 Kafka는 이 파일을 사용하여 오프셋 데이터를 저장합니다.

    예를 들면 다음과 같습니다.

    offset.storage.file.filename=/tmp/connect.offsets
    

Kafka에서 Pub/Sub로 이벤트 전달

이 섹션에서는 싱크 커넥터를 시작하고, Kafka에 이벤트를 게시한 후 Pub/Sub에서 전달된 메시지를 읽는 방법을 설명합니다.

  1. Google Cloud CLI를 사용하여 Pub/Sub 주제와 구독을 만듭니다.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

    다음을 바꿉니다.

    • PUBSUB_TOPIC: Kafka에서 메시지를 수신할 Pub/Sub 주제의 이름입니다.
    • PUBSUB_SUBSCRIPTION: 주제의 Pub/Sub 구독 이름입니다.
  2. 텍스트 편집기에서 /config/cps-sink-connector.properties 파일을 엽니다. 다음 속성 값을 추가합니다. 주석에는 "TODO"로 표시됩니다.

    topics=KAFKA_TOPICS
    cps.project=PROJECT_ID
    cps.topic=PUBSUB_TOPIC

    다음을 바꿉니다.

    • KAFKA_TOPICS: 읽어올 쉼표로 구분된 Kafka 주제 목록입니다.
    • PROJECT_ID: Pub/Sub 주제가 포함된 Google Cloud 프로젝트입니다.
    • PUBSUB_TOPIC: Kafka에서 메시지를 수신할 Pub/Sub 주제입니다.
  3. Kafka 디렉터리에서 다음 명령어를 실행합니다.

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-sink-connector.properties
    
  4. Apache Kafka 빠른 시작의 단계를 따라 Kafka 주제에 일부 이벤트를 씁니다.

  5. gcloud CLI를 사용하여 Pub/Sub에서 이벤트를 읽습니다.

    gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack

Pub/Sub에서 Kafka로 메시지 전달

이 섹션에서는 소스 커넥터를 시작하고, Pub/Sub에 메시지를 게시하고, Kafka에서 전달된 메시지를 읽는 방법을 설명합니다.

  1. gcloud CLI를 사용하여 Pub/Sub 주제와 구독을 만듭니다.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

    다음을 바꿉니다.

    • PUBSUB_TOPIC: Pub/Sub 주제의 이름입니다.
    • PUBSUB_SUBSCRIPTION: Pub/Sub 구독의 이름입니다.
  2. 텍스트 편집기에서 /config/cps-source-connector.properties라는 파일을 엽니다. 다음 속성 값을 추가합니다. 주석에는 "TODO"로 표시됩니다.

    kafka.topic=KAFKA_TOPIC
    cps.project=PROJECT_ID
    cps.subscription=PUBSUB_SUBSCRIPTION

    다음을 바꿉니다.

    • KAFKA_TOPIC: Pub/Sub 메시지를 수신할 Kafka 주제입니다.
    • PROJECT_ID: Pub/Sub 주제가 포함된 Google Cloud 프로젝트입니다.
    • PUBSUB_TOPIC: Pub/Sub 주제입니다.
  3. Kafka 디렉터리에서 다음 명령어를 실행합니다.

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-source-connector.properties
    
  4. gcloud CLI를 사용하여 Pub/Sub에 메시지를 게시합니다.

    gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
  5. Kafka에서 메시지를 읽습니다. Apache Kafka 빠른 시작의 단계를 따라 Kafka 주제의 메시지를 읽습니다.

메시지 변환

Kafka 레코드는 가변 길이 바이트 배열인 키와 값을 포함합니다. 필요에 따라 Kafka 레코드에 키-값 쌍인 헤더가 있을 수도 있습니다. Pub/Sub 메시지는 메시지 본문과 0개 이상의 키-값 속성이라는 두 가지 주요 부분으로 구성됩니다.

Kafka Connect는 변환기를 사용하여 Kafka 간의 키와 값을 직렬화합니다. 직렬화를 제어하려면 커넥터 구성 파일에서 다음 속성을 설정합니다.

  • key.converter: 레코드 키를 직렬화하는 데 사용되는 변환기입니다.
  • value.converter: 레코드 값을 직렬화하는 데 사용되는 변환기입니다.

Pub/Sub 메시지 본문은 ByteString 객체이므로 가장 효율적인 변환은 페이로드를 직접 복사하는 것입니다. 따라서 가능한 경우 동일한 메시지 본문을 역직렬화 및 재직렬화하지 않도록 기본 데이터 유형(정수, 부동 소수점 수, 문자열 또는 바이트 스키마)을 생성하는 변환기를 사용하는 것이 좋습니다.

Kafka에서 Pub/Sub로 변환

싱크 커넥터는 다음과 같이 Kafka 레코드를 Pub/Sub 메시지로 변환합니다.

  • Kafka 레코드 키는 Pub/Sub 메시지에 "key"라는 속성으로 저장됩니다.
  • 기본적으로 커넥터는 Kafka 레코드의 헤더를 삭제합니다. 그러나 headers.publish 구성 옵션을 true로 설정하면 커넥터가 헤더를 Pub/Sub 속성으로 작성합니다. 커넥터는 Pub/Sub 메시지 속성 한도를 초과하는 헤더를 건너뜁니다.
  • 정수, 부동 소수점 수, 문자열, 바이트 스키마의 경우 커넥터가 Kafka 레코드 값의 바이트를 Pub/Sub 메시지 본문으로 직접 전달합니다.
  • 구조체 스키마의 경우 커넥터에서 각 필드를 Pub/Sub 메시지의 속성으로 씁니다. 예를 들어 필드가 { "id"=123 }이면 결과 Pub/Sub 메시지에 "id"="123" 속성이 포함됩니다. 필드 값은 항상 문자열로 변환됩니다. 맵 및 구조체 유형은 구조체 내에서 필드 유형으로 지원되지 않습니다.
  • 맵 스키마의 경우 커넥터에서 각 키-값 쌍을 Pub/Sub 메시지의 속성으로 씁니다. 예를 들어 맵이 {"alice"=1,"bob"=2}이면 결과 Pub/Sub 메시지에 "alice"="1""bob"="2"라는 두 가지 속성이 있습니다. 키와 값은 문자열로 변환됩니다.

구조체 및 맵 스키마에는 몇 가지 추가 동작이 있습니다.

  • 필요에 따라 messageBodyName 구성 속성을 설정하여 특정 구조체 필드 또는 맵 키를 메시지 본문으로 지정할 수 있습니다. 필드 또는 키 값은 메시지 본문에 ByteString으로 저장됩니다. messageBodyName을 설정하지 않으면 구조체 및 맵 스키마에 대한 메시지 본문이 비어 있습니다.

  • 배열 값의 경우 커넥터에서 기본 배열 유형만 지원합니다. 배열의 값 시퀀스는 단일 ByteString 객체에 연결됩니다.

Pub/Sub에서 Kafka로 변환

소스 커넥터는 다음과 같이 Pub/Sub 메시지를 Kafka 레코드로 변환합니다.

  • Kafka 레코드 키: 기본적으로 키는 null로 설정됩니다. 필요에 따라 kafka.key.attribute 구성 옵션을 설정하여 키로 사용할 Pub/Sub 메시지 속성을 지정할 수 있습니다. 이러한 경우 커넥터에서 해당 이름의 속성을 찾아 레코드 키를 속성 값으로 설정합니다. 지정된 속성이 없으면 레코드 키가 null로 설정됩니다.

  • Kafka 레코드 값: 커넥터에서 다음과 같이 레코드 값을 씁니다.

    • Pub/Sub 메시지에 커스텀 속성이 없으면 커넥터에서 value.converter에 지정된 변환기를 사용하여 Pub/Sub 메시지 본문을 Kafka 레코드 값에 직접 byte[] 유형으로 씁니다.

    • Pub/Sub 메시지에 커스텀 속성이 있고 kafka.record.headersfalse인 경우, 커넥터는 레코드 값에 구조체를 씁니다. 구조체에는 속성마다 하나의 필드가 있으며 값이 Pub/Sub 메시지 본문(바이트로 저장됨)인 "message"라는 필드가 있습니다.

      {
        "message": "<Pub/Sub message body>",
        "<attribute-1>": "<value-1>",
        "<attribute-2>": "<value-2>",
        ....
      }
      

      이 경우 struct 스키마와 호환되는 value.converter(예: org.apache.kafka.connect.json.JsonConverter)를 사용해야 합니다.

    • Pub/Sub 메시지에 커스텀 속성이 있고 kafka.record.headerstrue인 경우, 커넥터는 속성을 Kafka 레코드 헤더로 씁니다. 커넥터에서 value.converter에 지정된 변환기를 사용하여 Pub/Sub 메시지 본문을 Kafka 레코드 값에 직접 byte[] 유형으로 씁니다.

  • Kafka 레코드 헤더: kafka.record.headerstrue로 설정하지 않으면 기본적으로 헤더가 비어 있습니다.

구성 옵션

Pub/Sub Group Kafka Connector는 Kafka Connect API에서 제공하는 구성 외에도 Pub/Sub 커넥터 구성에 설명된 대로 싱크 및 소스 구성을 지원합니다.

지원 받기

도움이 필요하면 지원 티켓을 만듭니다. 일반적인 질문과 토론은 GitHub 저장소에서 문제 사례를 만드세요.

다음 단계