실시간 임베딩 유사성 일치 시스템 빌드

이번 자료에서는 머신러닝을 사용해 임의의 항목과 유사한 항목을 찾는 기법인 근사 유사성 일치에 대해서 간략하게 알아보겠습니다. 또한 실시간 텍스트 시맨틱 검색을 위한 엔드 투 엔드 솔루션 예에 대해서 설명하고 솔루션 예를 실행할 수 있는 방법을 다각적으로 살펴보도록 하겠습니다. 솔루션 예는 관련된 realtime-embeddings-matching GitHub 저장소에 있습니다.

이 문서에서는 사용자가 머신러닝 개념을 비롯해 Google Cloud와 Apache Beam 같은 도구에 익숙하다고 가정합니다.

소개

임의의 쿼리와 유사한 항목을 찾는 것은 검색 및 가져오기 시스템은 물론이고 추천 엔진에서도 가장 중요한 요소입니다. 예를 들어 유사성 일치는 사용자가 다음과 같은 항목을 찾는 데 유용합니다.

  • 자신의 애완 동물 이미지와 유사한 이미지
  • 검색어와 관련된 뉴스 기사
  • 이전에 보았거나 들었던 것과 유사한 영화 또는 노래
  • 제품 및 서비스 추천

유사성 일치 시스템을 설계하려면 먼저 항목을 숫자 벡터로 표현해야 합니다. 그러면 숫자 벡터가 머신러닝(ML)을 통해 발견된 항목의 시맨틱 임베딩을 표현합니다. 자세한 내용은 개요: 머신러닝을 통한 특성 임베딩 추출 및 제공을 참조하세요.

그런 다음 사용자 쿼리의 임베딩 벡터와 유사한 항목을 찾기 위한 최근접 이웃 검색(유사성 측정항목 기준)을 할 수 있도록 임베딩을 구성하고 저장해야 합니다. 하지만 실시간으로 검색하고, 가져오고, 추천하려면 유사성 일치가 매우 빨라야 합니다. 따라서 유사 항목을 찾는 프로세스 속도를 높이려면 최근접 이웃 탐색 알고리즘을 적용하여 항목 임베딩의 색인을 빌드하는 것이 더욱 실용적입니다.

본 자료와 관련된 솔루션 예에서는 다음과 같은 사항에 대해서 다룹니다.

  • Wikipedia 제목의 텍스트 임베딩 추출
  • tf.Hub의 Universal Sentence Encoder 모듈 사용
  • Spotify의 Annoy 라이브러리를 사용한 근사 유사성 일치 색인 빌드
  • 웹 앱에서 실시간 시맨틱 검색을 위한 색인 제공

솔루션 예의 코드는 realtime-embeddings-matching GitHub 저장소에 있습니다.

근사 유사성 일치

일반적인 일치 및 가져오기 절차는 다음과 같습니다.

  1. 항목과 쿼리를 적합한 형상 공간의 벡터로 변환합니다. 이러한 특성을 임베딩이라고 부릅니다.
  2. 임베딩 벡터 쌍의 근접도를 정의합니다. 이러한 근접도가 코사인 유사성 또는 유클리드 거리가 될 수 있습니다.
  3. 전체 항목 집합에 대한 명시적 검색을 사용해 최근접 이웃을 찾습니다.

항목이 수백 개 또는 수천 개에 불과하다면 전체 항목 집합에 대해 검색하여 쿼리 벡터와 각 항목의 벡터 간 유사성을 계산할 때까지 시간이 오래 걸리지 않습니다. 또한 유사성 일치를 일괄 작업으로 실행하여 온라인 결과가 필요하지 않다면 성능도 만족할 만한 수준입니다. 하지만 실시간 검색 및 가져오기 시스템이나 추천 시스템을 제공하면서 항목 수가 수천만 개에 이른다면 근사 최근접 이웃을 찾아야 합니다. 이러한 경우에는 응답 지연 시간을 낮출 수 있도록 프로세스를 최적화해야 합니다.

실용적인 솔루션은 근사 유사성 일치를 실행하는 방법입니다. 근사 유사성 일치를 실행할 때는 항목 벡터를 색인으로 구성합니다. 여기에서 색인이란 유사 항목을 빠르게 가져올 수 있는 데이터 구조를 말합니다. 이때 가져온 항목이 해당 쿼리와 가장 유사한 항목이 아닐 수도 있다는 잠재적 문제도 존재합니다. 하지만 일반적으로 색인의 정밀도와 지연 시간(및 크기) 사이에서 상충 관계를 제어할 수 있습니다.

근사 유사성 일치는 크게 트리 기반 접근법과 해시 기반 접근법으로 구분됩니다.

트리 기반 접근법

트리 기반 접근법(또는 측정항목 트리 데이터 구조)은 데이터를 분할 정복 방식에 따라 재귀적으로 분할한다는 아이디어에서 출발하였으며, 그러면 트리에서 유사한 벡터가 서로 가깝게 배치됩니다. 예상되는 쿼리 시간은 O(log(n))이며, 여기서 n은 현재 가지고 있는 항목(벡터) 수입니다. 트리 색인은 대용량의 메모리가 필요하며, 고차원 데이터일수록 성능이 떨어집니다. 측정항목 트리 데이터 구조라고도 불리는 트리 기반 접근법의 예시는 다음과 같습니다.

해시 기반 접근법

트리 기반 접근법을 대신할 수 있는 것이 해시 기반 접근법입니다. 트리와 달리 해시에서는 재귀적 데이터 분할이 없습니다. 항목을 코드로 변환하는 모델을 학습한다는 아이디어에서 시작되었으며, 이때 유사 항목은 동일하거나 유사한 코드(해시 충돌)를 생성합니다. 이러한 접근법에서는 필요한 메모리 용량이 크게 줄어듭니다. 예상되는 쿼리 시간은 O(1)이지만 n에서 저선형이 될 수도 있습니다. 여기서 n은 현재 가지고 있는 항목(벡터) 수입니다. 해시 기반 접근법의 예는 다음과 같습니다.

근사 유사성 일치 기법을 구현하는 몇 가지 오픈소스 라이브러리가 있지만 정밀도, 쿼리 지연 시간, 메모리 효율성, 색인 빌드 시간, 특성, 사용 편의성 등의 상충 관계가 서로 다릅니다.

