| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291 |
- import json
- from multiprocessing.dummy import Pool as ThreadPool
- from pathlib import Path
- from random import choice
- import requests
- import yaml
- from qgis.core import QgsSettings
- TIANDITU_HOME_URL = "https://www.tianditu.gov.cn/"
- PLUGIN_NAME = "tianditu-tools"
- PluginDir = Path(__file__).parent
- HEADER = {
- "User-Agent": "Mozilla/5.0 QGIS/32400/Windows 10 Version 2009",
- "Referer": "https://www.tianditu.gov.cn/",
- }
- def get_extramap_status():
- summary = load_yaml(PluginDir.joinpath("maps/summary.yml"))
- default_status = {}
- for section in summary:
- section_data = load_yaml(PluginDir.joinpath(f"maps/{section}.yml"))
- default_status[section] = list(section_data["maps"].keys())
- return default_status
- class PluginConfig:
- def __init__(self):
- self.conf = QgsSettings()
- self.conf_name = "tianditu-tools"
- self.section_tianditu = f"{self.conf_name}/Tianditu"
- def init_config(self):
- # 初始化配置文件
- if not self.conf.contains("tianditu-tools/Tianditu/key"):
- print("初始化配置文件")
- # 初始化
- self.conf.setValue(f"{self.section_tianditu}/key", "")
- self.conf.setValue(f"{self.section_tianditu}/keyList", "")
- self.conf.setValue(f"{self.section_tianditu}/random", True)
- self.conf.setValue(f"{self.section_tianditu}/random_key", False)
- self.conf.setValue(f"{self.section_tianditu}/subdomain", "t0")
- if not self.conf.contains("tianditu-tools/Other/extramap_status"):
- print("初始化 extra map 文件")
- self.conf.setValue(
- f"{self.conf_name}/Other/extramap_status", str(get_extramap_status())
- )
- # 升级到保存多个key的版本
- if not self.conf.contains(f"{self.section_tianditu}/keyList"):
- self.conf.setValue(f"{self.section_tianditu}/random_key", False)
- # 保存原来的key
- if self.get_key() != "":
- self.conf.setValue(f"{self.section_tianditu}/keyList", self.get_key())
- else:
- self.conf.setValue(f"{self.section_tianditu}/keyList", "")
- def get_key_list(self):
- data_str = self.get_value("/Tianditu/keyList")
- if data_str == "":
- return []
- return data_str.split(",")
- def save_key_list(self, data_list):
- self.conf.setValue(f"{self.section_tianditu}/keyList", ",".join(data_list))
- def get_extra_maps_status(self):
- data = self.get_value("Other/extramap_status")
- return json.loads(data.replace("'", '"'))
- def set_extra_maps_status(self, data):
- self.conf.setValue(f"{self.conf_name}/Other/extramap_status", str(data))
- def get_value(self, name):
- return self.conf.value(f"{self.conf_name}/{name}")
- def get_bool_value(self, name):
- return self.conf.value(f"{self.conf_name}/{name}", type=bool)
- def set_value(self, name, value):
- self.conf.setValue(f"{self.conf_name}/{name}", value)
- def get_key(self):
- return self.get_value("Tianditu/key")
- def get_random_key(self):
- key_list = self.get_key_list()
- return choice(key_list)
- def set_key(self, key):
- key_to_set = ""
- if key is not None:
- key_to_set = key
- self.conf.setValue(f"{self.section_tianditu}/key", key_to_set)
- def got(url, headers=None, timeout=6):
- try:
- response = requests.get(url, headers=headers, timeout=timeout)
- except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
- response = None
- return response
- def tianditu_map_url(maptype: str, token: str, subdomain: str) -> str:
- """
- 返回天地图url
- Args:
- maptype (str): 类型
- token (str): 天地图key
- subdomain (str): 使用的子域名
- Returns:
- str: 返回天地图XYZ瓦片地址
- """
- url = f"https://{subdomain}.tianditu.gov.cn/"
- url += (
- f"{maptype}_w/wmts?SERVICE=WMTS&REQUEST=GetTile&VERSION=1.0.0&LAYER={maptype}"
- )
- url += "&STYLE=default&TILEMATRIXSET=w&FORMAT=tiles&TileCol={x}&TileRow={y}&TileMatrix={z}"
- url += f"&tk={token}"
- return url
- def check_url_status(url: str) -> object:
- """
- 检查url状态
- Args:
- url (str): url
- Returns:
- object: {"code": 0}
- code:
- -1:网络异常
- 0: 正常
- 1: 非法key
- 12: 权限类型错误
- 1000: 未知错误
- """
- res = got(url, headers=HEADER)
- msg = {"code": 0}
- if res is not None:
- if res.status_code == 403:
- msg["code"] = res.json()["code"] # 1:非法key 12:权限类型错误
- msg["msg"] = res.json()["msg"]
- msg["resolve"] = res.json()["resolve"]
- elif res.status_code == 200:
- msg["code"] = 0
- else:
- msg["code"] = 1000 # 未知错误
- msg["msg"] = "未知错误 "
- msg["resolve"] = f"错误代码:{res.status_code}"
- else:
- msg = {"code": -1, "msg": "网络错误", "resolve": "请检查网络连接"}
- return msg
- def check_subdomain(url: str) -> int:
- """对子域名进行测速
- Args:
- url (str): 瓦片url
- Returns:
- int: 子域名对应的延迟数(毫秒), -1 表示连接失败
- """
- response = got(url, headers=HEADER, timeout=8)
- if response:
- millisecond = response.elapsed.total_seconds() * 1000
- else:
- millisecond = -1
- return int(millisecond)
- def check_subdomains(url_list: list) -> list:
- """对子域名列表进行测速
- Args:
- url_list (list): 由不同子域名组成的瓦片url列表
- Returns:
- list: 每个子域名对应的延迟数(毫秒)组成的列表
- """
- pool = ThreadPool(4)
- ping_list = pool.map(check_subdomain, url_list)
- pool.close()
- pool.join()
- return ["❌" if x == -1 else f"{x} ms" for x in ping_list]
- def load_yaml(file_path: Path):
- """
- 读取YAML文件
- """
- with open(file_path, "r", encoding="utf-8") as f:
- return yaml.safe_load(f)
- class TiandituAPI:
- """实现天地图搜索API"""
- def __init__(self, token: str):
- self.token = token
- self.header = HEADER
- def get(self, url: str, payload: dict) -> object:
- """实现get请求
- Args:
- url (str): url
- payload (dict): 传递参数
- Returns:
- object: {"code": 1为正常, -1为异常, "data": 请求数据}
- """
- timeout = 8
- try:
- res = requests.get(
- url, headers=self.header, params=payload, timeout=timeout
- )
- if res.ok:
- return {"code": 1, "data": res.json()}
- return {"code": -1, "message": f"请求失败 Status Code:{res.status_code}"}
- except TimeoutError as error:
- return {"code": -1, "message": str(error)}
- def api_search_v2(self, keyword: str, specify: str = None) -> object:
- """天地图地名搜索V2接口
- API说明: http://lbs.tianditu.gov.cn/server/search2.html
- Args:
- keyword (str): 搜索关键词
- specify (str, optional): 指定行政区的国标码 默认不传入
- Returns:
- object: 返回
- """
- #
- url = "http://api.tianditu.gov.cn/v2/search"
- data = {
- "keyWord": keyword, # 搜索的关键字
- "mapBound": "-180,-90,180,90", # 查询的地图范围(minx,miny,maxx,maxy) | -180,-90至180,90
- "level": 18, # 目前查询的级别 | 1-18级
- "queryType": 1, # 搜索类型 | 1:普通搜索(含地铁公交) 7:地名搜索
- "start": 0, # 返回结果起始位(用于分页和缓存)默认0 | 0-300,表示返回结果的起始位置。
- "count": 10, # 返回的结果数量(用于分页和缓存)| 1-300,返回结果的条数。
- "show": 1, # 返回poi结果信息类别 | 取值为1,则返回基本poi信息;取值为2,则返回详细poi信息
- }
- if specify:
- data["specify"] = specify
- payload = {"postStr": str(data), "type": "query", "tk": self.token}
- return self.get(url, payload)
- def api_geocoder(self, keyword: str) -> object:
- """天地图地理编码接口
- API说明: http://lbs.tianditu.gov.cn/server/geocodinginterface.html
- Args:
- keyword (str): _description_
- Returns:
- object: _description_
- """
- url = "http://api.tianditu.gov.cn/geocoder"
- data = {
- "keyWord": keyword, # 搜索的关键字
- }
- payload = {"ds": str(data), "tk": self.token}
- return self.get(url, payload)
- def api_regeocoder(self, lon: float, lat: float) -> object:
- """天地图逆地理编码接口
- API说明: http://lbs.tianditu.gov.cn/server/geocoding.html
- Args:
- lon (float): 纬度值
- lat (float): 经度值
- Returns:
- object: 逆地理编码数据
- """
- url = "http://api.tianditu.gov.cn/geocoder"
- data = {"lon": lon, "lat": lat, "ver": 1}
- payload = {"postStr": str(data), "type": "geocode", "tk": self.token}
- return self.get(url, payload)
|