Google Cloud에서 경향 모델링을 위한 Kubeflow 파이프라인 사용

Last reviewed 2024-04-16 UTC

이 문서에서는 Google Cloud에서 경향 모델링을 수행하는 파이프라인 예시를 설명합니다. 이 문서는 머신러닝 모델을 만들고 배포하는 데이터 엔지니어, 머신러닝 엔지니어, 마케팅 과학팀을 대상으로 합니다. 이 문서에서는 사용자가 머신러닝 개념과 Google Cloud, BigQuery, Kubeflow Pipelines, Python, Jupyter 노트북에 익숙하다고 가정합니다. 또한 Google 애널리틱스 360 및 BigQuery의 원시 내보내기 기능을 알고 있다고 가정합니다.

여기에서 작업하는 파이프라인에는 Google 애널리틱스 샘플 데이터가 사용됩니다. 이 파이프라인은 BigQuery ML 및 XGBoost를 사용해서 여러 모델을 빌드합니다. 여기에서는 Vertex AI 파이프라인에서 Kubeflow 파이프라인을 사용하여 파이프라인을 실행합니다. 이 문서에서는 모델을 학습하고, 이를 평가하고, 배포하는 과정을 설명합니다. 또한 전체 프로세스를 자동화하는 방법에 대해서도 설명합니다.

전체 파이프라인 코드는 GitHub 저장소의 Jupyter 노트북에 있습니다.

경향 모델링이란 무엇인가요?

경향 모델링은 소비자가 취할 수 있는 행동을 예측합니다. 경향 모델링의 예시로는 특정 브랜드에 대한 소비자의 제품 구매 가능성, 서비스 등록 가능성, 앱 제거와 비활성 고객으로의 전환 가능성에 대한 예측이 포함됩니다.

경향 모델의 결과는 각 소비자에 대해 0~1까지의 점수로 출력됩니다. 이 점수는 소비자가 해당 행동을 수행할 가능성을 나타냅니다. 조직이 경향 모델을 선택하는 중요한 이유 중 하나는 퍼스트 파티 데이터의 효율성 향상 요구 때문입니다. 마케팅 사용 사례의 경우 최적의 경향 모델에는 사이트 애널리틱스 및 CRM 데이터와 같은 온라인 및 오프라인 소스 모두의 신호가 포함됩니다.

이 데모에서는 BigQuery에 없는 GA360 샘플 데이터가 사용됩니다. 해당 사용 사례에 따라 추가적인 오프라인 신호를 고려해야 할 수 있습니다.

MLOps로 ML 파이프라인을 간소화하는 방법

대부분의 ML 모델은 프로덕션에서 사용되지 않습니다. 모델 결과로 통계가 생성되고, 데이터 과학팀에서 모델을 마친 후 ML 엔지니어링 또는 소프트웨어 엔지니어링 팀이 Flask 또는 FastAPI와 같은 프레임워크를 사용해서 이를 프로덕션용 코드에 래핑해야 하는 경우가 많습니다. 이 프로세스에서는 모델을 새 프레임워크로 빌드해야 하는 경우가 많습니다. 즉, 데이터를 다시 변환해야 합니다. 이 작업에 몇 주 또는 몇 개월까지 걸릴 수 있어서 많은 모델이 프로덕션에 적합하지 않습니다.

ML 프로젝트의 가치를 늘리기 위해 머신러닝 작업(MLOps)이 중요해졌으며, MLOps는 이제 데이터 과학 조직에서 중요 기술로서 발전되고 있습니다. 이러한 가치의 이해를 돕기 위해 Google Cloud는 MLOps 개요를 제공하는 실무자를 위한 MLOps 가이드를 공개했습니다.

MLOps 원칙 및 Google Cloud를 사용해서 복잡한 수동 프로세스를 상당 부분 없애주는 자동 프로세스에 따라 모델을 엔드포인트로 푸시할 수 있습니다. 이 문서에 설명된 도구 및 프로세스에서는 파이프라인을 엔드 투 엔드로 소유함으로써 이러한 모델을 프로덕션으로 전환하는 데 도움이 되는 방식에 대해 설명합니다. 앞에서 언급한 실무자 가이드 문서에서는 MLOps 및 Google Cloud를 사용할 때 가능한 것들에 대한 아웃라인과 수평적 솔루션을 제공합니다.

