|
- # -*- coding: utf-8 -*-
- __author__ = 'wanger'
- __date__ = '2024-11-20'
- __copyright__ = '(C) 2024 by siwei'
- __revision__ = '1.0'
- import os
- import json
- import time
- from typing import Optional, Any, Dict
- import tempfile
- from pathlib import Path
- class StateManager:
- """
- 状态管理器,替代 Redis 的功能
- 使用文件系统和内存缓存来存储状态信息
- """
-
- def __init__(self, cache_dir: Optional[str] = None):
- """
- 初始化状态管理器
-
- Args:
- cache_dir: 缓存目录,如果为 None 则使用系统临时目录
- """
- if cache_dir is None:
- self.cache_dir = Path(tempfile.gettempdir()) / "qgis_processing_cache"
- else:
- self.cache_dir = Path(cache_dir)
-
- # 确保缓存目录存在
- self.cache_dir.mkdir(parents=True, exist_ok=True)
-
- # 内存缓存,提高性能
- self._memory_cache = {}
- self._cache_expiry = {}
-
- # 默认过期时间(30分钟)
- self.default_expire = 60 * 30
-
- def set(self, key: str, value: Any, expire: Optional[int] = None) -> None:
- """
- 设置键值对
-
- Args:
- key: 键名
- value: 值
- expire: 过期时间(秒),如果为 None 则使用默认值
- """
- if expire is None:
- expire = self.default_expire
-
- # 存储到内存缓存
- self._memory_cache[key] = value
- self._cache_expiry[key] = time.time() + expire
-
- # 存储到文件系统(持久化)
- cache_file = self.cache_dir / f"{key}.json"
- try:
- cache_data = {
- 'value': value,
- 'expire': time.time() + expire,
- 'created': time.time()
- }
- with open(cache_file, 'w', encoding='utf-8') as f:
- json.dump(cache_data, f, ensure_ascii=False, indent=2)
- except Exception as e:
- print(f"写入缓存文件失败: {e}")
-
- def get(self, key: str) -> Optional[Any]:
- """
- 获取键值
-
- Args:
- key: 键名
-
- Returns:
- 值,如果不存在或已过期则返回 None
- """
- # 首先检查内存缓存
- if key in self._memory_cache:
- if time.time() < self._cache_expiry.get(key, 0):
- return self._memory_cache[key]
- else:
- # 已过期,从内存缓存中删除
- del self._memory_cache[key]
- del self._cache_expiry[key]
-
- # 检查文件系统缓存
- cache_file = self.cache_dir / f"{key}.json"
- if cache_file.exists():
- try:
- with open(cache_file, 'r', encoding='utf-8') as f:
- cache_data = json.load(f)
-
- # 检查是否过期
- if time.time() < cache_data.get('expire', 0):
- # 未过期,加载到内存缓存
- value = cache_data['value']
- self._memory_cache[key] = value
- self._cache_expiry[key] = cache_data['expire']
- return value
- else:
- # 已过期,删除文件
- cache_file.unlink(missing_ok=True)
- except Exception as e:
- print(f"读取缓存文件失败: {e}")
- cache_file.unlink(missing_ok=True)
-
- return None
-
- def delete(self, key: str) -> bool:
- """
- 删除键值
-
- Args:
- key: 键名
-
- Returns:
- 是否成功删除
- """
- # 从内存缓存中删除
- if key in self._memory_cache:
- del self._memory_cache[key]
- if key in self._cache_expiry:
- del self._cache_expiry[key]
-
- # 从文件系统中删除
- cache_file = self.cache_dir / f"{key}.json"
- try:
- if cache_file.exists():
- cache_file.unlink()
- return True
- except Exception as e:
- print(f"删除缓存文件失败: {e}")
-
- return False
-
- def exists(self, key: str) -> bool:
- """
- 检查键是否存在且未过期
-
- Args:
- key: 键名
-
- Returns:
- 是否存在
- """
- return self.get(key) is not None
-
- def expire(self, key: str, seconds: int) -> bool:
- """
- 设置键的过期时间
-
- Args:
- key: 键名
- seconds: 过期时间(秒)
-
- Returns:
- 是否成功设置
- """
- value = self.get(key)
- if value is not None:
- self.set(key, value, seconds)
- return True
- return False
-
- def clear(self) -> None:
- """清空所有缓存"""
- # 清空内存缓存
- self._memory_cache.clear()
- self._cache_expiry.clear()
-
- # 清空文件系统缓存
- try:
- for cache_file in self.cache_dir.glob("*.json"):
- cache_file.unlink()
- except Exception as e:
- print(f"清空缓存文件失败: {e}")
-
- def get_all_keys(self) -> list:
- """获取所有键名"""
- keys = []
- try:
- for cache_file in self.cache_dir.glob("*.json"):
- key = cache_file.stem
- if self.exists(key):
- keys.append(key)
- except Exception as e:
- print(f"获取键名列表失败: {e}")
-
- return keys
-
- def close(self) -> None:
- """关闭状态管理器"""
- # 清理过期的内存缓存
- current_time = time.time()
- expired_keys = [key for key, expiry in self._cache_expiry.items()
- if current_time >= expiry]
-
- for key in expired_keys:
- del self._memory_cache[key]
- del self._cache_expiry[key]
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.close()
- # 全局状态管理器实例
- _global_state_manager = None
- def get_state_manager() -> StateManager:
- """获取全局状态管理器实例"""
- global _global_state_manager
- if _global_state_manager is None:
- _global_state_manager = StateManager()
- return _global_state_manager
- def set_state(key: str, value: Any, expire: Optional[int] = None) -> None:
- """设置全局状态"""
- get_state_manager().set(key, value, expire)
- def get_state(key: str) -> Optional[Any]:
- """获取全局状态"""
- return get_state_manager().get(key)
- def delete_state(key: str) -> bool:
- """删除全局状态"""
- return get_state_manager().delete(key)
- def clear_all_states() -> None:
- """清空所有全局状态"""
- get_state_manager().clear()
|