Solucionar problemas de ingestão de manifesto usando logs de tarefas de fluxo de ar
Este artigo ajuda você a solucionar problemas de fluxo de trabalho com ingestão de manifesto no Azure Data Manager for Energy usando logs de tarefas Airflow.
Tipos de fluxo de trabalho do DAG de ingestão de manifesto
Existem dois tipos de fluxos de trabalho de gráfico acíclico direcionado (DAG) para ingestão de manifesto: manifesto único e carregamento em lote.
Manifesto único
Um único arquivo de manifesto é usado para disparar o fluxo de trabalho de ingestão de manifesto.
Valor DagTaskName | Description |
---|---|
update_status_running_task |
Chama o serviço de fluxo de trabalho e marca o status do DAG como running no banco de dados. |
check_payload_type |
Valida se o tipo de ingestão é lote ou manifesto único. |
validate_manifest_schema_task |
Garante que todos os tipos de esquema mencionados no manifesto estejam presentes e que haja integridade referencial do esquema. Todos os valores inválidos são removidos do manifesto. |
provide_manifest_integrity_task |
Valida referências dentro do manifesto OSDU® R3 e remove entidades inválidas. Este operador é responsável pela validação pai/filho. Todas as entidades órfãs são registradas e excluídas do manifesto validado. Todos os registros referenciados externos são pesquisados. Se nenhum for encontrado, a entidade de manifesto será descartada. Todas as referências de chave substituta também são resolvidas. |
process_single_manifest_file_task |
Realiza a ingestão das entidades de manifesto final obtidas na etapa anterior. Os registos de dados são ingeridos através do serviço de armazenamento. |
update_status_finished_task |
Chama o serviço de fluxo de trabalho e marca o status do DAG como finished ou failed no banco de dados. |
Carregamento em lote
Vários arquivos de manifesto fazem parte da mesma solicitação de serviço de fluxo de trabalho. A seção manifesto na carga útil da solicitação é uma lista em vez de um dicionário de itens.
Valor DagTaskName | Description |
---|---|
update_status_running_task |
Chama o serviço de fluxo de trabalho e marca o status do DAG como running no banco de dados. |
check_payload_type |
Valida se o tipo de ingestão é lote ou manifesto único. |
batch_upload |
Divide a lista de manifestos em três lotes a serem processados em paralelo. (Nenhum log de tarefas é emitido.) |
process_manifest_task_(1 / 2 / 3) |
Divide a lista de manifestos em grupos de três e processa-os. Todas as etapas executadas em validate_manifest_schema_task , provide_manifest_integrity_task e process_single_manifest_file_task são condensadas e executadas sequencialmente nessas tarefas. |
update_status_finished_task |
Chama o serviço de fluxo de trabalho e marca o status do DAG como finished ou failed no banco de dados. |
Com base no tipo de carga (única ou em lote), a tarefa escolhe check_payload_type
a ramificação apropriada e ignora as tarefas na outra ramificação.
Pré-requisitos
Você deve ter integrado os logs de tarefas do Airflow com o Azure Monitor. Consulte Integrar logs de fluxo de ar com o Azure Monitor.
As colunas a seguir são expostas nos logs de tarefas do Airflow para você depurar o problema:
Nome do parâmetro | Description |
---|---|
RunID |
ID de execução exclusiva da execução do DAG acionada. |
CorrelationID |
ID de correlação exclusiva da execução do DAG (igual à ID de execução). |
DagName |
Nome do fluxo de trabalho do DAG. Por exemplo, Osdu_ingest é o nome do fluxo de trabalho para ingestão de manifesto. |
DagTaskName |
Nome da tarefa para o fluxo de trabalho do DAG. Por exemplo, update_status_running_task é o nome da tarefa para ingestão de manifesto. |
Content |
Mensagens de log de erros (erros ou exceções) que o Airflow emite durante a execução da tarefa. |
LogTimeStamp |
Intervalo de tempo de execução do DAG. |
LogLevel |
Nível do erro. Os valores são DEBUG , INFO , WARNING , e ERROR . Você pode ver a maioria das mensagens de exceção e erro filtrando no ERROR nível. |
Falha na execução do DAG
A execução do fluxo de trabalho falhou em Update_status_running_task
ou Update_status_finished_task
e os registros de dados não foram ingeridos.
Motivos possíveis
- A API de chamada para partição não foi autenticada, pois o ID da partição de dados está incorreto.
- Um nome de chave no contexto de execução do corpo da solicitação está incorreto.
- O serviço de fluxo de trabalho não está em execução ou está lançando erros 5xx.
estado do fluxo de trabalho
O status do fluxo de trabalho é marcado como failed
.
Solução
Verifique os logs de tarefas do fluxo de ar para update_status_running_task
ou update_status_finished_task
. Corrija a carga passando o ID de partição de dados correto ou o nome da chave.
Consulta Kusto de exemplo:
OEPAirFlowTask
| where DagName == "Osdu_ingest"
| where DagTaskName == "update_status_running_task"
| where LogLevel == "ERROR" // ERROR/DEBUG/INFO/WARNING
| where RunID == '<run_id>'
Exemplo de saída de vestígios:
[2023-02-05, 12:21:54 IST] {taskinstance.py:1703} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/osdu_ingestion/libs/context.py", line 50, in populate
data_partition_id = ctx_payload['data-partition-id']
KeyError: 'data-partition-id'
requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: https://contoso.energy.azure.com/api/workflow/v1/workflow/Osdu_ingest/workflowRun/e9a815f2-84f5-4513-9825-4d37ab291264
Falha na validação do esquema
Os registros não foram ingeridos porque a validação do esquema falhou.
Motivos possíveis
- O serviço de esquema está lançando erros "Esquema não encontrado".
- O corpo do manifesto não está de acordo com o tipo de esquema.
- As referências de esquema estão incorretas.
- O serviço de esquema está lançando erros 5xx.
estado do fluxo de trabalho
O status do fluxo de trabalho é marcado como finished
. Você não observa uma falha no status do fluxo de trabalho porque as entidades inválidas são ignoradas e a ingestão é continuada.
Solução
Verifique os logs de tarefas do fluxo de ar para validate_manifest_schema_task
ou process_manifest_task
. Corrija a carga passando o ID de partição de dados correto ou o nome da chave.
Consulta Kusto de exemplo:
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "validate_manifest_schema_task" or DagTaskName has "process_manifest_task"
| where LogLevel == "ERROR"
| where RunID == "<run_id>"
| order by ['time'] asc
Exemplo de saída de vestígios:
Error traces to look out for
[2023-02-05, 14:55:37 IST] {connectionpool.py:452} DEBUG - https://contoso.energy.azure.com:443 "GET /api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0 HTTP/1.1" 404 None
[2023-02-05, 14:55:37 IST] {authorization.py:137} ERROR - {"error":{"code":404,"message":"Schema is not present","errors":[{"domain":"global","reason":"notFound","message":"Schema is not present"}]}}
[2023-02-05, 14:55:37 IST] {validate_schema.py:170} ERROR - Error on getting schema of kind 'osdu:wks:work-product-component--WellLog:2.2.0'
[2023-02-05, 14:55:37 IST] {validate_schema.py:171} ERROR - 404 Client Error: Not Found for url: https://contoso.energy.azure.com/api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0
[2023-02-05, 14:55:37 IST] {validate_schema.py:314} WARNING - osdu:wks:work-product-component--WellLog:2.2.0 is not present in Schema service.
[2023-02-05, 15:01:23 IST] {validate_schema.py:322} ERROR - Schema validation error. Data field.
[2023-02-05, 15:01:23 IST] {validate_schema.py:323} ERROR - Manifest kind: osdu:wks:work-product-component--WellLog:1.1.0
[2023-02-05, 15:01:23 IST] {validate_schema.py:324} ERROR - Error: 'string-value' is not of type 'number'
Failed validating 'type' in schema['properties']['data']['allOf'][3]['properties']['SamplingStop']:
{'description': 'The stop value/last value of the ReferenceCurveID, '
'typically the end depth of the logging.',
'example': 7500,
'title': 'Sampling Stop',
'type': 'number',
'x-osdu-frame-of-reference': 'UOM'}
On instance['data']['SamplingStop']:
'string-value'
Verificações de referência com falha
Os registros não foram ingeridos porque as verificações de referência falharam.
Motivos possíveis
- Registros referenciados não foram encontrados.
- Os registros dos pais não foram encontrados.
- O serviço de pesquisa está lançando 5xx erros.
estado do fluxo de trabalho
O status do fluxo de trabalho é marcado como finished
. Você não observa uma falha no status do fluxo de trabalho porque as entidades inválidas são ignoradas e a ingestão é continuada.
Solução
Verifique os logs de tarefas do fluxo de ar para provide_manifest_integrity_task
ou process_manifest_task
.
Consulta Kusto de exemplo:
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "provide_manifest_integrity_task" or DagTaskName has "process_manifest_task"
| where Content has 'Search query "'or Content has 'response ids: ['
| where RunID has "<run_id>"
Como não há logs de erro especificamente para tarefas de integridade referencial, verifique as instruções de log de depuração para ver se todos os registros externos foram buscados por meio do serviço de pesquisa.
Por exemplo, a saída de rastreamento de exemplo a seguir mostra um registro consultado por meio do serviço de pesquisa para integridade referencial:
[2023-02-05, 19:14:40 IST] {search_record_ids.py:75} DEBUG - Search query "contoso-dp1:work-product-component--WellLog:5ab388ae0e140838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a"
A saída mostra os registros que foram recuperados e estavam no sistema. O objeto de manifesto relacionado que fazia referência a um registro seria descartado e não seria mais ingerido se você notasse que alguns dos registros não estavam presentes.
[2023-02-05, 19:14:40 IST] {search_record_ids.py:141} DEBUG - response ids: ['contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a:1675590506723615', 'contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a ']
Tags legais ou ACLs inválidas no manifesto
Os registros não foram ingeridos porque o manifesto contém tags legais inválidas ou listas de controle de acesso (ACLs).
Motivos possíveis
- As ACLs estão incorretas.
- As etiquetas legais estão incorretas.
- O serviço de armazenamento está lançando erros 5xx.
estado do fluxo de trabalho
O status do fluxo de trabalho é marcado como finished
. Você não observa uma falha no status do fluxo de trabalho.
Solução
Verifique os logs de tarefas do fluxo de ar para process_single_manifest_file_task
ou process_manifest_task
.
Consulta Kusto de exemplo:
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "process_single_manifest_file_task" or DagTaskName has "process_manifest_task"
| where LogLevel == "ERROR"
| where RunID has "<run_id>"
| order by ['time'] asc
Exemplo de saída de vestígios:
"PUT /api/storage/v2/records HTTP/1.1" 400 None
[2023-02-05, 16:57:05 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Invalid legal tags","message":"Invalid legal tags: contoso-dp1-R3FullManifest-Legal-Tag-Test779759112"}
A saída indica os registros que foram recuperados. Os registros de entidade de manifesto que correspondem a registros de pesquisa ausentes são descartados e não ingeridos.
"PUT /api/storage/v2/records HTTP/1.1" 400 None
[2023-02-05, 16:58:46 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Validation error.","message":"createOrUpdateRecords.records[0].acl: Invalid group name 'data1.default.viewers@contoso-dp1.dataservices.energy'"}
[2023-02-05, 16:58:46 IST] {single_manifest_processor.py:83} WARNING - Can't process entity SRN: surrogate-key:0ef20853-f26a-456f-b874-3f2f5f35b6fb
Problemas conhecidos
- Como não há logs de erro específicos para tarefas de integridade referencial, você deve procurar manualmente as instruções de log de depuração para ver se todos os registros externos foram recuperados por meio do serviço de pesquisa.
Próximos passos
Avance para o tutorial a seguir e aprenda a executar uma ingestão de arquivo baseada em manifesto: