이 페이지에서는 Dataflow 파이프라인에서 Apache Kafka용 Google Cloud 관리 서비스를 소스 또는 싱크로 사용하는 방법을 설명합니다.
다음 방법 중 하나를 사용할 수 있습니다.
요구사항
프로젝트에서 Cloud Storage, Dataflow, Managed Service for Apache Kafka API를 사용 설정합니다. API 사용 설정을 참고하거나 다음 Google Cloud CLI 명령어를 실행하세요.
gcloud services enable dataflow.googleapis.com managedkafka.googleapis.com storage-component.googleapis.com
Dataflow 작업자 서비스 계정에는 관리형 Kafka 클라이언트 (
roles/managedkafka.client
) Identity and Access Management (IAM) 역할이 있어야 합니다.Dataflow 작업자 VM이 Kafka 부트스트랩 서버에 대한 네트워크 액세스 권한이 있어야 합니다. 자세한 내용은 Apache Kafka용 관리형 서비스 네트워킹 구성을 참고하세요.
부트스트랩 서버 주소 가져오기
Apache Kafka용 관리형 서비스 클러스터에 연결되는 파이프라인을 실행하려면 먼저 클러스터의 부트스트랩 서버 주소를 가져옵니다. 파이프라인을 구성할 때 이 주소가 필요합니다.
다음과 같이 Google Cloud 콘솔 또는 Google Cloud CLI를 사용할 수 있습니다.
콘솔
Google Cloud 콘솔에서 클러스터 페이지로 이동합니다.
클러스터 이름을 클릭합니다.
구성 탭을 클릭합니다.
부트스트랩 URL에서 부트스트랩 서버 주소를 복사합니다.
gcloud
managed-kafka clusters describe
명령어를 사용합니다.
gcloud managed-kafka clusters describe CLUSTER_ID \
--location=LOCATION \
--format="value(bootstrapAddress)"
다음을 바꿉니다.
- CLUSTER_ID: 클러스터의 ID 또는 이름
- LOCATION: 클러스터의 위치
자세한 내용은 Apache Kafka용 관리형 서비스 클러스터 보기를 참고하세요.
Dataflow 템플릿과 함께 Apache Kafka용 관리형 서비스 사용
Google은 Apache Kafka에서 읽어오는 여러 Dataflow 템플릿을 제공합니다.
이 템플릿은 Apache Kafka용 관리형 서비스와 함께 사용할 수 있습니다. 이러한 파이프라인 중 하나가 사용 사례와 일치하는 경우 맞춤 파이프라인 코드를 작성하는 대신 해당 파이프라인을 사용하는 것이 좋습니다.
콘솔
Dataflow > 작업 페이지로 이동합니다.
템플릿에서 작업 만들기를 클릭합니다.
작업 이름에 작업 이름을 입력합니다.
Dataflow 템플릿 드롭다운 메뉴에서 실행할 템플릿을 선택합니다.
Kafka 부트스트랩 서버 상자에 부트스트랩 서버 주소를 입력합니다.
Kafka 주제 상자에 주제 이름을 입력합니다.
Kafka 인증 모드에서 APPLICATION_DEFAULT_CREDENTIALS를 선택합니다.
Kafka 메시지 형식에서 Apache Kafka 메시지의 형식을 선택합니다.
필요에 따라 다른 매개변수를 입력합니다. 지원되는 매개변수는 각 템플릿에 대해 문서화되어 있습니다.
작업 실행
gcloud
gcloud dataflow jobs run
명령어를 사용합니다.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://TEMPLATE_FILE \
--region REGION_NAME \
--parameters \
readBootstrapServerAndTopic=projects/PROJECT_NAME/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS\,
# Other parameters, depending on the template
# ...
다음을 바꿉니다.
- JOB_NAME: 작업 이름
- TEMPLATE_FILE: Cloud Storage의 템플릿 파일 위치입니다.
- REGION_NAME: 작업을 배포할 리전
- PROJECT_NAME: Google Cloud 프로젝트의 이름
- LOCATION: 클러스터의 위치
- CLUSTER_ID: 클러스터의 ID 또는 이름
- TOPIC: Kafka 주제의 이름
Beam 파이프라인에서 Apache Kafka용 관리형 서비스 사용
이 섹션에서는 Apache Beam SDK를 사용하여 Managed Service for Apache Kafka에 연결되는 Dataflow 파이프라인을 만들고 실행하는 방법을 설명합니다.
대부분의 시나리오에서는 관리형 I/O 변환을 Kafka 소스 또는 싱크로 사용합니다. 고급 성능 조정이 필요한 경우 KafkaIO
커넥터를 사용하는 것이 좋습니다.
관리형 I/O 사용의 이점에 대한 자세한 내용은 Dataflow 관리형 I/O를 참고하세요.
요구사항
Kafka 클라이언트 버전 3.6.0 이상
Apache Beam SDK 버전 2.61.0 이상
Dataflow 작업을 시작하는 머신이 Apache Kafka 부트스트랩 서버에 대한 네트워크 액세스 권한이 있어야 합니다. 예를 들어 클러스터에 연결할 수 있는 VPC에 액세스할 수 있는 Compute Engine 인스턴스에서 작업을 시작합니다.
작업을 만드는 주 구성원에게 다음 IAM 역할이 있어야 합니다.
- Apache Kafka 클러스터에 액세스하는 관리형 Kafka 클라이언트 (
roles/managedkafka.client
) - Dataflow 작업자 서비스 계정으로 작동하는 서비스 계정 사용자 (
roles/iam.serviceAccountUser
) - Cloud Storage에 작업 파일을 업로드하는 스토리지 관리자 (
roles/storage.admin
) - Dataflow 관리자 (
roles/dataflow.admin
)가 작업을 만듭니다.
Compute Engine 인스턴스에서 작업을 시작하는 경우 VM에 연결된 서비스 계정에 이러한 역할을 부여할 수 있습니다. 자세한 내용은 사용자 관리형 서비스 계정을 사용하는 VM 만들기를 참조하세요.
작업을 만들 때 서비스 계정 가장과 함께 애플리케이션 기본 사용자 인증 정보 (ADC)를 사용할 수도 있습니다.
- Apache Kafka 클러스터에 액세스하는 관리형 Kafka 클라이언트 (
관리형 I/O 구성
파이프라인에서 Apache Kafka용 관리형 I/O를 사용하는 경우 다음 구성 옵션을 설정하여 Apache Kafka용 관리형 서비스로 인증합니다.
"security.protocol"
:"SASL_SSL"
"sasl.mechanism"
:"OAUTHBEARER"
"sasl.login.callback.handler.class"
:"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
"sasl.jaas.config"
:"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
다음 예에서는 Apache Kafka용 관리형 서비스의 관리형 I/O를 구성하는 방법을 보여줍니다.
자바
// Create configuration parameters for the Managed I/O transform.
ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
.put("bootstrap_servers", options.getBootstrapServer())
.put("topic", options.getTopic())
.put("data_format", "RAW")
// Set the following fields to authenticate with Application Default
// Credentials (ADC):
.put("security.protocol", "SASL_SSL")
.put("sasl.mechanism", "OAUTHBEARER")
.put("sasl.login.callback.handler.class",
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler")
.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
.build();
Python
pipeline
| beam.managed.Read(
beam.managed.KAFKA,
config={
"bootstrap_servers": options.bootstrap_server,
"topic": options.topic,
"data_format": "RAW",
# Set the following fields to authenticate with Application Default
# Credentials (ADC):
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class":
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
"sasl.jaas.config":
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
}
)
KafkaIO
커넥터 구성
다음 예에서는 Apache Kafka용 관리형 서비스의 KafkaIO
커넥터를 구성하는 방법을 보여줍니다.
자바
String bootstap = options.getBootstrap();
String topicName = options.getTopic();
// Read from Kafka
pipeline.apply(KafkaIO.<String, String>read()
.withBootstrapServers(bootstap)
.withTopic(topicName)
.withKeyDeserializer(IntegerSerializer.class)
.withValueDeserializer(StringDeserializer.class)
.withGCPApplicationDefaultCredentials())
// Write to Kafka
pipeline.apply(KafkaIO.<Integer, String>write()
.withBootstrapServers(bootstrap)
.withTopic(topicName)
.withKeySerializer(IntegerSerializer.class)
.withValueSerializer(StringSerializer.class)
.withGCPApplicationDefaultCredentials());
Python
WriteToKafka(
producer_config={
"bootstrap.servers": options.bootstrap_servers,
"security.protocol": 'SASL_SSL',
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
"sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
},
topic=options.topic,
key_serializer=("org.apache.kafka.common.serialization." "LongSerializer"),
value_serializer=("org.apache.kafka.common.serialization." "StringSerializer")
)
다음 단계
- Apache Kafka용 관리형 서비스에 대해 자세히 알아보세요.
- Apache Kafka용 관리형 서비스에서 BigQuery로 데이터 쓰기
- Apache Kafka에서 Dataflow로 읽기
- Dataflow에서 Apache Kafka로 쓰기