Partilhar via


Resolver problemas de ParallelRunStep

APLICA-SE A: Python SDK azureml v1

Neste artigo, você aprenderá a solucionar problemas quando receber erros usando a classe ParallelRunStep do SDK do Azure Machine Learning.

Para obter dicas gerais sobre como solucionar problemas de um pipeline, consulte Solução de problemas de pipelines de aprendizado de máquina.

Testando scripts localmente

Seu ParallelRunStep é executado como uma etapa em pipelines de ML. Você pode querer testar seus scripts localmente como uma primeira etapa.

Requisitos do script de entrada

O script de entrada para um ParallelRunStep deve conter uma run() função e, opcionalmente, contém uma init() função:

  • init(): Use esta função para qualquer preparação dispendiosa ou comum para processamento posterior. Por exemplo, use-o para carregar o modelo em um objeto global. Esta função será chamada apenas uma vez no início do processo.

    Nota

    Se o seu init método cria um diretório de saída, especifique isso parents=True e exist_ok=True. O init método é chamado a partir de cada processo de trabalho em cada nó no qual o trabalho está sendo executado.

  • run(mini_batch): A função será executada para cada mini_batch instância.
    • mini_batch: ParallelRunStep invocará o método run e passará uma lista ou pandas DataFrame como um argumento para o método. Cada entrada no mini_batch será um caminho de arquivo se a entrada for um FileDataset ou um pandas DataFrame se a entrada for um TabularDatasetarquivo .
    • response: run() método deve retornar um pandas DataFrame ou uma matriz. Por append_row output_action, esses elementos retornados são anexados ao arquivo de saída comum. Por summary_only, o conteúdo dos elementos é ignorado. Para todas as ações de saída, cada elemento de saída retornado indica uma execução bem-sucedida do elemento de entrada no minilote de entrada. Certifique-se de que dados suficientes estão incluídos no resultado da execução para mapear a entrada para executar o resultado da saída. A saída de execução será escrita no arquivo de saída e não é garantido que esteja em ordem, você deve usar alguma chave na saída para mapeá-la para a entrada.

      Nota

      Um elemento de saída é esperado para um elemento de entrada.

%%writefile digit_identification.py
# Snippets from a sample script.
# Refer to the accompanying digit_identification.py
# (https://github.com/Azure/MachineLearningNotebooks/tree/master/how-to-use-azureml/machine-learning-pipelines/parallel-run)
# for the implementation script.

import os
import numpy as np
import tensorflow as tf
from PIL import Image
from azureml.core import Model


def init():
    global g_tf_sess

    # Pull down the model from the workspace
    model_path = Model.get_model_path("mnist")

    # Construct a graph to execute
    tf.reset_default_graph()
    saver = tf.train.import_meta_graph(os.path.join(model_path, 'mnist-tf.model.meta'))
    g_tf_sess = tf.Session()
    saver.restore(g_tf_sess, os.path.join(model_path, 'mnist-tf.model'))


def run(mini_batch):
    print(f'run method start: {__file__}, run({mini_batch})')
    resultList = []
    in_tensor = g_tf_sess.graph.get_tensor_by_name("network/X:0")
    output = g_tf_sess.graph.get_tensor_by_name("network/output/MatMul:0")

    for image in mini_batch:
        # Prepare each image
        data = Image.open(image)
        np_im = np.array(data).reshape((1, 784))
        # Perform inference
        inference_result = output.eval(feed_dict={in_tensor: np_im}, session=g_tf_sess)
        # Find the best probability, and add it to the result list
        best_result = np.argmax(inference_result)
        resultList.append("{}: {}".format(os.path.basename(image), best_result))

    return resultList

Se você tiver outro arquivo ou pasta no mesmo diretório do script de inferência, poderá fazer referência a ele localizando o diretório de trabalho atual. Se quiser importar seus pacotes, você também pode anexar sua pasta de pacotes ao sys.path.

script_dir = os.path.realpath(os.path.join(__file__, '..',))
file_path = os.path.join(script_dir, "<file_name>")

packages_dir = os.path.join(file_path, '<your_package_folder>')
if packages_dir not in sys.path:
    sys.path.append(packages_dir)
from <your_package> import <your_class>

Parâmetros para ParallelRunConfig

