Partilhar via


Solucionar problemas de ingestão de manifesto usando logs de tarefas de fluxo de ar

Este artigo ajuda você a solucionar problemas de fluxo de trabalho com ingestão de manifesto no Azure Data Manager for Energy usando logs de tarefas Airflow.

Tipos de fluxo de trabalho do DAG de ingestão de manifesto

Existem dois tipos de fluxos de trabalho de gráfico acíclico direcionado (DAG) para ingestão de manifesto: manifesto único e carregamento em lote.

Manifesto único

Um único arquivo de manifesto é usado para disparar o fluxo de trabalho de ingestão de manifesto.

Valor DagTaskName Description
update_status_running_task Chama o serviço de fluxo de trabalho e marca o status do DAG como running no banco de dados.
check_payload_type Valida se o tipo de ingestão é lote ou manifesto único.
validate_manifest_schema_task Garante que todos os tipos de esquema mencionados no manifesto estejam presentes e que haja integridade referencial do esquema. Todos os valores inválidos são removidos do manifesto.
provide_manifest_integrity_task Valida referências dentro do manifesto OSDU® R3 e remove entidades inválidas. Este operador é responsável pela validação pai/filho. Todas as entidades órfãs são registradas e excluídas do manifesto validado. Todos os registros referenciados externos são pesquisados. Se nenhum for encontrado, a entidade de manifesto será descartada. Todas as referências de chave substituta também são resolvidas.
process_single_manifest_file_task Realiza a ingestão das entidades de manifesto final obtidas na etapa anterior. Os registos de dados são ingeridos através do serviço de armazenamento.
update_status_finished_task Chama o serviço de fluxo de trabalho e marca o status do DAG como finished ou failed no banco de dados.

Carregamento em lote

Vários arquivos de manifesto fazem parte da mesma solicitação de serviço de fluxo de trabalho. A seção manifesto na carga útil da solicitação é uma lista em vez de um dicionário de itens.

Valor DagTaskName Description
update_status_running_task Chama o serviço de fluxo de trabalho e marca o status do DAG como running no banco de dados.
check_payload_type Valida se o tipo de ingestão é lote ou manifesto único.
batch_upload Divide a lista de manifestos em três lotes a serem processados em paralelo. (Nenhum log de tarefas é emitido.)
process_manifest_task_(1 / 2 / 3) Divide a lista de manifestos em grupos de três e processa-os. Todas as etapas executadas em validate_manifest_schema_task, provide_manifest_integrity_taske process_single_manifest_file_task são condensadas e executadas sequencialmente nessas tarefas.
update_status_finished_task Chama o serviço de fluxo de trabalho e marca o status do DAG como finished ou failed no banco de dados.

Com base no tipo de carga (única ou em lote), a tarefa escolhe check_payload_type a ramificação apropriada e ignora as tarefas na outra ramificação.

Pré-requisitos

Você deve ter integrado os logs de tarefas do Airflow com o Azure Monitor. Consulte Integrar logs de fluxo de ar com o Azure Monitor.

As colunas a seguir são expostas nos logs de tarefas do Airflow para você depurar o problema:

Nome do parâmetro Description
RunID ID de execução exclusiva da execução do DAG acionada.
CorrelationID ID de correlação exclusiva da execução do DAG (igual à ID de execução).
DagName Nome do fluxo de trabalho do DAG. Por exemplo, Osdu_ingest é o nome do fluxo de trabalho para ingestão de manifesto.
DagTaskName Nome da tarefa para o fluxo de trabalho do DAG. Por exemplo, update_status_running_task é o nome da tarefa para ingestão de manifesto.
Content Mensagens de log de erros (erros ou exceções) que o Airflow emite durante a execução da tarefa.
LogTimeStamp Intervalo de tempo de execução do DAG.
LogLevel Nível do erro. Os valores são DEBUG, INFO, WARNING, e ERROR. Você pode ver a maioria das mensagens de exceção e erro filtrando no ERROR nível.

Falha na execução do DAG

A execução do fluxo de trabalho falhou em Update_status_running_task ou Update_status_finished_taske os registros de dados não foram ingeridos.

