Поделиться через


Устранение неполадок ParallelRunStep

ОБЛАСТЬ ПРИМЕНЕНИЯ: Пакет SDK для Python версии 1

Из этой статьи вы узнаете, как выполнять отладку и устранение неполадок класса ParallelRunStep из пакета SDL решения «Машинное обучение Azure».

Общие советы по устранению неполадок конвейера см. в разделе Устранение неполадок конвейеров машинного обучения.

Локальное тестирование сценариев

ParallelRunStep выполняется в качестве одного из шагов конвейера. Возможно, вы хотите протестировать скрипты локально в качестве первого шага.

Требования начального сценария

Скрипт записи для ParallelRunStep функции должен содержать run() функцию и при необходимости содержит init() функцию:

  • init(): эта функция применяется для всех затратных или повторяющихся операций подготовки к последующей обработке. Например, в ней можно загружать модель в глобальный объект. Эта функция будет вызываться только один раз в начале процесса.

    Примечание.

    Если метод init создает выходной каталог, укажите parents=True и exist_ok=True. Метод init вызывается из каждого рабочего процесса на каждом узле, где выполняется задание.

  • run(mini_batch): функция будет выполняться для каждого mini_batch экземпляра.
    • mini_batch: ParallelRunStep вызывает метод run и передает ему в качестве аргумента список либо Pandas DataFrame. Каждая запись в mini_batch содержит одно из следующих значений: путь к файлу для входных данных в формате FileDataset или Pandas DataFrame для входных данных в формате TabularDataset.
    • response: метод run() должен возвращать Pandas DataFrame или массив. Для append_row output_action эти возвращаемые элементы добавляются в общий выходной файл. Для summary_only содержимое элементов игнорируется. Для всех выходных действий каждый возвращаемый элемент обозначает один успешный запуск входного элемента во входном мини-пакете. Убедитесь в том, что в результат выполнения включено достаточно данных, чтобы сопоставить входные данные с результатом вывода. Выходные данные будут записаны в выходной файл, и для них не гарантируется правильный порядок. Для сопоставления со входными данными нужно использовать какой-либо ключ.

      Примечание.

      Для одного элемента входных данных ожидается один элемент выходных данных.

%%writefile digit_identification.py
# Snippets from a sample script.
# Refer to the accompanying digit_identification.py
# (https://github.com/Azure/MachineLearningNotebooks/tree/master/how-to-use-azureml/machine-learning-pipelines/parallel-run)
# for the implementation script.

import os
import numpy as np
import tensorflow as tf
from PIL import Image
from azureml.core import Model


def init():
    global g_tf_sess

    # Pull down the model from the workspace
    model_path = Model.get_model_path("mnist")

    # Construct a graph to execute
    tf.reset_default_graph()
    saver = tf.train.import_meta_graph(os.path.join(model_path, 'mnist-tf.model.meta'))
    g_tf_sess = tf.Session()
    saver.restore(g_tf_sess, os.path.join(model_path, 'mnist-tf.model'))


def run(mini_batch):
    print(f'run method start: {__file__}, run({mini_batch})')
    resultList = []
    in_tensor = g_tf_sess.graph.get_tensor_by_name("network/X:0")
    output = g_tf_sess.graph.get_tensor_by_name("network/output/MatMul:0")

    for image in mini_batch:
        # Prepare each image
        data = Image.open(image)
        np_im = np.array(data).reshape((1, 784))
        # Perform inference
        inference_result = output.eval(feed_dict={in_tensor: np_im}, session=g_tf_sess)
        # Find the best probability, and add it to the result list
        best_result = np.argmax(inference_result)
        resultList.append("{}: {}".format(os.path.basename(image), best_result))

    return resultList

Если у вас есть другой файл или папка в том же каталоге, что и скрипт вывода, можно сослаться на него, найдя текущий рабочий каталог. Если вы хотите импортировать пакеты, можно также добавить папку пакетов в sys.path.

script_dir = os.path.realpath(os.path.join(__file__, '..',))
file_path = os.path.join(script_dir, "<file_name>")

packages_dir = os.path.join(file_path, '<your_package_folder>')
if packages_dir not in sys.path:
    sys.path.append(packages_dir)
from <your_package> import <your_class>

Параметры для ParallelRunConfig

