Поделиться через


Распределенное обучение моделей XGBoost с помощью xgboost.spark

Внимание

Эта функция предоставляется в режиме общедоступной предварительной версии.

Пакет Python xgboost>=1.7 содержит новый модуль xgboost.spark. Этот модуль включает в себя оценщики xgboost.spark.SparkXGBRegressorxgboost PySpark и xgboost.spark.SparkXGBClassifierxgboost.spark.SparkXGBRanker. Эти новые классы поддерживают включение оценщиков XGBoost в конвейеры SparkML. Дополнительные сведения об API см. в документации по API Spark для XGBoost.

Требования

Databricks Runtime 12.0 ML и более поздних версий.

параметры xgboost.spark

Оценки, определенные в модуле xgboost.spark , поддерживают большинство одинаковых параметров и аргументов, используемых в стандартном XGBoost.

  • Параметры конструктора класса, fit метода и predict метода в значительной степени идентичны параметрам в модуле xgboost.sklearn .
  • Именование, значения и значения по умолчанию в основном идентичны тем, которые описаны в параметрах XGBoost.
  • Исключения — это несколько неподдерживаемых параметров (напримерgpu_id, , sample_weightnthread, eval_set) и pyspark определенных параметров оценки ,которые были добавлены (напримерfeaturesCol, labelCol, , ). validationIndicatorColuse_gpu Дополнительные сведения см. в документации по API Spark для XGBoost.

Распределенное обучение

Оценки PySpark, определенные в модуле xgboost.spark , поддерживают распределенное обучение XGBoost с помощью num_workers параметра. Чтобы использовать распределенное обучение, создайте классификатор или регрессию и задайте num_workers количество параллельных задач Spark во время распределенного обучения. Чтобы использовать все слоты задач Spark, задайте.num_workers=sc.defaultParallelism

Рассмотрим пример.

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)

Примечание.

  • Нельзя использовать mlflow.xgboost.autolog с распределенным обучением XGBoost. Чтобы регистрировать модель Spark xgboost с помощью MLflow, используйте mlflow.spark.log_model(spark_xgb_model, artifact_path).
  • Вы не можете использовать распределенный XGBoost в кластере с включенным автомасштабированием. Новые рабочие узлы, начинающиеся в этой парадигме эластичного масштабирования, не могут получать новые наборы задач и оставаться бездействующими. Инструкции по отключению автомасштабирования см. в разделе "Включение автомасштабирования".

Включение оптимизации для обучения для разреженного набора данных функций

Оценки PySpark, определенные в xgboost.spark оптимизации поддержки модуля для обучения наборов данных с разреженными функциями. Чтобы включить оптимизацию разреженных наборов компонентов, необходимо предоставить набор fit данных методу, содержаму столбец признаков, состоящий из значений типа pyspark.ml.linalg.SparseVector и задать параметр enable_sparse_data_optim оценки.True Кроме того, необходимо задать missing для параметра 0.0значение .

Например:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)

Обучение GPU

Оценки PySpark, определенные в модуле xgboost.spark , поддерживают обучение на GPU. Задайте параметр use_gpu для True включения обучения GPU.

Примечание.

Для каждой задачи Spark, используемой в распределенном обучении use_gpu XGBoost, при установке Trueаргумента используется только один GPU. Databricks рекомендует использовать значение 1 по умолчанию для конфигурации spark.task.resource.gpu.amountкластера Spark. В противном случае дополнительные графические процессоры, выделенные этой задачей Spark, неактивны.

Например:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)

Устранение неполадок

Во время обучения с несколькими узлами, если вы столкнулись NCCL failure: remote process exited or there was a network error с сообщением, обычно это указывает на проблему с сетевым обменом данными между gpu. Эта проблема возникает, когда NCCL (библиотека коллективных коммуникаций NVIDIA) не может использовать определенные сетевые интерфейсы для взаимодействия с GPU.

