From 7fe27d703448d92767af31a9915f43ce6aa30fe2 Mon Sep 17 00:00:00 2001 From: shenjack <3695888@qq.com> Date: Sun, 26 Nov 2023 22:53:53 +0800 Subject: [PATCH] Add test for config storage and add cycle check by GitHub Copilot --- src/lib_not_dr/logger/config.py | 46 ++++++++++++++++++++------------- test/logger/config.py | 35 +++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 18 deletions(-) create mode 100644 test/logger/config.py diff --git a/src/lib_not_dr/logger/config.py b/src/lib_not_dr/logger/config.py index 6c2f591..c704bf6 100644 --- a/src/lib_not_dr/logger/config.py +++ b/src/lib_not_dr/logger/config.py @@ -4,7 +4,7 @@ # All rights reserved # ------------------------------- -from typing import List, Union, Optional, TYPE_CHECKING, Dict +from typing import List, Set, Dict from lib_not_dr.logger.logger import Logger from lib_not_dr.types.options import Options, OptionNameNotDefined @@ -54,21 +54,31 @@ class ConfigStorage(Options): self.fail_formatters.update(other_storage.fail_formatters) self.fail_outputs.update(other_storage.fail_outputs) - @staticmethod - def circle_require_check(require_dict: Dict[str, List[str]]) -> List[str]: - """ - Check circle require - :param require_dict: - :return: - """ - circle_require = [] - for key, value in require_dict.items(): - for require in value: - if require in require_dict and key in require_dict[require]: - circle_require.append(key) - return circle_require + # by GitHub Copilot + @classmethod + def detect_cycle(cls, graph: Dict[str, List[str]], start: str, visited: Set[str], path: List[str]) -> List[str]: + visited.add(start) # 将当前节点添加到已访问的节点集合中 + path.append(start) # 将当前节点添加到当前路径中 + for neighbour in graph[start]: # 遍历当前节点的所有邻居 + if neighbour in visited: # 如果邻居节点已经被访问过,那么我们找到了一个循环 + return path + [neighbour] # 返回包含循环的路径 + cycle_path = cls.detect_cycle(graph, neighbour, visited, path) # 递归地在邻居节点上调用函数 + if cycle_path: # 如果在邻居节点上找到了循环,那么返回包含循环的路径 + return cycle_path + visited.remove(start) # 从已访问的节点集合中移除当前节点 + path.remove(start) # 从当前路径中移除当前节点 + return [] # 如果没有找到循环,那么返回一个空列表 - def parse_formatter(self, formatter_config: Dict[str]) -> 'ConfigStorage': + @classmethod + def find_cycles(cls, graph: Dict[str, List[str]]) -> List[str]: + cycles_set = set() # 创建一个集合来存储所有的循环 + for node in graph: # 遍历图中的所有节点 + cycle = cls.detect_cycle(graph, node, set(), []) # 在每个节点上调用detect_cycle函数 + if cycle: # 如果找到了循环,那么将循环添加到集合中 + cycles_set.update(cycle) + return sorted(cycles_set) # 返回排序后的循环列表 + + def parse_formatter(self, formatter_config: Dict[str, str]) -> 'ConfigStorage': """ Parse formatter config :param formatter_config: @@ -84,9 +94,9 @@ class ConfigStorage(Options): formatter_require[key] = value['sub_formatter'] else: formatter_require[key] = [] - circle_require = self.circle_require_check(formatter_require) - if len(circle_require) > 0: - for formatter_name in circle_require: + cycles_require = self.find_cycles(formatter_require) + if cycles_require: + for formatter_name in cycles_require: self.log.error(f'Formatter {formatter_name} require circle, ignored') env.fail_formatters[formatter_name] = formatter_wait[formatter_name] formatter_wait.pop(formatter_name) diff --git a/test/logger/config.py b/test/logger/config.py new file mode 100644 index 0000000..44b69d9 --- /dev/null +++ b/test/logger/config.py @@ -0,0 +1,35 @@ +# ------------------------------- +# Difficult Rocket +# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com +# All rights reserved +# ------------------------------- + +import unittest + +from lib_not_dr.logger.config import ConfigStorage + + +class TestConfigStorage(unittest.TestCase): + def test_detect_circle_require(self): + test_dict = { + 'a': ['b', 'c'], + 'b': ['c'], + 'c': ['a'], + 'd': ['e'], + 'e': ['f'], + 'f': ['d'] + } + self.assertEqual(['a', 'b', 'c', 'd', 'e', 'f'], ConfigStorage.circle_require_check(test_dict)) + + def test_detect_circle_require_2(self): + # large circle + for i in range(100): + test_dict = {} + for j in range(100): + test_dict[str(j)] = [str((j + 1) % 100)] + self.assertEqual(list(map(str, range(100))), ConfigStorage.circle_require_check(test_dict)) + + def test_detect_circle_require_3(self): + # normal + test_dict = {'a': [], 'b': ['a'], 'c': ['b'], 'd': ['c']} + self.assertEqual([], ConfigStorage.circle_require_check(test_dict))