ParallelRunConfig — это основная конфигурация для экземпляра ParallelRunStep в конвейере Машинного обучения Azure. Он пригодится вам как оболочка скрипта для настройки необходимых параметров, включая перечисленные ниже записи:

  • entry_script: пользовательский скрипт в виде локального пути к файлу, который будет выполняться параллельно на нескольких узлах. Если присутствует source_directory, используйте относительный путь. В противном случае используйте любой путь, доступный на компьютере.

  • mini_batch_size: размер мини-пакета, переданного одному run() вызову. (Необязательный параметр; по умолчанию заданы 10 файлов для FileDataset и 1MB для TabularDataset.)

    • Для FileDataset здесь указывается количество файлов; минимальное допустимое значение — 1. Несколько файлов можно объединить в один мини-пакет.
    • Для TabularDataset здесь указывается размер данных. Примеры допустимых значений: 1024, 1024KB, 10MB и 1GB. Мы рекомендуем использовать значение 1MB. Мини-пакет из TabularDataset никогда не пересекает границы файлов. Предположим, что у вас есть CSV-файлы с разными размерами в пределах от 100 КБ до 10 МБ. Если задать mini_batch_size = 1MB, все файлы с размером меньше 1 МБ будут рассматриваться как один мини-пакет. Файлы с размером, превышающим 1 МБ, будут разбиты на несколько мини-пакетов.

      Примечание.

      Параметр TabularDatasets, поддерживаемый SQL, не может быть секционирован. Параметр TabularDatasets из одного файла Parquet и одной группы строк не может быть секционирован.

  • error_threshold: количество сбоев записей и TabularDataset сбоев файлов для FileDataset этого следует игнорировать во время обработки. Если общее количество ошибок для всего объема входных данных превысит это значение, задание будет прервано. Пороговое количество ошибок применяется к общему объему входных данных, а не к отдельному мини-пакету, которые передаются в метод run(). Используется диапазон [-1, int.max]. Часть -1 указывает на то, что следует игнорировать все сбои во время обработки.

  • output_action: одно из следующих значений указывает, как будут упорядочены выходные данные:

    • summary_only: сценарий пользователя будет хранить выходные данные. ParallelRunStep использует выходные данные только для вычисления порога ошибок.
    • append_row: для всех входных данных в выходной папке будет создан только один файл, чтобы добавить все выходные данные, разделенные строкой.
  • append_row_file_name: чтобы настроить имя выходного файла для append_row output_action (необязательно; значение parallel_run_step.txtпо умолчанию — ).

  • source_directory: пути к папкам, содержащим все файлы для выполнения на целевом объекте вычислений (необязательно).

  • compute_target: поддерживается только AmlCompute .

  • node_count: количество вычислительных узлов, используемых для выполнения пользовательского скрипта.

  • process_count_per_node: количество рабочих процессов на узел для параллельного выполнения начального сценария. Для компьютера GPU значение по умолчанию равно 1. Для компьютера с ЦП значение по умолчанию равно количеству ядер на узел. Рабочий процесс будет многократно вызывать run(), передавая полученный мини-пакет. Общее число рабочих процессов в задании равно process_count_per_node * node_count, что определяет максимальное число run() для параллельного выполнения.

  • environment: определение среды Python. Вы можете настроить использование существующей среды Python или временной среды. Также это определение может задавать необходимые зависимости приложения (необязательно).

  • logging_level: детализация журнала. Значения уровня детализации в порядке увеличения: WARNING, INFO и DEBUG. (необязательный параметр; по умолчанию используется значение INFO.)

  • run_invocation_timeoutrun(): время ожидания вызова метода в секундах. (необязательный параметр, значение по умолчанию — 60)

  • run_max_try: максимальное количество run() попыток для мини-пакета. Сбой run() при возникновении исключения или если при достижении run_invocation_timeout ничего не возвращается (необязательно; значение по умолчанию — 3).

Можно указать mini_batch_size, node_count, process_count_per_node, logging_level, run_invocation_timeout и run_max_try как PipelineParameter, чтобы при повторной отправке запуска конвейера можно было точно настроить значения параметров. В этом примере PipelineParameter используется для mini_batch_size и Process_count_per_node, и эти значения будут изменены при повторной отправке.

Видимость устройств CUDA

