Add test for config storage and add cycle check by GitHub Copilot

This commit is contained in:
shenjack 2023-11-26 22:53:53 +08:00
parent e8d499775b
commit 7fe27d7034
Signed by: shenjack
GPG Key ID: 7B1134A979775551
2 changed files with 63 additions and 18 deletions

View File

@ -4,7 +4,7 @@
# All rights reserved # All rights reserved
# ------------------------------- # -------------------------------
from typing import List, Union, Optional, TYPE_CHECKING, Dict from typing import List, Set, Dict
from lib_not_dr.logger.logger import Logger from lib_not_dr.logger.logger import Logger
from lib_not_dr.types.options import Options, OptionNameNotDefined from lib_not_dr.types.options import Options, OptionNameNotDefined
@ -54,21 +54,31 @@ class ConfigStorage(Options):
self.fail_formatters.update(other_storage.fail_formatters) self.fail_formatters.update(other_storage.fail_formatters)
self.fail_outputs.update(other_storage.fail_outputs) self.fail_outputs.update(other_storage.fail_outputs)
@staticmethod # by GitHub Copilot
def circle_require_check(require_dict: Dict[str, List[str]]) -> List[str]: @classmethod
""" def detect_cycle(cls, graph: Dict[str, List[str]], start: str, visited: Set[str], path: List[str]) -> List[str]:
Check circle require visited.add(start) # 将当前节点添加到已访问的节点集合中
:param require_dict: path.append(start) # 将当前节点添加到当前路径中
:return: for neighbour in graph[start]: # 遍历当前节点的所有邻居
""" if neighbour in visited: # 如果邻居节点已经被访问过,那么我们找到了一个循环
circle_require = [] return path + [neighbour] # 返回包含循环的路径
for key, value in require_dict.items(): cycle_path = cls.detect_cycle(graph, neighbour, visited, path) # 递归地在邻居节点上调用函数
for require in value: if cycle_path: # 如果在邻居节点上找到了循环,那么返回包含循环的路径
if require in require_dict and key in require_dict[require]: return cycle_path
circle_require.append(key) visited.remove(start) # 从已访问的节点集合中移除当前节点
return circle_require path.remove(start) # 从当前路径中移除当前节点
return [] # 如果没有找到循环,那么返回一个空列表
def parse_formatter(self, formatter_config: Dict[str]) -> 'ConfigStorage': @classmethod
def find_cycles(cls, graph: Dict[str, List[str]]) -> List[str]:
cycles_set = set() # 创建一个集合来存储所有的循环
for node in graph: # 遍历图中的所有节点
cycle = cls.detect_cycle(graph, node, set(), []) # 在每个节点上调用detect_cycle函数
if cycle: # 如果找到了循环,那么将循环添加到集合中
cycles_set.update(cycle)
return sorted(cycles_set) # 返回排序后的循环列表
def parse_formatter(self, formatter_config: Dict[str, str]) -> 'ConfigStorage':
""" """
Parse formatter config Parse formatter config
:param formatter_config: :param formatter_config:
@ -84,9 +94,9 @@ class ConfigStorage(Options):
formatter_require[key] = value['sub_formatter'] formatter_require[key] = value['sub_formatter']
else: else:
formatter_require[key] = [] formatter_require[key] = []
circle_require = self.circle_require_check(formatter_require) cycles_require = self.find_cycles(formatter_require)
if len(circle_require) > 0: if cycles_require:
for formatter_name in circle_require: for formatter_name in cycles_require:
self.log.error(f'Formatter {formatter_name} require circle, ignored') self.log.error(f'Formatter {formatter_name} require circle, ignored')
env.fail_formatters[formatter_name] = formatter_wait[formatter_name] env.fail_formatters[formatter_name] = formatter_wait[formatter_name]
formatter_wait.pop(formatter_name) formatter_wait.pop(formatter_name)

35
test/logger/config.py Normal file
View File

@ -0,0 +1,35 @@
# -------------------------------
# Difficult Rocket
# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com
# All rights reserved
# -------------------------------
import unittest
from lib_not_dr.logger.config import ConfigStorage
class TestConfigStorage(unittest.TestCase):
def test_detect_circle_require(self):
test_dict = {
'a': ['b', 'c'],
'b': ['c'],
'c': ['a'],
'd': ['e'],
'e': ['f'],
'f': ['d']
}
self.assertEqual(['a', 'b', 'c', 'd', 'e', 'f'], ConfigStorage.circle_require_check(test_dict))
def test_detect_circle_require_2(self):
# large circle
for i in range(100):
test_dict = {}
for j in range(100):
test_dict[str(j)] = [str((j + 1) % 100)]
self.assertEqual(list(map(str, range(100))), ConfigStorage.circle_require_check(test_dict))
def test_detect_circle_require_3(self):
# normal
test_dict = {'a': [], 'b': ['a'], 'c': ['b'], 'd': ['c']}
self.assertEqual([], ConfigStorage.circle_require_check(test_dict))