utils.py 9.2 KB


  1. import json
  2. from multiprocessing.dummy import Pool as ThreadPool
  3. from pathlib import Path
  4. from random import choice
  5. import requests
  6. import yaml
  7. from qgis.core import QgsSettings
  8. TIANDITU_HOME_URL = "https://www.tianditu.gov.cn/"
  9. PLUGIN_NAME = "tianditu-tools"
  10. PluginDir = Path(__file__).parent
  11. HEADER = {
  12. "User-Agent": "Mozilla/5.0 QGIS/32400/Windows 10 Version 2009",
  13. "Referer": "https://www.tianditu.gov.cn/",
  14. }
  15. def get_extramap_status():
  16. summary = load_yaml(PluginDir.joinpath("maps/summary.yml"))
  17. default_status = {}
  18. for section in summary:
  19. section_data = load_yaml(PluginDir.joinpath(f"maps/{section}.yml"))
  20. default_status[section] = list(section_data["maps"].keys())
  21. return default_status
  22. class PluginConfig:
  23. def __init__(self):
  24. self.conf = QgsSettings()
  25. self.conf_name = "tianditu-tools"
  26. self.section_tianditu = f"{self.conf_name}/Tianditu"
  27. def init_config(self):
  28. # 初始化配置文件
  29. if not self.conf.contains("tianditu-tools/Tianditu/key"):
  30. print("初始化配置文件")
  31. # 初始化
  32. self.conf.setValue(f"{self.section_tianditu}/key", "")
  33. self.conf.setValue(f"{self.section_tianditu}/keyList", "")
  34. self.conf.setValue(f"{self.section_tianditu}/random", True)
  35. self.conf.setValue(f"{self.section_tianditu}/random_key", False)
  36. self.conf.setValue(f"{self.section_tianditu}/subdomain", "t0")
  37. if not self.conf.contains("tianditu-tools/Other/extramap_status"):
  38. print("初始化 extra map 文件")
  39. self.conf.setValue(
  40. f"{self.conf_name}/Other/extramap_status", str(get_extramap_status())
  41. )
  42. # 升级到保存多个key的版本
  43. if not self.conf.contains(f"{self.section_tianditu}/keyList"):
  44. self.conf.setValue(f"{self.section_tianditu}/random_key", False)
  45. # 保存原来的key
  46. if self.get_key() != "":
  47. self.conf.setValue(f"{self.section_tianditu}/keyList", self.get_key())
  48. else:
  49. self.conf.setValue(f"{self.section_tianditu}/keyList", "")
  50. def get_key_list(self):
  51. data_str = self.get_value("/Tianditu/keyList")
  52. if data_str == "":
  53. return []
  54. return data_str.split(",")
  55. def save_key_list(self, data_list):
  56. self.conf.setValue(f"{self.section_tianditu}/keyList", ",".join(data_list))
  57. def get_extra_maps_status(self):
  58. data = self.get_value("Other/extramap_status")
  59. return json.loads(data.replace("'", '"'))
  60. def set_extra_maps_status(self, data):
  61. self.conf.setValue(f"{self.conf_name}/Other/extramap_status", str(data))
  62. def get_value(self, name):
  63. return self.conf.value(f"{self.conf_name}/{name}")
  64. def get_bool_value(self, name):
  65. return self.conf.value(f"{self.conf_name}/{name}", type=bool)
  66. def set_value(self, name, value):
  67. self.conf.setValue(f"{self.conf_name}/{name}", value)
  68. def get_key(self):
  69. return self.get_value("Tianditu/key")
  70. def get_random_key(self):
  71. key_list = self.get_key_list()
  72. return choice(key_list)
  73. def set_key(self, key):
  74. key_to_set = ""
  75. if key is not None:
  76. key_to_set = key
  77. self.conf.setValue(f"{self.section_tianditu}/key", key_to_set)
  78. def got(url, headers=None, timeout=6):
  79. try:
  80. response = requests.get(url, headers=headers, timeout=timeout)
  81. except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
  82. response = None
  83. return response
  84. def tianditu_map_url(maptype: str, token: str, subdomain: str) -> str:
  85. """
  86. 返回天地图url
  87. Args:
  88. maptype (str): 类型
  89. token (str): 天地图key
  90. subdomain (str): 使用的子域名
  91. Returns:
  92. str: 返回天地图XYZ瓦片地址
  93. """
  94. url = f"https://{subdomain}.tianditu.gov.cn/"
  95. url += (
  96. f"{maptype}_w/wmts?SERVICE=WMTS&REQUEST=GetTile&VERSION=1.0.0&LAYER={maptype}"
  97. )
  98. url += "&STYLE=default&TILEMATRIXSET=w&FORMAT=tiles&TileCol={x}&TileRow={y}&TileMatrix={z}"
  99. url += f"&tk={token}"
  100. return url
  101. def check_url_status(url: str) -> object:
  102. """
  103. 检查url状态
  104. Args:
  105. url (str): url
  106. Returns:
  107. object: {"code": 0}
  108. code:
  109. -1:网络异常
  110. 0: 正常
  111. 1: 非法key
  112. 12: 权限类型错误
  113. 1000: 未知错误
  114. """
  115. res = got(url, headers=HEADER)
  116. msg = {"code": 0}
  117. if res is not None:
  118. if res.status_code == 403:
  119. msg["code"] = res.json()["code"] # 1:非法key 12:权限类型错误
  120. msg["msg"] = res.json()["msg"]
  121. msg["resolve"] = res.json()["resolve"]
  122. elif res.status_code == 200:
  123. msg["code"] = 0
  124. else:
  125. msg["code"] = 1000 # 未知错误
  126. msg["msg"] = "未知错误 "
  127. msg["resolve"] = f"错误代码:{res.status_code}"
  128. else:
  129. msg = {"code": -1, "msg": "网络错误", "resolve": "请检查网络连接"}
  130. return msg
  131. def check_subdomain(url: str) -> int:
  132. """对子域名进行测速
  133. Args:
  134. url (str): 瓦片url
  135. Returns:
  136. int: 子域名对应的延迟数(毫秒), -1 表示连接失败
  137. """
  138. response = got(url, headers=HEADER, timeout=8)
  139. if response:
  140. millisecond = response.elapsed.total_seconds() * 1000
  141. else:
  142. millisecond = -1
  143. return int(millisecond)
  144. def check_subdomains(url_list: list) -> list:
  145. """对子域名列表进行测速
  146. Args:
  147. url_list (list): 由不同子域名组成的瓦片url列表
  148. Returns:
  149. list: 每个子域名对应的延迟数(毫秒)组成的列表
  150. """
  151. pool = ThreadPool(4)
  152. ping_list = pool.map(check_subdomain, url_list)
  153. pool.close()
  154. pool.join()
  155. return ["❌" if x == -1 else f"{x} ms" for x in ping_list]
  156. def load_yaml(file_path: Path):
  157. """
  158. 读取YAML文件
  159. """
  160. with open(file_path, "r", encoding="utf-8") as f:
  161. return yaml.safe_load(f)
  162. class TiandituAPI:
  163. """实现天地图搜索API"""
  164. def __init__(self, token: str):
  165. self.token = token
  166. self.header = HEADER
  167. def get(self, url: str, payload: dict) -> object:
  168. """实现get请求
  169. Args:
  170. url (str): url
  171. payload (dict): 传递参数
  172. Returns:
  173. object: {"code": 1为正常, -1为异常, "data": 请求数据}
  174. """
  175. timeout = 8
  176. try:
  177. res = requests.get(
  178. url, headers=self.header, params=payload, timeout=timeout
  179. )
  180. if res.ok:
  181. return {"code": 1, "data": res.json()}
  182. return {"code": -1, "message": f"请求失败 Status Code:{res.status_code}"}
  183. except TimeoutError as error:
  184. return {"code": -1, "message": str(error)}
  185. def api_search_v2(self, keyword: str, specify: str = None) -> object:
  186. """天地图地名搜索V2接口
  187. API说明: http://lbs.tianditu.gov.cn/server/search2.html
  188. Args:
  189. keyword (str): 搜索关键词
  190. specify (str, optional): 指定行政区的国标码 默认不传入
  191. Returns:
  192. object: 返回
  193. """
  194. #
  195. url = "http://api.tianditu.gov.cn/v2/search"
  196. data = {
  197. "keyWord": keyword, # 搜索的关键字
  198. "mapBound": "-180,-90,180,90", # 查询的地图范围(minx,miny,maxx,maxy) | -180,-90至180,90
  199. "level": 18, # 目前查询的级别 | 1-18级
  200. "queryType": 1, # 搜索类型 | 1:普通搜索(含地铁公交) 7:地名搜索
  201. "start": 0, # 返回结果起始位(用于分页和缓存)默认0 | 0-300,表示返回结果的起始位置。
  202. "count": 10, # 返回的结果数量(用于分页和缓存)| 1-300,返回结果的条数。
  203. "show": 1, # 返回poi结果信息类别 | 取值为1,则返回基本poi信息;取值为2,则返回详细poi信息
  204. }
  205. if specify:
  206. data["specify"] = specify
  207. payload = {"postStr": str(data), "type": "query", "tk": self.token}
  208. return self.get(url, payload)
  209. def api_geocoder(self, keyword: str) -> object:
  210. """天地图地理编码接口
  211. API说明: http://lbs.tianditu.gov.cn/server/geocodinginterface.html
  212. Args:
  213. keyword (str): _description_
  214. Returns:
  215. object: _description_
  216. """
  217. url = "http://api.tianditu.gov.cn/geocoder"
  218. data = {
  219. "keyWord": keyword, # 搜索的关键字
  220. }
  221. payload = {"ds": str(data), "tk": self.token}
  222. return self.get(url, payload)
  223. def api_regeocoder(self, lon: float, lat: float) -> object:
  224. """天地图逆地理编码接口
  225. API说明: http://lbs.tianditu.gov.cn/server/geocoding.html
  226. Args:
  227. lon (float): 纬度值
  228. lat (float): 经度值
  229. Returns:
  230. object: 逆地理编码数据
  231. """
  232. url = "http://api.tianditu.gov.cn/geocoder"
  233. data = {"lon": lon, "lat": lat, "ver": 1}
  234. payload = {"postStr": str(data), "type": "geocode", "tk": self.token}
  235. return self.get(url, payload)