Для целевых объектов вычислений, оснащенных GPU, переменная среды CUDA_VISIBLE_DEVICES будет задана в рабочих процессах. В AmlCompute можно найти общее число устройств GPU в переменной среды AZ_BATCHAI_GPU_COUNT_FOUND, которая задается автоматически. Если требуется, чтобы каждый рабочий процесс имел выделенный GPU, задайте значение process_count_per_node, равное количеству устройств GPU на компьютере. Каждый рабочий процесс назначит уникальный индекс для CUDA_VISIBLE_DEVICES. Если рабочий процесс останавливается по какой-либо причине, следующий запущенный рабочий процесс будет использовать освободившийся индекс GPU.

Если общее число устройств GPU меньше process_count_per_node, индекс GPU будет назначаться рабочим процессам до тех пор, пока не будут использованы все, что есть.

Представив, что для примера общее число устройств GPU равно 2, а process_count_per_node = 4, процесс 0 и процесс 1 будут иметь индексы 0 и 1. Процессы 2 и 3 не будут иметь переменную среды. Для библиотеки, использующей эту переменную среды для назначения GPU, процессы 2 и 3 не будут иметь GPU и не будут пытаться получить устройства GPU. Если процесс 0 останавливается, он освобождает индекс GPU 0. Следующему процессу, которым является процесс 4, будет назначен индекс GPU 0.

Дополнительные сведения см. в разделе Советы по CUDA для профессионалов: управление видимостью GPU с помощью CUDA_VISIBLE_DEVICES.

Параметры для создания ParallelRunStep

Создайте ParallelRunStep, используя скрипт, конфигурацию среды и параметры. Укажите целевой объект вычислений, который уже подключен к рабочей области, в качестве целевого объекта для выполнения скрипта вывода. Используйте ParallelRunStep, чтобы создать шаг конвейера пакетного вывода, который принимает все следующие параметры.

  • name: Имя шага со следующими ограничениями на именование: уникальность, от 3 до 32 символов, соответствие регулярному выражению ^[a-z]([-a-z0-9]*[a-z0-9])?$.
  • parallel_run_configParallelRunConfig: объект, как определено ранее.
  • inputs: один или несколько однотипных Машинное обучение Azure наборов данных, которые следует секционировать для параллельной обработки.
  • side_inputs: один или несколько ссылочных данных или наборов данных, используемых в качестве побочных входных данных без необходимости секционироваться.
  • output: Объект OutputFileDatasetConfig, представляющий путь к каталогу, в котором будут храниться выходные данные.
  • arguments: список аргументов, переданных в скрипт пользователя. Используйте unknown_args, чтобы получить их в начальном сценарии (необязательно).
  • allow_reuse: следует ли шаг повторно использовать предыдущие результаты при выполнении с теми же параметрами и входными данными. Если этот параметр имеет значение False, то во время выполнения конвейера для этого шага всегда будет создаваться новый запуск. (необязательный параметр; по умолчанию используется значение True.)
from azureml.pipeline.steps import ParallelRunStep

parallelrun_step = ParallelRunStep(
    name="predict-digits-mnist",
    parallel_run_config=parallel_run_config,
    inputs=[input_mnist_ds_consumption],
    output=output_dir,
    allow_reuse=True
)

Отладка сценариев из удаленного контекста

Переход от локальной отладки сценария оценки к отладке сценария оценки в фактическом конвейере может оказаться сложной задачей. Сведения о поиске журналов на портале см. подраздел о конвейерах машинного обучения раздела сценариев отладки из удаленного контекста. Сведения в этом разделе также применимы к классу ParallelRunStep.

Например, файл журнала 70_driver_log.txt содержит сведения о контроллере, который запускает код ParallelRunStep.

Из-за того, что задания ParallelRunStep имеют распределенный характер, журналы могут поступать из нескольких разных источников. Однако создается два консолидированных файла, которые предоставляют общие сведения.

  • ~/logs/job_progress_overview.txt: этот файл предоставляет высокоуровневую информацию о количестве мини-пакетов (также известных как задачи), созданных до сих пор и количестве мини-пакетов, обработанных до сих пор. В этом случае отображается результат задачи. Если задание завершилось с ошибкой, отобразится сообщение об ошибке, а также рекомендации по началу устранения неполадок.

  • ~/logs/sys/master_role.txt: В этом файле доступно представление главного узла (т. н. оркестратор) выполняемого задания. Включает создание задач, мониторинг хода выполнения, результат выполнения.