Motivos possíveis

  • A API de chamada para partição não foi autenticada, pois o ID da partição de dados está incorreto.
  • Um nome de chave no contexto de execução do corpo da solicitação está incorreto.
  • O serviço de fluxo de trabalho não está em execução ou está lançando erros 5xx.

estado do fluxo de trabalho

O status do fluxo de trabalho é marcado como failed.

Solução

Verifique os logs de tarefas do fluxo de ar para update_status_running_task ou update_status_finished_task. Corrija a carga passando o ID de partição de dados correto ou o nome da chave.

Consulta Kusto de exemplo:

    OEPAirFlowTask
        | where DagName == "Osdu_ingest"
        | where DagTaskName == "update_status_running_task"
        | where LogLevel == "ERROR" // ERROR/DEBUG/INFO/WARNING
        | where RunID == '<run_id>'

Exemplo de saída de vestígios:

    [2023-02-05, 12:21:54 IST] {taskinstance.py:1703} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/home/airflow/.local/lib/python3.8/site-packages/osdu_ingestion/libs/context.py", line 50, in populate
        data_partition_id = ctx_payload['data-partition-id']
    KeyError: 'data-partition-id'
    
    requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: https://contoso.energy.azure.com/api/workflow/v1/workflow/Osdu_ingest/workflowRun/e9a815f2-84f5-4513-9825-4d37ab291264

Falha na validação do esquema

Os registros não foram ingeridos porque a validação do esquema falhou.

Motivos possíveis

  • O serviço de esquema está lançando erros "Esquema não encontrado".
  • O corpo do manifesto não está de acordo com o tipo de esquema.
  • As referências de esquema estão incorretas.
  • O serviço de esquema está lançando erros 5xx.

estado do fluxo de trabalho

O status do fluxo de trabalho é marcado como finished. Você não observa uma falha no status do fluxo de trabalho porque as entidades inválidas são ignoradas e a ingestão é continuada.

Solução

Verifique os logs de tarefas do fluxo de ar para validate_manifest_schema_task ou process_manifest_task. Corrija a carga passando o ID de partição de dados correto ou o nome da chave.

Consulta Kusto de exemplo:

    OEPAirFlowTask
    | where DagName has "Osdu_ingest"
    | where DagTaskName == "validate_manifest_schema_task" or DagTaskName has "process_manifest_task"
    | where LogLevel == "ERROR"
    | where RunID == "<run_id>"
    | order by ['time'] asc  

Exemplo de saída de vestígios:

    Error traces to look out for
    [2023-02-05, 14:55:37 IST] {connectionpool.py:452} DEBUG - https://contoso.energy.azure.com:443 "GET /api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0 HTTP/1.1" 404 None
    [2023-02-05, 14:55:37 IST] {authorization.py:137} ERROR - {"error":{"code":404,"message":"Schema is not present","errors":[{"domain":"global","reason":"notFound","message":"Schema is not present"}]}}
    [2023-02-05, 14:55:37 IST] {validate_schema.py:170} ERROR - Error on getting schema of kind 'osdu:wks:work-product-component--WellLog:2.2.0'
    [2023-02-05, 14:55:37 IST] {validate_schema.py:171} ERROR - 404 Client Error: Not Found for url: https://contoso.energy.azure.com/api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0
    [2023-02-05, 14:55:37 IST] {validate_schema.py:314} WARNING - osdu:wks:work-product-component--WellLog:2.2.0 is not present in Schema service.
    [2023-02-05, 15:01:23 IST] {validate_schema.py:322} ERROR - Schema validation error. Data field.
    [2023-02-05, 15:01:23 IST] {validate_schema.py:323} ERROR - Manifest kind: osdu:wks:work-product-component--WellLog:1.1.0
    [2023-02-05, 15:01:23 IST] {validate_schema.py:324} ERROR - Error: 'string-value' is not of type 'number'
    
    Failed validating 'type' in schema['properties']['data']['allOf'][3]['properties']['SamplingStop']:
        {'description': 'The stop value/last value of the ReferenceCurveID, '
                        'typically the end depth of the logging.',
         'example': 7500,
         'title': 'Sampling Stop',
         'type': 'number',
         'x-osdu-frame-of-reference': 'UOM'}
    
    On instance['data']['SamplingStop']:
        'string-value'