Kubeflow 파이프라인과 Vertex AI는 무엇인가요?

Kubeflow 파이프라인은 파이프라인 빌드를 위해 사용되는 오픈소스 프레임워크입니다.

Kubeflow 파이프라인 프로세스의 각 단계는 아티팩트 형식으로 입력을 받아 들이거나 출력을 생성할 수 있는 독립적인 컨테이너로 구성됩니다. 예를 들어 프로세스의 한 단계에서 데이터 세트를 빌드하는 경우에는 데이터 세트 아티팩트가 출력됩니다. 이 데이터 세트 아티팩트는 다음 단계에서 입력으로 사용될 수 있습니다. 각 구성요소가 별도의 컨테이너이기 때문에 기본 이미지 이름 및 모든 종속 항목 목록 등과 같이 파이프라인의 각 구성요소에 대해 정보를 제공해야 합니다.

Vertex AI 파이프라인의 경우 Kubeflow 파이프라인 또는 TensorFlow Extended(TFX)를 사용하여 빌드된 파이프라인을 실행할 수 있습니다. Vertex AI 없이 이러한 오픈소스 프레임워크를 대규모로 실행하기 위해서는 자체적으로 Kubernetes 클러스터를 설정하고 유지보수해야 합니다. Vertex AI 파이프라인은 이러한 문제를 해결합니다. 관리형 서비스이기 때문에 필요에 따라 확장 또는 축소가 가능하며, 지속적인 유지보수가 필요하지 않습니다.

파이프라인 빌드 프로세스

이 문서에서 설명하는 예시에서는 Juptyer 노트북을 사용하여 파이프라인 구성요소를 만들고 컴파일, 실행, 자동화를 수행합니다. 앞에서 설명한 것처럼 이 노트북은 GitHub 저장소에 있습니다.

인증을 처리하는 Vertex AI Workbench를 사용해서 노트북 코드를 실행할 수 있습니다. Vertex AI Workbench를 사용하면 노트북 작업을 통해 머신 만들기, 노트북 빌드, Git에 연결을 수행할 수 있습니다. (Vertex AI Workbench에는 더 많은 기능들이 포함되어 있지만 이 문서에서는 이에 대해 다루지 않습니다.)

파이프라인 실행이 완료되면 다음과 비슷한 다이어그램이 Vertex AI 파이프라인에서 생성됩니다.

파이프라인에서 실행되는 구성요소를 보여주는 방향성 비순환 그래프입니다.

이전 다이어그램은 방향성 비순환 그래프(DAG)입니다. DAG 빌드 및 검토는 데이터 또는 ML 파이프라인을 이해하기 위한 핵심 단계입니다. DAG의 핵심 속성은 구성요소가 단일 방향(여기에서는 위에서 아래로)으로 이동하고, 순환이 발생하지 않는 즉, 상위 구성요소에 하위 구성요소가 사용되지 않는 것입니다. 일부 구성요소는 병렬로 수행될 수 있고 다른 구성요소는 종속 항목을 가질 수 있으므로, 일련의 흐름으로 수행됩니다.

각 구성요소에서 녹색 체크박스는 해당 코드가 올바르게 실행되었음을 나타냅니다. 오류가 발생했으면 빨간색 느낌표가 표시됩니다. 다이어그램에서 각 구성요소를 클릭하면 해당 작업의 세부정보를 볼 수 있습니다.

DAG 다이어그램은 이 문서 섹션에서 파이프라인으로 빌드된 각 구성요소의 청사진으로 포함되었습니다. 다음 목록에서는 각 구성요소에 대해 설명합니다.

