Cloud Dataflow 파이프라인의 예측 머신러닝 모델 비교

이 솔루션은 Dataflow 파이프라인에서 머신러닝 모델을 호출하는 다양한 설계 접근 방식을 설명 및 비교하고 특정 접근 방식을 선택하는 경우의 절충안을 검토합니다. 일괄 처리 및 스트림 처리 파이프라인에서 서로 다른 접근 방식을 살펴보고 이러한 절충 관계를 파악하기 위해 실행한 일련의 실험에서 얻은 결과를 제시합니다. 이 솔루션은 머신러닝 모델을 빌드하려는 데이터과학자가 아닌 학습된 모델을 데이터 처리 파이프라인에 통합하려는 작업자를 위해 설계되었습니다.

소개

이 ML 모델을 Dataflow 파이프라인에 통합하는 작업에는 다양한 접근 방식이 있으며 그중 시스템 요구사항에 가장 적합한 접근 방식을 선택해야 합니다. 다음과 같은 몇 가지 고려사항에 주의하세요.

  • 처리량
  • 지연 시간
  • 비용
  • 구현
  • 유지보수

물론 이러한 고려사항을 모두 감안하기 어려운 경우도 있겠지만, 우선순위에 따라 의사 결정 프로세스를 처리하는 데는 도움이 되는 솔루션입니다. 이 솔루션은 일괄 및 스트림 데이터 파이프라인에서 TensorFlow를 통해 학습하는 머신러닝(ML) 모델을 사용하여 예측을 수행하는 세 가지 접근 방식을 비교합니다.

  • 배포된 모델을 파이프라인 스트리밍용 REST/HTTP API로 사용
  • 일괄 파이프라인에 AI Platform(AI Platform) 일괄 예측 작업 사용
  • 일괄 파이프라인과 스트리밍 파이프라인 모두에 Dataflow 직접 모델 예측 사용

모든 실험은 다양한 입력을 기반으로 아기의 체중을 예측하는 기존의 학습된 모델인 Natality 데이터세트를 사용합니다. 이 솔루션의 목표는 모델 빌드가 아니기 때문에 모델의 빌드 또는 학습 방법은 다루지 않습니다. Natality 데이터세트에 대한 자세한 내용은 다음 단계 섹션을 참조하세요.

플랫폼

데이터 파이프라인을 실행하고 학습된 ML 모델을 호출하는 방법은 다양합니다. 하지만 기능 요구사항은 항상 동일합니다.

  1. 제한된(일괄) 소스 또는 제한되지 않은(스트리밍) 소스에서 데이터를 수집합니다. 데이터를 수집하는 소스에는 센서 데이터, 웹사이트 상호작용, 금융 거래 등이 있습니다.
  2. 예측에 ML 모델을 호출하여 입력 데이터를 변환 및 보강합니다. JSON 파일을 파싱하여 유지관리 날짜 예측, 제품 추천 또는 사기 행위 감지를 수행하기 위해 관련 필드를 추출하는 작업을 예로 들 수 있습니다.
  3. 분석 또는 백업을 위해 변환된 데이터와 예측을 저장하거나 큐 시스템으로 전달하여 새 이벤트 또는 추가 파이프라인을 트리거합니다. 잠재적인 사기 행위를 실시간으로 감지하거나 대시보드에서 액세스할 수 있는 저장소에 유지관리 일정 정보를 저장하는 작업을 예로 들 수 있습니다.

일괄 ETL 프로세스의 예측을 통해 데이터를 변환 및 보강할 경우 처리량을 극대화하여 전체 데이터 배치에 필요한 전반적인 시간을 줄이는 것을 목표로 해야 합니다. 반면에 스트리밍 데이터를 처리하여 온라인 예측을 수행할 경우 지연 시간을 최소화하여 각 예측을 실시간에 가깝게 수신하는 것을 목표로 해야 합니다. 따라서 모델을 호출하면 병목 현상이 발생할 수 있습니다.

핵심 구성요소

이 솔루션의 일괄 및 스트리밍 실험에서는 세 가지 주요 기술을 사용합니다.

  • Dataflow에서 실행되어 데이터를 처리하는 Apache Beam
  • ML 모델을 구현하고 학습하는 TensorFlow
  • 일부 실험에서 AI Platform을 ML 모델의 호스팅 플랫폼으로 사용하여 일괄 및 온라인 예측을 수행

이 솔루션에서는 Dataflow에서 실행되는 Apache Beam을 사용하여 데이터 파이프라인을 실행합니다. 그 이유는 다음과 같습니다.

  • Apache Beam은 스트리밍 데이터 처리 작업과 일괄 데이터 처리 작업을 모두 실행하는 오픈소스 통합 프로그래밍 모델입니다.
  • Dataflow는 서버 없이 Apache Beam 작업을 실행할 수 있는 Google Cloud 제품입니다.