Чтобы устранить эту проблему, задайте для кластера значение spark.executorEnv.NCCL_SOCKET_IFNAME ethsparkConf. Это по сути задает переменную NCCL_SOCKET_IFNAME eth среды для всех рабочих ролей в узле.

Пример записной книжки

В этой записной книжке показано использование пакета xgboost.spark Python с Spark MLlib.

Записная книжка PySpark-XGBoost

Получить записную книжку

Руководство по миграции для устаревшего sparkdl.xgboost модуля

  • Замените и замените from sparkdl.xgboost import XgboostRegressor from sparkdl.xgboost import XgboostClassifier на from xgboost.spark import SparkXGBClassifier.from xgboost.spark import SparkXGBRegressor
  • Измените все имена параметров в конструкторе оценщика из стиля верблюдьего Регистра на snake_case стиле. Например, измените XgboostRegressor(featuresCol=XXX) на SparkXGBRegressor(features_col=XXX).
  • Параметры use_external_storage и external_storage_precision удалены. xgboost.spark Средства оценки используют API итерации данных DMatrix для более эффективного использования памяти. Больше не требуется использовать неэффективный внешний режим хранения. Для очень больших наборов данных Databricks рекомендует увеличить num_workers параметр, что делает каждую задачу обучения секционированием данных на меньшие, более управляемые секции данных. Рассмотрим параметр num_workers = sc.defaultParallelism, который задает num_workers общее количество слотов задач Spark в кластере.
  • Для оценщиков, определенных в xgboost.spark, параметр num_workers=1 выполняет обучение модели с помощью одной задачи Spark. Это использует количество ядер ЦП, указанных параметром spark.task.cpusконфигурации кластера Spark, которое по умолчанию равно 1. Чтобы использовать больше ядер ЦП для обучения модели, увеличения или spark.task.cpusувеличенияnum_workers. Нельзя задать или n_jobs параметр nthread для оценщиков, определенных в xgboost.spark. Это поведение отличается от предыдущего поведения оценщиков, определенных в устаревшем пакете sparkdl.xgboost .

Преобразование sparkdl.xgboost модели в xgboost.spark модель

sparkdl.xgboost модели сохраняются в другом формате, чем xgboost.spark модели, и имеют разные параметры параметров. Используйте следующую служебную функцию для преобразования модели:

def convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls,
  sparkdl_xgboost_model,
):
  """
  :param xgboost_spark_estimator_cls:
      `xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
  :param sparkdl_xgboost_model:
      `sparkdl.xgboost` model instance e.g. the instance of
       `sparkdl.xgboost.XgboostRegressorModel` type.

  :return
      A `xgboost.spark` model instance
  """

  def convert_param_key(key):
    from xgboost.spark.core import _inverse_pyspark_param_alias_map
    if key == "baseMarginCol":
      return "base_margin_col"
    if key in _inverse_pyspark_param_alias_map:
      return _inverse_pyspark_param_alias_map[key]
    if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
      return None
    return key

  xgboost_spark_params_dict = {}
  for param in sparkdl_xgboost_model.params:
    if param.name == "arbitraryParamsDict":
      continue
    if sparkdl_xgboost_model.isDefined(param):
      xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)

  xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))

  xgboost_spark_params_dict = {
    convert_param_key(k): v
    for k, v in xgboost_spark_params_dict.items()
    if convert_param_key(k) is not None
  }

  booster = sparkdl_xgboost_model.get_booster()
  booster_bytes = booster.save_raw("json")
  booster_config = booster.save_config()
  estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
  sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
  return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))

# Example
from xgboost.spark import SparkXGBRegressor

new_model = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=model,
)

Если у вас есть pyspark.ml.PipelineModel модель, sparkdl.xgboost содержащая модель в качестве последней стадии sparkdl.xgboost , можно заменить этап модели преобразованной xgboost.spark моделью.

pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=pipeline_model.stages[-1],
)