ParallelRunConfig é a configuração principal, por ParallelRunStep exemplo, dentro do pipeline do Azure Machine Learning. Você o usa para encapsular seu script e configurar os parâmetros necessários, incluindo todas as seguintes entradas:

  • entry_script: Um script de usuário como um caminho de arquivo local que será executado em paralelo em vários nós. Se source_directory estiver presente, use um caminho relativo. Caso contrário, use qualquer caminho acessível na máquina.

  • mini_batch_size: O tamanho do minilote passado para uma única run() chamada. (opcional; o valor padrão é 10 arquivos para FileDataset e 1MB para TabularDataset.)

    • Para FileDataset, é o número de arquivos com um valor mínimo de 1. Você pode combinar vários arquivos em um minilote.
    • Para TabularDataset, é o tamanho dos dados. Os valores de exemplo são 1024, 1024KB, 10MBe 1GB. O valor recomendado é 1MB. O minilote de nunca cruzará os limites do TabularDataset arquivo. Por exemplo, se você tiver .csv arquivos com vários tamanhos, o menor arquivo é de 100 KB e o maior é de 10 MB. Se você definir mini_batch_size = 1MBo , os arquivos com tamanho menor que 1 MB serão tratados como um minilote. Os ficheiros com um tamanho superior a 1 MB serão divididos em vários mini-lotes.

      Nota

      TabularDatasets apoiados por SQL não podem ser particionados. TabularDatasets de um único arquivo parquet e um único grupo de linhas não podem ser particionados.

  • error_threshold: O número de falhas TabularDataset de registro e falhas de arquivo para FileDataset isso deve ser ignorado durante o processamento. Se a contagem de erros para toda a entrada ultrapassar esse valor, o trabalho será anulado. O limite de erro é para toda a entrada e não para o minilote individual enviado para o run() método. O intervalo é [-1, int.max]. A -1 peça indica ignorar todas as falhas durante o processamento.

  • output_action: Um dos seguintes valores indica como a saída será organizada:

    • summary_only: O script de usuário armazenará a saída. ParallelRunStep usará a saída apenas para o cálculo do limite de erro.
    • append_row: Para todas as entradas, apenas um arquivo será criado na pasta de saída para acrescentar todas as saídas separadas por linha.
  • append_row_file_name: Para personalizar o nome do arquivo de saída para append_row output_action (opcional; o valor padrão é parallel_run_step.txt).

  • source_directory: Caminhos para pastas que contêm todos os arquivos a serem executados no destino de computação (opcional).

  • compute_target: Somente AmlCompute é suportado.

  • node_count: O número de nós de computação a serem usados para executar o script do usuário.

  • process_count_per_node: O número de processos de trabalho por nó para executar o script de entrada em paralelo. Para uma máquina GPU, o valor padrão é 1. Para uma máquina CPU, o valor padrão é o número de núcleos por nó. Um processo de trabalho chamará run() repetidamente passando o mini lote que obtém. O número total de processos de trabalho em seu trabalho é process_count_per_node * node_count, que decide o número máximo de processos a run() serem executados em paralelo.

  • environment: A definição do ambiente Python. Você pode configurá-lo para usar um ambiente Python existente ou para configurar um ambiente temporário. A definição também é responsável por definir as dependências de aplicativo necessárias (opcional).

  • logging_level: Log verbosidade. Os valores no aumento da verbosidade são: WARNING, INFO, e DEBUG. (opcional; o valor padrão é INFO)

  • run_invocation_timeout: O run() tempo limite de invocação do método em segundos. (opcional; o valor padrão é 60)

  • run_max_try: Contagem máxima de tentativas para run() um minilote. A run() falhará se uma exceção for lançada ou nada for retornado quando run_invocation_timeout for atingido (opcional; o valor padrão será 3).

Você pode especificar mini_batch_size, node_count, process_count_per_node, logging_level, run_invocation_timeout, e run_max_try como PipelineParameter, para que, ao reenviar uma execução de pipeline, possa ajustar os valores dos parâmetros. Neste exemplo, você usa PipelineParameter para mini_batch_size e Process_count_per_node e alterará esses valores quando reenviar outra execução.

Visibilidade dos dispositivos CUDA