Журналы, созданные на основе начального сценария с помощью вспомогательного метода EntryScript и операторов Print, находятся следующих файлах:

  • ~/logs/user/entry_script_log/<node_id>/<process_name>.log.txt: Это журналы, записываемые из entry_script с помощью вспомогательного метода EntryScript.

  • ~/logs/user/stdout/<node_id>/<process_name>.stdout.txt: Эти файлы являются журналами из stdout (например, инструкции Print) в entry_script.

  • ~/logs/user/stderr/<node_id>/<process_name>.stderr.txt: Эти файлы являются журналами из stderr в entry_script.

Следующие признаки позволяют быстро распознать ошибки в сценарии.

  • ~/logs/user/error.txt: этот файл попытается свести итоги ошибок в скрипте.

Дополнительные сведения об ошибках в сценарии:

  • ~/logs/user/error/: Содержит полные трассировки стека исключений, возникших при загрузке и выполнении скрипта записи.

Если необходимо в полной мере оценить, как сценарий оценки выполняется на каждом из узлов, просмотрите отдельные журналы процесса для каждого узла. Журналы процесса можно найти в папке sys/node. Они сгруппированы по рабочим узлам:

  • ~/logs/sys/node/<node_id>/<process_name>.txt: Этот файл содержит подробные сведения о каждом мини-пакете, который выбирается или выполняется рабочей ролью. Для каждого мини-пакета этот файл содержит:

    • IP-адрес и идентификатор рабочего процесса.
    • Общее число элементов, число успешно обработанных элементов и число элементов, обработка которых завершилась сбоем.
    • Время начала, продолжительность, время обработки и время выполнения метода.

Можно также просмотреть результаты периодических проверок использования ресурсов для каждого узла. Файлы журнала и файлы установки находятся в этой папке:

  • ~/logs/perf: Задайте --resource_monitor_interval для изменения интервала проверки в секундах. По умолчанию используется интервал 600, который составляет приблизительно 10 минут. Чтобы остановить наблюдение, задайте значение 0. Каждая папка <node_id> включает:

    • os/: Сведения обо всех выполняющихся в узле процессах. Одна проверка запускает команду операционной системы и сохраняет результат в файл. В системе Linux используется команда ps. В системе Windows используется команда tasklist.
      • %Y%m%d%H: Имя вложенной папки — это время в часах.
        • processes_%M: Файл завершается минутой времени проверки.
    • node_disk_usage.csv: Подробные сведения об использовании диска узлом.
    • node_resource_usage.csv: Общие сведения об использовании ресурсов узла.
    • processes_resource_usage.csv: Отчет об использовании ресурсов для каждого процесса.

Ведение журнала из пользовательского сценария в удаленном контексте

ParallelRunStep может запускать несколько процессов на одном узле на основе process_count_per_node. Чтобы упорядочить журналы из каждого процесса на узле и объединить инструкции print и log, рекомендуется использовать средство ведения журнала ParallelRunStep представленным ниже образом. Можно получить средство ведения журнала из EntryScript, как показано в приведенном ниже примере кода, чтобы журналы отображались в папке logs/user на портале.

Пример начального сценария с использованием средства ведения журнала:

from azureml_user.parallel_run import EntryScript

def init():
    """Init once in a worker process."""
    entry_script = EntryScript()
    logger = entry_script.logger
    logger.info("This will show up in files under logs/user on the Azure portal.")


def run(mini_batch):
    """Call once for a mini batch. Accept and return the list back."""
    # This class is in singleton pattern and will return same instance as the one in init()
    entry_script = EntryScript()
    logger = entry_script.logger
    logger.info(f"{__file__}: {mini_batch}.")
    ...

    return mini_batch

Куда принимается сообщение от logging в Python?

ParallelRunStep задает обработчик для корневого средства ведения журнала, который принимает сообщение в logs/user/stdout/<node_id>/processNNN.stdout.txt.

По умолчанию для logging используется уровень INFO. По умолчанию уровни ниже INFO, например DEBUG, не отображаются.

Как выполнить запись в файл, отображаемый на портале?