텐서플로우는 머신러닝 프레임워크로 사용되는 Google의 오픈소스 수학 라이브러리입니다. TensorFlow를 사용하면 단일 머신 또는 분산 환경에 모델을 빌드, 학습, 제공할 수 있습니다. 모델을 다양한 기기에 이식할 수 있으며 사용 가능한 CPU, GPU, TPU 리소스를 학습 및 제공에 활용할 수 있습니다.

AI Platform은 DevOps에 필요한 관리를 최소화하여 대규모로 TensorFlow 모델을 학습, 조정(초매개변수 조정 기능 사용), 제공할 수 있는 서버리스 플랫폼입니다. AI Platform은 온라인 예측용으로 학습된 모델을 REST API로 배포하고 일괄 예측 작업을 제출할 수 있도록 지원합니다. AI Platform은 모델을 마이크로서비스로 제공할 수 있는 몇 가지 옵션 중 하나입니다.

이 솔루션에서 설명하는 접근 방식은 데이터 처리 파이프라인에 Dataflow를 사용하고 모델을 HTTP 엔드포인트로 호스팅하는 데 AI Platform을 사용합니다. 하지만 이 접근 방식은 다른 기술로 바꿀 수 있습니다. HTTP와 직접 TensorFlow 모델 간의 성능 비교는 크게 바뀌지 않습니다.

일괄 및 스트리밍 데이터 처리

이 솔루션의 실험은 일괄 사용 사례와 스트림 사용 사례를 모두 포함합니다. 제한된 소스와 제한되지 않은 소스는 운영 요구사항이 서로 다르므로 각 실험은 입력과 출력에 서로 다른 Google Cloud 제품을 활용합니다.

제한된 데이터세트 일괄 처리

그림 1은 일반적인 일괄 처리 파이프라인에서 원시 입력 데이터가 Cloud Storage와 같은 객체 저장소에 저장되는 것을 보여줍니다. 구조화된 데이터 저장소 형식에는 쉼표로 구분된 값(CSV), 최적화된 행렬(ORC), Parquet, Avro 등이 포함됩니다. 이러한 형식은 주로 데이터의 출처가 데이터베이스 또는 로그일 때 사용됩니다.

일반적인 일괄 처리 파이프라인의 아키텍처
그림 1. 일괄 처리 아키텍처

BigQuery와 같은 일부 분석 플랫폼은 쿼리 기능뿐만 아니라 저장소도 제공합니다. BigQuery는 스토리지에 Capacitor를 사용합니다. Dataflow의 Apache Beam은 일괄 처리 파이프라인의 다른 스토리지 옵션 외에도 BigQuery와 Cloud Storage 모두에서 읽고 쓸 수 있습니다.

제한되지 않은 데이터스트림의 스트림 처리

스트리밍의 경우, 그림 2에 나와 있는 것처럼 데이터 처리 파이프라인에 대한 입력은 일반적으로 메시지 시스템입니다. Pub/Sub 또는 Kafka와 같은 기술은 일반적으로 JSON, CSV, protobuf 형식의 개별 데이터 포인트를 수집하는 데 사용됩니다.

일반적인 스트림 처리 파이프라인의 아키텍처
그림 2. 스트림 처리 아키텍처

데이터 포인트는 개별적으로 처리하거나 윈도우 함수를 통해 마이크로 배치에서 그룹화하여 일시적인 이벤트 처리를 수행할 수 있습니다. 처리된 데이터는 다음과 같이 여러 목적지로 전송될 수 있습니다.

  1. 스트리밍 API를 통한 임시 분석을 위한 BigQuery
  2. 실시간 정보 제공을 위한 Bigtable
  3. 후속 프로세스/파이프라인을 트리거하기 위한 Pub/Sub 주제

Apache Beam I/O 페이지에서 제한된 데이터 소스와 제한되지 않은 데이터 소스 싱크에 대한 소스 커넥터(입력)와 싱크 커넥터(출력)의 전체 목록을 확인할 수 있습니다.

TensorFlow 모델 호출

TensorFlow로 학습한 모델을 호출하는 방법은 세 가지입니다.

  1. 온라인 예측용 HTTP 엔드포인트를 통해 호출
  2. 일괄온라인 예측을 위해 저장된 모델 파일을 사용하여 직접 호출
  3. 일괄 예측을 위한 AI Platform 일괄 예측 작업을 통해 호출

온라인 예측을 위한 HTTP 엔드포인트

TensorFlow 모델은 HTTP 엔드포인트로 배포되어 스트림 데이터 처리 파이프라인 또는 클라이언트 앱을 통해 실시간으로 호출되고 예측을 제공합니다.

TensorFlow Serving 또는 Seldon과 같은 기타 호스팅 서비스를 사용하여 TensorFlow 모델을 HTTP 엔드포인트로 배포하여 온라인 예측에 사용할 수 있습니다. 그림 3에 나와 있는 것처럼 다음 옵션 중 하나를 선택할 수 있습니다.

  1. 하나 이상의 Compute Engine 인스턴스에 모델을 직접 배포합니다.
  2. Compute Engine 또는 Google Kubernetes Engine에서 Docker 이미지를 사용합니다.
  3. Kubeflow를 활용하여 Kubernetes 또는 Google Kubernetes Engine에 대한 배포를 용이하게 합니다.
  4. Endpoints에 App Engine을 사용하여 웹 앱에 모델을 호스팅합니다.
  5. Google Cloud의 완전 관리형 ML 학습 및 제공 서비스인 AI Platform을 사용합니다.