이번 자료의 솔루션 예에서는 Spotify가 음악을 추천하기 위해 개발한 라이브러리인 Annoy(Approximate Nearest Neighbors Oh Yeah)를 사용합니다. Annoy는 랜덤 프로젝션 트리를 빌드하는 C++ 라이브러리로, Python 결합이 포함되어 있습니다. 색인은 k 트리 포리스트로 빌드되며, 여기서 k는 정밀도와 성능 간의 균형을 맞추는 조정 가능한 매개변수입니다. 이때 대용량의 읽기 전용 파일 기반 데이터 구조도 함께 생성됩니다. 이렇게 생성된 데이터 구조는 다수의 프로세스에서 데이터를 공유할 수 있도록 메모리로 매핑됩니다.

그 밖에 널리 사용되는 라이브러리로 NMSLIB(Non-Metric Space Library)와 Faiss(Facebook AI Similarity Search)도 있습니다. 근사 유사성 일치를 구현할 때 사용하는 라이브러리가 본 자료에서 언급하는 전체 솔루션 아키텍처나 워크플로에 영향을 미쳐서는 안 됩니다.

본 자료의 솔루션 예는 텍스트 시맨틱 검색에서 임베딩 유사성 일치를 적용하는 방법에 대해서 설명하고 있습니다. 솔루션 목적은 입력된 검색어와 의미적으로 연관된 문서(뉴스 기사, 블로그 게시물, 연구 논문 등)를 실시간으로 가져오는 데 있습니다.

토큰 기반 검색 기법은 문서와 일치하는 검색어(개별적으로 또는 전체적으로)에 대한 측정항목(최근 구매일, 빈도 수 등)을 기준으로 문서를 가져옵니다. 반대로 시맨틱 검색은 쿼리 및 문서의 임베딩을 사용해 일치 여부를 확인합니다. 예를 들어 검색어가 'tropical wild animals'라고 가정할 경우 검색 결과에 'in the african jungle, it's every lion, wildebeest and crocodile for itself! bbc wildlife' 같은 제목이 포함될 수 있습니다. 이 내용은 이 문서의 후반부 검색 웹 앱 쿼리 섹션에 나와 있습니다. 검색 결과를 보면 검색어가 포함되어 있지 않지만 결과는 열대 지역 야생 동물에 관한 문서가 맞습니다.

Wikipedia BigQuery 데이터 세트

예시에서 사용하는 데이터 소스는 BigQuery의 bigquery-samples:wikipedia_benchmark.Wiki100B 데이터 세트입니다. 이 공개 데이터 세트에는 Wikipedia 제목을 기준으로 1,000억 개의 항목이 포함되어 있습니다. 예를 들어 데이터는 뷰가 3개 이상이고, 단어가 5개 이상이고, 문자가 500개 미만인 고유 제목으로 제한됩니다. 이를 통해 약 1,050만 개의 고유 제목으로 필터링됩니다.

시스템 기술 요건

시맨틱 검색 시스템 예의 기술 요건은 다음과 같습니다.

  • 시맨틱스 또는 Wikipedia 제목을 인코딩한 벡터 표현, 즉 임베딩을 찾는 노력을 최소화합니다. 따라서 시스템 예는 언어 모델을 처음부터 학습하기 보다는 사전에 학습된 텍스트 임베딩 모델을 사용해야 합니다.
  • 임베딩을 추출하여 색인을 빌드하는 전용 컴퓨팅 인프라의 필요성을 최소화합니다. 따라서 시스템 예는 작업 리소스(메모리와 CPU)가 충분하면서 작업 완료 시 리소스를 반환하는 완전 관리형, 주문형인 컴퓨팅 서비스를 사용해야 합니다.
  • 임베딩 추출 프로세스를 자동으로 확장합니다. 따라서 시스템 예는 병렬식 데이터 처리 서비스를 사용해야 합니다.
  • 임의의 쿼리에 대해 색인에서 유사한 임베딩을 찾는 데 따른 지연 시간을 최소화합니다. 따라서 색인이 메모리에 완전히 로드되어야 합니다.
  • 유사한 임베딩 벡터에 적합한 Wikipedia 제목을 실시간으로 가져오는 데 따른 지연 시간을 최소화합니다. 따라서 시스템 예시는 Wikipedia 제목을 지연 시간이 낮은 읽기 데이터베이스에 저장해야 합니다.
  • 검색 서비스를 웹 앱으로 배포하는 데 따른 DevOps 팀의 노력을 최소화합니다. 따라서 시스템 예시는 완전 관리형 서비스를 사용해야 합니다.
  • 초당 쿼리 수(QPS)가 최대 수천 개에 달할 정도로 점차 늘어나는 웹 앱 워크로드를 1초 미만의 평균 지연 시간으로 처리합니다. 따라서 시스템 예는 검색 웹 앱의 일부 노드를 배포한 후 부하 분산기를 배포할 수 있어야 합니다.

솔루션 아키텍처

그림 1은 실시간 텍스트 시맨틱 검색 시스템을 간략하게 나타낸 것입니다. 시스템이 먼저 Wikipedia 제목에서 임베딩을 추출하고, Annoy를 사용해 근사 유사성 일치 색인을 빌드한 후 실시간 시맨틱 검색 및 가져오기를 위해 빌드 색인을 제공합니다.

솔루션 예시의 아키텍처

그림 1. 텍스트 시맨틱 검색 시스템에 사용되는 고급 솔루션 아키텍처

주요 아키텍처 구성요소

다음 표는 그림 1에서 나타낸 주요 구성요소를 설명한 것입니다.

