Apache Kafka to BigQuery 템플릿

Apache Kafka to BigQuery 템플릿은 Apache Kafka용 Google Cloud 관리형 서비스 클러스터에서 텍스트 데이터를 수집한 후 결과 레코드를 BigQuery 테이블에 출력하는 스트리밍 파이프라인입니다. 출력 테이블에 데이터를 삽입하는 동안 발생하는 모든 오류는 BigQuery의 개별 오류 테이블에 삽입됩니다.

자체 관리형 또는 외부 Kafka와 함께 Apache Kafka to BigQuery 템플릿을 사용할 수도 있습니다.

파이프라인 요구사항

  • Apache Kafka 브로커 서버가 실행 중이며 Dataflow 작업자 머신에서 연결할 수 있어야 합니다.
  • Apache Kafka 주제가 있어야 합니다.
  • Dataflow, BigQuery, Cloud Storage API를 사용 설정해야 합니다. 인증이 필요한 경우 Secret Manager API도 사용 설정해야 합니다.
  • Kafka 입력 주제에 적절한 스키마로 BigQuery 데이터 세트와 테이블을 만듭니다. 동일한 주제에서 여러 스키마를 사용하고 여러 테이블에 작성하려는 경우 파이프라인을 구성하기 전에 테이블을 만들 필요가 없습니다.
  • 템플릿의 데드 레터 (처리되지 않은 메시지) 큐가 사용 설정된 경우 데드 레터 큐의 스키마가 없는 빈 테이블을 만듭니다.

Kafka 메시지 형식

Apache Kafka to BigQuery 템플릿은 CONFLUENT_AVRO_WIRE_FORMAT, AVRO_BINARY_FORMAT, JSON 형식의 Kafka 메시지 읽기를 지원합니다.

인증

Apache Kafka to BigQuery 템플릿은 Kafka 브로커에 대한 SASL/PLAIN 인증을 지원합니다.

템플릿 매개변수

필수 매개변수

  • readBootstrapServerAndTopic: 입력을 읽어올 Kafka 주제입니다.
  • writeMode: 스키마에 따라 하나의 테이블 또는 여러 테이블에 레코드를 씁니다. DYNAMIC_TABLE_NAMES 모드는 AVRO_CONFLUENT_WIRE_FORMAT 소스 메시지 형식 및 SCHEMA_REGISTRY 스키마 소스에서만 지원됩니다. 타겟 테이블 이름은 각 메시지의 Avro 스키마 이름을 기반으로 자동 생성되며 단일 스키마 (단일 테이블 생성) 또는 여러 스키마 (여러 테이블 생성)일 수 있습니다. SINGLE_TABLE_NAME 모드는 사용자가 지정한 단일 테이블 (단일 스키마)에 씁니다. 기본값은 SINGLE_TABLE_NAME입니다.
  • kafkaReadAuthenticationMode: Kafka 클러스터에서 사용할 인증 모드입니다. 인증이 없는 경우 NONE을, SASL/PLAIN 사용자 이름과 비밀번호의 경우 SASL_PLAIN을, 인증서 기반 인증의 경우 TLS을 사용합니다. BigQuery용 Apache Kafka에서는 SASL_PLAIN 인증 모드만 지원합니다. 기본값은 SASL_PLAIN입니다.
  • messageFormat: 읽을 Kafka 메시지의 형식입니다. 지원되는 값은 AVRO_CONFLUENT_WIRE_FORMAT (Confluent 스키마 레지스트리로 인코딩된 Avro), AVRO_BINARY_ENCODING (일반 바이너리 Avro), JSON입니다. 기본값은 AVRO_CONFLUENT_WIRE_FORMAT입니다.
  • useBigQueryDLQ: 이 값이 true이면 실패한 메시지가 추가 오류 정보와 함께 BigQuery에 작성됩니다. 기본값은 false입니다.

