자습서 4부: 일괄 처리 채점 수행 및 레이크하우스에 예측 저장
이 자습서에서는 Microsoft Fabric MLflow 모델 레지스트리를 사용하여 3부에서 학습한 등록된 LightGBMClassifier 모델을 가져오고 레이크하우스에서 로드한 테스트 데이터 세트에 대한 일괄 처리 예측을 수행하는 방법을 알아봅니다.
Microsoft Fabric을 사용하면 모든 컴퓨팅 엔진에서 일괄 처리 채점을 지원하는 확장 가능한 PREDICT라는 함수를 통해 기계 학습 모델을 운영할 수 있습니다. Microsoft Fabric Notebook 또는 주어진 모델의 항목 페이지에서 바로 일괄 처리 예측을 생성할 수 있습니다. PREDICT에 대해 알아봅니다.
테스트 데이터 세트에 대한 일괄 처리 예측을 생성하려면 학습된 모든 기계 학습 모델 중에서 가장 뛰어난 성능을 보인 학습된 LightGBM 모델 버전 1을 사용합니다. 테스트 데이터 세트를 spark DataFrame에 로드하고 MLFlowTransformer 개체를 만들어 일괄 처리 예측을 생성합니다. 그런 다음, 다음 세 가지 방법 중 하나를 사용하여 PREDICT 함수를 호출할 수 있습니다.
- SynapseML의 변환기 API
- Spark SQL API
- PySpark UDF(사용자 정의 함수)
필수 조건
Microsoft Fabric 구독을 구매합니다. 또는 무료 Microsoft Fabric 평가판에 등록합니다.
Microsoft Fabric에 로그인합니다.
홈페이지 왼쪽의 환경 전환기를 사용하여 Synapse 데이터 과학 환경으로 전환합니다.
이 자습서는 총 5부 시리즈의 4부입니다. 이 자습서를 완료하려면 먼저 다음을 완료합니다.
- 1부: Apache Spark를 사용하여 Microsoft Fabric 레이크하우스에 데이터를 수집합니다.
- 2부: Microsoft Fabric Notebook을 사용하여 데이터를 탐색하고 시각화하여 데이터에 대해 자세히 알아봅니다.
- 3부: 기계 학습 모델을 학습시키고 등록합니다.
Notebooks에서 따라 하기
이 자습서에는 4-predict.ipynb Notebook이 함께 제공됩니다.
이 자습서에 함께 제공되는 Notebook을 열려면 데이터 과학 자습서를 위한 시스템 준비의 지침에 따라 Notebook을 작업 영역으로 가져옵니다.
이 페이지에서 코드를 복사하여 붙여넣으려는 경우 새 Notebook을 만들 수 있습니다.
코드 실행을 시작하기 전에 Notebook에 레이크하우스를 연결해야 합니다.
중요 사항
이 시리즈의 다른 부분에서 사용한 것과 동일한 레이크하우스를 연결합니다.
테스트 데이터 로드
3부에 저장한 테스트 데이터를 로드합니다.
df_test = spark.read.format("delta").load("Tables/df_test")
display(df_test)
변환기 API를 통해 PREDICT
SynapseML의 Transformer API를 사용하려면 먼저 MLFlowTransformer 개체를 만들어야 합니다.
MLFlowTransformer 개체 인스턴스화
MLFlowTransformer 개체는 3부에 등록한 MLFlow 모델 주위의 래퍼입니다. 이를 통해 지정된 DataFrame에 대한 일괄 처리 예측을 생성할 수 있습니다. MLFlowTransformer 개체를 인스턴스화하려면 다음 매개 변수를 제공해야 합니다.
- 모델에 대한 입력으로 필요한 테스트 DataFrame의 열입니다(이 경우 모두 필요).
- 새 출력 열의 이름입니다(이 경우 예측).
- 예측을 생성하는 데 필요한 올바른 모델 이름 및 모델 버전입니다(이 경우
lgbm_sm
및 버전 1).
from synapse.ml.predict import MLFlowTransformer
model = MLFlowTransformer(
inputCols=list(df_test.columns),
outputCol='predictions',
modelName='lgbm_sm',
modelVersion=1
)
이제 MLFlowTransformer 개체가 있으므로 이를 사용하여 일괄 처리 예측을 생성할 수 있습니다.
import pandas
predictions = model.transform(df_test)
display(predictions)
Spark SQL API를 통해 PREDICT
다음 코드는 Spark SQL API를 통해 PREDICT 함수를 호출합니다.
from pyspark.ml.feature import SQLTransformer
# Substitute "model_name", "model_version", and "features" below with values for your own model name, model version, and feature columns
model_name = 'lgbm_sm'
model_version = 1
features = df_test.columns
sqlt = SQLTransformer().setStatement(
f"SELECT PREDICT('{model_name}/{model_version}', {','.join(features)}) as predictions FROM __THIS__")
# Substitute "X_test" below with your own test dataset
display(sqlt.transform(df_test))
UDF(사용자 정의 함수)를 통해 PREDICT
다음 코드는 ySpark UDF를 통해 PREDICT 함수를 호출합니다.
from pyspark.sql.functions import col, pandas_udf, udf, lit
# Substitute "model" and "features" below with values for your own model name and feature columns
my_udf = model.to_udf()
features = df_test.columns
display(df_test.withColumn("predictions", my_udf(*[col(f) for f in features])))
모델의 항목 페이지에서 PREDICT 코드를 생성할 수도 있습니다. PREDICT에 대해 알아봅니다.
레이크하우스에 모델 예측 결과 쓰기
일괄 처리 예측을 생성한 후 모델 예측 결과를 레이크하우스에 다시 씁니다.
# Save predictions to lakehouse to be used for generating a Power BI report
table_name = "customer_churn_test_predictions"
predictions.write.format('delta').mode("overwrite").save(f"Tables/{table_name}")
print(f"Spark DataFrame saved to delta table: {table_name}")
다음 단계
다음을 계속 진행합니다.