구성요소 설명
BigQuery BigQuery는 Google의 페타바이트급 규모의 저비용 완전 관리형 분석 데이터 웨어하우스입니다. 솔루션 예에서는 소스인 Wikipedia 제목이 BigQuery에 저장됩니다.
Apache Beam Apache Beam은 스트리밍 및 일괄 데이터 처리 작업을 모두 실행하는 오픈소스 통합 프로그래밍 프레임워크입니다. 솔루션 예시에서는 Apache Beam을 사용해 임베딩을 추출할 파이프라인을 구현한 후 제목을 찾을 때 사용할 ID를 Datastore에 저장합니다.
Dataflow Dataflow는 Google Cloud에서 대규모로 Apache Beam 파이프라인을 실행하기 위한 완전 관리형의 안정적인 서버리스 서비스입니다. Dataflow는 입력 텍스트의 처리와 임베딩의 추출을 조정하는 데 사용됩니다.
tf.Hub TensorFlow Hub는 재사용이 가능한 머신러닝 모듈의 라이브러리입니다. 솔루션 예에서는 사전 학습된 텍스트 임베딩 모듈인 Universal Sentence Encoder를 사용해 각 제목을 임베딩 벡터로 변환합니다.
Cloud Storage Cloud Storage는 뛰어난 가용성과 내구성을 자랑하는 대용량 바이너리 객체 스토리지입니다. 솔루션 예에서는 추출된 임베딩이 Cloud Storage에 TFRecord로 저장됩니다. 또한 근사 유사성 일치 색인이 빌드된 후에는 직렬화를 통해 Cloud Storage에 저장됩니다.
Datastore Datastore는 자동 확장, 고성능, 간편한 애플리케이션 개발을 위해 빌드된 NoSQL 문서 데이터베이스입니다. 솔루션 예시에서는 Datastore를 사용해 Wikipedia 제목과 제목 ID를 저장하기 때문에 지연 시간이 짧은 실시간으로 가져올 수 있습니다.
AI Platform AI Platform은 ML 모델을 대규모로 학습할 수 있는 서버리스 서비스입니다. 솔루션 예에서는 AI Platform에서 Annoy 라이브러리를 사용해 근사 유사성 일치 색인을 빌드하기 때문에 전용 컴퓨팅 인프라가 필요하지 않습니다.
App Engine App Engine은 완전 관리형 플랫폼에서 확장 가능하고 안정적인 애플리케이션을 개발하여 배포할 수 있는 구성요소입니다. 솔루션 예에서는 App Engine을 사용해 Flask 웹 앱을 제공합니다. 이 웹 앱은 사용자 쿼리와 의미적으로 연관된 Wikipedia 제목을 검색합니다. 그 밖에도 App Engine에서는 점차 늘어나는 QPS를 처리할 목적으로 단순 구성만 사용한 부하 분산을 통해 다수의 앱 인스턴스를 배포할 수 있습니다.

전체 워크플로

그림 1의 실시간 텍스트 시맨틱 검색 시스템의 워크플로는 다음과 같은 단계로 구분할 수 있습니다.

  1. Dataflow를 사용한 임베딩 추출

    1. BigQuery에서 Wikipedia 제목을 읽어옵니다.
    2. Universal Sentence Encoder 모듈을 사용해 제목 임베딩을 추출합니다.
    3. 추출된 임베딩을 Cloud Storage에 TFRecord로 저장합니다.
    4. 제목과 제목 식별자를 실시간으로 가져올 수 있도록 Datastore에 저장합니다.
  2. AI Platform을 사용한 색인 빌드

    1. 임베딩을 Cloud Storage 파일에서 Annoy 색인으로 로드합니다.
    2. 색인을 메모리에 빌드합니다.
    3. 색인을 디스크로 저장합니다.
    4. 저장된 색인을 Cloud Storage로 업로드합니다.
  3. App Engine을 사용한 검색 앱 제공

    1. Cloud Storage에서 Annoy 색인을 다운로드합니다.
    2. 사용자 쿼리를 가져옵니다.
    3. Universal Sentence Encoder 모듈을 사용해 쿼리 임베딩을 추출합니다.
    4. Annoy 색인을 사용해 쿼리 임베딩과 유사한 임베딩을 찾습니다.
    5. 유사한 임베딩의 항목 ID를 가져옵니다.
    6. 식별자를 사용해 Wikipedia 제목을 Datastore에서 가져옵니다.
    7. 결과를 반환합니다.

실제 검색 시스템

실제로 검색 및 가져오기 시스템은 시맨틱 기반 검색 기법을 토큰 기반(역색인) 기법과 결합하여 사용하는 경우가 많습니다. 사용자에게 제공하기 전에 두 기법의 검색 결과를 결합한 후 순위를 결정합니다. 이미 이 태스크에 사용되는 Elasticsearch(Google Cloud Marketplace에서 제공)에 익숙할 수 있지만 이 검색 엔진은 Apache Lucene 라이브러리 기반 전체 텍스트 검색에서 역색인화에 널리 사용되는 프레임워크입니다.

그 밖에 실제 시스템에서 종종 구현되는 최적화는 이번 솔루션에서 다루지는 않지만 Memorystore 같은 서비스를 사용해 쿼리와 쿼리의 연관 제목 식별자를 캐싱하는 방법입니다. 이전에 쿼리를 알았다면 제목 식별자를 Memorystore에서 직접 가져올 수 있습니다. 그러면 Universal Sentence Encoder를 호출하여 쿼리 임베딩을 생성하는 작업과 근사 일치 색인을 검색하여 유사 항목 유무를 확인하는 작업이 필요 없게 됩니다. 쿼리 캐시는 쿼리 요청의 중복 수준에 따라 시스템의 평균 지연 시간을 개선할 수 있는 경우가 많습니다. 그림 2는 쿼리 캐시가 포함된 워크플로를 나타낸 것입니다.

캐시를 사용한 솔루션 아키텍처

그림 2. 쿼리 캐시를 포함하여 텍스트 시맨틱 검색에 사용되는 고급 솔루션 아키텍처

그림 2의 흐름은 다음과 같습니다.

  1. 검색어를 수신합니다.
  2. 캐시에서 쿼리를 찾습니다.
  3. 쿼리를 찾지 못할 경우 다음을 수행하세요.
    1. 임베딩을 쿼리에서 추출합니다.
    2. 색인에서 유사 항목을 찾습니다.
    3. 캐시를 업데이트합니다.
  4. Datastore의 ID를 통해 액세스 권한을 가져옵니다.
  5. 결과를 반환합니다.

서비스 및 액세스 권한 사용

그림 1에서 설명하는 엔드 투 엔드 솔루션을 Cloud Console에서 사용하려면 다음 서비스 API가 필요합니다.