전체 파이프라인은 DAG 다이어그램에 표시된 것처럼 다음 단계를 수행합니다.

  1. create-input-view: 이 구성요소는 BigQuery 보기를 만듭니다. 이 구성요소는 Cloud Storage 버킷에서 SQL을 복사하고 사용자가 제공한 매개변수 값을 입력합니다. 이 BigQuery 보기는 파이프라인에서 이후의 모든 모델에 사용되는 입력 데이터 세트입니다.
  2. build-bqml-logistic: 이 파이프라인은 BigQuery ML을 사용해서 로지스틱 회귀 모델을 만듭니다. 이 구성요소가 완료되면 새 모델이 BigQuery 콘솔에 표시됩니다. 이 모델 객체를 사용하여 모델 성능을 확인하고 나중에 예측을 빌드할 수 있습니다.
  3. evaluate-bqml-logistic: 이 파이프라인은 이 구성요소를 사용해서 로지스틱 회귀에 대한 정밀도/재현율 곡선(DAG 다이어그램의 logistic_data_path)을 만듭니다. 이 아티팩트는 Cloud Storage 버킷에 저장됩니다.
  4. build-bqml-xgboost: 이 구성요소는 BigQuery ML을 사용하여 XGBoost 모델을 만듭니다. 이 구성요소가 완료되면 BigQuery 콘솔에서 새 모델 객체(system.Model)를 볼 수 있습니다. 이 객체를 사용하여 모델 성능을 확인하고 나중에 예측을 빌드할 수 있습니다.
  5. evaluate-bqml-xgboost: 이 구성요소는 XGBoost 모델에 대해 xgboost_data_path라는 정밀도/재현율 곡선을 만듭니다. 이 아티팩트는 Cloud Storage 버킷에 저장됩니다.
  6. build-xgb-xgboost: 이 파이프라인은 XGBoost 모델을 만듭니다. 이 구성요소는 BigQuery ML 대신 Python을 사용하므로 다른 모델 만들기 방법을 참조할 수 있습니다. 이 구성요소가 완료되면 Cloud Storage 버킷에 모델 객체 및 성능 측정항목이 저장됩니다.
  7. deploy-xgb: 이 구성요소는 XGBoost 모델을 배포합니다. 일괄 또는 온라인 예측을 허용하는 엔드포인트를 만듭니다. Vertex AI 콘솔 페이지의 모델 탭에서 엔드포인트를 탐색할 수 있습니다. 엔드포인트가 트래픽과 일치하도록 자동 확장됩니다.
  8. build-bqml-automl: 이 파이프라인은 BigQuery ML을 사용하여 AutoML 모델을 만듭니다. 이 구성요소가 완료되면 새 모델 객체가 BigQuery 콘솔에 표시됩니다. 이 객체를 사용하여 모델 성능을 확인하고 나중에 예측을 빌드할 수 있습니다.
  9. evaluate-bqml-automl: 이 파이프라인은 AutoML 모델의 정밀도/재현율 곡선을 만듭니다. 아티팩트는 Cloud Storage 버킷에 저장됩니다.

이 프로세스는 BigQuery ML 모델을 엔드포인트에 푸시하지 않습니다. BigQuery에 있는 모델 객체로부터 직접 예측을 생성할 수 있기 때문입니다. BigQuery ML 또는 다른 라이브러리 중에서 솔루션에 사용할 항목을 결정할 때는 예측 생성 방법을 고려하세요. 일일 일괄 예측으로 요구가 충족될 경우 BigQuery 환경에 유지하는 것이 워크플로를 단순화할 수 있는 방법입니다. 하지만 실시간 예측이 필요하거나 다른 라이브러리에 있는 기능이 필요한 시나리오의 경우에는 이 문서의 단계에 따라 저장된 모델을 엔드포인트에 푸시합니다.

비용

이 문서에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요. Google Cloud를 처음 사용하는 사용자는 무료 체험판을 사용할 수 있습니다.

시작하기 전에

  1. Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
  2. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기로 이동

  3. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

  4. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기로 이동

  5. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

이 시나리오의 Jupyter 노트북

파이프라인 만들기 및 빌드 태스크는 GitHub 저장소에 있는 Jupyter 노트북에 포함되어 있습니다.

태스크를 수행하려면 노트북을 가져온 후 노트북에 있는 코드 셀을 순서대로 실행합니다. 이 문서에 설명된 흐름에서는 Vertex AI Workbench에서 노트북을 실행한다고 가정합니다.

Vertex AI Workbench 환경 열기

