PostgreSQL.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. # -*- coding: utf-8 -*-
  2. __author__ = 'wanger'
  3. __date__ = '2024-08-20'
  4. __copyright__ = '(C) 2024 by siwei'
  5. __revision__ = '1.0'
  6. import time
  7. from typing import Optional
  8. import os
  9. import psycopg2
  10. import uuid
  11. from ..FTP.FtpUitl import FtpOper
  12. from ..FTP.FtpConfig import *
  13. from processing.tools.FileUtils import getInputFileName
  14. class PostgreSQL:
  15. # 矢量数据元数据表
  16. Vector_Storage = "t_vector_storage"
  17. # 矢量数据服务发布表
  18. Vector_Server = "t_vector_server"
  19. # 附件表
  20. Vector_FJ = "t_vector_fj"
  21. # 字段表
  22. Vector_Field = "t_vector_field"
  23. # 平台资源目录表
  24. Portal_Zyml = "t_yzt_zyml"
  25. # 备份表后缀名
  26. BackTable = "_back"
  27. # 序列名称后缀
  28. Sequence = "_id_seq"
  29. def __init__(
  30. self,
  31. host: Optional[str] = "127.0.0.1", # default host during installation
  32. port: Optional[str] = "5432", # default port during pg installation
  33. user: Optional[str] = "postgres", # default user during pg installation
  34. password: Optional[str] = "postgres", # default password during pg installation
  35. dbname: Optional[str] = "real3d", # default dbname during pg installation
  36. schema: Optional[str] = None
  37. ):
  38. # 配置数据库连接参数并指定schema
  39. self.connparams = {
  40. "dbname": dbname,
  41. "user": user,
  42. "password": password,
  43. "host": host,
  44. "port": port,
  45. "options": "-c search_path=otherSchema," + schema if schema is not None else None
  46. }
  47. self.conn = psycopg2.connect(**self.connparams)
  48. # 创建一个游标对象
  49. self.cur = self.conn.cursor()
  50. # 执行一个查询
  51. # self.cur.execute("SELECT 省,类型 from \"XZQH3857\";")
  52. # 获取查询结果
  53. # rows = self.cur.fetchall()
  54. # 打印结果
  55. # for row in rows:
  56. # for v in row:
  57. # print(v)
  58. def execute(self, sql):
  59. # 执行一个查询
  60. self.cur.execute(sql)
  61. # 获取查询结果
  62. return self.cur.fetchall()
  63. def close(self):
  64. # 关闭游标和连接
  65. self.cur.close()
  66. self.conn.close()
  67. # 根据名称删除元数据信息
  68. def deleteVectorStorage(self, name):
  69. sql = "delete from {} where name = '{}'".format(self.Vector_Storage, name)
  70. self.cur.execute(sql)
  71. self.conn.commit()
  72. # 保存元数据信息
  73. def insertVectorStorage(self, id, name, year, xmlx, sjly, xzqh, xzqh_field, ywlx, sjlx, sjywz, glbm, table_alias):
  74. sql = "insert into {} (id, name, year, xmlx, sjly, xzqh, xzqh_field, ywlx, sjlx, sjywz, glbm, table_alias) values ('{}','{}', '{}', (select bsm from t_vector_zyml where name = '{}'), '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}')".format(
  75. self.Vector_Storage, id, name, year.toString("yyyy"), xmlx, sjly, xzqh, xzqh_field, ywlx, sjlx, sjywz, glbm,
  76. table_alias)
  77. self.cur.execute(sql)
  78. self.conn.commit()
  79. # 元数据入库,参数parameters
  80. def metadataStorage(self, parameters, ssxzqh, fileliststr, ywlx, glbm, ogrLayer, zyml):
  81. record_id = ""
  82. id = uuid.uuid4().__str__()
  83. id = id.replace("-", "")
  84. # print(parameters)
  85. if (parameters.get("TABLE") is None or parameters.get("TABLE") == "") and ogrLayer is not None:
  86. record_id = parameters.get("SCHEMA") + "." + getInputFileName(ogrLayer).lower()
  87. else:
  88. record_id = parameters.get("SCHEMA") + "." + parameters.get("TABLE")
  89. table_alias = parameters.get("TABLE_ALIAS")
  90. if table_alias != '' and table_alias is not None: # 配置表别名
  91. self.setTableAlias(table=self.getTableName(recordid=record_id),
  92. alias=table_alias)
  93. else:
  94. table_alias = ''
  95. self.deleteVectorStorage(name=record_id)
  96. self.insertVectorStorage(id=id, name=record_id, year=parameters.get("VECTOR_YEAR"),
  97. xmlx=zyml, sjly=parameters.get("VECTOR_SJLY"),
  98. xzqh=ssxzqh, xzqh_field=parameters.get("XZQH_FIELD"),
  99. ywlx=ywlx, sjlx=parameters.get("SOURCE_TYPE"), sjywz=ogrLayer,
  100. glbm=glbm, table_alias=table_alias)
  101. # 附件上传
  102. if fileliststr is not None and fileliststr != "":
  103. self.uploadAttachment(layername=record_id, layerid=id, fileliststr=fileliststr)
  104. # 获取文件名称
  105. def getInputFileName(self, file):
  106. filename = file.split("/")[len(file.split("/")) - 1]
  107. return filename[0: filename.find(".")]
  108. # 附件上传FTP
  109. def uploadAttachment(self, layername, layerid, fileliststr):
  110. print("开始上传附件啦!!")
  111. self.ftpOper = FtpOper()
  112. self.ftpOper.ftp.connect(ftpHost, ftpPort)
  113. self.ftpOper.ftp.login(ftpUsername, ftpPassword)
  114. ftp_path = '/三亚数管/' + layername # 图层FTP目录地址
  115. if not self.ftpOper.is_exist('/三亚数管/'):
  116. self.ftpOper.makedir('三亚数管', '/', '/三亚数管/')
  117. if self.ftpOper.is_exist(ftp_path): # 已有此图层目录,则删除
  118. self.ftpOper.deletedir(ftp_path)
  119. self.ftpOper.makedir(layername, '/三亚数管/', ftp_path) # 重新创建图层目录
  120. file_paths_list = fileliststr.split(",")
  121. if len(file_paths_list):
  122. for file_path in file_paths_list:
  123. if os.path.exists(file_path):
  124. filename = os.path.basename(file_path)
  125. filenamenoexten, fileextension = os.path.splitext(filename)
  126. self.ftpOper.uploadfile(file_path, ftp_path)
  127. self.insertVectorFJ(uuid.uuid4().__str__(), layerid, layername, filename, fileextension,
  128. ftp_path + '/' + filename,
  129. file_path, self.ftpOper.ftp.host + ':' + str(self.ftpOper.ftp.port),
  130. str(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())))
  131. self.ftpOper.ftp.close()
  132. # 保存附件信息
  133. def insertVectorFJ(self, id, layerid, layername, name, fjlx, fjdz, fjwz, dldz, cjsj):
  134. sql = "insert into {} (id,layerid, layername, name, fjlx, fjdz, fjwz,dldz,cjsj) values ('{}','{}','{}', '{}', '{}', '{}', '{}', '{}', '{}')".format(
  135. self.Vector_FJ, id, layerid, layername, name, fjlx, fjdz, fjwz, dldz, cjsj)
  136. self.cur.execute(sql)
  137. self.conn.commit()
  138. # 服务发布表插入
  139. def insertVectorServer(self, id, fwqlx, sjy, fwlx, fwdz, fwmc, fwgzkj, fwys, qpfa, zymlbsm, layergroup, zoommin,
  140. zoommax, layername):
  141. # 数据库管理系统服务相关表插入
  142. sql = "insert into {} (id, fwqlx, sjy, fwlx, fwdz, fwmc, fwgzkj, fwys, qpfa, zymlbsm, layergroup, zoommin, zoommax, layername) values ('{}','{}','{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}')".format(
  143. self.Vector_Server, id, fwqlx, sjy, fwlx, fwdz, fwmc, fwgzkj, fwys, qpfa, zymlbsm, layergroup, zoommin,
  144. zoommax, layername)
  145. self.cur.execute(sql)
  146. self.conn.commit()
  147. # 平台资源目录表插入
  148. def insertPortalZymlServer(self, id, fwqlx, sjy, fwlx, fwdz, fwmc, fwgzkj, fwys, qpfa, zymlbsm, layergroup, zoommin,
  149. zoommax, layername):
  150. # 平台资源目录服务相关表插入
  151. sql = "insert into {} (bsm, name, type, pbsm, url, state, parent, server_type, fwmc, " \
  152. "fwgzkj, fwys, qpfa, layergroup, format, maximumlevel, minimumlevel) values ('{}','{}','{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}')".format(
  153. self.Portal_Zyml, id, fwmc, fwlx, zymlbsm, fwdz, '1', '0', fwqlx, layername, fwgzkj, fwys, qpfa,
  154. layergroup, 'image/png', zoommax, zoommin)
  155. self.cur.execute(sql)
  156. self.conn.commit()
  157. # 获取资源目录
  158. def getZyml(self):
  159. self.cur.execute(
  160. "select t.bsm, t.name, t.pbsm from t_yzt_zyml t where t.parent = '1' order by t.lev,t.pbsm , t.sort")
  161. rows = self.cur.fetchall()
  162. return rows
  163. def getVectorZyml(self):
  164. self.cur.execute(
  165. "select bsm,name from t_vector_zyml t order by t.sort,t.name")
  166. rows = self.cur.fetchall()
  167. return rows
  168. # 获取行政区划
  169. def getXzqh(self):
  170. self.cur.execute(
  171. "select t.id, t.name, case when t.pid = '0' then '' else t.pid end from vector.xzqh t order by t.id")
  172. rows = self.cur.fetchall()
  173. return rows
  174. # 获取矢量数据字典类型
  175. def getVectorYwlx(self):
  176. self.cur.execute(
  177. "select distinct(t.ywlx) from t_vector_field t ")
  178. rows = self.cur.fetchall()
  179. return rows
  180. # 获取部门列表
  181. def getDeptList(self):
  182. self.cur.execute(
  183. "select dept_name from (select distinct(dept_name),dept_id from sys_dept t where t.del_flag = '0' order by t.dept_id) a")
  184. rows = self.cur.fetchall()
  185. return rows
  186. # 根据用户名称查询有权限维护的数据库表,返回别名或者表名
  187. def getManagerTables(self, username):
  188. sql = f'select t.id "id",name "name", case when t.table_alias != \'\' then t.table_alias else t.name end as "alias", t.ywlx "ywlx" ' \
  189. f'from {self.Vector_Storage} t where t.glbm = (select dept_name from sys_dept d where d.dept_id = (select dept_id from sys_user u where u.user_name = \'{username}\' )) and t.sjlx = \'vector\''
  190. self.cur.execute(sql)
  191. rows = self.cur.fetchall()
  192. return rows
  193. # 根据业务类型和表名查询必填的字段名
  194. def getMustRequireField(self, tablename, ywlx):
  195. arr = tablename.split(".")
  196. sql = f'select lower(f.name) from {self.Vector_Field} f where f.ywlx = \'{ywlx}\' and lower(f.name) in (' \
  197. f'select column_name from information_schema.columns where table_schema = \'{arr[0]}\' and table_name = \'{arr[1]}\') and f.nullable = \'1\''
  198. self.cur.execute(sql)
  199. rows = self.cur.fetchall()
  200. fields = []
  201. for row in rows:
  202. fields.append(row[0])
  203. return fields
  204. def getAllTableField(self, tablename):
  205. arr = tablename.split(".")
  206. sql = f'select column_name from information_schema.columns where table_schema = \'{arr[0]}\' and table_name = \'{arr[1]}\' and lower(column_name) not in (\'id\',\'geom\', \'shape_leng\', \'shape_area\')'
  207. self.cur.execute(sql)
  208. rows = self.cur.fetchall()
  209. fields = []
  210. for row in rows:
  211. fields.append(row[0])
  212. return fields
  213. # 保存矢量要素
  214. def insertVectorFeature(self, tablename, fields, values, wkt, wktsrid, geomtype):
  215. arr = tablename.split(".")
  216. insertvalues = ""
  217. print(values)
  218. for value in values:
  219. if value == None:
  220. insertvalues += "null,"
  221. else:
  222. b = self.is_string(value)
  223. if b == True:
  224. insertvalues += f'\'{value}\','
  225. else:
  226. insertvalues += f'{value},'
  227. print(insertvalues)
  228. print(insertvalues[:-1])
  229. # 判断几何类型是否一致
  230. if wkt.startswith(geomtype) == False:
  231. if geomtype.startswith("MULTI"): # 转换为Multi
  232. wkt = 'MULTI' + wkt
  233. wkt = wkt.replace("((", "(((").replace("))", ")))")
  234. else:
  235. wkt = wkt.replace("(((", "((").replace(")))", "))").replace("MULTI", "")
  236. sql = f'insert into {arr[0]}."{arr[1]}" ({",".join(fields)} , geom) values ({insertvalues[:-1]} , public.st_geomfromtext( \'{wkt}\', {wktsrid}))'
  237. sql = sql.replace("None", "null")
  238. print(sql)
  239. self.cur.execute(sql)
  240. self.conn.commit()
  241. # 获取矢量数据表的坐标系
  242. def getVectorTableSrid(self, tablename):
  243. arr = tablename.split(".")
  244. sql = f'select public.st_srid(geom) from {arr[0]}."{arr[1]}" limit 1'
  245. self.cur.execute(sql)
  246. rows = self.cur.fetchall()
  247. for row in rows:
  248. return row[0]
  249. # 获取矢量数据表的空间类型
  250. def getVectorTableGeomType(self, tablename):
  251. arr = tablename.split(".")
  252. sql = f'select replace(upper(public.ST_GeometryType(geom)),\'ST_\',\'\') from {arr[0]}."{arr[1]}" limit 1'
  253. self.cur.execute(sql)
  254. rows = self.cur.fetchall()
  255. for row in rows:
  256. return row[0]
  257. def getInstersectsCount(self, tablename, tablesrid, wkts, wktsrid):
  258. arr = tablename.split(".")
  259. wktsql = ""
  260. for i in range(len(wkts)):
  261. wkt = wkts[i]
  262. if i > 0:
  263. wktsql += ' or '
  264. wktsql += f' public.st_intersects(t.geom,public.st_transform( public.st_geomfromtext( \'{wkt}\', {wktsrid}), {tablesrid} )) '
  265. sql = f'select count(1) from {arr[0]}."{arr[1]}" t where {wktsql}'
  266. self.cur.execute(sql)
  267. rows = self.cur.fetchall()
  268. for row in rows:
  269. return row[0]
  270. # 设置表的别名(基于postgresql comment)
  271. def setTableAlias(self, table, alias):
  272. sql = f'COMMENT ON TABLE {table} IS \'{alias}\''
  273. self.cur.execute(sql)
  274. self.conn.commit()
  275. # 删除备份表并创建备份表
  276. def restoreBackTable(self, tablename):
  277. arr = tablename.split(".")
  278. # 删除当前表
  279. deletesql = f'drop table if exists {arr[0]}."{arr[1]}"'
  280. self.cur.execute(deletesql)
  281. self.conn.commit()
  282. # 删除当前表序列
  283. deletesequencesql = f'drop sequence if exists {tablename}{self.Sequence}'
  284. self.cur.execute(deletesequencesql)
  285. self.conn.commit()
  286. # 恢复back表
  287. createsql = f'create table {arr[0]}."{arr[1]}" as table {arr[0]}."{arr[1]}{self.BackTable}"'
  288. self.cur.execute(createsql)
  289. self.conn.commit()
  290. # 查询数据表中id的最大值
  291. sql = f'select (max(id) + 1) from {arr[0]}."{arr[1]}"'
  292. self.cur.execute(sql)
  293. rows = self.cur.fetchall()
  294. max = 1
  295. for row in rows:
  296. max = row[0]
  297. # 创建序列
  298. addsequencesql = f'create sequence {tablename}{self.Sequence} start with {max} increment by 1 no minvalue no maxvalue cache 1'
  299. self.cur.execute(addsequencesql)
  300. self.conn.commit()
  301. # 设置id字段不能为空
  302. notnullsql = f'alter table {arr[0]}."{arr[1]}" alter column id set not null'
  303. self.cur.execute(notnullsql)
  304. self.conn.commit()
  305. # 使用序列
  306. usesequencesql = f'alter table {arr[0]}."{arr[1]}" alter column id set default nextval(\'{tablename}{self.Sequence}\')'
  307. self.cur.execute(usesequencesql)
  308. self.conn.commit()
  309. # 删除备份表并创建备份表
  310. def resetBackTable(self, tablename):
  311. arr = tablename.split(".")
  312. deletesql = f'drop table if exists {arr[0]}."{arr[1]}{self.BackTable}"'
  313. self.cur.execute(deletesql)
  314. self.conn.commit()
  315. createsql = f'create table {arr[0]}."{arr[1]}{self.BackTable}" as table {arr[0]}."{arr[1]}"'
  316. self.cur.execute(createsql)
  317. self.conn.commit()
  318. def getTableName(self, recordid):
  319. sp = recordid.split(".")
  320. res = f'"{sp[0]}"."{sp[1].lower()}"'
  321. return res
  322. # 查询数据库当前模式下有没有同名的表
  323. def checkTableName(self, schemaname, tablename):
  324. sql = f'select tablename from pg_tables t where t.schemaname = \'{schemaname}\' and t.tablename = \'{tablename}\''
  325. self.cur.execute(sql)
  326. rows = self.cur.fetchall()
  327. return len(rows) == 0
  328. # 判断数据是否为字符串
  329. def is_string(self, var):
  330. return isinstance(var, str)