또한 서비스 계정에게 다음과 같은 권한을 부여해야 합니다. 기본 서비스 계정은 동일한 Google Cloud 프로젝트에 속할 경우 필요한 리소스에 대해 충분한 액세스 권한을 갖습니다. 하지만 서비스 계정 권한이 바뀌면 변경이 필요할 수도 있습니다. 필요한 권한은 다음과 같습니다.

  • Dataflow
    • BigQuery 데이터 세트에 대한 읽기 권한
    • TFRecord가 저장되는 Cloud Storage 버킷에 대한 읽기/쓰기 권한
    • Datastore에 대한 쓰기 권한
  • AI Platform
    • 색인이 저장되는 Cloud Storage 버킷에 대한 읽기/쓰기 권한
  • App Engine
    • 색인이 저장되는 Cloud Storage 버킷에 대한 읽기 권한
    • Datastore에 대한 읽기 권한

다음 섹션의 코드 스니펫은 본 자료에서 언급하는 개념을 나타낸 것입니다. 엔드 투 엔드 예시를 실행하는 방법에 대한 자세한 내용은 관련 GitHub 저장소에서 README.md 파일을 참조하세요.

Dataflow를 사용한 임베딩 추출

Wikipedia 제목에서 임베딩을 추출하기 위한 파이프라인은 Apache Beam을 사용해 pipeline.py에서 구현됩니다. 전체 파이프라인은 다음 코드 스니펫과 같습니다.

def run(pipeline_options, known_args):

 pipeline = beam.Pipeline(options=pipeline_options)
 gcp_project = pipeline_options.get_all_options()['project']

 with impl.Context(known_args.transform_temp_dir):
   articles = (
       pipeline
       | 'Read from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(
     project=gcp_project, query=get_source_query(known_args.limit),
     use_standard_sql=True)))

   articles_dataset = (articles, get_metadata())
   embeddings_dataset, _ = (
       articles_dataset
       | 'Extract embeddings' >> impl.AnalyzeAndTransformDataset(
preprocess_fn))

   embeddings, transformed_metadata = embeddings_dataset

   embeddings | 'Write embeddings to TFRecords' >> beam.io.tfrecordio.WriteToTFRecord(
     file_path_prefix='{0}'.format(known_args.output_dir),
     file_name_suffix='.tfrecords',
     coder=tft_coders.example_proto_coder.ExampleProtoCoder(
transformed_metadata.schema))

   (articles
       | "Convert to entity" >> beam.Map(
lambda input_features: create_entity(input_features, known_args.kind))
       | "Write to Datastore" >> WriteToDatastore(project=gcp_project))

...

 job = pipeline.run()

 if pipeline_options.get_all_options()['runner'] == 'DirectRunner':
   job.wait_until_finish()

BigQuery에서 읽어오기

첫 번째 파이프라인 단계에서는 Wikipedia BigQuery 데이터 세트에서 beam.io.Read 메서드와 beam.io.BigQuerySource 객체를 사용해 제목을 읽어옵니다. pipeline.py에서 get_source_query 메서드는 데이터를 가져올 때 사용할 SQL 스크립트를 준비합니다. BigQuery에서 가져올 Wikipedia 제목 수는 get_source_query 함수의 limit 매개변수를 통해 구성할 수 있습니다.

def get_source_query(limit=1000000):
 query = """
   SELECT
     GENERATE_UUID() as id,
     text
   FROM
   (
       SELECT
         DISTINCT LOWER(title) text
       FROM
         `bigquery-samples.wikipedia_benchmark.Wiki100B`
       WHERE
         ARRAY_LENGTH(split(title,' ')) >= 5
       AND
         language = 'en'
       AND
         LENGTH(title) < 500
    )
   LIMIT {0}
 """.format(limit)
 return query

식별자는 기본 제공되는 BigQuery GENERATE_UUID 함수를 사용해 제목(여기에서 id)에 추가됩니다. 이 값은 Datastore에서 ID를 사용해 Wikipedia 제목을 찾아서 임베딩에 매핑하는 데 사용됩니다.

이번 Beam 파이프라인 단계에서는 PCollection 객체를 반환합니다. 이때 컬렉션의 각 항목에 두 가지 요소인 id(문자열)와 title(문자열)이 포함됩니다.

임베딩 추출

두 번째 파이프라인 단계에서는 tf.Hub의 Universal Sentence Encoder 모듈을 사용해 BigQuery에서 읽어온 각 Wikipedia 제목마다 임베딩 벡터를 추출합니다. 예시에서는 모듈을 실행하기 위해 TensorFlow Transform(tf.Transform) API를 사용합니다.

TensorFlow Transform은 Apache Beam을 사용해 데이터를 전처리할 때 필요한 라이브러리입니다. 예시에서는 tf.TransformAnalyzeAndTransformDataset 메서드를 tf.Hub 모듈을 호출하기 위한 컨텍스트로 사용하여 텍스트 임베딩을 추출합니다.

AnalyzeAndTransformDataset 메서드는 preprocess_fn 함수를 실행합니다. 이 함수에는 다음 스니펫과 같은 변환 로직이 포함되어 있습니다.

def preprocess_fn(input_features):
 import tensorflow_transform as tft
 embedding = tft.apply_function(embed_text, input_features['text'])
 output_features = {
   'id': input_features['id'],
   'embedding': embedding
 }
 return output_features

def embed_text(text):
 import tensorflow_hub as hub
 global encoder
 if encoder is None:
   encoder = hub.Module(
'https://tfhub.dev/google/universal-sentence-encoder/2')
 embedding = encoder(text)
 return embedding

이번 파이프라인 단계에서는 다른 PCollection 객체를 생성합니다. 이때 컬렉션의 각 항목마다 Wikipedia 제목의 id 값(문자열)과 Universal Sentence Encoder에서 추출된 embedding 값(숫자 배열), 즉 512개의 차원이 포함됩니다.

임베딩을 TFRecord에 쓰기

Wikipedia 제목 임베딩이 추출되면 솔루션이 beam.io.tfrecordio.WriteToTFRecord 메서드를 사용해 임베딩을 제목 ID와 함께 TFRecord로 Cloud Storage에 저장합니다.

TFRecord 형식은 바이너리 레코드 시퀀스를 저장하는 데 사용되는 단순 형식입니다. TFRecord 파일의 각 레코드는 tf.Example 프로토콜 버퍼입니다. 이 프로토콜 버퍼는 키-값 매핑을 유연하게 표현하는 메시지 유형으로 구성됩니다. 이러한 유형은 구조화된 데이터를 직렬화하는 데 효율적입니다.
생성할 임베딩 파일 수는 WriteToTFRecord 메서드에서 num_shards 매개변수를 설정하여 지정할 수 있습니다.

