Compartilhar via


Crie um modelo de aprendizado de máquina com o Apache Spark MLlib

Neste artigo, você vai saber como usar o Apache Spark MLlib para criar um aplicativo de aprendizado de máquina que faz uma análise preditiva simples em um Conjunto de Dados em Aberto do Azure. O Spark fornece bibliotecas de aprendizado de máquina integradas. Este exemplo usa classificação por meio de regressão logística.

As bibliotecas básicas do Spark SparkML e MLlib fornecem muitos utilitários úteis para tarefas de aprendizado de máquina. Estes utilitários são adequados para:

  • Classificação
  • Clustering
  • Teste de hipótese e cálculo de estatísticas de exemplo
  • Regressão
  • Decomposição de valor singular (SVD) e análise de componente principal (PCA)
  • Modelagem de tópico

Compreender a classificação e regressão logística

Classificação, uma tarefa popular de aprendizado de máquina que envolve a classificação de dados de entrada em categorias. Um algoritmo de classificação deve descobrir como atribuir rótulos aos dados de entrada fornecidos. Por exemplo, um algoritmo de aprendizado de máquina que poderia aceitar informações sobre estoque como entrada e divida o estoque em duas categorias: estoque que você deve vender e estoque que deve ser mantido.

A regressão logística é o algoritmo usado para classificação. A API de regressão logística do Spark é útil para classificação binária de dados de entrada em um dos dois grupos. Para obter mais informações sobre a regressão logística, confira a Wikipédia.

A regressão logística produz uma função logística que pode prever a probabilidade de um vetor de entrada pertencer a um grupo ou outro.

Exemplo de análise preditiva de dados de táxi de Nova York

Primeiro, instale o azureml-opendatasets. Os dados estão disponíveis por meio do recurso Azure Open Datasets. Esse subconjunto do conjunto de dados hospeda informações sobre as corridas de táxi amarelo, incluindo as horas de início e de término, locais de partida e destino e o custo das corridas.

%pip install azureml-opendatasets

No restante deste artigo, usaremos o Apache Spark para executar algumas análises sobre os dados de gorjeta de corrida de táxi de Nova Iorque e, em seguida, desenvolver um modelo para prever se uma viagem específica inclui uma dica ou não.

Criar um modelo de machine learning do Apache Spark

  1. Crie um notebook do PySpark. Para obter mais informações, visite Criar um notebook.

  2. Importe os tipos necessários para este notebook.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Usaremos o MLflow para acompanhar nossos experimentos de machine learning e execuções correspondentes. Se o Log Automático do Microsoft Fabric estiver habilitado, as métricas e os parâmetros correspondentes serão capturados automaticamente.

    import mlflow
    

Construir o DataFrame de entrada

Nesse exemplo, carregaremos os dados em um dataframe do Pandas e os converteremos em um dataframe do Apache Spark. Usando esse formato, podemos aplicar outras operações do Apache Spark para limpar e filtrar o conjunto de dados.

  1. Cole essas linhas em uma nova célula e execute-as para criar um DataFrame do Spark. Essa etapa recupera os dados por meio da API do Conjunto de Dados em Aberto no Azure. Podemos filtrar esses dados para examinar uma janela específica de dados. O exemplo de código usa start_date e end_date para aplicar um filtro que retorna um mês de dados.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Este código reduz o conjunto de dados para cerca de 10.000 linhas. Para acelerar o desenvolvimento e o treinamento, vamos examinar nosso conjunto de dados por enquanto.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Queremos dar uma olhada em nossos dados usando o comando interno display(). Esse comando nos permite exibir facilmente um exemplo dos dados ou explorar as tendências nos dados graficamente.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Preparar os dados

A preparação de dados é uma etapa crucial no processo de aprendizado de máquina. Ela envolve a limpeza, a transformação e a organização de dados brutos para torná-los adequados para análise e modelagem. No código a seguir, você executa várias etapas de preparação de dados:

  • Filtrar o conjunto de dados para remover o exceções e valores incorretos
  • Remover colunas que não são necessárias para treinamento de modelo
  • Criar novas colunas com base nos dados brutos
  • Gerar um rótulo para determinar se uma determinada viagem de táxi envolve ou não uma gorjeta
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Em seguida, uma segunda passagem é feita sobre os dados para adicionar os recursos finais.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Criar um modelo de regressão logística

A tarefa final é converter os dados rotulados para um formato que possa ser analisado pela regressão logística. A entrada para um algoritmo de regressão logística precisa ser uma estrutura de pares de vetor de recurso de rótulo, em que o vetor de recurso é um vetor de números que representa o ponto de entrada.

Com base nos requisitos finais da tarefa, devemos converter as colunas categóricas em números. Especificamente, as colunas trafficTimeBins e weekdayString precisam ser convertidas em representações de inteiros. Temos muitas opções disponíveis para lidar com este requisito. Este exemplo envolve a abordagem OneHotEncoder:

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Essa ação resulta em um novo DataFrame com todas as colunas no formato certo para treinar um modelo.

Treinar um modelo de regressão logística

A primeira tarefa é dividir o conjunto de dados em um conjunto de treinamento e um conjunto de teste ou validação.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Assim que tivermos os dois DataFrames, devemos criar a fórmula do modelo e executá-la no DataFrame de treinamento. Em seguida, podemos validar em relação ao DataFrame de teste. Faça experiências com versões diferentes da fórmula do modelo para ver o impacto de diferentes combinações.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

As saídas da célula:

Area under ROC = 0.9749430523917996

Criar uma representação visual da previsão

Agora podemos construir uma visualização final para interpretar os resultados do modelo. Uma curva ROC certamente pode apresentar o resultado.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Grafo que mostra a curva ROC para regressão logística no modelo de gorjetas.