Файлы в папке logs будут отправлены и отображены на портале. Вы можете получить папку logs/user/entry_script_log/<node_id>, как показано ниже, и создать путь к файлу для записи:

from pathlib import Path
from azureml_user.parallel_run import EntryScript

def init():
    """Init once in a worker process."""
    entry_script = EntryScript()
    log_dir = entry_script.log_dir
    log_dir = Path(entry_script.log_dir)  # logs/user/entry_script_log/<node_id>/.
    log_dir.mkdir(parents=True, exist_ok=True) # Create the folder if not existing.

    proc_name = entry_script.agent_name  # The process name in pattern "processNNN".
    fil_path = log_dir / f"{proc_name}_<file_name>" # Avoid conflicting among worker processes with proc_name.

Как обрабатывать журналы в новых процессах?

Вы можете создавать новые процессы в скрипте записи с subprocess помощью модуля, подключаться к каналам ввода и вывода или ошибки и получать их коды возврата.

Рекомендуемый подход заключается в использовании функции run() с параметром capture_output=True. Ошибки будут отображаться в файле logs/user/error/<node_id>/<process_name>.txt.

Если вы хотите использовать Popen(), следует перенаправить потоки stdout и stderr в файлы, например:

from pathlib import Path
from subprocess import Popen

from azureml_user.parallel_run import EntryScript


def init():
    """Show how to redirect stdout/stderr to files in logs/user/entry_script_log/<node_id>/."""
    entry_script = EntryScript()
    proc_name = entry_script.agent_name  # The process name in pattern "processNNN".
    log_dir = Path(entry_script.log_dir)  # logs/user/entry_script_log/<node_id>/.
    log_dir.mkdir(parents=True, exist_ok=True) # Create the folder if not existing.
    stdout_file = str(log_dir / f"{proc_name}_demo_stdout.txt")
    stderr_file = str(log_dir / f"{proc_name}_demo_stderr.txt")
    proc = Popen(
        ["...")],
        stdout=open(stdout_file, "w"),
        stderr=open(stderr_file, "w"),
        # ...
    )

Примечание.

Рабочий процесс выполняет код "system" и код начального сценария в одном и том же процессе.

Если поток stdout или stderr не указан, подпроцесс, созданный с помощью метода Popen() в начальном сценарии, наследует параметр рабочего процесса.

stdout выполнит запись в logs/sys/node/<node_id>/processNNN.stdout.txt, а stderr — в logs/sys/node/<node_id>/processNNN.stderr.txt.

Как записать файл в выходной каталог, а затем просмотреть его на портале?

Вы можете получить выходной каталог из класса EntryScript и выполнить в него запись. Чтобы просмотреть записанные файлы, в шаге запуска представления на портале Машинного обучения Azure откройте вкладку Выходные данные и журналы. Выберите ссылку Data outputs (Выходные значения данных) и выполните действия, описанные в этом диалоговом окне.

Используйте EntryScript в начальном сценарии, как показано в следующем примере:

from pathlib import Path
from azureml_user.parallel_run import EntryScript

def run(mini_batch):
    output_dir = Path(entry_script.output_dir)
    (Path(output_dir) / res1).write...
    (Path(output_dir) / res2).write...

Как передать всем рабочим ролям сторонние входные данные, такие как файлы, содержащие таблицу подстановки.

Пользователь может передать ссылочные данные в скрипт, используя параметр side_inputs в ParallelRunStep. Все наборы данных, предоставленные как side_inputs, будут подключены к каждому рабочему узлу. Пользователь может получить расположение подключенгия, передав аргумент.

Создайте набор данных, содержащий ссылочные данные, затем укажите локальный путь подключения и зарегистрируйте его в рабочей области. Передайте его в параметр side_inputs вашего ParallelRunStep. Кроме того, можно добавить путь к нему в раздел arguments, чтобы легко получить доступ к подключенному пути.

Примечание.

Используйте FileDatasets только для side_inputs.

local_path = "/tmp/{}".format(str(uuid.uuid4()))
label_config = label_ds.as_named_input("labels_input").as_mount(local_path)
batch_score_step = ParallelRunStep(
    name=parallel_step_name,
    inputs=[input_images.as_named_input("input_images")],
    output=output_dir,
    arguments=["--labels_dir", label_config],
    side_inputs=[label_config],
    parallel_run_config=parallel_run_config,
)