Datastore에 쓰기

다음 단계에서는 Datastore에 씁니다. 이번 단계는 임베딩 추출 단계와 동시에 실행됩니다. 동시에 실행하는 목적은 ID를 사용해 Wikipedia 제목을 가져올 수 있도록 Datastore에 저장하는 데 있습니다. Wikipedia 제목 ID 역시 임베딩과 함께 TFRecord 파일에 저장되기 때문에 Annoy 색인에 추가되는 항목(임베딩 벡터)의 식별자로 사용할 수 있습니다.

BigQuery에서 읽어오기 단계에서 생성된 항목을 Datastore에 저장하려면 먼저 pipeline.py에서 다음 코드 스니펫을 사용해 각 항목을 Datastore 항목으로 변환해야 합니다.

def create_entity(input_features, kind):
 entity = entity_pb2.Entity()
 datastore_helper.add_key_path(
   entity.key, kind, input_features['id'])
 datastore_helper.add_properties(
   entity, {
     "text": unicode(input_features['text'])
   })
 return entity

이 코드가 실행되면 WriteToDatastore 메서드가 항목을 Datastore에 저장합니다. 그림 3은 Datastore kind 매개변수를 wikipedia로 설정하여 파이프라인을 실행한 후 일부 항목이 Datastore에 저장된 것을 나타냅니다.

이미지

그림 3. 파이프라인 실행 이후 Datastore 항목

Dataflow에서 파이프라인 실행

Apache Beam 파이프라인을 실행하려면 먼저 run.py 스크립트를 실행하여 필요한 인수를 전달한 후 --runner 인수를 DataflowRunner로 설정해야 합니다. 이를 위해 먼저 run.sh 스크립트 파일에서 구성 매개변수를 설정한 다음 run.py 스크립트를 실행합니다.

다음 명령어는 파이프라인 실행 방법을 나타냅니다. 스크립트에는 run.sh 스크립트를 실행할 때 설정하는 여러 가지 변수(예: $OUTPUT_PREFIX)가 포함되어 있습니다.

python run.py \
 --output_dir=$OUTPUT_PREFIX \
 --transform_temp_dir=$TRANSFORM_TEMP_DIR \
 --transform_export_dir=$TRANSFORM_EXPORT_DIR \
 --project=$PROJECT \
 --runner=$RUNNER \
 --region=$REGION \
 --kind=$KIND \
 --limit=$LIMIT \
 --staging_location=$STAGING_LOCATION \
 --temp_location=$TEMP_LOCATION \
 --setup_file=$(pwd)/setup.py \
 --job_name=$JOB_NAME \
 --worker_machine_type=$MACHINE_TYPE \
 --enable_debug \
 --debug_output_prefix=$DEBUG_OUTPUT_PREFIX

Dataflow 파이프라인의 흐름은 Cloud Console에서 볼 수 있으며, 그림 4와 같은 모습입니다.

Cloud Console에 표시된 Cloud Dataflow 파이프라인

그림 4. Cloud Console에 표시된 Cloud Dataflow 파이프라인의 실행 그래프

AI Platform을 사용한 색인 빌드

솔루션 예에서 임베딩 벡터가 Wikipedia 제목에서 추출되었으면 다음 단계에서 Annoy 라이브러리를 사용해 해당 벡터에 대한 근사 유사성 일치 색인을 빌드합니다. 솔루션 예시의 index_builder 폴더에는 이번 태스크에 사용할 코드가 포함되어 있습니다.

첫째, 색인을 빌드하고 저장하는 태스크를 구현합니다. 둘째, 태스크를 제출하여 AI Platform에서 실행합니다. 이러한 접근법을 사용하면 전용 컴퓨터 인프라 없이도 색인을 만들 수 있습니다.

색인 빌더 태스크 구현

task.py 파일은 색인 빌더의 진입점으로 다음과 같은 단계를 실행합니다.

  • Annoy 색인을 빌드합니다.
  • (선택사항) 색인을 압축합니다.
  • 생성된 아티팩트를 Cloud Storage에 업로드합니다.

Annoy 색인을 빌드하는 로직은 index.py 모듈에서 다음 코드 스니펫과 같습니다.

def build_index(embedding_files_pattern, index_filename,
                num_trees=100):

 annoy_index = AnnoyIndex(VECTOR_LENGTH, metric=METRIC)
 mapping = {}

 embed_files = tf.gfile.Glob(embedding_files_pattern)
 logging.info('{} embedding files are found.'.format(len(embed_files)))

 item_counter = 0
 for f, embed_file in enumerate(embed_files):
   logging.info('Loading embeddings in file {} of {}...'.format(f, len(embed_files)))
   record_iterator = tf.python_io.tf_record_iterator(path=embed_file)

   for string_record in record_iterator:
     example = tf.train.Example()
     example.ParseFromString(string_record)
     string_identifier = example.features.feature['id'].bytes_list.value[0]
     mapping[item_counter] = string_identifier
     embedding = np.array(example.features.feature['embedding'].float_list.value)
     annoy_index.add_item(item_counter, embedding)
     item_counter += 1

   logging.info('Loaded {} items to the index'.format(item_counter))

 logging.info('Start building the index with {} trees...'.format(num_trees))
 annoy_index.build(n_trees=num_trees)
 logging.info('Index is successfully built.')
 logging.info('Saving index to disk...')
 annoy_index.save(index_filename)
 logging.info('Index is saved to disk.')
 logging.info('Saving mapping to disk...')
 with open(index_filename + '.mapping', 'wb') as handle:
   pickle.dump(mapping, handle, protocol=pickle.HIGHEST_PROTOCOL)
 logging.info('Mapping is saved to disk.')

단계는 다음과 같습니다.

  1. 지정된 패턴과 일치하는 임베딩 파일 이름을 모두 가져옵니다.
  2. 각 임베딩 파일마다 다음과 같이 실행합니다.
    1. TFRecord 파일에서 tf.Example 인스턴스를 반복합니다.
    2. string_identifier(ID)를 읽어와 mapping 사전에 값으로 추가합니다. 이때 키는 현재 item_counter 값입니다.
    3. embedding 벡터를 읽어와 annoy_index에 추가합니다. 여기서 item_id 값은 현재 item_counter 값으로 설정됩니다.
  3. 지정된 num_trees 값을 사용해 annoy_index.build 메서드를 호출합니다.
  4. annoy_index.save 메서드를 호출하여 색인을 저장합니다.
  5. pickle.dump 메서드를 사용하여 mapping 사전을 직렬화합니다.