Para destinos de computação equipados com GPUs, a variável CUDA_VISIBLE_DEVICES de ambiente será definida em processos de trabalho. Em AmlCompute, você pode encontrar o número total de dispositivos GPU na variável AZ_BATCHAI_GPU_COUNT_FOUNDde ambiente , que é definida automaticamente. Se você quiser que cada processo de trabalho tenha uma GPU dedicada, defina process_count_per_node igual ao número de dispositivos GPU em uma máquina. Cada processo de trabalho atribuirá um índice exclusivo ao CUDA_VISIBLE_DEVICES. Se um processo de trabalho parar por qualquer motivo, o próximo processo de trabalho iniciado usará o índice de GPU liberado.

Se o número total de dispositivos GPU for menor que process_count_per_node, os processos de trabalho receberão o índice GPU até que todos tenham sido usados.

Dado que o total de dispositivos GPU é 2 e process_count_per_node = 4 , como exemplo, o processo 0 e o processo 1 terão índice 0 e 1. Os processos 2 e 3 não terão uma variável de ambiente. Para uma biblioteca que usa essa variável de ambiente para atribuição de GPU, os processos 2 e 3 não terão GPUs e não tentarão adquirir dispositivos GPU. Se o processo 0 parar, ele liberará o índice de GPU 0. O próximo processo, que é o processo 4, terá o índice GPU 0 atribuído.

Para obter mais informações, consulte CUDA Pro Tip: Control GPU Visibility with CUDA_VISIBLE_DEVICES.

Parâmetros para criar o ParallelRunStep

Crie o ParallelRunStep usando o script, a configuração do ambiente e os parâmetros. Especifique o destino de computação que você já anexou ao seu espaço de trabalho como o destino de execução para seu script de inferência. Use ParallelRunStep para criar a etapa de pipeline de inferência em lote, que usa todos os seguintes parâmetros:

  • name: O nome da etapa, com as seguintes restrições de nomenclatura: exclusivo, 3-32 caracteres e regex ^[a-z]([-a-z0-9]*[a-z0-9])?$.
  • parallel_run_config: Um ParallelRunConfig objeto, conforme definido anteriormente.
  • inputs: Um ou mais conjuntos de dados do Azure Machine Learning de tipo único a serem particionados para processamento paralelo.
  • side_inputs: Um ou mais dados de referência ou conjuntos de dados usados como entradas laterais sem a necessidade de serem particionados.
  • output: Um OutputFileDatasetConfig objeto que representa o caminho do diretório no qual os dados de saída serão armazenados.
  • arguments: Uma lista de argumentos passados para o script de usuário. Use unknown_args para recuperá-los em seu script de entrada (opcional).
  • allow_reuse: Se a etapa deve reutilizar resultados anteriores quando executada com as mesmas configurações/entradas. Se esse parâmetro for False, uma nova execução sempre será gerada para esta etapa durante a execução do pipeline. (opcional; o valor padrão é True.)
from azureml.pipeline.steps import ParallelRunStep

parallelrun_step = ParallelRunStep(
    name="predict-digits-mnist",
    parallel_run_config=parallel_run_config,
    inputs=[input_mnist_ds_consumption],
    output=output_dir,
    allow_reuse=True
)

Depurando scripts do contexto remoto

A transição da depuração de um script de pontuação localmente para a depuração de um script de pontuação em um pipeline real pode ser um salto difícil. Para obter informações sobre como localizar seus logs no portal, consulte a seção pipelines de aprendizado de máquina sobre depuração de scripts de um contexto remoto. As informações nessa seção também se aplicam a um ParallelRunStep.

Por exemplo, o arquivo 70_driver_log.txt de log contém informações do controlador que inicia o código ParallelRunStep.

Devido à natureza distribuída dos trabalhos ParallelRunStep, há logs de várias fontes diferentes. No entanto, são criados dois ficheiros consolidados que fornecem informações de alto nível:

  • ~/logs/job_progress_overview.txt: Este arquivo fornece informações de alto nível sobre o número de minilotes (também conhecidos como tarefas) criados até agora e o número de minilotes processados até agora. Neste final, mostra o resultado do trabalho. Se o trabalho falhar, ele mostrará a mensagem de erro e onde iniciar a solução de problemas.

  • ~/logs/sys/master_role.txt: Este arquivo fornece a visualização do nó principal (também conhecido como orquestrador) do trabalho em execução. Inclui a criação de tarefas, o monitoramento do progresso, o resultado da execução.