먼저 GitHub 저장소를 Vertex AI Workbench 환경에 클론합니다.

  1. Google Cloud Console에서 노트북을 만들 프로젝트를 선택합니다.
  2. Vertex AI Workbench 페이지로 이동합니다.

    Vertex AI Workbench 페이지로 이동

  3. 사용자 관리 노트북 탭에서 새 노트북을 클릭합니다.

  4. 노트북 유형 목록에서 Python 3 노트북을 선택합니다.

  5. 새 노트북 대화상자에서 고급 옵션을 클릭한 후 머신 유형 아래에서 사용하려는 머신 유형을 선택합니다. 확실하지 않으면 n1-standard-1(1 cVPU, 3.75GB RAM)을 선택합니다.

  6. 만들기를 클릭합니다.

    노트북 환경이 생성되는 데 잠시 시간이 걸립니다.

  7. 노트북이 생성되었으면 노트북을 선택하고 Jupyterlab 열기를 클릭합니다.

    JupyterLab 환경이 해당 브라우저에서 열립니다.

  8. 터미널 탭을 실행하려면 파일 > 새로 만들기 > 런처를 선택합니다.

  9. 런처 탭에서 터미널 아이콘을 클릭합니다.

  10. 터미널에서 mlops-on-gcp GitHub 저장소를 클론합니다.

    git clone https://github.com/GoogleCloudPlatform/cloud-for-marketing/
    

    명령어가 완료되면 파일 브라우저에 cloud-for-marketing 폴더가 표시됩니다.

노트북 설정 구성

노트북을 실행하려면 먼저 이를 구성해야 합니다. 노트북을 사용하려면 파이프라인 아티팩트 저장을 위해 Cloud Storage 버킷이 필요하므로 먼저 버킷을 만듭니다.

  1. 노트북이 파이프라인 아티팩트를 저장할 수 있는 Cloud Storage 버킷 만들기를 수행합니다. 버킷 이름은 전역적으로 고유해야 합니다.
  2. cloud-for-marketing/marketing-analytics/predicting/kfp_pipeline/ 폴더에서 Propensity_Pipeline.ipynb 노트북을 엽니다.
  3. 노트북에서 PROJECT_ID 변수 값을 파이프라인을 실행하려는 Google Cloud 프로젝트의 ID로 설정합니다.
  4. BUCKET_NAME 변수 값을 바로 전에 만든 버킷의 이름으로 설정합니다.

이 문서의 나머지 부분에서는 파이프라인 작동 방법을 이해하는 데 중요한 코드 스니펫에 대해 설명합니다. 전체 구현은 GitHub 저장소를 참조하세요.

BigQuery 보기 빌드

파이프라인에서 첫 번째 단계는 각 모델을 빌드하는 데 사용되는 입력 데이터를 생성하는 것입니다. 이 Kubeflow 파이프라인 구성요소는 BigQuery 보기를 생성합니다. 보기 생성 프로세스를 단순화하기 위해 일부 SQL이 이미 생성되어 GitHub에서 텍스트 파일로 저장되어 있습니다.

각 구성요소의 코드는 Kubeflow 파이프라인 구성요소 클래스를 데코레이션(속성을 통해 상위 클래스 또는 함수 수정)하는 것으로 시작됩니다. 그런 후 코드가 파이프라인에서 하나의 단계를 나타내는 create_input_view 함수를 정의합니다.

이 함수에는 여러 입력이 필요합니다. 이러한 값 중 일부는 시작일 및 종료일과 같이 현재 코드에 하드 코딩되어 있습니다. 파이프라인을 자동화할 때는 적합한 값을 사용하도록 코드를 수정하거나(예를 들어 날짜의 경우 CURRENT_DATE 함수 사용), 하드 코딩된 상태로 두는 대신 값을 매개변수로 가져오도록 구성요소를 업데이트할 수 있습니다. 또한 ga_data_ref 값을 GA360 테이블의 이름으로 변경하고 conversion 변수 값을 해당 전환으로 설정해야 합니다. (이 예시에서는 공개 GA360 샘플 데이터가 사용됩니다.)

다음 목록은 create-input-view 구성요소의 코드를 보여줍니다.

@component(
   # this component builds a BigQuery view, which will be the underlying source for model
   packages_to_install=["google-cloud-bigquery", "google-cloud-storage"],
   base_image="python:3.9",
   output_component_file="output_component/create_input_view.yaml",
)
def create_input_view(view_name: str,
                     data_set_id: str,
                     project_id: str,
                     bucket_name: str,
                     blob_path: str

):
   from google.cloud import bigquery
   from google.cloud import storage
   client = bigquery.Client(project=project_id)
   dataset = client.dataset(data_set_id)
   table_ref = dataset.table(view_name)
   ga_data_ref = 'bigquery-public-data.google_analytics_sample.ga_sessions_*'
   conversion = "hits.page.pageTitle like '%Shopping Cart%'"
   start_date = '20170101'
   end_date = '20170131'

