Apache Kafka to Apache Kafka 템플릿은 Apache Kafka 소스에서 데이터를 바이트로 수집한 후 바이트 데이터를 Apache Kafka 싱크에 쓰는 스트리밍 파이프라인을 만듭니다.
파이프라인 요구사항
- Apache Kafka 소스 주제가 있어야 합니다.
- Apache Kafka 소스 및 싱크 브로커 서버가 실행 중이며 Dataflow 작업자 머신에서 연결될 수 있어야 합니다.
- Apache Kafka용 Google Cloud 관리 서비스를 소스나 싱크로 사용하는 경우 템플릿을 실행하기 전에 주제가 있어야 합니다.
Kafka 메시지 형식
Apache Kafka 소스 메시지를 바이트로 읽고 바이트를 Apache Kafka 싱크에 씁니다.
인증
Apache Kafka to Apache Kafka 템플릿은 Kafka 브로커에 대한 SASL/PLAIN 및 TLS 인증을 지원합니다.
템플릿 매개변수
필수 매개변수
- readBootstrapServerAndTopic: 입력을 읽어올 Kafka 부트스트랩 서버 및 주제입니다. 예를 들면
localhost:9092;topic1,topic2
입니다. - kafkaReadAuthenticationMode: Kafka 클러스터에서 사용할 인증 모드입니다. 인증이 없는 경우
NONE
을, SASL/PLAIN 사용자 이름과 비밀번호의 경우SASL_PLAIN
을, 인증서 기반 인증의 경우TLS
을 사용합니다. BigQuery용 Apache Kafka에서는SASL_PLAIN
인증 모드만 지원합니다. 기본값은 SASL_PLAIN입니다. - writeBootstrapServerAndTopic: 출력을 쓸 Kafka 주제입니다.
- kafkaWriteAuthenticationMethod: Kafka 클러스터에서 사용할 인증 모드입니다. 인증이 없는 경우 NONE을, SASL/PLAIN 사용자 이름과 비밀번호의 경우 SASL_PLAIN을, 인증서 기반 인증의 경우 TLS를 사용합니다. 기본값은 APPLICATION_DEFAULT_CREDENTIALS입니다.
선택적 매개변수
- enableCommitOffsets: 처리된 메시지의 오프셋을 Kafka에 커밋합니다. 이 파라미터를 사용 설정하면 파이프라인을 다시 시작할 때 메시지 처리의 간격이나 중복을 최소화할 수 있습니다. 소비자 그룹 ID를 지정해야 합니다. 기본값은 false입니다.
- consumerGroupId: 이 파이프라인이 속한 소비자 그룹의 고유 식별자입니다. Kafka에 오프셋 커밋이 사용 설정된 경우에 필요합니다. 기본값은 빈 값입니다.
- kafkaReadOffset: 커밋된 오프셋이 없는 경우 메시지를 읽는 시작점입니다. 처음에 시작 메시지가 표시되며 마지막에 최신 메시지가 표시됩니다. 기본값은 latest입니다.
- kafkaReadUsernameSecretId:
SASL_PLAIN
인증에서 사용할 Kafka 사용자 이름이 포함된 Google Cloud Secret Manager 보안 비밀 ID입니다. 예를 들면projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
입니다. 기본값은 빈 값입니다. - kafkaReadPasswordSecretId:
SASL_PLAIN
인증에서 사용할 Kafka 비밀번호가 포함된 Google Cloud Secret Manager 보안 비밀 ID입니다. 예를 들면projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
입니다. 기본값은 빈 값입니다. - kafkaReadKeystoreLocation: Kafka 클러스터로 인증할 때 사용할 TLS 인증서와 비공개 키가 포함된 Java 키 저장소 (JKS) 파일의 Google Cloud Storage 경로입니다. 예를 들면
gs://your-bucket/keystore.jks
입니다. - kafkaReadTruststoreLocation: Kafka 브로커 ID를 확인하는 데 사용할 신뢰할 수 있는 인증서가 포함된 Java 트러스트 저장소 (JKS) 파일의 Google Cloud Storage 경로입니다.
- kafkaReadTruststorePasswordSecretId: Kafka TLS 인증을 위해 Java 트러스트 저장소(JKS) 파일에 액세스하는 데 사용할 비밀번호가 포함된 Google Cloud Secret Manager 보안 비밀 ID입니다(예:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
). - kafkaReadKeystorePasswordSecretId: Kafka TLS 인증을 위해 Java 키 저장소 (JKS) 파일에 액세스하는 데 사용할 비밀번호가 포함된 Google Cloud Secret Manager 보안 비밀 ID입니다. 예를 들면
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
입니다. - kafkaReadKeyPasswordSecretId: Kafka TLS 인증을 위해 Java 키 저장소 (JKS) 파일 내 비공개 키에 액세스하는 데 사용할 비밀번호가 포함된 Google Cloud Secret Manager 보안 비밀 ID입니다. 예를 들면
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
입니다. - kafkaWriteUsernameSecretId: 대상 Kafka 클러스터와의 SASL_PLAIN 인증을 위한 Kafka 사용자 이름이 포함된 Google Cloud Secret Manager 보안 비밀 ID입니다. 예를 들면
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
입니다. 기본값은 빈 값입니다. - kafkaWritePasswordSecretId: 대상 Kafka 클러스터와의 SASL_PLAIN 인증에 사용할 Kafka 비밀번호가 포함된 Google Cloud Secret Manager 보안 비밀 ID입니다. 예를 들면
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
입니다. 기본값은 빈 값입니다. - kafkaWriteKeystoreLocation: 대상 Kafka 클러스터를 인증하기 위한 TLS 인증서와 비공개 키가 포함된 Java 키 저장소 (JKS) 파일의 Google Cloud Storage 경로입니다. 예를 들면
gs://<BUCKET>/<KEYSTORE>.jks
입니다. - kafkaWriteTruststoreLocation: 대상 Kafka 브로커 ID를 확인하는 데 사용할 신뢰할 수 있는 인증서가 포함된 Java 트러스트 저장소 (JKS) 파일의 Google Cloud Storage 경로입니다.
- kafkaWriteTruststorePasswordSecretId: 대상 Kafka 클러스터와의 TLS 인증을 위해 Java 트러스트 저장소 (JKS) 파일에 액세스하는 데 사용할 비밀번호가 포함된 Google Cloud Secret Manager 보안 비밀 ID입니다. 예를 들면
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
입니다. - kafkaWriteKeystorePasswordSecretId: 대상 Kafka 클러스터와의 TLS 인증에 사용할 Java 키 저장소 (JKS) 파일에 액세스하는 비밀번호가 포함된 Google Cloud Secret Manager 보안 비밀 ID입니다. 예를 들면
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
입니다. - kafkaWriteKeyPasswordSecretId: 대상 Kafka 클러스터와의 TLS 인증을 위해 Java 키 저장소 (JKS) 파일 내 비공개 키에 액세스하는 데 사용할 비밀번호가 포함된 Google Cloud Secret Manager 보안 비밀 ID입니다. 예를 들면
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
입니다.
템플릿 실행
콘솔
- Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다. 템플릿에서 작업 만들기로 이동
- 작업 이름 필드에 고유한 작업 이름을 입력합니다.
- (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은
us-central1
입니다.Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.
- Dataflow 템플릿 드롭다운 메뉴에서 the Kafka to Cloud Storage template을 선택합니다.
- 제공된 매개변수 필드에 매개변수 값을 입력합니다.
- 선택사항: 정확히 한 번 처리에서 적어도 한 번 스트리밍 모드로 전환하려면 적어도 한 번를 선택합니다.
- 작업 실행을 클릭합니다.
gcloud
셸 또는 터미널에서 템플릿을 실행합니다.
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Kafka \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
다음을 바꿉니다.
PROJECT_ID
: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 IDJOB_NAME
: 선택한 고유한 작업 이름REGION_NAME
: Dataflow 작업을 배포할 리전(예:us-central1
)VERSION
: 사용할 템플릿 버전다음 값을 사용할 수 있습니다.
latest
: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates-REGION_NAME/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.- 버전 이름(예:
2023-09-12-00_RC00
): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates-REGION_NAME/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
BIGQUERY_TABLE
: Cloud Storage 테이블 이름KAFKA_TOPICS
: Apache Kakfa 주제 목록. 주제가 여러 개 제공된 경우 쉼표를 이스케이프 처리해야 합니다.gcloud topic escaping
을 참조하세요.PATH_TO_JAVASCRIPT_UDF_FILE
: 사용할 JavaScript 사용자 정의 함수(UDF)를 정의하는.js
파일의 Cloud Storage URI입니다. 예를 들면gs://my-bucket/my-udfs/my_file.js
입니다.JAVASCRIPT_FUNCTION
: 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.예를 들어 자바스크립트 함수가
myTransform(inJson) { /*...do stuff...*/ }
이면 함수 이름은myTransform
입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.KAFKA_SERVER_ADDRESSES
: Apache Kafka 브로커 서버 IP 주소 목록. 각 IP 주소에는 서버가 액세스할 수 있는 포트 번호가 있어야 합니다. 예를 들면35.70.252.199:9092
입니다. 주소가 여러 개 제공된 경우 쉼표를 이스케이프 처리해야 합니다.gcloud topic escaping
을 참조하세요.
API
REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch
를 참조하세요.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Kafka", } }
다음을 바꿉니다.
PROJECT_ID
: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 IDJOB_NAME
: 선택한 고유한 작업 이름LOCATION
: Dataflow 작업을 배포할 리전(예:us-central1
)VERSION
: 사용할 템플릿 버전다음 값을 사용할 수 있습니다.
latest
: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates-REGION_NAME/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.- 버전 이름(예:
2023-09-12-00_RC00
): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates-REGION_NAME/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
BIGQUERY_TABLE
: Cloud Storage 테이블 이름KAFKA_TOPICS
: Apache Kakfa 주제 목록. 주제가 여러 개 제공된 경우 쉼표를 이스케이프 처리해야 합니다.gcloud topic escaping
을 참조하세요.PATH_TO_JAVASCRIPT_UDF_FILE
: 사용할 JavaScript 사용자 정의 함수(UDF)를 정의하는.js
파일의 Cloud Storage URI입니다. 예를 들면gs://my-bucket/my-udfs/my_file.js
입니다.JAVASCRIPT_FUNCTION
: 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.예를 들어 자바스크립트 함수가
myTransform(inJson) { /*...do stuff...*/ }
이면 함수 이름은myTransform
입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.KAFKA_SERVER_ADDRESSES
: Apache Kafka 브로커 서버 IP 주소 목록. 각 IP 주소에는 서버가 액세스할 수 있는 포트 번호가 있어야 합니다. 예를 들면35.70.252.199:9092
입니다. 주소가 여러 개 제공된 경우 쉼표를 이스케이프 처리해야 합니다.gcloud topic escaping
을 참조하세요.
자세한 내용은 Dataflow로 Kafka에서 Cloud Storage로 데이터 쓰기를 참조하세요.
다음 단계
- Dataflow 템플릿 알아보기
- Google 제공 템플릿 목록 참조