Apache Kafka to Cloud Storage 템플릿은 Apache Kafka용 Google Cloud 관리형 서비스에서 텍스트 데이터를 수집하고 Cloud Storage에 레코드를 출력하는 스트리밍 파이프라인입니다.
자체 관리형 또는 외부 Kafka와 함께 Apache Kafka to BigQuery 템플릿을 사용할 수도 있습니다.
파이프라인 요구사항
- 출력 Cloud Storage 버킷이 있어야 합니다.
- Apache Kafka 브로커 서버가 실행 중이며 Dataflow 작업자 머신에서 연결할 수 있어야 합니다.
- Apache Kafka 주제가 있어야 합니다.
Kafka 메시지 형식
Apache Kafka to Cloud Storage 템플릿은 CONFLUENT_AVRO_WIRE_FORMAT
및 JSON
형식의 Kafka 메시지 읽기를 지원합니다.
출력 파일 형식
출력 파일 형식은 입력 Kafka 메시지와 동일한 형식입니다. 예를 들어 Kafka 메시지 형식으로 JSON을 선택하면 JSON 파일이 출력 Cloud Storage 버킷에 기록됩니다.
인증
Apache Kafka to Cloud Storage 템플릿은 Kafka 브로커에 대한 SASL/PLAIN 인증을 지원합니다.
템플릿 매개변수
필수 매개변수
- readBootstrapServerAndTopic: 입력을 읽어올 Kafka 주제입니다.
- kafkaReadAuthenticationMode: Kafka 클러스터에서 사용할 인증 모드입니다. 인증이 없는 경우 NONE을 사용하고 SASL/PLAIN 사용자 이름과 비밀번호의 경우 SASL_PLAIN을 사용합니다. BigQuery용 Apache Kafka는 SASL_PLAIN 인증 모드만 지원합니다. 기본값은 SASL_PLAIN입니다.
- outputDirectory: 출력 파일을 쓰기 위한 경로와 파일 이름 프리픽스입니다. 슬래시로 끝나야 합니다. (예: gs://your-bucket/your-path/).
- messageFormat: 읽을 Kafka 메시지의 형식입니다. 지원되는 값은 AVRO_CONFLUENT_WIRE_FORMAT(Confluent Schema Registry 인코딩된 Avro), AVRO_BINARY_ENCODING(일반 바이너리 Avro), JSON입니다. 기본값은 AVRO_CONFLUENT_WIRE_FORMAT입니다.
선택적 매개변수
- windowDuration: 데이터가 Cloud Storage에 기록되는 기간/크기입니다. 허용되는 형식은 Ns(초, 예: 5s), Nm(분, 예: 12m), Nh(시, 예: 2h)입니다. 예를 들면 5m과 같습니다. 기본값은 5m입니다.
- outputFilenamePrefix: 윈도우 설정된 각 파일에 넣을 프리픽스입니다. (예: output-) 기본값은 output입니다.
- numShards: 쓰는 동안에 생성되는 최대 출력 샤드 수입니다. 샤드 수가 많을수록 Cloud Storage 쓰기 처리량이 높아지지만 출력 Cloud Storage 파일을 처리할 때 샤드 간에 데이터 집계 비용이 늘어날 수 있습니다. Dataflow에서 기본값을 결정합니다.
- enableCommitOffsets: 처리된 메시지의 오프셋을 Kafka에 커밋합니다. 이 기능을 사용 설정하면 파이프라인을 다시 시작할 때 메시지의 간격이나 중복 처리가 최소화됩니다. 소비자 그룹 ID를 지정해야 합니다. 기본값은 false입니다.
- consumerGroupId: 이 파이프라인이 속한 소비자 그룹의 고유 식별자입니다. Kafka에 오프셋 커밋이 사용 설정된 경우 필요합니다. 기본값은 빈 값입니다.
- kafkaReadOffset: 커밋된 오프셋이 없는 경우 메시지를 읽기 위한 시작점입니다. 처음에 시작 메시지가 표시되며 마지막에 최신 메시지가 표시됩니다. 기본값은 latest입니다.
- kafkaReadUsernameSecretId: SASL_PLAIN 인증에 사용할 Kafka 사용자 이름이 포함된 Google Cloud Secret Manager 보안 비밀 ID입니다. (예: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). 기본값은 비어 있습니다.
- kafkaReadPasswordSecretId: SASL_PLAIN 인증에 사용할 Kafka 비밀번호가 포함된 Google Cloud Secret Manager 보안 비밀 ID입니다. (예: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). 기본값은 비어 있습니다.
- schemaFormat: Kafka 스키마 형식입니다. SINGLE_SCHEMA_FILE 또는 SCHEMA_REGISTRY로 제공할 수 있습니다. SINGLE_SCHEMA_FILE이 지정된 경우 모든 메시지에 avro 스키마 파일에 언급된 스키마가 있어야 합니다. SCHEMA_REGISTRY가 지정되면 메시지에 단일 스키마 또는 여러 스키마가 있을 수 있습니다. 기본값은 SINGLE_SCHEMA_FILE입니다.
- confluentAvroSchemaPath: 주제의 모든 메시지를 디코딩하는 데 사용되는 단일 Avro 스키마 파일의 Google Cloud Storage 경로입니다. 기본값은 빈 값입니다.
- schemaRegistryConnectionUrl: 메시지 디코딩을 위한 Avro 스키마를 관리하는 데 사용되는 Confluent 스키마 레지스트리 인스턴스의 URL입니다. 기본값은 빈 값입니다.
- binaryAvroSchemaPath: 바이너리로 인코딩된 Avro 메시지를 디코딩하는 데 사용되는 Avro 스키마 파일의 Google Cloud Storage 경로입니다. 기본값은 빈 값입니다.
템플릿 실행
콘솔
- Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다. 템플릿에서 작업 만들기로 이동
- 작업 이름 필드에 고유한 작업 이름을 입력합니다.
- (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은
us-central1
입니다.Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.
- Dataflow 템플릿 드롭다운 메뉴에서 the Kafka to Cloud Storage template을 선택합니다.
- 제공된 매개변수 필드에 매개변수 값을 입력합니다.
- 선택사항: 정확히 한 번 처리에서 적어도 한 번 스트리밍 모드로 전환하려면 적어도 한 번를 선택합니다.
- 작업 실행을 클릭합니다.
gcloud
셸 또는 터미널에서 템플릿을 실행합니다.
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Cloud Storage \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
다음을 바꿉니다.
PROJECT_ID
: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 IDJOB_NAME
: 선택한 고유한 작업 이름REGION_NAME
: Dataflow 작업을 배포할 리전(예:us-central1
)VERSION
: 사용할 템플릿 버전다음 값을 사용할 수 있습니다.
latest
: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates-REGION_NAME/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.- 버전 이름(예:
2023-09-12-00_RC00
): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates-REGION_NAME/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
BIGQUERY_TABLE
: Cloud Storage 테이블 이름KAFKA_TOPICS
: Apache Kakfa 주제 목록. 주제가 여러 개 제공된 경우 쉼표를 이스케이프해야 합니다.gcloud topic escaping
을 참조하세요.PATH_TO_JAVASCRIPT_UDF_FILE
: 사용할 JavaScript 사용자 정의 함수(UDF)를 정의하는.js
파일의 Cloud Storage URI입니다. 예를 들면gs://my-bucket/my-udfs/my_file.js
입니다.JAVASCRIPT_FUNCTION
: 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.예를 들어 자바스크립트 함수가
myTransform(inJson) { /*...do stuff...*/ }
이면 함수 이름은myTransform
입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.KAFKA_SERVER_ADDRESSES
: Apache Kafka 브로커 서버 IP 주소 목록. 각 IP 주소에는 서버가 액세스할 수 있는 포트 번호가 있어야 합니다. 예를 들면35.70.252.199:9092
입니다. 주소가 여러 개 제공된 경우 쉼표를 이스케이프해야 합니다.gcloud topic escaping
을 참조하세요.
API
REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch
를 참조하세요.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Cloud Storage", } }
다음을 바꿉니다.
PROJECT_ID
: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 IDJOB_NAME
: 선택한 고유한 작업 이름LOCATION
: Dataflow 작업을 배포할 리전(예:us-central1
)VERSION
: 사용할 템플릿 버전다음 값을 사용할 수 있습니다.
latest
: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates-REGION_NAME/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.- 버전 이름(예:
2023-09-12-00_RC00
): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates-REGION_NAME/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
BIGQUERY_TABLE
: Cloud Storage 테이블 이름KAFKA_TOPICS
: Apache Kakfa 주제 목록. 주제가 여러 개 제공된 경우 쉼표를 이스케이프해야 합니다.gcloud topic escaping
을 참조하세요.PATH_TO_JAVASCRIPT_UDF_FILE
: 사용할 JavaScript 사용자 정의 함수(UDF)를 정의하는.js
파일의 Cloud Storage URI입니다. 예를 들면gs://my-bucket/my-udfs/my_file.js
입니다.JAVASCRIPT_FUNCTION
: 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.예를 들어 자바스크립트 함수가
myTransform(inJson) { /*...do stuff...*/ }
이면 함수 이름은myTransform
입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.KAFKA_SERVER_ADDRESSES
: Apache Kafka 브로커 서버 IP 주소 목록. 각 IP 주소에는 서버가 액세스할 수 있는 포트 번호가 있어야 합니다. 예를 들면35.70.252.199:9092
입니다. 주소가 여러 개 제공된 경우 쉼표를 이스케이프해야 합니다.gcloud topic escaping
을 참조하세요.
자세한 내용은 Dataflow로 Kafka에서 Cloud Storage로 데이터 쓰기를 참고하세요.
다음 단계
- Dataflow 템플릿 알아보기
- Google 제공 템플릿 목록 참조