mapping 사전이 필요한 이유는 Datastore에 저장되는 Wikipedia 제목 식별자가 문자열이기 때문입니다(BigQuery에서 데이터를 읽어올 때 GENERATE_UUID 메서드를 사용해 생성됨). 하지만 Annoy 색인의 항목(임베딩 벡터) 식별자는 정수만 될 수 있습니다. 이러한 이유로 코드에서 서로게이트 정수 색인과 Wikipedia 항목의 문자열 식별자를 서로 매핑할 수 있는 사전이 만들어집니다.

AnnoyIndex 생성자에서 전달되는 METRIC 값은 각도 즉, 코사인 유사성의 변화를 나타냅니다. VECTOR_LENGTH 값은 512로 설정되며 Universal Sentence Encoder 모듈에서 생성되는 텍스트 임베딩의 길이를 나타냅니다.

저장된 색인의 크기는 임베딩 벡터 수와 num_trees 매개변수 값에 따라 몇 기가바이트가 될 수 있습니다. 따라서 색인을 Cloud Storage로 업로드하려면 솔루션이 청킹을 지원하는 API를 사용해야 합니다. 솔루션 예시에서는 다음 task.py 코드 스니펫과 같이 google.cloud.storage가 아닌 googleapiclient.http.MediaFileUpload 메서드를 사용합니다.

media = MediaFileUpload(
 local_file_name, mimetype='application/octet-stream', chunksize=CHUNKSIZE,
 resumable=True)
request = gcs_services.objects().insert(
 bucket=bucket_name, name=gcs_location, media_body=media)
response = None
while response is None:
 progress, response = request.next_chunk()

색인 빌더 태스크를 AI Platform에 제출

솔루션 예에서 색인 빌더 태스크를 AI Platform 작업으로 실행하려면 다음 파일이 필요합니다.

  • submit.sh. 이 파일은 프로젝트 변수와 버킷 이름, 그리고 색인 출력 리전을 설정하여 업데이트해야 합니다.
  • config.yaml. 이 파일에서는 scale_tier 매개변수를 사용해 작업을 실행할 때 사용할 머신 크기를 지정합니다.
  • setup.py. 이 파일에서는 작업에 필요한 패키지를 지정합니다. 솔루션 예시에는 Annoygoogle-api-python-client가 필요합니다.

위의 파일이 모두 업데이트되면 submit.sh 스크립트를 실행하여 빌더 태스크를 AI Platform 작업으로 제출할 수 있습니다. 스크립트에는 다음 명령어가 포함되어 있습니다.

gcloud ml-engine jobs submit training ${JOB_NAME} \
    --job-dir=${JOB_DIR} \
    --runtime-version=1.12 \
    --region=${REGION} \
    --scale-tier=${TIER} \
    --module-name=builder.task \
    --package-path=${PACKAGE_PATH}  \
    --config=config.yaml \
    -- \
    --embedding-files=${EMBED_FILES} \
    --index-file=${INDEX_FILE} \
    --num-trees=${NUM_TREES}

작업은 색인 크기에 따라 몇 시간이 걸릴 수도 있습니다. 시간은 벡터 수 및 차원과 색인을 빌드할 때 사용할 트리 수에 따라 달라집니다.

색인을 빌드하는 AI Platform 작업이 완료되면 지정된 Cloud Storage 버킷에서 다음 아티팩트를 사용할 수 있습니다.

  • gs://your_bucket/wikipedia/index/embeds.index
  • gs://your_bucket/wikipedia/index/embeds.index.mapping

시맨틱 검색 서비스 구현

이번 섹션에서는 앞에서 빌드된 Annoy 색인을 사용해 연관된 Wikipedia 제목을 Datastore에서 가져오는 시맨틱 검색 서비스 유틸리티의 구현에 대해서 알아보겠습니다. 시맨틱 검색 서비스에서 사용하는 유틸리티는 다음과 같습니다.

  • 쿼리 임베딩 유틸리티
  • 임베딩 일치 유틸리티
  • Datastore 조회 유틸리티
  • 검색 서비스 래퍼

쿼리 임베딩 유틸리티

사용자가 검색어를 입력했을 때 솔루션이 색인에서 유사한 임베딩을 찾으려면 먼저 쿼리 임베딩을 추출해야 합니다. 다음은 이러한 태스크를 실행하는 embedding.py 코드 스니펫입니다.

class EmbedUtil:

 def __init__(self):
   logging.info("Initialising embedding utility...")
   embed_module = hub.Module(
"https://tfhub.dev/google/universal-sentence-encoder/2")
   placeholder = tf.placeholder(dtype=tf.string)
   embed = embed_module(placeholder)
   session = tf.Session()
   session.run([tf.global_variables_initializer(), tf.tables_initializer()])
   logging.info('tf.Hub module is loaded.')

   def _embeddings_fn(sentences):
     computed_embeddings = session.run(
       embed, feed_dict={placeholder: sentences})
     return computed_embeddings

   self.embedding_fn = _embeddings_fn
   logging.info("Embedding utility initialised.")

 def extract_embeddings(self, query):
   return self.embedding_fn([query])[0]

위 코드의 기능은 다음과 같습니다.

  1. tf.Hub에서 Universal Sentence Encoder를 로드합니다.
  2. extract_embeddings 메서드를 제공하여 사용자 쿼리 텍스트를 허용합니다.
  3. 쿼리에 대한 문장 인코딩(임베딩)을 반환합니다.

위 코드를 실행하면 EmbedUtil 메서드가 extract_embeddings 메서드를 호출할 때마다 클래스 생성자에서 tf.Hub 모듈을 로드할 필요 없이 한 번만 로드하면 됩니다. 이는 Universal Sentence Encoder 모듈을 로드하는 데 몇 초 정도 걸릴 수 있기 때문입니다.

임베딩 일치 유틸리티

matching.py에서 구현되는 MatchingUtil 클래스는 로컬 디스크 파일에서 Annoy 색인을 로드할 뿐만 아니라 매핑 사전을 로드하는 역할도 합니다. 다음 코드 스니펫은 MatchingUtil 메서드의 구현을 보여줍니다.