def get_sql(bucket_name, blob_path):
       from google.cloud import storage
       storage_client = storage.Client()
       bucket = storage_client.get_bucket(bucket_name)
       blob = bucket.get_blob(blob_path)
       content = blob.download_as_string()
       return content
def if_tbl_exists(client, table_ref):

...

   else:
       content = get_sql()
       content = str(content, 'utf-8')
       create_base_feature_set_query = content.
                                   format(start_date = start_date,
                                   end_date = end_date,
                                   ga_data_ref = ga_data_ref,
                                   conversion = conversion)
shared_dataset_ref = client.dataset(data_set_id)
base_feature_set_view_ref = shared_dataset_ref.table(view_name)
base_feature_set_view = bigquery.Table(base_feature_set_view_ref)
base_feature_set_view.view_query = create_base_feature_set_query.format(project_id)
base_feature_set_view = client.create_table(base_feature_set_view)

BigQuery ML 모델 빌드

보기가 생성된 후에는 build_bqml_logistic이라는 구성요소를 실행해서 BigQuery ML 모델을 빌드합니다. 노트북에서 이 블록이 핵심 구성요소입니다. 첫 번째 블록에서 만든 학습 보기를 사용하여 BigQuery ML 모델을 빌드합니다. 이 예시에서는 노트북에 로지스틱 회귀가 사용됩니다.

사용 가능한 모델 유형 및 초매개변수에 대한 자세한 내용은 BigQuery ML 참조 문서를 참조하세요.

다음 목록은 이 구성요소의 코드를 보여줍니다.

@component(
   # this component builds a logistic regression with BigQuery ML
   packages_to_install=["google-cloud-bigquery"],
   base_image="python:3.9",
   output_component_file="output_component/create_bqml_model_logistic.yaml"
)
def build_bqml_logistic(project_id: str,
                       data_set_id: str,
                       model_name: str,
                       training_view: str
):
   from google.cloud import bigquery
   client = bigquery.Client(project=project_id)
   model_name = f"{project_id}.{data_set_id}.{model_name}"
   training_set = f"{project_id}.{data_set_id}.{training_view}"
   build_model_query_bqml_logistic = '''
   CREATE OR REPLACE MODEL `{model_name}`
   OPTIONS(model_type='logistic_reg'
   , INPUT_LABEL_COLS = ['label']
   , L1_REG = 1
   , DATA_SPLIT_METHOD = 'RANDOM'
   , DATA_SPLIT_EVAL_FRACTION = 0.20
   ) AS
       SELECT * EXCEPT (fullVisitorId, label),
       CASE WHEN label is null then 0 ELSE label end as label
   FROM `{training_set}`
   '''.format(model_name = model_name, training_set = training_set)
job_config = bigquery.QueryJobConfig()
client.query(build_model_query_bqml_logistic, job_config=job_config)

BigQuery ML 대신 XGBoost 사용

이전 섹션에서 설명한 구성요소에는 BigQuery ML이 사용됩니다. 노트북의 다음 섹션에서는 BigQuery ML을 사용하는 대신 Python에서 직접 XGBoost를 사용하는 방법을 보여줍니다.

build_bqml_xgboost라는 구성요소를 실행해서 그리드 검색으로 표준 XGBoost 분류 모델을 실행하기 위한 구성요소를 빌드합니다. 그런 후 이 코드는 생성된 Cloud Storage 버킷에 모델을 아티팩트로 저장합니다. 이 함수는 출력 아티팩트에 대해 추가 매개변수(metricsmodel)가 지원됩니다. 이러한 매개변수는 Kubeflow 파이프라인에 필요합니다.

@component(
   # this component builds an xgboost classifier with xgboost
   packages_to_install=["google-cloud-bigquery", "xgboost", "pandas", "sklearn", "joblib", "pyarrow"],
   base_image="python:3.9",
   output_component_file="output_component/create_xgb_model_xgboost.yaml"
)
def build_xgb_xgboost(project_id: str,
                     data_set_id: str,
                     training_view: str,
                     metrics: Output[Metrics],
                     model: Output[Model]
):

...

  data_set = f"{project_id}.{data_set_id}.{training_view}"
  build_df_for_xgboost = '''
                         SELECT * FROM `{data_set}`
                         '''.format(data_set = data_set)

