이 문서에서는 Pub/Sub Group Kafka Connector를 사용하여 Apache Kafka와 Pub/Sub를 통합하는 방법을 설명합니다.
Pub/Sub Group Kafka Connector 정보
Apache Kafka는 이벤트 스트리밍을 위한 오픈소스 플랫폼입니다. Kafka는 분산 아키텍처에서 느슨하게 결합된 구성요소 간의 통신을 사용 설정하기 위해 일반적으로 사용됩니다. Pub/Sub는 메시지를 비동기적으로 주고받기 위한 관리형 서비스입니다. Kafka와 마찬가지로 Pub/Sub를 사용하여 클라우드 아키텍처의 구성요소 간에 통신할 수 있습니다.
Pub/Sub Group Kafka Connector를 사용하면 이러한 두 시스템을 통합할 수 있습니다. 다음 커넥터는 Connector JAR로 패키징됩니다.
- 싱크 커넥터는 하나 이상의 Kafka 주제에서 레코드를 읽어 Pub/Sub에 게시합니다.
- 소스 커넥터는 Pub/Sub 주제에서 메시지를 읽고 Kafka에 게시합니다.
다음은 Pub/Sub Group Kafka Connector를 사용할 수 있는 몇 가지 시나리오입니다.
- Kafka 기반 아키텍처를 Google Cloud로 마이그레이션합니다.
- Google Cloud 외부의 Kafka에 이벤트를 저장하는 프런트엔드 시스템이 있지만 Google Cloud를 사용하여 Kafka 이벤트를 수신해야 할 일부 백엔드 서비스를 실행합니다.
- 온프레미스 Kafka 솔루션에서 로그를 수집하여 데이터 분석을 위해 Google Cloud로 전송합니다.
- Google Cloud를 사용하는 프런트엔드 시스템이 있지만 Kafka를 사용하여 데이터를 온프레미스에 저장합니다.
커넥터에는 Kafka 및 기타 시스템 간 데이터 스트리밍을 위한 프레임워크인 Kafka Connect가 필요합니다. 커넥터를 사용하려면 Kafka 클러스터와 함께 Kafka Connect를 실행해야 합니다.
이 문서에서는 사용자가 Kafka와 Pub/Sub에 모두 익숙하다고 가정합니다. 이 문서를 읽기 전에 Pub/Sub 빠른 시작 중 하나를 완료하는 것이 좋습니다.
Pub/Sub 커넥터는 Google Cloud IAM과 Kafka Connect ACL 간의 통합을 지원하지 않습니다.
커넥터 시작하기
이 섹션에서는 다음 작업을 안내합니다.- Pub/Sub Group Kafka Connector를 구성합니다.
- Kafka에서 Pub/Sub로 이벤트를 전송합니다.
- Pub/Sub에서 Kafka로 메시지를 전송합니다.
기본 요건
Kafka 설치
Apache Kafka 빠른 시작을 따라 로컬 머신에 단일 노드 Kafka를 설치합니다. 빠른 시작에서 다음 단계를 완료합니다.
- 최신 Kafka 버전을 다운로드하고 압축을 풉니다.
- Kafka 환경을 시작합니다.
- Kafka 주제를 만듭니다.
인증
Pub/Sub Group Kafka Connector는 Pub/Sub 메시지를 전송 및 수신하기 위해 Pub/Sub로 인증해야 합니다. 인증을 설정하려면 다음 단계를 수행합니다.
- Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Google 계정에 역할을 부여합니다. 다음 각 IAM 역할에 대해 다음 명령어를 한 번씩 실행합니다.
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
PROJECT_ID
를 프로젝트 ID로 바꿉니다.EMAIL_ADDRESS
를 이메일 주소로 바꿉니다.ROLE
을 각 개별 역할로 바꿉니다.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Google 계정에 역할을 부여합니다. 다음 각 IAM 역할에 대해 다음 명령어를 한 번씩 실행합니다.
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
PROJECT_ID
를 프로젝트 ID로 바꿉니다.EMAIL_ADDRESS
를 이메일 주소로 바꿉니다.ROLE
을 각 개별 역할로 바꿉니다.
커넥터 JAR 다운로드
커넥터 JAR 파일을 로컬 머신에 다운로드합니다. 자세한 내용은 GitHub 리드미에서 커넥터 획득을 참조하세요.
커넥터 구성 파일 복사
커넥터에서 GitHub 저장소를 클론하거나 다운로드합니다.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
config
디렉터리 콘텐츠를 Kafka 설치의config
하위 디렉터리에 복사합니다.cp config/* [path to Kafka installation]/config/
이러한 파일에는 커넥터의 구성 설정이 포함됩니다.
Kafka Connect 구성 업데이트
- 다운로드한 Kafka Connect 바이너리가 포함된 디렉터리로 이동합니다.
- Kafka Connect 바이너리 디렉터리에서
config/connect-standalone.properties
라는 파일을 텍스트 편집기로 엽니다. plugin.path property
가 주석 처리된 경우 주석 처리를 삭제합니다.커넥터 JAR의 경로를 포함하도록
plugin.path property
를 업데이트합니다.예를 들면 다음과 같습니다.
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
offset.storage.file.filename
속성을 로컬 파일 이름으로 설정합니다. 독립형 모드에서 Kafka는 이 파일을 사용하여 오프셋 데이터를 저장합니다.예를 들면 다음과 같습니다.
offset.storage.file.filename=/tmp/connect.offsets
Kafka에서 Pub/Sub로 이벤트 전달
이 섹션에서는 싱크 커넥터를 시작하고, Kafka에 이벤트를 게시한 후 Pub/Sub에서 전달된 메시지를 읽는 방법을 설명합니다.
Google Cloud CLI를 사용하여 Pub/Sub 주제와 구독을 만듭니다.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
다음을 바꿉니다.
- PUBSUB_TOPIC: Kafka에서 메시지를 수신할 Pub/Sub 주제의 이름입니다.
- PUBSUB_SUBSCRIPTION: 주제의 Pub/Sub 구독 이름입니다.
텍스트 편집기에서
/config/cps-sink-connector.properties
파일을 엽니다. 다음 속성 값을 추가합니다. 주석에는"TODO"
로 표시됩니다.topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
다음을 바꿉니다.
- KAFKA_TOPICS: 읽어올 쉼표로 구분된 Kafka 주제 목록입니다.
- PROJECT_ID: Pub/Sub 주제가 포함된 Google Cloud 프로젝트입니다.
- PUBSUB_TOPIC: Kafka에서 메시지를 수신할 Pub/Sub 주제입니다.
Kafka 디렉터리에서 다음 명령어를 실행합니다.
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties
Apache Kafka 빠른 시작의 단계를 따라 Kafka 주제에 일부 이벤트를 씁니다.
gcloud CLI를 사용하여 Pub/Sub에서 이벤트를 읽습니다.
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
Pub/Sub에서 Kafka로 메시지 전달
이 섹션에서는 소스 커넥터를 시작하고, Pub/Sub에 메시지를 게시하고, Kafka에서 전달된 메시지를 읽는 방법을 설명합니다.
gcloud CLI를 사용하여 Pub/Sub 주제와 구독을 만듭니다.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
다음을 바꿉니다.
- PUBSUB_TOPIC: Pub/Sub 주제의 이름입니다.
- PUBSUB_SUBSCRIPTION: Pub/Sub 구독의 이름입니다.
텍스트 편집기에서
/config/cps-source-connector.properties
라는 파일을 엽니다. 다음 속성 값을 추가합니다. 주석에는"TODO"
로 표시됩니다.kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
다음을 바꿉니다.
- KAFKA_TOPIC: Pub/Sub 메시지를 수신할 Kafka 주제입니다.
- PROJECT_ID: Pub/Sub 주제가 포함된 Google Cloud 프로젝트입니다.
- PUBSUB_TOPIC: Pub/Sub 주제입니다.
Kafka 디렉터리에서 다음 명령어를 실행합니다.
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties
gcloud CLI를 사용하여 Pub/Sub에 메시지를 게시합니다.
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
Kafka에서 메시지를 읽습니다. Apache Kafka 빠른 시작의 단계를 따라 Kafka 주제의 메시지를 읽습니다.
메시지 변환
Kafka 레코드는 가변 길이 바이트 배열인 키와 값을 포함합니다. 필요에 따라 Kafka 레코드에 키-값 쌍인 헤더가 있을 수도 있습니다. Pub/Sub 메시지는 메시지 본문과 0개 이상의 키-값 속성이라는 두 가지 주요 부분으로 구성됩니다.
Kafka Connect는 변환기를 사용하여 Kafka 간의 키와 값을 직렬화합니다. 직렬화를 제어하려면 커넥터 구성 파일에서 다음 속성을 설정합니다.
key.converter
: 레코드 키를 직렬화하는 데 사용되는 변환기입니다.value.converter
: 레코드 값을 직렬화하는 데 사용되는 변환기입니다.
Pub/Sub 메시지 본문은 ByteString
객체이므로 가장 효율적인 변환은 페이로드를 직접 복사하는 것입니다. 따라서 가능한 경우 동일한 메시지 본문을 역직렬화 및 재직렬화하지 않도록 기본 데이터 유형(정수, 부동 소수점 수, 문자열 또는 바이트 스키마)을 생성하는 변환기를 사용하는 것이 좋습니다.
Kafka에서 Pub/Sub로 변환
싱크 커넥터는 다음과 같이 Kafka 레코드를 Pub/Sub 메시지로 변환합니다.
- Kafka 레코드 키는 Pub/Sub 메시지에
"key"
라는 속성으로 저장됩니다. - 기본적으로 커넥터는 Kafka 레코드의 헤더를 삭제합니다. 그러나
headers.publish
구성 옵션을true
로 설정하면 커넥터가 헤더를 Pub/Sub 속성으로 작성합니다. 커넥터는 Pub/Sub 메시지 속성 한도를 초과하는 헤더를 건너뜁니다. - 정수, 부동 소수점 수, 문자열, 바이트 스키마의 경우 커넥터가 Kafka 레코드 값의 바이트를 Pub/Sub 메시지 본문으로 직접 전달합니다.
- 구조체 스키마의 경우 커넥터에서 각 필드를 Pub/Sub 메시지의 속성으로 씁니다. 예를 들어 필드가
{ "id"=123 }
이면 결과 Pub/Sub 메시지에"id"="123"
속성이 포함됩니다. 필드 값은 항상 문자열로 변환됩니다. 맵 및 구조체 유형은 구조체 내에서 필드 유형으로 지원되지 않습니다. - 맵 스키마의 경우 커넥터에서 각 키-값 쌍을 Pub/Sub 메시지의 속성으로 씁니다. 예를 들어 맵이
{"alice"=1,"bob"=2}
이면 결과 Pub/Sub 메시지에"alice"="1"
및"bob"="2"
라는 두 가지 속성이 있습니다. 키와 값은 문자열로 변환됩니다.
구조체 및 맵 스키마에는 몇 가지 추가 동작이 있습니다.
필요에 따라
messageBodyName
구성 속성을 설정하여 특정 구조체 필드 또는 맵 키를 메시지 본문으로 지정할 수 있습니다. 필드 또는 키 값은 메시지 본문에ByteString
으로 저장됩니다.messageBodyName
을 설정하지 않으면 구조체 및 맵 스키마에 대한 메시지 본문이 비어 있습니다.배열 값의 경우 커넥터에서 기본 배열 유형만 지원합니다. 배열의 값 시퀀스는 단일
ByteString
객체에 연결됩니다.
Pub/Sub에서 Kafka로 변환
소스 커넥터는 다음과 같이 Pub/Sub 메시지를 Kafka 레코드로 변환합니다.
Kafka 레코드 키: 기본적으로 키는
null
로 설정됩니다. 필요에 따라kafka.key.attribute
구성 옵션을 설정하여 키로 사용할 Pub/Sub 메시지 속성을 지정할 수 있습니다. 이러한 경우 커넥터에서 해당 이름의 속성을 찾아 레코드 키를 속성 값으로 설정합니다. 지정된 속성이 없으면 레코드 키가null
로 설정됩니다.Kafka 레코드 값: 커넥터에서 다음과 같이 레코드 값을 씁니다.
Pub/Sub 메시지에 커스텀 속성이 없으면 커넥터에서
value.converter
에 지정된 변환기를 사용하여 Pub/Sub 메시지 본문을 Kafka 레코드 값에 직접byte[]
유형으로 씁니다.Pub/Sub 메시지에 커스텀 속성이 있고
kafka.record.headers
가false
인 경우, 커넥터는 레코드 값에 구조체를 씁니다. 구조체에는 속성마다 하나의 필드가 있으며 값이 Pub/Sub 메시지 본문(바이트로 저장됨)인"message"
라는 필드가 있습니다.{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }
이 경우
struct
스키마와 호환되는value.converter
(예:org.apache.kafka.connect.json.JsonConverter
)를 사용해야 합니다.Pub/Sub 메시지에 커스텀 속성이 있고
kafka.record.headers
가true
인 경우, 커넥터는 속성을 Kafka 레코드 헤더로 씁니다. 커넥터에서value.converter
에 지정된 변환기를 사용하여 Pub/Sub 메시지 본문을 Kafka 레코드 값에 직접byte[]
유형으로 씁니다.
Kafka 레코드 헤더:
kafka.record.headers
를true
로 설정하지 않으면 기본적으로 헤더가 비어 있습니다.
구성 옵션
Pub/Sub Group Kafka Connector는 Kafka Connect API에서 제공하는 구성 외에도 다음 구성을 지원합니다.
싱크 커넥터 구성 옵션
싱크 커넥터는 다음 구성 옵션을 지원합니다.
설정 | 데이터 유형 | 설명 |
---|---|---|
connector.class |
String |
필수. 커넥터의 자바 클래스입니다. Pub/Sub 싱크 커넥터의 값은 com.google.pubsub.kafka.sink.CloudPubSubSinkConnector 여야 합니다.
|
cps.endpoint |
String |
사용할 Pub/Sub 엔드포인트입니다. 기본값: |
cps.project |
String |
필수. Pub/Sub 주제가 포함된 Google Cloud입니다. |
cps.topic |
String |
필수. Kafka 레코드를 게시할 Pub/Sub 주제입니다. |
gcp.credentials.file.path |
String |
선택사항. Pub/Sub Lite 인증을 위해 Google Cloud 사용자 인증 정보를 저장하는 파일의 경로입니다. |
gcp.credentials.json |
String |
선택사항. Pub/Sub Lite 인증을 위한 Google Cloud가 포함된 JSON blob입니다. |
headers.publish |
Boolean |
기본값: |
maxBufferBytes |
Long |
주제 Kafka 파티션에서 Pub/Sub에 게시하기 전에 수신할 최대 바이트 수입니다. 기본값: 10000000 |
maxBufferSize |
Integer |
Pub/Sub에 게시하기 전에 Kafka 주제 파티션에서 수신할 최대 레코드 수입니다. 기본값: 100 |
maxDelayThresholdMs |
Integer |
Pub/Sub에 미해결 레코드를 게시하기 전에 기본값: 100 |
maxOutstandingMessages |
Long |
게시자가 추가 게시를 차단하기 전에 완료되지 않고 대기 중인 배치를 포함해 미해결 상태일 수 있는 최대 레코드 수입니다. 기본값: |
maxOutstandingRequestBytes |
Long |
게시자가 추가 게시를 차단하기 전에 완료되지 않고 대기 중인 배치를 포함해 미해결 상태일 수 있는 최대 총 바이트 수입니다. 기본값: |
maxRequestTimeoutMs |
Integer |
Pub/Sub에 대한 개별 게시 요청 제한 시간(밀리초)입니다. 기본값: 10000 |
maxTotalTimeoutMs |
Integer |
재시도를 포함하여 Pub/Sub에 게시할 호출의 총 제한 시간(밀리초)입니다. 기본값: 60000 |
metadata.publish |
Boolean |
기본값: |
messageBodyName |
String |
구조체 또는 맵 값 스키마를 사용할 때 Pub/Sub 메시지 본문으로 사용할 필드 또는 키의 이름을 지정합니다. Kafka에서 Pub/Sub로 변환을 참조하세요. 기본값: |
orderingKeySource |
String |
Pub/Sub 메시지에서 순서 키를 설정하는 방법을 지정합니다. 다음 값 중 하나일 수 있습니다.
기본값: |
topics |
String |
필수. 읽어올 쉼표로 구분된 Kafka 주제 목록입니다. |
소스 커넥터 구성 옵션
소스 커넥터는 다음 구성 옵션을 지원합니다.
설정 | 데이터 유형 | 설명 |
---|---|---|
connector.class |
String |
필수. 커넥터의 자바 클래스입니다. Pub/Sub 소스 커넥터의 값은 com.google.pubsub.kafka.source.CloudPubSubSourceConnector 여야 합니다.
|
cps.endpoint |
String |
사용할 Pub/Sub 엔드포인트입니다. 기본값: |
cps.makeOrderingKeyAttribute |
Boolean |
기본값: |
cps.maxBatchSize |
Integer |
Pub/Sub에 대한 pull 요청당 일괄 처리할 최대 메시지 수입니다. 기본값: 100 |
cps.project |
String |
필수. Pub/Sub 주제가 포함된 Google Cloud 프로젝트입니다. |
cps.subscription |
String |
필수. 메시지를 가져올 Pub/Sub 구독의 이름입니다. |
gcp.credentials.file.path |
String |
선택사항. Pub/Sub Lite 인증을 위해 Google Cloud 사용자 인증 정보를 저장하는 파일의 경로입니다. |
gcp.credentials.json |
String |
선택사항. Pub/Sub Lite 인증을 위한 Google Cloud가 포함된 JSON blob입니다. |
kafka.key.attribute |
String |
Kafka에 게시된 메시지의 키로 사용할 Pub/Sub 메시지 속성입니다. 기본값: |
kafka.partition.count |
Integer |
메시지가 게시되는 Kafka 주제의 Kafka 파티션 수입니다. 파티션 스키마가 기본값: 1 |
kafka.partition.scheme |
String |
Kafka의 파티션에 메시지를 할당하는 스키마입니다. 다음 값 중 하나일 수 있습니다.
기본값: |
kafka.record.headers |
Boolean |
|
kafka.topic |
String |
필수. Pub/Sub에서 메시지를 수신하는 Kafka 주제입니다. |
지원 받기
도움이 필요하면 지원 티켓을 만듭니다. 일반적인 질문과 토론은 GitHub 저장소에서 문제 사례를 만드세요.
다음 단계
- Kafka와 Pub/Sub 사이의 차이점 이해
- Pub/Sub Group Kafka Connector 자세히 알아보기
- Pub/Sub Group Kafka Connector GitHub 저장소 알아보기