class MatchingUtil:

 def __init__(self, index_file):
   logging.info("Initialising matching utility...")
   self.index = AnnoyIndex(VECTOR_LENGTH)
   self.index.load(index_file, prefault=True)
   logging.info("Annoy index {} is loaded".format(index_file))
   with open(index_file + '.mapping', 'rb') as handle:
     self.mapping = pickle.load(handle)
   logging.info("Mapping file {} is loaded".format(index_file + '.mapping'))
   logging.info("Matching utility initialised.")

 def find_similar_items(self, vector, num_matches):
   item_ids = self.index.get_nns_by_vector(
     vector, num_matches, search_k=-1, include_distances=False)
   identifiers = [self.mapping[item_id]
                 for item_id in item_ids]
   return identifiers

색인이 클래스 생성자에 로드됩니다. 코드에서 전체 색인 파일을 메모리에 로드할 수 있도록 index.load 메서드의 prefault 매개변수가 True로 설정됩니다.

또한 클래스가 다음과 같은 기능의 find_similar_items 메서드를 호출합니다.

  1. 벡터(사용자 쿼리의 임베딩 벡터)를 수신합니다.
  2. 수신된 벡터와 가장 유사한 임베딩의 item_ids(정수 ID)를 Annoy index에서 찾습니다.
  3. mapping 사전에서 identifiers(GUID 문자열 ID)를 가져옵니다.
  4. Datastore에서 Wikipedia 제목을 가져올 때 사용할 identifiers 객체를 반환합니다.

Datastore 조회 유틸리티

다음 스니펫은 lookup.py에서 DatastoreUtil 클래스를 나타낸 것입니다. 이 클래스는 Datastore에서 Wikipedia 제목을 가져오는 로직을 구현합니다. 생성자는 제목이 속한 항목을 설명하는 Datastore kind 값을 갖습니다.

class DatastoreUtil:

 def __init__(self, kind):
   logging.info("Initialising datastore lookup utility...")
   self.kind = kind
   self.client = datastore.Client()
   logging.info("Datastore lookup utility initialised.")

 def get_items(self, keys):
   keys = [self.client.key(self.kind, key)
           for key in keys]
   items = self.client.get_multi(keys)
   return items

get_items 메서드는 식별자 목록인 keys 매개변수를 허용한 후 이 키와 연결된 Datastore items 객체를 반환합니다.

검색 서비스 래퍼

다음 스니펫은 search.py에서 SearchUtil 클래스를 나타낸 것입니다. 이 클래스는 앞에서 설명한 유틸리티 모듈에 대한 래퍼 역할을 합니다.

class SearchUtil:

 def __init__(self):
   logging.info("Initialising search utility...")
   dir_path = os.path.dirname(os.path.realpath(__file__))
   service_account_json = os.path.join(dir_path, SERVICE_ACCOUNT_JSON)
   index_file = os.path.join(dir_path, INDEX_FILE)
   download_artifacts(index_file, GCS_BUCKET, GCS_INDEX_LOCATION)
   self.match_util = matching.MatchingUtil(index_file)
   self.embed_util = embedding.EmbedUtil()
   self.datastore_util = lookup.DatastoreUtil(KIND, service_account_json)
   logging.info("Search utility is up and running.")

 def search(self, query, num_matches=10):
   query_embedding = self.embed_util.extract_embeddings(query)
   item_ids = self.match_util.find_similar_items(query_embedding, num_matches)
   items = self.datastore_util.get_items(item_ids)
   return items

SearchUtil 생성자에서 Annoy 색인 파일과 직렬화된 매핑 사전이 download_artifacts 메서드를 사용해 Cloud Storage에서 로컬 디스크로 다운로드됩니다. 그러면 match_util, embed_util, datastore_util 객체가 초기화됩니다.

search 메서드는 사용자 검색 query 매개변수를 비롯해 가져올 일치 항목 수를 지정하는 num_matches 매개변수를 허용합니다. search 메서드로 호출되는 메서드는 다음과 같습니다.

  • embed_util.extract_embeddings 메서드는 Universal Sentence Encoder 모듈을 사용해 쿼리의 임베딩 벡터를 가져옵니다.
  • match_util.find_similar_items 메서드는 Annoy 색인에서 쿼리 임베딩과 유사한 임베딩의 항목 ID를 찾습니다.
  • datastore_util.get_items 메서드는 발견한 item_ids에 따라 Datastore에서 Wikipedia 제목이 포함된 항목을 가져옵니다.

일반적으로 항목을 가져온 이후의 단계에서는 항목을 반환하기 전에 유사도를 기준으로 색인에서 생성된 항목의 순위를 결정합니다.

App Engine을 사용한 검색 서비스 제공

이번 섹션에서는 시맨틱 검색 서비스를 웹 앱으로 제공하여 App Engine에 배포하는 방법에 대해서 알아보겠습니다.

Flask 웹 앱 구현

다음 main.py 코드 스니펫은 Wikipedia 제목에 대한 시맨틱 검색 서비스를 제공하기 위해 Flask 웹 앱을 구현하는 것입니다.

...
search_util = utils.search.SearchUtil()
app = Flask(__name__)

@app.route('/search', methods=['GET'])
def search():
   try:
       query = request.args.get('query')
       show = request.args.get('show')
       is_valid, error = validate_request(query, show)

       if is_valid:
           results = search_util.search(query, show)
       else:
           results = error

   except Exception as error:
       results = "Unexpected error: {}".format(error)

   response = jsonify(results)
   return response

if __name__ == '__main__':
 app.run(host='127.0.0.1', port=8080)

search_util 객체는 모듈 수준에서 한 번만 초기화됩니다. RESTful /search 엔드포인트가 HTTP GET 요청을 search 메서드로 리디렉션합니다. 그러면 메서드가 사용자 검색 query(문자열)과 결과 수를 나타내는 show(정수)를 가져오고 search_util.search 메서드를 호출한 다음 가져온 일치 항목을 반환합니다.

웹 앱을 App Engine에 배포

