StateManager.py 6.9 KB


  1. # -*- coding: utf-8 -*-
  2. __author__ = 'wanger'
  3. __date__ = '2024-11-20'
  4. __copyright__ = '(C) 2024 by siwei'
  5. __revision__ = '1.0'
  6. import os
  7. import json
  8. import time
  9. from typing import Optional, Any, Dict
  10. import tempfile
  11. from pathlib import Path
  12. class StateManager:
  13. """
  14. 状态管理器,替代 Redis 的功能
  15. 使用文件系统和内存缓存来存储状态信息
  16. """
  17. def __init__(self, cache_dir: Optional[str] = None):
  18. """
  19. 初始化状态管理器
  20. Args:
  21. cache_dir: 缓存目录,如果为 None 则使用系统临时目录
  22. """
  23. if cache_dir is None:
  24. self.cache_dir = Path(tempfile.gettempdir()) / "qgis_processing_cache"
  25. else:
  26. self.cache_dir = Path(cache_dir)
  27. # 确保缓存目录存在
  28. self.cache_dir.mkdir(parents=True, exist_ok=True)
  29. # 内存缓存,提高性能
  30. self._memory_cache = {}
  31. self._cache_expiry = {}
  32. # 默认过期时间(30分钟)
  33. self.default_expire = 60 * 30
  34. def set(self, key: str, value: Any, expire: Optional[int] = None) -> None:
  35. """
  36. 设置键值对
  37. Args:
  38. key: 键名
  39. value: 值
  40. expire: 过期时间(秒),如果为 None 则使用默认值
  41. """
  42. if expire is None:
  43. expire = self.default_expire
  44. # 存储到内存缓存
  45. self._memory_cache[key] = value
  46. self._cache_expiry[key] = time.time() + expire
  47. # 存储到文件系统(持久化)
  48. cache_file = self.cache_dir / f"{key}.json"
  49. try:
  50. cache_data = {
  51. 'value': value,
  52. 'expire': time.time() + expire,
  53. 'created': time.time()
  54. }
  55. with open(cache_file, 'w', encoding='utf-8') as f:
  56. json.dump(cache_data, f, ensure_ascii=False, indent=2)
  57. except Exception as e:
  58. print(f"写入缓存文件失败: {e}")
  59. def get(self, key: str) -> Optional[Any]:
  60. """
  61. 获取键值
  62. Args:
  63. key: 键名
  64. Returns:
  65. 值,如果不存在或已过期则返回 None
  66. """
  67. # 首先检查内存缓存
  68. if key in self._memory_cache:
  69. if time.time() < self._cache_expiry.get(key, 0):
  70. return self._memory_cache[key]
  71. else:
  72. # 已过期,从内存缓存中删除
  73. del self._memory_cache[key]
  74. del self._cache_expiry[key]
  75. # 检查文件系统缓存
  76. cache_file = self.cache_dir / f"{key}.json"
  77. if cache_file.exists():
  78. try:
  79. with open(cache_file, 'r', encoding='utf-8') as f:
  80. cache_data = json.load(f)
  81. # 检查是否过期
  82. if time.time() < cache_data.get('expire', 0):
  83. # 未过期,加载到内存缓存
  84. value = cache_data['value']
  85. self._memory_cache[key] = value
  86. self._cache_expiry[key] = cache_data['expire']
  87. return value
  88. else:
  89. # 已过期,删除文件
  90. cache_file.unlink(missing_ok=True)
  91. except Exception as e:
  92. print(f"读取缓存文件失败: {e}")
  93. cache_file.unlink(missing_ok=True)
  94. return None
  95. def delete(self, key: str) -> bool:
  96. """
  97. 删除键值
  98. Args:
  99. key: 键名
  100. Returns:
  101. 是否成功删除
  102. """
  103. # 从内存缓存中删除
  104. if key in self._memory_cache:
  105. del self._memory_cache[key]
  106. if key in self._cache_expiry:
  107. del self._cache_expiry[key]
  108. # 从文件系统中删除
  109. cache_file = self.cache_dir / f"{key}.json"
  110. try:
  111. if cache_file.exists():
  112. cache_file.unlink()
  113. return True
  114. except Exception as e:
  115. print(f"删除缓存文件失败: {e}")
  116. return False
  117. def exists(self, key: str) -> bool:
  118. """
  119. 检查键是否存在且未过期
  120. Args:
  121. key: 键名
  122. Returns:
  123. 是否存在
  124. """
  125. return self.get(key) is not None
  126. def expire(self, key: str, seconds: int) -> bool:
  127. """
  128. 设置键的过期时间
  129. Args:
  130. key: 键名
  131. seconds: 过期时间(秒)
  132. Returns:
  133. 是否成功设置
  134. """
  135. value = self.get(key)
  136. if value is not None:
  137. self.set(key, value, seconds)
  138. return True
  139. return False
  140. def clear(self) -> None:
  141. """清空所有缓存"""
  142. # 清空内存缓存
  143. self._memory_cache.clear()
  144. self._cache_expiry.clear()
  145. # 清空文件系统缓存
  146. try:
  147. for cache_file in self.cache_dir.glob("*.json"):
  148. cache_file.unlink()
  149. except Exception as e:
  150. print(f"清空缓存文件失败: {e}")
  151. def get_all_keys(self) -> list:
  152. """获取所有键名"""
  153. keys = []
  154. try:
  155. for cache_file in self.cache_dir.glob("*.json"):
  156. key = cache_file.stem
  157. if self.exists(key):
  158. keys.append(key)
  159. except Exception as e:
  160. print(f"获取键名列表失败: {e}")
  161. return keys
  162. def close(self) -> None:
  163. """关闭状态管理器"""
  164. # 清理过期的内存缓存
  165. current_time = time.time()
  166. expired_keys = [key for key, expiry in self._cache_expiry.items()
  167. if current_time >= expiry]
  168. for key in expired_keys:
  169. del self._memory_cache[key]
  170. del self._cache_expiry[key]
  171. def __enter__(self):
  172. return self
  173. def __exit__(self, exc_type, exc_val, exc_tb):
  174. self.close()
  175. # 全局状态管理器实例
  176. _global_state_manager = None
  177. def get_state_manager() -> StateManager:
  178. """获取全局状态管理器实例"""
  179. global _global_state_manager
  180. if _global_state_manager is None:
  181. _global_state_manager = StateManager()
  182. return _global_state_manager
  183. def set_state(key: str, value: Any, expire: Optional[int] = None) -> None:
  184. """设置全局状态"""
  185. get_state_manager().set(key, value, expire)
  186. def get_state(key: str) -> Optional[Any]:
  187. """获取全局状态"""
  188. return get_state_manager().get(key)
  189. def delete_state(key: str) -> bool:
  190. """删除全局状态"""
  191. return get_state_manager().delete(key)
  192. def clear_all_states() -> None:
  193. """清空所有全局状态"""
  194. get_state_manager().clear()