Partager via


Inférence ONNX sur Spark

Dans cet exemple, vous formez un modèle LightGBM et convertissez le modèle au format ONNX. Une fois converti, vous utilisez le modèle pour déduire des données de test sur Spark.

Cet exemple utilise les packages et versions Python suivants :

  • onnxmltools==1.7.0
  • lightgbm==3.2.1

Prérequis

  • Attachez votre cahier à une cabane au bord du lac. Sur le côté gauche, sélectionnez Ajouter pour ajouter une maison de lac existante ou créer une maison de lac.
  • Vous devrez peut-être installer onnxmltools en ajoutant !pip install onnxmltools==1.7.0 une cellule de code, puis en exécutant la cellule.

Charger les données d’exemple

Pour charger les exemples de données, ajoutez les exemples de code suivants aux cellules de votre bloc-notes, puis exécutez les cellules :

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

from synapse.ml.core.platform import *
df = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/company_bankruptcy_prediction_data.csv"
    )
)

display(df)

Le résultat doit ressembler au tableau suivant, bien que les valeurs et le nombre de lignes puissent différer :

Intérêt ratio de couverture Indicateur de revenu net Équité à la responsabilité
0.5641 1.0 0.0165
0.5702 1.0 0.0208
0.5673 1.0 0.0165

Utiliser LightGBM pour former un modèle

from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMClassifier

feature_cols = df.columns[1:]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

train_data = featurizer.transform(df)["Bankrupt?", "features"]

model = (
    LightGBMClassifier(featuresCol="features", labelCol="Bankrupt?", dataTransferMode="bulk")
    .setEarlyStoppingRound(300)
    .setLambdaL1(0.5)
    .setNumIterations(1000)
    .setNumThreads(-1)
    .setMaxDeltaStep(0.5)
    .setNumLeaves(31)
    .setMaxDepth(-1)
    .setBaggingFraction(0.7)
    .setFeatureFraction(0.7)
    .setBaggingFreq(2)
    .setObjective("binary")
    .setIsUnbalance(True)
    .setMinSumHessianInLeaf(20)
    .setMinGainToSplit(0.01)
)

model = model.fit(train_data)

Convertir le modèle au format ONNX

Le code suivant exporte le modèle formé vers un booster LightGBM, puis le convertit au format ONNX :

import lightgbm as lgb
from lightgbm import Booster, LGBMClassifier


def convertModel(lgbm_model: LGBMClassifier or Booster, input_size: int) -> bytes:
    from onnxmltools.convert import convert_lightgbm
    from onnxconverter_common.data_types import FloatTensorType

    initial_types = [("input", FloatTensorType([-1, input_size]))]
    onnx_model = convert_lightgbm(
        lgbm_model, initial_types=initial_types, target_opset=9
    )
    return onnx_model.SerializeToString()


booster_model_str = model.getLightGBMBooster().modelStr().get()
booster = lgb.Booster(model_str=booster_model_str)
model_payload_ml = convertModel(booster, len(feature_cols))

Après la conversion, chargez la charge utile ONNX dans un ONNXModel et inspectez les entrées et les sorties du modèle :

from synapse.ml.onnx import ONNXModel

onnx_ml = ONNXModel().setModelPayload(model_payload_ml)

print("Model inputs:" + str(onnx_ml.getModelInputs()))
print("Model outputs:" + str(onnx_ml.getModelOutputs()))

Mappez l’entrée du modèle au nom de colonne de la trame de données d’entrée (FeedDict) et mappez les noms de colonne de la trame de données de sortie aux sorties du modèle (FetchDict).

onnx_ml = (
    onnx_ml.setDeviceType("CPU")
    .setFeedDict({"input": "features"})
    .setFetchDict({"probability": "probabilities", "prediction": "label"})
    .setMiniBatchSize(5000)
)

Utiliser le modèle pour l’inférence

Pour effectuer une inférence avec le modèle, le code suivant crée des données de test et transforme les données via le modèle ONNX.

from pyspark.ml.feature import VectorAssembler
import pandas as pd
import numpy as np

n = 1000 * 1000
m = 95
test = np.random.rand(n, m)
testPdf = pd.DataFrame(test)
cols = list(map(str, testPdf.columns))
testDf = spark.createDataFrame(testPdf)
testDf = testDf.union(testDf).repartition(200)
testDf = (
    VectorAssembler()
    .setInputCols(cols)
    .setOutputCol("features")
    .transform(testDf)
    .drop(*cols)
    .cache()
)

display(onnx_ml.transform(testDf))

Le résultat doit ressembler au tableau suivant, bien que les valeurs et le nombre de lignes puissent différer :

Index Fonctionnalités Prédiction Probabilité
1 "{"type":1,"values":[0.105... 0 "{"0":0.835...
2 "{"type":1,"values":[0.814... 0 "{"0":0.658...