모델을 HTTP 엔드포인트로 제공하기 위한 Dataflow의 옵션
그림 3. 모델을 HTTP 엔드포인트로 제공하기 위한 Dataflow의 다양한 옵션

AI Platform은 완전 관리형 서비스이므로 다른 옵션보다 쉽게 구현할 수 있습니다. 따라서 이 실험에서는 모델을 HTTP 엔드포인트로 제공하는 옵션으로 Cloud Machine Learning Engine을 사용합니다. 그런 다음, 다른 HTTP 모델 제공 옵션과 비교하는 대신 AI Platform에서 직접 모델과 HTTP 엔드포인트의 성능을 비교하는 데 집중할 수 있습니다.

AI Platform Prediction으로 온라인 예측 제공

온라인 예측을 제공하려면 두 가지 작업을 수행해야 합니다.

  1. 모델을 배포합니다.
  2. 추론(예측 생성)을 위해 배포된 모델과 상호작용합니다.

AI Platform Prediction을 사용하여 HTTP 엔드포인트로 모델을 배포하려면 다음 단계를 따라야 합니다.

  1. 학습된 모델 파일을 Cloud Storage에서 사용할 수 있는지 확인합니다.
  2. gcloud ml-engine models create 명령어를 사용하여 모델을 만듭니다.
  3. Cloud Storage에서 모델 파일과 함께 gcloud ml-engine versions create 명령어를 사용하여 모델 버전을 배포합니다.

다음과 같은 명령어를 사용하여 모델을 배포할 수 있습니다.


PROJECT="[PROJECT_ID]" # change to your project name
REGION="[REGION]"
BUCKET="[BUCKET]" # change to your bucket name
MODEL_NAME="babyweight_estimator" # change to your estimator name
MODEL_VERSION="v1" # change to your model version
MODEL_BINARIES=gs://${BUCKET}/models/${MODEL_NAME}