Verificações de referência com falha

Os registros não foram ingeridos porque as verificações de referência falharam.

Motivos possíveis

  • Registros referenciados não foram encontrados.
  • Os registros dos pais não foram encontrados.
  • O serviço de pesquisa está lançando 5xx erros.

estado do fluxo de trabalho

O status do fluxo de trabalho é marcado como finished. Você não observa uma falha no status do fluxo de trabalho porque as entidades inválidas são ignoradas e a ingestão é continuada.

Solução

Verifique os logs de tarefas do fluxo de ar para provide_manifest_integrity_task ou process_manifest_task.

Consulta Kusto de exemplo:

    OEPAirFlowTask
        | where DagName has "Osdu_ingest"
        | where DagTaskName == "provide_manifest_integrity_task" or DagTaskName has "process_manifest_task"
        | where Content has 'Search query "'or Content has 'response ids: ['
        | where RunID has "<run_id>"

Como não há logs de erro especificamente para tarefas de integridade referencial, verifique as instruções de log de depuração para ver se todos os registros externos foram buscados por meio do serviço de pesquisa.

Por exemplo, a saída de rastreamento de exemplo a seguir mostra um registro consultado por meio do serviço de pesquisa para integridade referencial:

    [2023-02-05, 19:14:40 IST] {search_record_ids.py:75} DEBUG - Search query "contoso-dp1:work-product-component--WellLog:5ab388ae0e140838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a"

A saída mostra os registros que foram recuperados e estavam no sistema. O objeto de manifesto relacionado que fazia referência a um registro seria descartado e não seria mais ingerido se você notasse que alguns dos registros não estavam presentes.

    [2023-02-05, 19:14:40 IST] {search_record_ids.py:141} DEBUG - response ids: ['contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a:1675590506723615', 'contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a    ']

Os registros não foram ingeridos porque o manifesto contém tags legais inválidas ou listas de controle de acesso (ACLs).

Motivos possíveis

  • As ACLs estão incorretas.
  • As etiquetas legais estão incorretas.
  • O serviço de armazenamento está lançando erros 5xx.

estado do fluxo de trabalho

O status do fluxo de trabalho é marcado como finished. Você não observa uma falha no status do fluxo de trabalho.

Solução

Verifique os logs de tarefas do fluxo de ar para process_single_manifest_file_task ou process_manifest_task.

Consulta Kusto de exemplo:

    OEPAirFlowTask
    | where DagName has "Osdu_ingest"
    | where DagTaskName == "process_single_manifest_file_task" or DagTaskName has "process_manifest_task"
    | where LogLevel == "ERROR"
    | where RunID has "<run_id>"
    | order by ['time'] asc 

Exemplo de saída de vestígios:

    "PUT /api/storage/v2/records HTTP/1.1" 400 None
    [2023-02-05, 16:57:05 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Invalid legal tags","message":"Invalid legal tags: contoso-dp1-R3FullManifest-Legal-Tag-Test779759112"}
    

A saída indica os registros que foram recuperados. Os registros de entidade de manifesto que correspondem a registros de pesquisa ausentes são descartados e não ingeridos.

    "PUT /api/storage/v2/records HTTP/1.1" 400 None
    [2023-02-05, 16:58:46 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Validation error.","message":"createOrUpdateRecords.records[0].acl: Invalid group name 'data1.default.viewers@contoso-dp1.dataservices.energy'"}
    [2023-02-05, 16:58:46 IST] {single_manifest_processor.py:83} WARNING - Can't process entity SRN: surrogate-key:0ef20853-f26a-456f-b874-3f2f5f35b6fb

Problemas conhecidos

  • Como não há logs de erro específicos para tarefas de integridade referencial, você deve procurar manualmente as instruções de log de depuração para ver se todos os registros externos foram recuperados por meio do serviço de pesquisa.

Próximos passos

Avance para o tutorial a seguir e aprenda a executar uma ingestão de arquivo baseada em manifesto:

Referências