Apache Kafka to Cloud Storage 템플릿

Apache Kafka to Cloud Storage 템플릿은 Apache Kafka용 Google Cloud 관리형 서비스에서 텍스트 데이터를 수집하고 Cloud Storage에 레코드를 출력하는 스트리밍 파이프라인입니다.

자체 관리형 또는 외부 Kafka와 함께 Apache Kafka to BigQuery 템플릿을 사용할 수도 있습니다.

파이프라인 요구사항

  • 출력 Cloud Storage 버킷이 있어야 합니다.
  • Apache Kafka 브로커 서버가 실행 중이며 Dataflow 작업자 머신에서 연결할 수 있어야 합니다.
  • Apache Kafka 주제가 있어야 합니다.

Kafka 메시지 형식

Apache Kafka to Cloud Storage 템플릿은 Kafka에서 CONFLUENT_AVRO_WIRE_FORMATJSON 형식의 메시지 읽기를 지원합니다.

출력 파일 형식

출력 파일 형식은 입력 Kafka 메시지와 동일한 형식입니다. 예를 들어 Kafka 메시지 형식으로 JSON을 선택하면 JSON 파일이 출력 Cloud Storage 버킷에 작성됩니다.

인증

Apache Kafka to Cloud Storage 템플릿은 Kafka 브로커에 대한 SASL/PLAIN 인증을 지원합니다.

템플릿 매개변수

필수 매개변수

  • readBootstrapServerAndTopic: 입력을 읽어올 Kafka 주제입니다.
  • outputDirectory: 출력 파일을 쓰기 위한 경로 및 파일 이름 프리픽스입니다. 슬래시로 끝나야 합니다. 예를 들면 gs://your-bucket/your-path/입니다.
  • kafkaReadAuthenticationMode: Kafka 클러스터에서 사용할 인증 모드입니다. 인증이 없는 경우 NONE을, SASL/PLAIN 사용자 이름과 비밀번호의 경우 SASL_PLAIN을, 인증서 기반 인증의 경우 TLS을 사용합니다. BigQuery용 Apache Kafka에서는 SASL_PLAIN 인증 모드만 지원합니다. 기본값은 SASL_PLAIN입니다.
  • messageFormat: 읽을 Kafka 메시지의 형식입니다. 지원되는 값은 AVRO_CONFLUENT_WIRE_FORMAT (Confluent 스키마 레지스트리로 인코딩된 Avro), AVRO_BINARY_ENCODING (일반 바이너리 Avro), JSON입니다. 기본값은 AVRO_CONFLUENT_WIRE_FORMAT입니다.
  • useBigQueryDLQ: 이 값이 true이면 실패한 메시지가 추가 오류 정보와 함께 BigQuery에 작성됩니다. 기본값은 false입니다.