Flask 웹 앱은 gunicorn을 HTTP 웹 서버 게이트웨이 인터페이스(WSGI)로 사용해 App Engine 가변형 환경에 배포됩니다. App Engine에 배포하려면 다음 파일에서 구성 설정이 필요합니다.

  • app.yaml. 이 파일은 Python 런타임 구성 설정을 비롯해 일반적인 앱, 네트워크, 리소스 설정을 정의합니다. 이 파일에서 변경해야 할 설정은 다음과 같습니다.

    • readiness_check 섹션에서 색인을 다운로드하여 유틸리티 객체에 로드할 수 있는 시간이 충분하도록 app_start_timeout_sec을 설정합니다.
    • resources 섹션에서 색인을 메모리에 완전히 로드할 수 있도록 색인 크기보다 큰 값으로 memory를 설정합니다.
    • 색인을 다운로드 및 로드한 후 유틸리티 객체를 로드할 수 있는 시간이 충분하도록 gunicorn --timeout을 설정합니다.
    • 동시 실행 능력을 높일 수 있도록 app.yaml 파일의 resources 섹션에서 요청한 CPU 코어 수의 2~4배로 gunicorn --threading을 설정합니다.
  • requirement.txt. 앱을 시작하기 이전 런타임 과정에서 requirements.txt 파일을 앱의 소스 디렉터리에서 찾은 후 pip을 사용해 종속 항목을 모두 설치합니다.

deploy.sh 스크립트를 실행해 앱을 App Engine에 배포할 수 있습니다. 이 스크립트에 포함된 명령어는 다음과 같습니다.

gcloud --verbosity=info -q app deploy app.yaml --project=${PROJECT}

검색 웹 앱에 대한 쿼리

웹 앱이 App Engine에 배포되면 다음 URL을 호출하여 검색을 불러올 수 있습니다.

https://service_name-dot-project_name.appspot.com/search?query=query

service_name 값은 app.yaml 파일에 입력되는 이름과 동일합니다. query로 전달되는 쿼리에 공백이 포함되어 있으면 공백을 %20으로 변환해야 합니다. show=num_results를 쿼리 문자열에 추가하면 가져올 일치 항목 수를 지정할 수 있습니다. 기본값은 10입니다.

다음 예는 샘플 데이터세트를 기준으로 검색어 예와 일치하는 Wikipedia 제목을 나타낸 것입니다.

쿼리 결과 샘플
Tropical wild animals "in the african jungle, it's every lion, wildebeest and crocodile for itself! bbc wildlife"
Global technology concerns "worldwide risk of artificial intelligence"
Fresh summer drinks "great ideas for non-alcoholic mojito"
Winter sports "cross-country skiing at the fis nordic world ski championships 2007"

용량 및 부하 테스트

솔루션 예시를 개발한 후 샘플을 실행하여 성능 정보를 얻었습니다. 다음 표에서는 bigquery-samples.wikipedia_benchmark.Wiki100B 데이터 세트를 사용하여 엔드 투 엔드 예시를 실행할 때 사용한 설정을 보여줍니다.

임베딩 추출

다음 표에서는 임베딩을 추출할 때 사용한 Cloud Dataflow 작업 구성과 최종 실행 시간을 보여줍니다.

구성
  • 레코드 제한: 500만 개
  • 임베딩 벡터 크기: 512개
  • vCPU 수: 64개(32 작업자)
  • 작업자 머신 유형: n1-highmem-2
결과
  • 작업 시간: 32분

색인 빌드

다음 표는 AI Platform 작업을 사용해 색인을 빌드하는 태스크의 구성 및 결과 정보를 나타낸 것입니다.

구성
  • Annoy 색인의 트리 수: 100개
  • AI Platform 확장 등급: large_model (n1-highmem-8)
결과
  • 작업 시간: 2시간 56분
  • 인덱스 파일 크기: 19.28GB
  • 매핑 파일 크기: 263.21MB

검색 앱 제공

다음 표는 App Engine을 사용해 검색 앱을 제공하기 위한 구성 및 결과 정보를 나타낸 것입니다. 부하 테스트에서는 ab - Apache HTTP 서버 벤치마킹 도구를 180초 동안 사용하였습니다.

구성
  • vCPU 수: 6개
  • 메모리: 24GB
  • 디스크: 50GB
  • 확장: 인스턴스 10개(수동)
결과
  • 배포 시간(정상 가동): 최대 19분
  • 컨테이너 이미지 빌드 및 업로드: 최대 6분
  • 앱 배포: 최대 13분
  • 동시 실행 수준: 1500개
  • 초당 요청 수: 최대 2500건
  • 지연 시간(95번째 백분위수): 최대 903밀리초
  • 지연 시간(50번째 백분위수): 최대 514밀리초

추가 개선

다음은 현재 시스템을 개선할 수 있는 사항입니다.

  • GPU 사용. Universal Sentence Encoder 모드는 가속기에서 실행할 경우 유용합니다. Annoy 라이브러리는 GPU를 지원하지 않지만 Faiss 같이 GPU를 지원하는 라이브러리는 근사 유사성 일치 색인에 대한 검색 시간을 개선할 수 있습니다. 하지만 App Engine은 GPU 사용을 지원하지 않기 때문에 GPU를 사용하려면 App Engine이 아닌 Compute Engine 또는 Google Kubernetes Engine(GKE)을 사용해야 합니다.

  • 디스크에서 색인 읽어오기. 대용량 메모리 노드(솔루션 예에서는 26GB RAM)를 사용해 색인을 제공하는 대신 비용을 최적화하는 방법으로 더욱 작은 용량의 메모리 노드(4GB RAM 등)를 사용해 색인을 디스크에서 읽어올 수 있습니다. 색인을 디스크에서 읽어오려면 SSD를 지정해야 합니다. 그렇지 않으면 성능이 부족할 수 있습니다. 색인을 디스크에 저장할 경우 노드 수를 높여서 시스템 처리량을 늘릴 수 있습니다. 또한 시스템 비용도 절감됩니다. 단, 색인을 디스크에 저장하고 싶다면 Compute Engine 또는 GKE를 사용해야 합니다. App Engine은 영구 디스크로 SSD를 지원하지 않기 때문입니다.

  • 라이브 시스템의 색인 업데이트. 새로운 데이터가 수신되면(솔루션 예에서는 새로운 Wikipedia 자료) 색인을 업데이트해야 합니다. 이러한 업데이트는 일반적으로 매일 또는 매주 일괄 프로세스로 실행됩니다. 업데이트를 마치면 다운타임 없이 새로운 인덱스를 사용할 수 있도록 검색 앱도 업데이트해야 합니다.

다음 단계