...

  xgb_model = XGBClassifier(n_estimators=50,
                            objective='binary:hinge',
                            silent=True,
                            nthread=1,
                           eval_metric="auc")
   random_search = RandomizedSearchCV(xgb_model,
                                     param_distributions=params,
                                     n_iter=param_comb,
                                     scoring='precision',
                                     n_jobs=4,
                                     cv=skf.split(X_train,y_train),
                                     verbose=3,
                                     random_state=1001 )
  random_search.fit(X_train, y_train)
  xgb_model_best = random_search.best_estimator_
  predictions = xgb_model_best.predict(X_test)
  score = accuracy_score(y_test, predictions)
  auc = roc_auc_score(y_test, predictions)
  precision_recall = precision_recall_curve(y_test, predictions)

  metrics.log_metric("accuracy",(score * 100.0))
  metrics.log_metric("framework", "xgboost")
  metrics.log_metric("dataset_size", len(df))
  metrics.log_metric("AUC", auc)

  dump(xgb_model_best, model.path + ".joblib")

엔드포인트 빌드

deploy_xgb라는 구성요소를 실행해서 이전 섹션의 XGBoost 모델을 사용하여 엔드포인트를 빌드합니다. 이 구성요소는 이전 XGBoost 모델 아티팩트를 가져와서, 컨테이너를 빌드한 후 엔드포인트를 배포합니다. 또한 사용자가 이를 볼 수 있도록 엔드포인트 URL을 아티팩트로 제공합니다. 이 단계가 완료되면 Vertex AI 엔드포인트가 생성되고 Vertex AI의 콘솔 페이지에서 엔드포인트를 볼 수 있습니다.

@component(
   # Deploys xgboost model
   packages_to_install=["google-cloud-aiplatform", "joblib", "sklearn", "xgboost"],
   base_image="python:3.9",
   output_component_file="output_component/xgboost_deploy_component.yaml",
)
def deploy_xgb(
   model: Input[Model],
   project_id: str,
   vertex_endpoint: Output[Artifact],
   vertex_model: Output[Model]
):
   from google.cloud import aiplatform
   aiplatform.init(project=project_id)
   deployed_model = aiplatform.Model.upload(
       display_name="tai-propensity-test-pipeline",
       artifact_uri = model.uri.replace("model", ""),
       serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-4:latest"
   )
   endpoint = deployed_model.deploy(machine_type="n1-standard-4")
# Save data to the output params
   vertex_endpoint.uri = endpoint.resource_name
   vertex_model.uri = deployed_model.resource_name

파이프라인 정의

파이프라인을 정의하려면 이전에 만든 구성요소를 기준으로 각 작업을 정의합니다. 그런 후 구성요소에 명시적으로 호출되지 않은 경우 파이프라인 요소의 순서를 지정할 수 있습니다.

예를 들어 노트북의 다음 코드는 파이프라인을 정의합니다. 여기에서 이 코드는 build_bqml_logistic_op 구성요소가 create_input_view_op 구성요소 다음에 실행되어야 합니다.

@dsl.pipeline(
   # Default pipeline root. You can override it when submitting the pipeline.
   pipeline_root=PIPELINE_ROOT,
   # A name for the pipeline.
   name="pipeline-test",
   description='Propensity BigQuery ML Test'
)
def pipeline():

   create_input_view_op = create_input_view(
                          view_name = VIEW_NAME,
                          data_set_id = DATA_SET_ID,
                          project_id = PROJECT_ID,
                          bucket_name = BUCKET_NAME,
                          blob_path = BLOB_PATH
                                            )
    build_bqml_logistic_op = build_bqml_logistic(
                        project_id = PROJECT_ID,
                        data_set_id = DATA_SET_ID,
                        model_name = 'bqml_logistic_model',
                        training_view = VIEW_NAME
                                                  )

 # several components have been deleted for brevity

   build_bqml_logistic_op.after(create_input_view_op)
   build_bqml_xgboost_op.after(create_input_view_op)
   build_bqml_automl_op.after(create_input_view_op)
   build_xgb_xgboost_op.after(create_input_view_op)

   evaluate_bqml_logistic_op.after(build_bqml_logistic_op)
   evaluate_bqml_xgboost_op.after(build_bqml_xgboost_op)
   evaluate_bqml_automl_op.after(build_bqml_automl_op)