После этого доступ к нему можно будет получить в сценарии вывода (например, в методе init ()) следующим образом:

parser = argparse.ArgumentParser()
parser.add_argument('--labels_dir', dest="labels_dir", required=True)
args, _ = parser.parse_known_args()

labels_path = args.labels_dir

Как использовать входные наборы данных с проверкой подлинности субъекта-службы?

Пользователи могут передавать входные наборы данных с проверкой подлинности субъекта-службы в рабочей области. Использование такого набора данных в ParallelRunStep требует регистрации набора данных для создания конфигурации ParallelRunStep.

service_principal = ServicePrincipalAuthentication(
    tenant_id="***",
    service_principal_id="***",
    service_principal_password="***")

ws = Workspace(
    subscription_id="***",
    resource_group="***",
    workspace_name="***",
    auth=service_principal
    )

default_blob_store = ws.get_default_datastore() # or Datastore(ws, '***datastore-name***')
ds = Dataset.File.from_files(default_blob_store, '**path***')
registered_ds = ds.register(ws, '***dataset-name***', create_new_version=True)

Проверка хода выполнения и его анализ

В этом разделе описано, как проверить ход выполнения задания ParallelRunStep и узнать причину непредвиденного поведения.

Как проверить ход выполнения задания?

Помимо общего состояния StepRun, в ~/logs/job_progress_overview.<timestamp>.txt можно просмотреть количество запланированных/обработанных мини-пакетов и ход создания выходных данных. Файл сменяется на ежедневной основе, актуальные сведения находятся в файле с наибольшей меткой времени.

Что следует проверить при отсутствии прогресса в течение некоторого времени?

Вы можете зайти в ~/logs/sys/errror, чтобы узнать о наличии каких-либо исключений. Если их нет, вполне вероятно, что ваш начальный сценарий занимает много времени. Вы можете вывести сведения о ходе выполнения в коде, чтобы найти часть, на которую уходит много времени, или добавить "--profiling_module", "cProfile" в arguments класса ParallelRunStep, чтобы создать файл профиля с именем <process_name>.profile в папке ~/logs/sys/node/<node_id>.

Когда остановится задание?

Если задание не отменено, оно остановится с состоянием:

  • Завершено. Если все мини-пакеты были обработаны, а для режима append_row были созданы выходные данные.
  • сбой. Если параметр error_threshold в Parameters for ParallelRunConfig превышен или во время задания произошла системная ошибка.

Где найти основную причину сбоя?

Чтобы найти причину и подробный журнал ошибок, можно обратиться к ~logs/job_result.txt.

Будет ли ошибка узла влиять на результат задания?

Нет, если в назначенном вычислительном кластере есть другие доступные узлы. Оркестратор запустит новый узел в качестве замены, а ParallelRunStep устойчив к такой операции.

Что произойдет при сбое функции init в начальном сценарии?

ParallelRunStep имеет механизм повтора в течение определенного времени, чтобы дать возможность восстановления от временных проблем без задержки сбоя задания слишком долго, механизм выглядит следующим образом:

  1. Когда после запуска узла init продолжает завершаться со сбоем на всех агентах, после неудач 3 * process_count_per_node дальнейшие попытки будут прекращены.
  2. Когда после запуска задания init продолжает завершаться со сбоем на всех агентах всех узлов, мы прекратим повторные попытки, если задание выполняется более 2 минут и число сбоев составляет 2 * node_count * process_count_per_node.
  3. Если все агенты задерживаются на init более 3 * run_invocation_timeout + 30 секунд, задание завершится сбоем, так как прогресс будет отсутствовать слишком долго.

Что произойдет при OutOfMemory? Как проверить причину?

ParallelRunStep установит текущую попытку обработать мини-пакет в состояние сбоя и попытается перезапустить завершившийся сбоем процесс. Чтобы найти процесс, потребляющий много памяти, можно проверить ~logs/perf/<node_id>.

Почему у меня много файлов processNNN?

ParallelRunStep запустит новые рабочие процессы на замену завершившимся аварийно, и каждый процесс создаст файл processNNN в качестве журнала. Однако если процесс завершился сбоем из-за исключения во время выполнения функции init пользовательского сценария и ошибка повторяется несколько раз (3 * process_count_per_node), новый рабочий процесс запущен не будет.

Следующие шаги