Os logs gerados a partir do script de entrada usando instruções auxiliares e impressas do EntryScript serão encontrados nos seguintes arquivos:

  • ~/logs/user/entry_script_log/<node_id>/<process_name>.log.txt: Esses arquivos são os logs escritos a partir de entry_script usando o auxiliar EntryScript.

  • ~/logs/user/stdout/<node_id>/<process_name>.stdout.txt: Esses arquivos são os logs do stdout (por exemplo, instrução de impressão) de entry_script.

  • ~/logs/user/stderr/<node_id>/<process_name>.stderr.txt: Esses arquivos são os logs do stderr de entry_script.

Para uma compreensão concisa dos erros em seu script há:

  • ~/logs/user/error.txt: Este arquivo tentará resumir os erros em seu script.

Para obter mais informações sobre erros em seu script, há:

  • ~/logs/user/error/: Contém rastreamentos de pilha completa de exceções lançadas durante o carregamento e execução do script de entrada.

Quando você precisar de uma compreensão completa de como cada nó executou o script de pontuação, examine os logs de processo individuais para cada nó. Os logs de sys/node processo podem ser encontrados na pasta, agrupados por nós de trabalho:

  • ~/logs/sys/node/<node_id>/<process_name>.txt: Este arquivo fornece informações detalhadas sobre cada minilote à medida que é coletado ou concluído por um trabalhador. Para cada minilote, este ficheiro inclui:

    • O endereço IP e o PID do processo de trabalho.
    • O número total de itens, a contagem de itens processados com êxito e a contagem de itens com falha.
    • A hora de início, duração, tempo do processo e tempo do método de execução.

Você também pode exibir os resultados de verificações periódicas do uso de recursos para cada nó. Os arquivos de log e os arquivos de instalação estão nesta pasta:

  • ~/logs/perf: Defina --resource_monitor_interval para alterar o intervalo de verificação em segundos. O intervalo padrão é 600, que é de aproximadamente 10 minutos. Para interromper o monitoramento, defina o valor como 0. Cada <node_id> pasta inclui:

    • os/: Informações sobre todos os processos em execução no nó. Uma verificação executa um comando do sistema operacional e salva o resultado em um arquivo. No Linux, o comando é ps. No Windows, use tasklisto .
      • %Y%m%d%H: O nome da subpasta é o tempo até a hora.
        • processes_%M: O ficheiro termina com o minuto da hora de verificação.
    • node_disk_usage.csv: Uso detalhado do disco do nó.
    • node_resource_usage.csv: Visão geral do uso de recursos do nó.
    • processes_resource_usage.csv: Visão geral do uso de recursos de cada processo.

Como faço para registrar a partir do meu script de usuário a partir de um contexto remoto?

ParallelRunStep pode executar vários processos em um nó com base em process_count_per_node. Para organizar os logs de cada processo no nó e combinar a instrução print e log, recomendamos o uso do registrador ParallelRunStep conforme mostrado abaixo. Você obtém um logger do EntryScript e faz com que os logs apareçam na pasta logs/user no portal.

Um script de entrada de exemplo usando o registrador:

from azureml_user.parallel_run import EntryScript

def init():
    """Init once in a worker process."""
    entry_script = EntryScript()
    logger = entry_script.logger
    logger.info("This will show up in files under logs/user on the Azure portal.")


def run(mini_batch):
    """Call once for a mini batch. Accept and return the list back."""
    # This class is in singleton pattern and will return same instance as the one in init()
    entry_script = EntryScript()
    logger = entry_script.logger
    logger.info(f"{__file__}: {mini_batch}.")
    ...

    return mini_batch

Para onde vai a mensagem do Python logging ?

ParallelRunStep define um manipulador no registrador raiz, que armazena a mensagem para logs/user/stdout/<node_id>/processNNN.stdout.txt.

logging padrão para INFO o nível. Por padrão, os níveis abaixo INFO não aparecerão, como DEBUG.

Como posso escrever num ficheiro para aparecer no portal?

Os ficheiros na logs pasta serão carregados e apresentados no portal. Você pode obter a pasta logs/user/entry_script_log/<node_id> como abaixo e compor o caminho do arquivo para gravar:

from pathlib import Path
from azureml_user.parallel_run import EntryScript

def init():
    """Init once in a worker process."""
    entry_script = EntryScript()
    log_dir = entry_script.log_dir
    log_dir = Path(entry_script.log_dir)  # logs/user/entry_script_log/<node_id>/.
    log_dir.mkdir(parents=True, exist_ok=True) # Create the folder if not existing.

    proc_name = entry_script.agent_name  # The process name in pattern "processNNN".
    fil_path = log_dir / f"{proc_name}_<file_name>" # Avoid conflicting among worker processes with proc_name.

Como lidar com novos processos de login?

Você pode gerar novos processos em seu script de entrada com subprocess módulo, conectar-se a seus tubos de entrada/saída/erro e obter seus códigos de retorno.

A abordagem recomendada é usar a run() função com capture_output=True. Os erros aparecerão no logs/user/error/<node_id>/<process_name>.txt.

Se você quiser usar Popen()o , você deve redirecionar stdout/stderr para arquivos, como:

from pathlib import Path
from subprocess import Popen

from azureml_user.parallel_run import EntryScript


def init():
    """Show how to redirect stdout/stderr to files in logs/user/entry_script_log/<node_id>/."""
    entry_script = EntryScript()
    proc_name = entry_script.agent_name  # The process name in pattern "processNNN".
    log_dir = Path(entry_script.log_dir)  # logs/user/entry_script_log/<node_id>/.
    log_dir.mkdir(parents=True, exist_ok=True) # Create the folder if not existing.
    stdout_file = str(log_dir / f"{proc_name}_demo_stdout.txt")
    stderr_file = str(log_dir / f"{proc_name}_demo_stderr.txt")
    proc = Popen(
        ["...")],
        stdout=open(stdout_file, "w"),
        stderr=open(stderr_file, "w"),
        # ...
    )

Nota

Um processo de trabalho executa o código "system" e o código de script de entrada no mesmo processo.

Se não stdout ou stderr especificado, um subprocesso criado com Popen() seu script de entrada herdará a configuração do processo de trabalho.

stdout escreverá para logs/sys/node/<node_id>/processNNN.stdout.txt e stderr para logs/sys/node/<node_id>/processNNN.stderr.txt.

Como faço para gravar um arquivo no diretório de saída e, em seguida, visualizá-lo no portal?

Você pode obter o diretório de saída da EntryScript classe e gravar nele. Para exibir os arquivos gravados, no modo de exibição Executar etapa no portal do Azure Machine Learning, selecione a guia Saídas + logs . Selecione o link Saídas de dados e conclua as etapas descritas na caixa de diálogo.

Use EntryScript em seu script de entrada como neste exemplo:

from pathlib import Path
from azureml_user.parallel_run import EntryScript

def run(mini_batch):
    output_dir = Path(entry_script.output_dir)
    (Path(output_dir) / res1).write...
    (Path(output_dir) / res2).write...

Como posso passar uma entrada lateral, como um arquivo ou arquivos contendo uma tabela de pesquisa, para todos os meus trabalhadores?

O usuário pode passar dados de referência para o script usando side_inputs parâmetro de ParalleRunStep. Todos os conjuntos de dados fornecidos como side_inputs serão montados em cada nó de trabalho. O usuário pode obter a localização da montagem passando argumento.

Construa um Dataset contendo os dados de referência, especifique um caminho de montagem local e registre-o em seu espaço de trabalho. Passe para o side_inputs parâmetro do seu ParallelRunSteparquivo . Além disso, você pode adicionar seu caminho na arguments seção para acessar facilmente seu caminho montado.

Nota

Use FileDatasets somente para side_inputs.

local_path = "/tmp/{}".format(str(uuid.uuid4()))
label_config = label_ds.as_named_input("labels_input").as_mount(local_path)
batch_score_step = ParallelRunStep(
    name=parallel_step_name,
    inputs=[input_images.as_named_input("input_images")],
    output=output_dir,
    arguments=["--labels_dir", label_config],
    side_inputs=[label_config],
    parallel_run_config=parallel_run_config,
)

Depois disso, você pode acessá-lo em seu script de inferência (por exemplo, em seu método init()) da seguinte maneira:

parser = argparse.ArgumentParser()
parser.add_argument('--labels_dir', dest="labels_dir", required=True)
args, _ = parser.parse_known_args()

labels_path = args.labels_dir

Como usar conjuntos de dados de entrada com autenticação de entidade de serviço?

O usuário pode passar conjuntos de dados de entrada com a autenticação da entidade de serviço usada no espaço de trabalho. O uso desse conjunto de dados em ParallelRunStep requer que o conjunto de dados seja registrado para que ele construa a configuração ParallelRunStep.

service_principal = ServicePrincipalAuthentication(
    tenant_id="***",
    service_principal_id="***",
    service_principal_password="***")

ws = Workspace(
    subscription_id="***",
    resource_group="***",
    workspace_name="***",
    auth=service_principal
    )

default_blob_store = ws.get_default_datastore() # or Datastore(ws, '***datastore-name***')
ds = Dataset.File.from_files(default_blob_store, '**path***')
registered_ds = ds.register(ws, '***dataset-name***', create_new_version=True)

Como verificar o progresso e analisá-lo

Esta seção é sobre como verificar o progresso de um trabalho ParallelRunStep e verificar a causa do comportamento inesperado.

Como verificar o progresso do trabalho?

Além de analisar o status geral do StepRun, a contagem de minilotes agendados/processados e o progresso da geração de saída podem ser visualizados em ~/logs/job_progress_overview.<timestamp>.txt. O arquivo gira diariamente, você pode verificar aquele com o maior carimbo de data/hora para obter as informações mais recentes.

O que devo verificar se não houver progresso por um tempo?

Você pode entrar para ~/logs/sys/errror ver se há alguma exceção. Se não houver nenhum, é provável que seu script de entrada esteja demorando muito tempo, você pode imprimir informações de progresso em seu código para localizar a parte demorada ou adicionar "--profiling_module", "cProfile" ao arguments de para gerar um arquivo de ParallelRunStep perfil nomeado como <process_name>.profile em ~/logs/sys/node/<node_id> pasta.

Quando é que um emprego vai parar?

Se não for cancelado, o trabalho será interrompido com o status:

  • Concluída. Se todos os minilotes tiverem sido processados e a saída tiver sido gerada para append_row o modo.
  • Com falhas. Se error_threshold in Parameters for ParallelRunConfig for excedido, ou ocorreu um erro de sistema durante o trabalho.

Onde encontrar a causa raiz da falha?

Você pode seguir o exemplo para ~logs/job_result.txt encontrar a causa e o log de erros detalhado.

A falha do nó afetará o resultado do trabalho?

Não se houver outros nós disponíveis no cluster de computação designado. O orquestrador iniciará um novo nó como substituto, e o ParallelRunStep é resiliente a essa operação.

O que acontece se init a função no script de entrada falhar?

ParallelRunStep tem mecanismo para tentar novamente por um certo tempo para dar chance de recuperação de problemas transitórios sem atrasar a falha de trabalho por muito tempo, o mecanismo é o seguinte:

  1. Se depois que um nó for iniciado, init em todos os agentes continuar falhando, vamos parar de tentar após 3 * process_count_per_node falhas.
  2. Se após o início do trabalho, init em todos os agentes de todos os nós continuar falhando, vamos parar de tentar se o trabalho for executado mais de 2 minutos e houver 2 * node_count * process_count_per_node falhas.
  3. Se todos os agentes ficarem init presos por mais de 3 * run_invocation_timeout + 30 segundos, o trabalho falhará devido à ausência de progresso por muito tempo.

O que acontecerá no OutOfMemory? Como posso verificar a causa?

ParallelRunStep definirá a tentativa atual de processar o minilote para o status de falha e tentará reiniciar o processo com falha. Você pode verificar ~logs/perf/<node_id> para encontrar o processo de consumo de memória.

Por que eu tenho muitos arquivos processNNN?

O ParallelRunStep iniciará novos processos de trabalho em substituição aos que saíram anormalmente e cada processo gerará um processNNN arquivo como log. No entanto, se o processo falhou devido a exceção durante a init função de script de usuário e que o erro repetido continuamente por 3 * process_count_per_node vezes, nenhum novo processo de trabalho será iniciado.

Próximos passos