# upload the local SavedModel to GCS
gsutil -m cp -r model/trained/v1/* gs://${BUCKET}/models/${MODEL_NAME}

# set the current project
gcloud config set project ${PROJECT}

# list model files on GCS
gsutil ls ${MODEL_BINARIES}

# deploy model to GCP
gcloud ml-engine models create ${MODEL_NAME} --regions=${REGION}

# deploy model version
gcloud ml-engine versions create ${MODEL_VERSION} --model=${MODEL_NAME} --origin=${MODEL_BINARIES} --runtime-version=1.4

이 코드는 모델 버전 v1을 사용하여 Google Cloud 프로젝트에서 babyweight_estimator라는 AI Platform Prediction 모델을 만듭니다.

모델을 배포한 다음에는 호출할 수 있습니다. 다음 Python 코드는 AI Platform Prediction의 모델 버전을 REST API로 호출하는 방법을 보여줍니다.

cmle_api = None

def init_api():

    global cmle_api

    if cmle_api is None:
        cmle_api = discovery.build('ml', 'v1',
                              discoveryServiceUrl='https://storage.googleapis.com/cloud-ml/discovery/ml_v1_discovery.json',
                              cache_discovery=True)

def estimate_cmle(instances):
    """
    Calls the babyweight estimator API on CMLE to get predictions

    Args:
       instances: list of json objects
    Returns:
        int - estimated baby weight
    """
    init_api()

    request_data = {'instances': instances}

    model_url = 'projects/{}/models/{}/versions/{}'.format(PROJECT, CMLE_MODEL_NAME, CMLE_MODEL_VERSION)
    response = cmle_api.projects().predict(body=request_data, name=model_url).execute()
    values = [item["predictions"][0] for item in response['predictions']]
    return values

BigQuery 또는 Cloud Storage 등에 대형 데이터세트를 사용할 수 있고 전체 프로세스의 처리량을 극대화하려는 경우, ML 모델을 HTTP 엔드포인트로 제공하는 것은 일괄 예측에 대해서는 권장되지 않습니다. 이렇게 하면 각 데이터 포인트당 하나의 HTTP 요청이 생성되어 HTTP 요청 볼륨이 매우 커집니다. 다음 섹션에서는 일괄 예측에 더 나은 옵션을 살펴봅니다.

일괄 예측 및 온라인 예측을 위한 직접 모델

직접 모델 예측 기술은 Dataflow 인스턴스에서 로컬 TensorFlow SavedModel을 활용합니다. 저장된 모델은 TensorFlow 모델의 빌드와 학습이 완료된 후 생성된 출력 파일의 복사본입니다. TensorFlow SavedModel의 특징은 다음과 같습니다.

  • Dataflow 작업으로 제출되는 파이프라인 소스 코드의 일부입니다.
  • 그림 4와 같이 Cloud Storage에서 다운로드됩니다.
Dataflow에서의 직접 모델 예측
그림 4. Dataflow에서의 직접 모델 예측

이 솔루션에서는 GitHub의 소스 코드에 포함된 SavedModel을 사용합니다. 인스턴스에 모델을 로드하는 방법은 다음과 같습니다.

  1. Dataflow 작업을 만들 때 모델 파일을 포함하여 로드할 파이프라인 종속 항목을 지정합니다. 다음 Python 코드는 Dataflow 작업과 함께 제출할 모델 파일이 포함된 setup.py 파일을 보여줍니다.

    import setuptools
    
    requirements = []
    
    setuptools.setup(
        name='TF-DATAFLOW-DEMO',
        version='v1',
        install_requires=requirements,
        packages=setuptools.find_packages(),
        package_data={'model': ['trained/*',
                                'trained/v1/*',
                                'trained/v1/variables/*']
                      },
    )
  2. 파이프라인에서 로컬 모델 파일을 호출합니다. 이렇게 하면 지정된 인스턴스에 대한 예측을 생성합니다. 다음 Python 코드는 이 작업을 수행하는 방법을 보여줍니다.

    predictor_fn = None
    
    def init_predictor():
        """ Loads the TensorFlow saved model to the predictor object
    
        Returns:
            predictor_fn
        """
    
        global predictor_fn
    
        if predictor_fn is None:
    
            logging.info("Initialising predictor...")
            dir_path = os.path.dirname(os.path.realpath(__file__))
            export_dir = os.path.join(dir_path, SAVED_MODEL_DIR)
    
            if os.path.exists(export_dir):
                predictor_fn = tf.contrib.predictor.from_saved_model(
                    export_dir=export_dir,
                    signature_def_key="predict"
                )
            else:
                logging.error("Model not found! - Invalid model path: {}".format(export_dir))
    
    def estimate_local(instances):
        """
        Calls the local babyweight estimator to get predictions
    
        Args:
           instances: list of json objects
        Returns:
            int - estimated baby weight
        """
    
        init_predictor()
    
        inputs = dict((k, [v]) for k, v in instances[0].items())
        for i in range(1,len(instances)):
            instance = instances[i]
    
            for k, v in instance.items():
                inputs[k] += [v]
    
        values = predictor_fn(inputs)['predictions']
        return [value.item() for value in values.reshape(-1)]

자세한 내용은 Apache Beam 다중 파일 종속 항목 페이지를 참조하세요.

AI Platform 일괄 예측 작업

AI Platform을 사용하면 모델을 HTTP 엔드포인트로 배포하는 것 외에도 배포된 모델 버전 또는 Cloud Storage의 TensorFlow SavedModel을 사용하여 일괄 예측 작업을 실행할 수 있습니다.

AI Platform 일괄 예측 작업은 입력 데이터 파일의 Cloud Storage 위치를 매개변수로 사용합니다. 이 작업에서는 모델을 사용하여 해당 데이터에 대한 예측을 수행하고 마찬가지로 매개변수로 주어지는 또 다른 Cloud Storage 출력 위치에 예측 결과를 저장합니다. 다음 예시는 AI Platform 일괄 예측 작업을 제출하는 gcloud 명령어를 보여줍니다.

BUCKET='<BUCKET>'
DATA_FORMAT="TEXT"
INPUT_PATHS=gs://${BUCKET}/data/babyweight/experiments/outputs/data-prep-*
OUTPUT_PATH=gs://${BUCKET}/data/babyweight/experiments/outputs/cmle-estimates
MODEL_NAME='babyweight_estimator'
VERSION_NAME='v1'
REGION='<REGION>'
now=$(date +"%Y%m%d_%H%M%S")
JOB_NAME="batch_predict_$MODEL_NAME$now"
MAX_WORKER_COUNT="20"

gcloud ml-engine jobs submit prediction $JOB_NAME \
    --model=$MODEL_NAME \
    --input-paths=$INPUT_PATHS \
    --output-path=$OUTPUT_PATH \
    --region=$REGION \
    --data-format=$DATA_FORMAT \
    --max-worker-count=$MAX_WORKER_COUNT

온라인 예측에서 포인트별 처리와 마이크로 배치 비교

실시간 예측 파이프라인에서 모델을 HTTP 엔드포인트로 제공하거나 작업자가 모델을 직접 사용할 경우, 새로 추가되는 데이터 포인트에 대해 예측하는 방법에는 두 가지가 있습니다.

  • 개별 포인트. 가장 분명한 옵션은 각 데이터 포인트를 모델에 개별적으로 전송하여 예측하는 것입니다.
  • 마이크로 배치. 보다 최적화된 옵션은 윈도우 함수를 사용하여 특정한 시간 간격(예: 5초 간격) 내의 데이터 포인트를 그룹화하는 마이크로 배치를 생성하는 것입니다. 그런 다음 마이크로 배치는 모델로 전송되어 해당 시점의 모든 인스턴스에 대한 예측을 생성합니다.

다음 Python 코드는 Apache Beam 파이프라인에서 윈도우 함수를 사용하여 시간 기반 마이크로 배치를 생성하는 방법을 보여줍니다.

def run_pipeline_with_micro_batches(inference_type, project,
                                    pubsub_topic, pubsub_subscription,
                                    bq_dataset, bq_table,
                                    window_size, runner, args=None):

    prepare_steaming_source(project, pubsub_topic, pubsub_subscription)
    prepare_steaming_sink(project, bq_dataset, bq_table)
    pubsub_subscription_url = "projects/{}/subscriptions/{}".format(project, pubsub_subscription)
    options = beam.pipeline.PipelineOptions(flags=[], **args)

    pipeline = beam.Pipeline(runner, options=options)
    (
            pipeline
            | 'Read from PubSub' >> beam.io.ReadStringsFromPubSub(subscription=pubsub_subscription_url, id_label="source_id")
            | 'Micro-batch - Window Size: {} Seconds'.format(window_size) >> beam.WindowInto(FixedWindows(size=window_size))
            | 'Estimate Targets - {}'.format(inference_type) >> beam.FlatMap(lambda messages: estimate(messages, inference_type))
            | 'Write to BigQuery' >> beam.io.WriteToBigQuery(project=project,
                                                             dataset=bq_dataset,
                                                             table=bq_table
                                                             )
    )

    pipeline.run()

마이크로 배치 접근 방식은 HTTP 엔드포인트로 배포된 모델을 사용하여 HTTP 요청 수를 획기적으로 줄이고 지연 시간을 단축합니다. 직접 모델에 마이크로 배치 기술을 사용하는 경우에도 벡터화된 연산으로 인해 예측용 인스턴스 N개가 있는 텐서를 모델에 전송하는 것이 길이가 1인 텐서를 전송하는 것보다 효율적입니다.

일괄 실험

일괄 실험에서는 TensorFlow 회귀 모델을 사용하여 BigQuery의 Natality 데이터 세트에서 아기 체중을 예상합니다. 그런 다음 Dataflow 일괄 파이프라인을 사용하여 예측 결과를 Cloud Storage에 CSV 파일로 저장합니다. 다음 섹션에서는 이 작업에 성공하기 위한 다양한 실험을 설명합니다.

접근 방식 1: Dataflow에서 직접 모델 예측

이 접근 방식에서는 Dataflow 작업자가 각 레코드의 일괄 처리 파이프라인에서 예측을 위해 직접 호출된 TensorFlow SavedModel을 호스팅합니다. 그림 5는 이 접근 방식에 대한 개략적인 아키텍처를 보여줍니다.

일괄 접근 방식 1: Dataflow에서 직접 모델 예측
그림 5. 일괄 접근 방식 1: Dataflow에서 직접 모델 예측

Dataflow 파이프라인은 다음 단계를 수행합니다.

  1. BigQuery에서 데이터를 읽습니다.
  2. 예측을 위한 BigQuery 레코드를 준비합니다.
  3. 로컬 TensorFlow SavedModel을 호출하여 각 레코드에 대한 예측을 수행합니다.
  4. 결과(입력 레코드 및 예상된 아기 체중)를 CSV 파일로 변환합니다.
  5. Cloud Storage에 CSV 파일을 씁니다.

이 접근 방식에서는 AI Platform에 HTTP 엔드포인트로 배포된 모델과 같은 원격 서비스를 호출하지 않습니다. 예측은 TensorFlow SavedModel을 사용하여 각 Dataflow 작업자 내에서 로컬로 수행됩니다.

접근 방식 2: Dataflow에서 AI Platform 일괄 예측

이 접근 방식에서 TensorFlow SavedModel은 Cloud Storage에 저장되며 AI Platform에서 예측에 사용됩니다. 하지만 이전 접근 방식처럼 각 레코드에 배포된 모델에서 API를 호출하는 대신, 예측을 위해 데이터를 준비하고 일괄 작업으로 제출합니다.

이 접근 방식은 두 단계로 이루어집니다.

  1. Dataflow는 예측을 위해 BigQuery의 데이터를 준비한 후 Cloud Storage에 저장합니다.
  2. 준비한 데이터와 함께 AI Platform 일괄 예측 작업이 제출되고 예측 결과가 Cloud Storage에 저장됩니다.

그림 6은 이러한 2단계 접근 방식의 전체 아키텍처를 보여줍니다.

일괄 접근 방식 2: Dataflow에서 AI Platform 일괄 예측
그림 6. 일괄 접근 방식 2: Dataflow에서 AI Platform 일괄 예측

Dataflow 파이프라인을 포함한 워크플로 단계는 다음과 같습니다.

  1. BigQuery에서 데이터를 읽습니다.
  2. 예측을 위한 BigQuery 레코드를 준비합니다.
  3. Cloud Storage에 JSON 데이터를 씁니다. 모델의 serving_fn 함수는 입력으로 JSON 인스턴스를 예상합니다.
  4. Cloud Storage에 준비되어 있는 데이터와 함께 AI Platform 일괄 예측 작업을 제출합니다. 이 작업은 예측 결과를 Cloud Storage에도 씁니다.

Dataflow 작업은 AI Platform Prediction 작업을 제출하는 대신 예측을 위해 데이터를 준비합니다. 즉, 데이터 준비 작업과 일괄 예측 작업은 긴밀하게 연결되어 있지 않습니다. Cloud Functions, Airflow 또는 기타 스케줄러는 Dataflow 작업을 실행한 다음 일괄 예측을 위해 AI Platform 작업을 제출하여 워크플로를 조정할 수 있습니다.

데이터가 다음 기준을 충족할 경우 성능과 사용 편의성 측면에서 AI Platform 일괄 예측이 권장됩니다.

  • 이전의 데이터 수집 프로세스에서 얻은 데이터를 예측에 알맞은 형식으로 Cloud Storage에서 사용할 수 있는 경우
  • Cloud Storage에서 예측을 위해 데이터를 준비하는 Dataflow 파이프라인과 같은 워크플로의 첫 번째 단계는 개발자가 제어할 수 없습니다.

실험 구성

세 가지 실험에서 다음과 같은 구성을 사용했습니다.

  • 데이터 크기: 10K, 100K, 1M, 10M
  • Cloud Storage 클래스: Regional Storage
  • Cloud Storage 위치: europe-west1-b
  • Dataflow 리전: europe-west1-b
  • Dataflow 작업자 머신 유형: n1-standard-1
  • 최대 100만 개 레코드의 일괄 데이터에 대한 Dataflow 자동 확장
  • Dataflow num_worker: 20 최대 1,000만 개 레코드의 일괄 데이터
  • AI Platform 일괄 예측 max-worker-count 설정: 20

Cloud Storage 위치와 Dataflow 리전은 동일해야 합니다. 이 솔루션은 europe-west1-b 리전을 임의의 값으로 사용합니다.

결과

다음은 다양한 크기의 데이터세트에서 일괄 예측과 직접 모델 예측을 수행한 결과(소요 시간)를 요약한 표입니다.

일괄 데이터 크기 측정항목 Dataflow 다음 AI Platform 일괄 예측 Dataflow에서 직접 모델 예측
1만 행 실행 시간 15분 30초

(Dataflow: 7분 47초 +
AI Platform: 7분 43초)
8분 24초
총 vCPU 시간 0.301시간

(Dataflow: 0.151시간 +
AI Platform: 0.15시간)
0.173시간
10만 행 실행 시간 16분 37초

(Dataflow: 8분 39초 +
AI Platform: 7분 58초)
10분 31초
총 vCPU 시간 0.334시간

(Dataflow: 0.184시간 +
AI Platform: 0.15시간)
0.243시간
100만 행 실행 시간 21분 11초
(Dataflow: 11분 7초 +
AI Platform: 10분 4초)
17분 12초
총 vCPU 시간 0.446시간

(Dataflow: 0.256시간 +
AI Platform: 0.19시간)
1.115시간
1,000만 행 실행 시간 33분 8초
(Dataflow: 12분 15초 +
AI Platform: 20분 53초)
25분 02초
총 vCPU 시간 5.251시간

(Dataflow: 3.581시간 +
AI Platform: 1.67시간)
7.878시간

그림 7은 이러한 결과를 나타내는 그래프를 보여줍니다.

다양한 크기의 4가지 데이터 세트에서 3가지 접근 방식의 소요 시간을 보여주는 그래프
그림 7. 다양한 크기의 4가지 데이터 세트에서 3가지 접근 방식의 소요 시간을 보여주는 그래프

결과에서 알 수 있듯이 데이터가 이미 예측에 사용되는 형식으로 Cloud Storage에 저장되어 있을 경우 AI Platform 일괄 예측 작업 자체에서는 입력 데이터에 대한 예측을 생성하는 시간이 비교적 짧습니다. 그러나 일괄 예측 작업을 사전 처리 단계(BigQuery에서 Cloud Storage로 데이터를 추출 및 준비하여 예측) 및 후속 처리 단계(데이터를 다시 BigQuery에 저장)와 결합하면 직접 모델 접근 방식의 엔드 투 엔드 실행 시간이 더욱 단축됩니다. 또한 마이크로 배치를 통해 직접 모델 예측 접근 방식의 성능을 더욱 최적화할 수 있습니다. 마이크로 배치에 대한 내용은 나중에 스트리밍 실험을 위해 다루게 됩니다.

스트림 실험

스트리밍 실험에서 Dataflow 파이프라인은 Pub/Sub 주제에서 데이터 포인트를 읽고 스트리밍 API를 사용하여 데이터를 BigQuery에 씁니다. Dataflow 스트리밍 파이프라인은 TensorFlow 아기 체중 추정 모델을 사용하여 데이터를 처리하고 예측을 수행합니다.

이 주제는 사전 정의된 초당 이벤트 속도로 아기 체중을 예상하는 인스턴스인 데이터 포인트를 생성하는 스트림 시뮬레이터에서 데이터를 수신합니다. 이를 통해 제한되지 않은 데이터 소스의 실제 예시를 시뮬레이션할 수 있습니다. 다음 Python 코드는 Pub/Sub 주제로 전송된 데이터 스트림을 시뮬레이션합니다.

client = pubsub.Client(project=PARAMS.project_id)
topic = client.topic(PARAMS.pubsub_topic)
if not topic.exists():
    print 'Topic does not exist. Please run a stream pipeline first to create the topic.'
    print 'Simulation aborted.'

    return

for index in range(PARAMS.stream_sample_size):

    message = send_message(topic, index)

    # for debugging
    if PARAMS.show_message:
        print "Message {} was sent: {}".format(index+1, message)
        print ""

    time.sleep(sleep_time_per_msg)

접근 방식 1: Dataflow에서 AI Platform 온라인 예측

이 접근 방식에서 TensorFlow 모델은 AI Platform에 REST API로 배포되고 호스팅됩니다. Dataflow 스트리밍 파이프라인은 Pub/Sub 예측 생성에서 소비한 각 메시지에 대해 API를 호출합니다. 그림 8은 이 접근 방식에 대한 개략적인 아키텍처를 보여줍니다.

스트림 접근 방식 1: Dataflow에서 AI Platform 온라인 예측
그림 8. 스트림 접근 방식 1: Dataflow에서 AI Platform 온라인 예측 HTTP 요청은 단일 데이터 포인트 또는 데이터 포인트 그룹을 마이크로 배치에 포함할 수 있습니다.

이 접근 방식에서 Dataflow 파이프라인은 다음 단계를 수행합니다.

  1. Pub/Sub 주제에서 메시지를 읽습니다.
  2. AI Platform 모델의 API에 HTTP 요청을 보내 각 메시지에 대한 예측을 가져옵니다.
  3. 스트리밍 API를 사용하여 BigQuery에 결과를 씁니다.

마이크로 배치가 더 나은 접근 방식입니다. 즉, Dataflow는 Pub/Sub에서 읽은 각 메시지에 대해 모델의 REST API로 HTTP 요청을 전송하는 대신 1초 동안 수신된 메시지를 그룹화합니다. 그런 다음 이 메시지 그룹을 단일 HTTP 요청의 마이크로 배치로 모델의 API에 보냅니다. 이 접근 방식에서 Dataflow 파이프라인은 다음 단계를 수행합니다.

  1. Pub/Sub 주제에서 메시지를 읽습니다.
  2. 1초 기간 설정 작업을 적용하여 메시지의 마이크로 배치를 생성합니다.
  3. AI Platform 모델의 API에 마이크로 배치를 포함한 HTTP 요청을 보내 각 메시지에 대한 예측을 가져옵니다.
  4. 스트리밍 API를 사용하여 BigQuery에 결과를 씁니다.

이 접근 방식의 근거는 다음과 같습니다.

  1. AI Platform 모델과 같은 원격 서비스에 대한 호출 수를 줄입니다.
  2. 각 메시지 제공의 평균 지연 시간을 단축합니다.
  3. 파이프라인의 전반적인 처리 시간을 단축합니다.

이 실험에서 시간 기간 설정은 1초로 설정되었습니다. 하지만 AI Platform 모드에 배치로 전송되는 메시지 수인 마이크로 배치 크기는 다를 수 있습니다. 마이크로 배치 크기는 메시지 생성 빈도(초당 메시지 수)에 따라 다릅니다.

다음 섹션에서는 초당 메시지 수 50개, 100개, 500개의 세 가지 빈도에 대한 실험을 설명합니다. 즉, 마이크로 배치 크기가 50, 100, 500입니다.

접근 방식 2: Dataflow에서 직접 모델 예측

이 접근 방식은 일괄 실험에서 사용한 접근 방식과 유사합니다. TensorFlow SavedModel은 Dataflow 작업자에 호스팅되며 각 레코드에 대한 스트림 처리 파이프라인에서 예측을 위해 호출됩니다. 그림 9는 이 접근 방식에 대한 개략적인 아키텍처를 보여줍니다.

스트림 접근 방식 2: Dataflow에서 직접 모델 예측
그림 9. 스트림 접근 방식 2: Dataflow에서 직접 모델 예측

이 접근 방식에서 Dataflow 파이프라인은 다음 단계를 수행합니다.

  1. Pub/Sub 주제에서 메시지를 읽습니다.
  2. 로컬 TensorFlow SavedModel을 호출하여 각 레코드에 대한 예측을 수행합니다.
  3. 스트리밍 API를 사용하여 BigQuery에 결과를 씁니다.

마이크로 배치 기술은 직접 모델 예측 접근 방식을 통해 스트림 파이프라인에서도 사용할 수 있습니다. 하나의 데이터 인스턴스에 대한 텐서를 모델로 전송하는 대신 데이터 인스턴스 N개의 텐서를 전송할 수 있습니다. 이때 N은 Dataflow 기간 중 모델에서 수신한 메시지의 수와 같습니다. 이 기술은 TensorFlow 모델의 벡터화된 연산을 사용하고 여러 예측을 병렬로 수행합니다.

실험 구성

이러한 실험에서 다음과 같은 구성을 사용했습니다.

  • 스트림 데이터 크기: 10K records (messages)
  • 초당 시뮬레이션된 메시지(MPS): 50, 100, 500
  • 기간 설정 크기(마이크로 배치 실험): 1 second
  • Dataflow 리전: europe-west1-b
  • Dataflow 작업자 머신 유형: n1-standard-1
  • Dataflow num_worker: 5(자동 확장 없음)
  • AI Platform 모델 API 노드: 3 (manualScale)

결과

다음 표는 다양한 데이터 볼륨(초당 메시지 수)에서 스트리밍 실험을 수행한 결과를 요약하여 보여줍니다. 메시지 빈도는 초당 전송된 메시지 수를, 시뮬레이션 시간은 모든 메시지를 전송하는 데 소요된 시간을 나타냅니다.

스트림 메시지 빈도 측정항목 Dataflow에서 AI Platform 온라인 예측   Dataflow에서 직접 모델 예측  
    단일 메시지 마이크로 배치 단일 메시지 마이크로 배치
초당 메시지 50개

(시뮬레이션 시간: 3분 20초)
총 시간 9분 34초 7분 44초 3분 43초 3분 22초
초당 메시지 100개

(시뮬레이션 시간: 1분 40초)
총 시간 6분 03초 4분 34초 1분 51초 1분 41초
초당 메시지 500개

(시뮬레이션 시간: 20초)
총 시간 해당 사항 없음 - 기본 AI Platform 온라인 예측 할당량 2분 47초 1분 23초 48초

그림 10은 이러한 결과를 나타내는 그래프를 보여줍니다.

다양한 접근 방식과 빈도에 대한 소요 시간을 보여주는 그래프
그림 10. 다양한 접근 방식과 빈도에 대한 소요 시간을 보여주는 그래프

결과에서 볼 수 있듯이 마이크로 배치 기술은 AI Platform 온라인 예측과 직접 모델 예측 모두에서 실행 성능을 개선합니다. 또한 스트리밍 파이프라인을 통해 직접 모델을 사용하면 온라인 예측을 위해 외부 REST/HTTP API를 호출하는 것에 비해 성능이 2배에서 4배 향상됩니다.

맺음말

앞서 설명한 접근 방식과 실험 결과에 따라 다음과 같은 접근 방식을 권장합니다.

일괄 처리

  • 일괄 데이터 처리 파이프라인을 빌드하고 파이프라인의 일부로 예측을 수행하려는 경우 최고의 성능을 제공하는 직접 모델 접근 방식을 사용하세요.
  • 예측에 사용할 로컬 모델을 호출하여 벡터화된 연산을 병렬 처리하기 전에 데이터 포인트의 마이크로 배치를 생성하여 직접 모델 접근 방식의 성능을 개선하세요.
  • Cloud Storage에 저장된 데이터가 예측에 알맞은 형식인 경우 최고의 성능을 제공하는 AI Platform 일괄 예측을 사용하세요.
  • 일괄 예측에 GPU의 능력을 사용하고 싶다면 AI Platform을 사용하세요.
  • 일괄 예측에 AI Platform 온라인 예측을 사용하지 마세요.

스트림 처리

  • 스트리밍 파이프라인에 최상의 성능을 제공하고 평균 지연 시간을 단축하는 직접 모델을 사용하세요. 예측은 원격 서비스에 대한 HTTP 호출 없이 로컬로 수행됩니다.
  • 온라인 예측에서 사용되는 모델의 더 나은 유지 관리성을 위해 데이터 처리 파이프라인에서 모델을 분리하세요. 가장 좋은 접근 방식은 AI Platform 또는 기타 웹 호스팅 서비스를 사용하여 모델을 독립적인 마이크로서비스로 제공하는 것입니다.
  • 여러 개의 데이터 처리 파이프라인과 온라인 앱이 모델 서비스를 엔드포인트로 소비할 수 있도록 모델을 독립적인 웹 서비스로 배포하세요. 또한 모델을 변경하면 해당 모델을 소비하는 앱과 파이프라인에서 변경 사항을 투명하게 확인할 수 있습니다.
  • 부하 분산을 통해 여러 개의 서비스 인스턴스를 배포하여 모델 웹 서비스의 확장성과 가용성을 향상시키세요. AI Platform을 사용하면 모델 버전을 배포할 때 yaml 구성 파일에 노드(manualScaling) 또는 minNodes(autoScaling)만 지정하면 됩니다.
  • 별도의 마이크로서비스에 모델을 배포하면 기본 제공 인프라에 따라 추가 비용이 발생합니다. AI Platform 온라인 예측에 대한 가격 책정 FAQ를 참조하세요.
  • 스트리밍 데이터 처리 파이프라인에 마이크로 배치를 사용하여 직접 모델 서비스와 HTTP 모델 서비스 모두 성능을 개선하세요. 마이크로 배치는 모델 서비스에 대한 HTTP 요청 수를 줄이고 TensorFlow 모델의 벡터화된 연산을 사용하여 예측을 수행합니다.

다음 단계