파이프라인 컴파일 및 실행

이제 파이프라인을 컴파일하고 실행할 수 있습니다.

노트북에서 다음 코드는 캐싱을 사용 설정하기 위해 enable_caching 값을 true로 설정합니다. 캐싱이 사용 설정되었으면 구성요소가 성공적으로 완료되었던 이전 실행이 다시 실행되지 않습니다. 이 플래그는 캐싱이 사용 설정된 경우 실행이 더 빠르게 완료되고 리소스가 더 적게 사용되기 때문에 파이프라인을 테스트할 때 특히 유용합니다.

compiler.Compiler().compile(
   pipeline_func=pipeline, package_path="pipeline.json"
)
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
run = pipeline_jobs.PipelineJob(
   display_name="test-pipeine",
   template_path="pipeline.json",

   job_id="test-{0}".format(TIMESTAMP),
   enable_caching=True
)
run.run()

파이프라인 자동화

이 단계에서는 첫 번째 파이프라인을 실행했습니다. Console의 Vertex AI 파이프라인 페이지에서 이 작업의 상태를 확인할 수 있습니다. 각 컨테이너가 빌드되고 실행되는 것을 확인할 수 있습니다. 또한 각 항목을 클릭하여 이 섹션에서 특정 구성요소의 오류를 추적할 수 있습니다.

파이프라인을 예약하려면 Cloud 함수를 빌드하고 크론 작업과 비슷한 스케줄러를 사용합니다.

노트북 마지막 섹션의 코드는 다음 코드 스니펫에 표시된 것처럼 하루 한 번 실행되도록 파이프라인을 예약합니다.

from kfp.v2.google.client import AIPlatformClient
api_client = AIPlatformClient(project_id=PROJECT_ID,
                            region='us-central1'
                            )
api_client.create_schedule_from_job_spec(
   job_spec_path='pipeline.json',
   schedule='0 * * * *',
   enable_caching=False
)

프로덕션에서 완료된 파이프라인 사용

완료된 파이프라인으로 다음 태스크가 수행되었습니다.

  • 입력 데이터 세트를 만들었습니다.
  • BigQuery ML은 물론 Python's XGBoost를 모두 사용해서 일부 모델을 학습했습니다.
  • 모델 결과를 분석했습니다.
  • XGBoost 모델을 배포했습니다.

또한 Cloud Functions 및 Cloud Scheduler를 사용해서 매일 실행되도록 파이프라인을 자동화했습니다.

노트북에 정의된 파이프라인은 여러 모델을 만드는 방법을 보여주기 위해 생성되었습니다. 현재 빌드된 상태로는 파이프라인을 프로덕션 시나리오에서 실행하지 않을 것입니다. 하지만 이 파이프라인을 가이드로 사용해서 요구에 맞게 구성요소를 수정할 수 있습니다. 예를 들어 기능 만들기 프로세스를 수정해서 데이터를 활용하고, 데이터 범위를 수정하고, 대체 모델을 빌드할 수도 있습니다. 또한 설명된 것들 중에서 프로덕션 요구사항에 가장 적합한 모델을 선택할 수도 있습니다.

파이프라인을 프로덕션에 사용할 준비가 되었으면 추가 태스크를 구현할 수 있습니다. 예를 들어 매일 새 모델을 생성하고 새 모델(챌린저)과 기존 모델(챔피언)에 새 데이터에 맞게 점수를 책정하는 챔피언/챌린저 모델을 구현할 수도 있습니다. 성능이 현재 모델의 성능보다 뛰어난 경우에만 새 모델을 프로덕션에 배치합니다. 시스템 진행 상태를 모니터링하기 위해서는 매일 모델 성능을 기록하고 추세 성능을 시각화할 수도 있습니다.

다음 단계

  • MLOps를 사용하여 프로덕션에 사용 가능한 ML 시스템을 만드는 방법은 MLOps 실무자 가이드를 참조하세요.
  • Vertex AI에 대한 자세한 내용은 Vertex AI 문서를 참조하세요.
  • Kubeflow 파이프라인에 대한 자세한 내용은 KFP 문서를 참조하세요.
  • Cloud 아키텍처 센터에서 참조 아키텍처, 다이어그램, 권장사항 자세히 살펴보기