선택적 매개변수

  • outputTableSpec: 출력을 작성할 BigQuery 테이블 위치입니다. 이름은 <project>:<dataset>.<table_name> 형식이어야 합니다. 테이블 스키마가 입력 객체와 일치해야 합니다.
  • persistKafkaKey: 이 값이 true이면 파이프라인은 BigQuery 테이블의 BYTES 유형 _key 필드에 Kafka 메시지 키를 유지합니다. 기본값은 false입니다 (키는 무시됨).
  • outputProject: 데이터 세트가 있는 BigQuery 출력 프로젝트입니다. 테이블은 데이터 세트에 동적으로 생성됩니다. 기본값은 빈 값입니다.
  • outputDataset: 출력을 작성할 BigQuery 출력 데이터 세트입니다. 테이블은 데이터 세트에 동적으로 생성됩니다. 테이블이 사전에 만들어진 경우 테이블 이름은 지정된 이름 지정 규칙을 따라야 합니다. 이름은 bqTableNamePrefix + Avro Schema FullName여야 하며 각 단어는 하이픈 -으로 구분됩니다. 기본값은 빈 값입니다.
  • bqTableNamePrefix: BigQuery 출력 테이블을 만들 때 사용할 이름 접두사입니다. 스키마 레지스트리를 사용하는 경우에만 적용됩니다. 기본값은 빈 값입니다.
  • createDisposition: BigQuery CreateDisposition입니다. 예를 들면 다음과 같습니다. CREATE_IF_NEEDED, CREATE_NEVER 기본값은 CREATE_IF_NEEDED입니다.
  • writeDisposition: BigQuery WriteDisposition입니다. 예를 들면 WRITE_APPEND, WRITE_EMPTY, WRITE_TRUNCATE입니다. 기본값은 WRITE_APPEND입니다.
  • useAutoSharding: true이면 파이프라인은 BigQuery에 쓸 때 자동 샤딩을 사용합니다. 기본값은 true입니다.
  • numStorageWriteApiStreams: 쓰기 스트림 수를 지정합니다. 이 매개변수는 설정해야 합니다. 기본값은 0입니다.
  • storageWriteApiTriggeringFrequencySec: 트리거 빈도를 초 단위로 지정합니다. 이 매개변수는 설정해야 합니다. 기본값은 5초입니다.
  • useStorageWriteApiAtLeastOnce: 이 매개변수는 'BigQuery Storage Write API 사용'이 설정된 경우에만 적용됩니다. 사용 설정하면 Storage Write API에 최소 1회의 시맨틱스가 사용되고 그렇지 않은 경우 정확히 한 번의 시맨틱스가 사용됩니다. 기본값은 false입니다.
  • enableCommitOffsets: 처리된 메시지의 오프셋을 Kafka에 커밋합니다. 이 파라미터를 사용 설정하면 파이프라인을 다시 시작할 때 메시지 처리의 간격이나 중복을 최소화할 수 있습니다. 소비자 그룹 ID를 지정해야 합니다. 기본값은 false입니다.
  • consumerGroupId: 이 파이프라인이 속한 소비자 그룹의 고유 식별자입니다. Kafka에 오프셋 커밋이 사용 설정된 경우에 필요합니다. 기본값은 빈 값입니다.
  • kafkaReadOffset: 커밋된 오프셋이 없는 경우 메시지를 읽는 시작점입니다. 처음에 시작 메시지가 표시되며 마지막에 최신 메시지가 표시됩니다. 기본값은 latest입니다.
  • kafkaReadUsernameSecretId: SASL_PLAIN 인증에서 사용할 Kafka 사용자 이름이 포함된 Google Cloud Secret Manager 보안 비밀 ID입니다. 예를 들면 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>입니다. 기본값은 빈 값입니다.
  • kafkaReadPasswordSecretId: SASL_PLAIN 인증에서 사용할 Kafka 비밀번호가 포함된 Google Cloud Secret Manager 보안 비밀 ID입니다. 예를 들면 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>입니다. 기본값은 빈 값입니다.
  • kafkaReadKeystoreLocation: Kafka 클러스터로 인증할 때 사용할 TLS 인증서와 비공개 키가 포함된 Java 키 저장소 (JKS) 파일의 Google Cloud Storage 경로입니다. 예를 들면 gs://your-bucket/keystore.jks입니다.
  • kafkaReadTruststoreLocation: Kafka 브로커 ID를 확인하는 데 사용할 신뢰할 수 있는 인증서가 포함된 Java 트러스트 저장소 (JKS) 파일의 Google Cloud Storage 경로입니다.
  • kafkaReadTruststorePasswordSecretId: Kafka TLS 인증을 위해 Java 트러스트 저장소(JKS) 파일에 액세스하는 데 사용할 비밀번호가 포함된 Google Cloud Secret Manager 보안 비밀 ID입니다(예: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
  • kafkaReadKeystorePasswordSecretId: Kafka TLS 인증을 위해 Java 키 저장소 (JKS) 파일에 액세스하는 데 사용할 비밀번호가 포함된 Google Cloud Secret Manager 보안 비밀 ID입니다. 예를 들면 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>입니다.
  • kafkaReadKeyPasswordSecretId: Kafka TLS 인증을 위해 Java 키 저장소 (JKS) 파일 내 비공개 키에 액세스하는 데 사용할 비밀번호가 포함된 Google Cloud Secret Manager 보안 비밀 ID입니다. 예를 들면 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>입니다.
  • schemaFormat: Kafka 스키마 형식입니다. SINGLE_SCHEMA_FILE 또는 SCHEMA_REGISTRY으로 제공할 수 있습니다. SINGLE_SCHEMA_FILE가 지정된 경우 모든 메시지에 avro 스키마 파일에 언급된 스키마를 사용합니다. SCHEMA_REGISTRY가 지정되면 메시지에 단일 스키마 또는 여러 스키마가 있을 수 있습니다. 기본값은 SINGLE_SCHEMA_FILE입니다.
  • confluentAvroSchemaPath: 주제의 모든 메시지를 디코딩하는 데 사용되는 단일 Avro 스키마 파일의 Google Cloud Storage 경로입니다. 기본값은 빈 값입니다.
  • schemaRegistryConnectionUrl: 메시지 디코딩을 위한 Avro 스키마를 관리하는 데 사용되는 Confluent 스키마 레지스트리 인스턴스의 URL입니다. 기본값은 빈 값입니다.
  • binaryAvroSchemaPath: 바이너리로 인코딩된 Avro 메시지를 디코딩하는 데 사용되는 Avro 스키마 파일의 Google Cloud Storage 경로입니다. 기본값은 빈 값입니다.
  • schemaRegistryAuthenticationMode: 스키마 레지스트리 인증 모드입니다. NONE, TLS 또는 OAUTH일 수 있습니다. 기본값은 NONE입니다.
  • schemaRegistryTruststoreLocation: Schema Registry 인증을 위한 트러스트 저장소가 저장된 SSL 인증서의 위치입니다. 예를 들면 /your-bucket/truststore.jks입니다.
  • schemaRegistryTruststorePasswordSecretId: truststore의 보안 비밀에 액세스하는 비밀번호가 저장된 Secret Manager의 SecretId입니다. 예를 들면 projects/your-project-number/secrets/your-secret-name/versions/your-secret-version입니다.
  • schemaRegistryKeystoreLocation: SSL 인증서와 비공개 키가 포함된 키 저장소 위치입니다. 예를 들면 /your-bucket/keystore.jks입니다.
  • schemaRegistryKeystorePasswordSecretId: 키 저장소 파일에 액세스하는 비밀번호가 있는 Secret Manager의 SecretId입니다(예: projects/your-project-number/secrets/your-secret-name/versions/your-secret-version).
  • schemaRegistryKeyPasswordSecretId: 키 저장소에 저장된 클라이언트의 비공개 키에 액세스하는 데 필요한 비밀번호의 SecretId입니다(예: projects/your-project-number/secrets/your-secret-name/versions/your-secret-version).
  • schemaRegistryOauthClientId: OAUTH 모드에서 스키마 레지스트리 클라이언트를 인증하는 데 사용되는 클라이언트 ID입니다. AVRO_CONFLUENT_WIRE_FORMAT 메시지 형식에 필요합니다.
  • schemaRegistryOauthClientSecretId: OAUTH 모드에서 Schema Registry 클라이언트를 인증하는 데 사용할 클라이언트 보안 비밀이 포함된 Google Cloud Secret Manager 보안 비밀 ID입니다. AVRO_CONFLUENT_WIRE_FORMAT 메시지 형식에 필요합니다. 예를 들면 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>입니다.
  • schemaRegistryOauthScope: OAUTH 모드에서 스키마 레지스트리 클라이언트를 인증하는 데 사용되는 액세스 토큰 범위입니다. 이 필드는 선택사항입니다. 범위 매개변수를 전달하지 않고도 요청할 수 있기 때문입니다. 예를 들면 openid입니다.
  • schemaRegistryOauthTokenEndpointUrl: OAUTH 모드에서 스키마 레지스트리 클라이언트를 인증하는 데 사용되는 OAuth/OIDC ID 공급업체의 HTTP(S) 기반 URL입니다. AVRO_CONFLUENT_WIRE_FORMAT 메시지 형식에 필요합니다.
  • outputDeadletterTable: 실패한 메시지의 정규화된 BigQuery 테이블 이름입니다. 다양한 이유 (예: 스키마 불일치, 잘못된 형식의 json)로 인해 출력 테이블에 도달하지 못한 메시지가 이 테이블에 작성됩니다. 이 테이블은 템플릿에 의해 생성됩니다. 예를 들면 your-project-id:your-dataset.your-table-name입니다.

템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Kafka to BigQuery template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 선택사항: 정확히 한 번 처리에서 적어도 한 번 스트리밍 모드로 전환하려면 적어도 한 번를 선택합니다.
  8. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery_Flex \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • BIGQUERY_TABLE: BigQuery 테이블 이름
  • KAFKA_TOPICS: Apache Kakfa 주제 목록. 주제가 여러 개 제공된 경우 쉼표를 이스케이프 처리해야 합니다. gcloud topic escaping을 참조하세요.
  • PATH_TO_JAVASCRIPT_UDF_FILE: 사용할 JavaScript 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • JAVASCRIPT_FUNCTION: 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.

    예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.

  • KAFKA_SERVER_ADDRESSES: Apache Kafka 브로커 서버 IP 주소 목록. 각 IP 주소에는 서버가 액세스할 수 있는 포트 번호가 있어야 합니다. 예를 들면 35.70.252.199:9092입니다. 주소가 여러 개 제공된 경우 쉼표를 이스케이프 처리해야 합니다. gcloud topic escaping을 참조하세요.

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery_Flex",
   }
}
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • BIGQUERY_TABLE: BigQuery 테이블 이름
  • KAFKA_TOPICS: Apache Kakfa 주제 목록. 주제가 여러 개 제공된 경우 쉼표를 이스케이프 처리해야 합니다. gcloud topic escaping을 참조하세요.
  • PATH_TO_JAVASCRIPT_UDF_FILE: 사용할 JavaScript 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • JAVASCRIPT_FUNCTION: 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.

    예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.

  • KAFKA_SERVER_ADDRESSES: Apache Kafka 브로커 서버 IP 주소 목록. 각 IP 주소에는 서버가 액세스할 수 있는 포트 번호가 있어야 합니다. 예를 들면 35.70.252.199:9092입니다. 주소가 여러 개 제공된 경우 쉼표를 이스케이프 처리해야 합니다. gcloud topic escaping을 참조하세요.

자세한 내용은 Dataflow로 Kafka에서 BigQuery로 데이터 쓰기를 참조하세요.

다음 단계