선택적 매개변수

  • windowDuration: 데이터가 Cloud Storage에 기록되는 기간/크기입니다. 허용되는 형식은 Ns(초, 예: 5s), Nm(분, 예: 12m), Nh(시, 예: 2h)입니다. 예를 들면 5m입니다. 기본값은 5m입니다.
  • outputFilenamePrefix: 윈도우 설정된 각 파일에 넣을 프리픽스입니다. 예를 들면 output-입니다. 기본값은 output입니다.
  • numShards: 쓰는 동안에 생성되는 최대 출력 샤드 수입니다. 샤드 수가 많을수록 Cloud Storage 쓰기 처리량이 높아지지만 출력 Cloud Storage 파일을 처리할 때 샤드 간에 데이터 집계 비용이 늘어날 수 있습니다. Dataflow에서 기본값을 결정합니다.
  • enableCommitOffsets: 처리된 메시지의 오프셋을 Kafka에 커밋합니다. 이 파라미터를 사용 설정하면 파이프라인을 다시 시작할 때 메시지 처리의 간격이나 중복을 최소화할 수 있습니다. 소비자 그룹 ID를 지정해야 합니다. 기본값은 false입니다.
  • consumerGroupId: 이 파이프라인이 속한 소비자 그룹의 고유 식별자입니다. Kafka에 오프셋 커밋이 사용 설정된 경우에 필요합니다. 기본값은 빈 값입니다.
  • kafkaReadOffset: 커밋된 오프셋이 없는 경우 메시지를 읽는 시작점입니다. 처음에 시작 메시지가 표시되며 마지막에 최신 메시지가 표시됩니다. 기본값은 latest입니다.
  • kafkaReadUsernameSecretId: SASL_PLAIN 인증에서 사용할 Kafka 사용자 이름이 포함된 Google Cloud Secret Manager 보안 비밀 ID입니다. 예를 들면 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>입니다. 기본값은 빈 값입니다.
  • kafkaReadPasswordSecretId: SASL_PLAIN 인증에서 사용할 Kafka 비밀번호가 포함된 Google Cloud Secret Manager 보안 비밀 ID입니다. 예를 들면 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>입니다. 기본값은 빈 값입니다.
  • kafkaReadKeystoreLocation: Kafka 클러스터로 인증할 때 사용할 TLS 인증서와 비공개 키가 포함된 Java 키 저장소 (JKS) 파일의 Google Cloud Storage 경로입니다. 예를 들면 gs://your-bucket/keystore.jks입니다.
  • kafkaReadTruststoreLocation: Kafka 브로커 ID를 확인하는 데 사용할 신뢰할 수 있는 인증서가 포함된 Java 트러스트 저장소 (JKS) 파일의 Google Cloud Storage 경로입니다.
  • kafkaReadTruststorePasswordSecretId: Kafka TLS 인증을 위해 Java 트러스트 저장소(JKS) 파일에 액세스하는 데 사용할 비밀번호가 포함된 Google Cloud Secret Manager 보안 비밀 ID입니다(예: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
  • kafkaReadKeystorePasswordSecretId: Kafka TLS 인증을 위해 Java 키 저장소 (JKS) 파일에 액세스하는 데 사용할 비밀번호가 포함된 Google Cloud Secret Manager 보안 비밀 ID입니다. 예를 들면 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>입니다.
  • kafkaReadKeyPasswordSecretId: Kafka TLS 인증을 위해 Java 키 저장소 (JKS) 파일 내 비공개 키에 액세스하는 데 사용할 비밀번호가 포함된 Google Cloud Secret Manager 보안 비밀 ID입니다. 예를 들면 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>입니다.
  • schemaFormat: Kafka 스키마 형식입니다. SINGLE_SCHEMA_FILE 또는 SCHEMA_REGISTRY으로 제공할 수 있습니다. SINGLE_SCHEMA_FILE가 지정된 경우 모든 메시지에 avro 스키마 파일에 언급된 스키마를 사용합니다. SCHEMA_REGISTRY가 지정되면 메시지에 단일 스키마 또는 여러 스키마가 있을 수 있습니다. 기본값은 SINGLE_SCHEMA_FILE입니다.
  • confluentAvroSchemaPath: 주제의 모든 메시지를 디코딩하는 데 사용되는 단일 Avro 스키마 파일의 Google Cloud Storage 경로입니다. 기본값은 빈 값입니다.
  • schemaRegistryConnectionUrl: 메시지 디코딩을 위한 Avro 스키마를 관리하는 데 사용되는 Confluent 스키마 레지스트리 인스턴스의 URL입니다. 기본값은 빈 값입니다.
  • binaryAvroSchemaPath: 바이너리로 인코딩된 Avro 메시지를 디코딩하는 데 사용되는 Avro 스키마 파일의 Google Cloud Storage 경로입니다. 기본값은 빈 값입니다.
  • schemaRegistryAuthenticationMode: 스키마 레지스트리 인증 모드입니다. NONE, TLS 또는 OAUTH일 수 있습니다. 기본값은 NONE입니다.
  • schemaRegistryTruststoreLocation: Schema Registry 인증을 위한 트러스트 저장소가 저장된 SSL 인증서의 위치입니다. 예를 들면 /your-bucket/truststore.jks입니다.
  • schemaRegistryTruststorePasswordSecretId: truststore의 보안 비밀에 액세스하는 비밀번호가 저장된 Secret Manager의 SecretId입니다. 예를 들면 projects/your-project-number/secrets/your-secret-name/versions/your-secret-version입니다.
  • schemaRegistryKeystoreLocation: SSL 인증서와 비공개 키가 포함된 키 저장소 위치입니다. 예를 들면 /your-bucket/keystore.jks입니다.
  • schemaRegistryKeystorePasswordSecretId: 키 저장소 파일에 액세스하는 비밀번호가 있는 Secret Manager의 SecretId입니다(예: projects/your-project-number/secrets/your-secret-name/versions/your-secret-version).
  • schemaRegistryKeyPasswordSecretId: 키 저장소에 저장된 클라이언트의 비공개 키에 액세스하는 데 필요한 비밀번호의 SecretId입니다(예: projects/your-project-number/secrets/your-secret-name/versions/your-secret-version).
  • schemaRegistryOauthClientId: OAUTH 모드에서 스키마 레지스트리 클라이언트를 인증하는 데 사용되는 클라이언트 ID입니다. AVRO_CONFLUENT_WIRE_FORMAT 메시지 형식에 필요합니다.
  • schemaRegistryOauthClientSecretId: OAUTH 모드에서 Schema Registry 클라이언트를 인증하는 데 사용할 클라이언트 보안 비밀이 포함된 Google Cloud Secret Manager 보안 비밀 ID입니다. AVRO_CONFLUENT_WIRE_FORMAT 메시지 형식에 필요합니다. 예를 들면 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>입니다.
  • schemaRegistryOauthScope: OAUTH 모드에서 스키마 레지스트리 클라이언트를 인증하는 데 사용되는 액세스 토큰 범위입니다. 이 필드는 선택사항입니다. 범위 매개변수를 전달하지 않고도 요청할 수 있기 때문입니다. 예를 들면 openid입니다.
  • schemaRegistryOauthTokenEndpointUrl: OAUTH 모드에서 스키마 레지스트리 클라이언트를 인증하는 데 사용되는 OAuth/OIDC ID 공급업체의 HTTP(S) 기반 URL입니다. AVRO_CONFLUENT_WIRE_FORMAT 메시지 형식에 필요합니다.
  • outputDeadletterTable: 실패한 메시지의 정규화된 BigQuery 테이블 이름입니다. 다양한 이유 (예: 스키마 불일치, 잘못된 형식의 json)로 인해 출력 테이블에 도달하지 못한 메시지가 이 테이블에 작성됩니다. 이 테이블은 템플릿에 의해 생성됩니다. 예를 들면 your-project-id:your-dataset.your-table-name입니다.

템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Kafka to Cloud Storage template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 선택사항: 정확히 한 번 처리에서 적어도 한 번 스트리밍 모드로 전환하려면 적어도 한 번를 선택합니다.
  8. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Gcs_Flex \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • BIGQUERY_TABLE: Cloud Storage 테이블 이름
  • KAFKA_TOPICS: Apache Kakfa 주제 목록. 주제가 여러 개 제공된 경우 쉼표를 이스케이프 처리해야 합니다. gcloud topic escaping을 참조하세요.
  • PATH_TO_JAVASCRIPT_UDF_FILE: 사용할 JavaScript 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • JAVASCRIPT_FUNCTION: 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.

    예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.

  • KAFKA_SERVER_ADDRESSES: Apache Kafka 브로커 서버 IP 주소 목록. 각 IP 주소에는 서버가 액세스할 수 있는 포트 번호가 있어야 합니다. 예를 들면 35.70.252.199:9092입니다. 주소가 여러 개 제공된 경우 쉼표를 이스케이프 처리해야 합니다. gcloud topic escaping을 참조하세요.

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Gcs_Flex",
   }
}
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • BIGQUERY_TABLE: Cloud Storage 테이블 이름
  • KAFKA_TOPICS: Apache Kakfa 주제 목록. 주제가 여러 개 제공된 경우 쉼표를 이스케이프 처리해야 합니다. gcloud topic escaping을 참조하세요.
  • PATH_TO_JAVASCRIPT_UDF_FILE: 사용할 JavaScript 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • JAVASCRIPT_FUNCTION: 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.

    예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.

  • KAFKA_SERVER_ADDRESSES: Apache Kafka 브로커 서버 IP 주소 목록. 각 IP 주소에는 서버가 액세스할 수 있는 포트 번호가 있어야 합니다. 예를 들면 35.70.252.199:9092입니다. 주소가 여러 개 제공된 경우 쉼표를 이스케이프 처리해야 합니다. gcloud topic escaping을 참조하세요.

자세한 내용은 Dataflow로 Kafka에서 Cloud Storage로 데이터 쓰기를 참조하세요.

다음 단계