PostgreSQL.py 21 KB


  1. # -*- coding: utf-8 -*-
  2. __author__ = 'wanger'
  3. __date__ = '2024-08-20'
  4. __copyright__ = '(C) 2024 by siwei'
  5. __revision__ = '1.0'
  6. import time
  7. from typing import Optional
  8. import os
  9. import psycopg2
  10. import uuid
  11. import siwei_config
  12. from ..FTP.FtpUitl import FtpOper
  13. from ..FTP.FtpConfig import *
  14. from ..FileUtils import getInputFileName
  15. from ..StringUtils import (getNow, base64str, enbase64str)
  16. from datetime import datetime
  17. class PostgreSQL:
  18. # 矢量数据元数据表
  19. Vector_Storage = "t_vector_storage"
  20. # 矢量数据服务发布表
  21. Vector_Server = "t_vector_server"
  22. # 附件表
  23. Vector_FJ = "t_vector_fj"
  24. # 字段表
  25. Vector_Field = "t_vector_field"
  26. # 平台资源目录表
  27. Portal_Zyml = "t_yzt_zyml"
  28. # 备份表后缀名
  29. BackTable = "_back"
  30. # 序列名称后缀
  31. Sequence = "_id_seq"
  32. # 版本控制表
  33. Vector_Version = "t_vector_version"
  34. # 版本控制详情表
  35. Vector_Version_Details = "t_vector_version_details"
  36. # 全局参数修改
  37. db_config = siwei_config.CONFIG['db']
  38. def __init__(
  39. self,
  40. host: Optional[str] = db_config['host'], # default host during installation
  41. port: Optional[str] = db_config['port'], # default port during pg installation
  42. user: Optional[str] = db_config['user'], # default user during pg installation
  43. password: Optional[str] = db_config['password'], # default password during pg installation
  44. dbname: Optional[str] = db_config['name'], # default dbname during pg installation
  45. schema: Optional[str] = db_config['schema']
  46. ):
  47. # 配置数据库连接参数并指定schema
  48. self.connparams = {
  49. "dbname": dbname,
  50. "user": user,
  51. "password": password,
  52. "host": host,
  53. "port": port,
  54. "options": "-c search_path=otherSchema," + schema if schema is not None else None
  55. }
  56. self.conn = psycopg2.connect(**self.connparams)
  57. # 创建一个游标对象
  58. self.cur = self.conn.cursor()
  59. # 执行一个查询
  60. # self.cur.execute("SELECT 省,类型 from \"XZQH3857\";")
  61. # 获取查询结果
  62. # rows = self.cur.fetchall()
  63. # 打印结果
  64. # for row in rows:
  65. # for v in row:
  66. # print(v)
  67. def execute(self, sql):
  68. # 执行一个查询
  69. self.cur.execute(sql)
  70. # 获取查询结果
  71. return self.cur.fetchall()
  72. def close(self):
  73. # 关闭游标和连接
  74. self.cur.close()
  75. self.conn.close()
  76. # 根据名称删除元数据信息
  77. def deleteVectorStorage(self, name):
  78. sql = "delete from {} where name = '{}'".format(self.Vector_Storage, name)
  79. self.cur.execute(sql)
  80. self.conn.commit()
  81. # 保存元数据信息
  82. def insertVectorStorage(self, id, name, year, xmlx, sjly, xzqh, xzqh_field, ywlx, sjlx, sjywz, glbm, table_alias):
  83. sql = "insert into {} (id, name, year, xmlx, sjly, xzqh, xzqh_field, ywlx, sjlx, sjywz, glbm, table_alias) values ('{}','{}', '{}', (select bsm from t_vector_zyml where name = '{}'), '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}')".format(
  84. self.Vector_Storage, id, name, year.toString("yyyy"), xmlx, sjly, xzqh, xzqh_field, ywlx, sjlx, sjywz, glbm,
  85. table_alias)
  86. self.cur.execute(sql)
  87. self.conn.commit()
  88. # 元数据入库,参数parameters
  89. def metadataStorage(self, parameters, ssxzqh, fileliststr, ywlx, glbm, ogrLayer, layername, zyml):
  90. record_id = ""
  91. id = uuid.uuid4().__str__()
  92. id = id.replace("-", "")
  93. print(ogrLayer)
  94. print(layername)
  95. print(parameters)
  96. if parameters.get("TABLE") is None or parameters.get("TABLE") == "":
  97. if layername is not None and layername != "":
  98. record_id = parameters.get("SCHEMA") + "." + layername.lower()
  99. else:
  100. record_id = parameters.get("SCHEMA") + "." + getInputFileName(ogrLayer).lower()
  101. else:
  102. record_id = parameters.get("SCHEMA") + "." + parameters.get("TABLE")
  103. # if (parameters.get("TABLE") is None or parameters.get("TABLE") == "") and ogrLayer is not None:
  104. # record_id = parameters.get("SCHEMA") + "." + getInputFileName(ogrLayer).lower()
  105. # elif ogrLayer is not None:
  106. # record_id = parameters.get("SCHEMA") + "." + ogrLayer.lower()
  107. # else:
  108. # record_id = parameters.get("SCHEMA") + "." + parameters.get("TABLE")
  109. table_alias = parameters.get("TABLE_ALIAS")
  110. if table_alias is not None and table_alias != '': # 配置表别名
  111. self.setTableAlias(table=self.getTableName(recordid=record_id),
  112. alias=table_alias)
  113. else:
  114. table_alias = ''
  115. self.deleteVectorStorage(name=record_id)
  116. self.deleteVectorVersion(tablename=record_id)
  117. self.insertVectorStorage(id=id, name=record_id, year=parameters.get("VECTOR_YEAR"),
  118. xmlx=zyml, sjly=parameters.get("VECTOR_SJLY"),
  119. xzqh=ssxzqh, xzqh_field=parameters.get("XZQH_FIELD"),
  120. ywlx=ywlx, sjlx=parameters.get("SOURCE_TYPE"), sjywz=ogrLayer,
  121. glbm=glbm, table_alias=table_alias)
  122. self.insertVectorVersion(tablename=record_id)
  123. # 附件上传
  124. if fileliststr is not None and fileliststr != "":
  125. self.uploadAttachment(layername=record_id, layerid=id, fileliststr=fileliststr)
  126. # 删除版本控制表
  127. def deleteVectorVersion(self, tablename):
  128. sql = "delete from {} where tablename = '{}'".format(self.Vector_Version, tablename)
  129. self.cur.execute(sql)
  130. self.conn.commit()
  131. # 删除版本数据
  132. def deleteVectorRecords(self, tablename, rksj_sw):
  133. sql = "delete from {} t where t.rksj_sw > '{}'".format(tablename, rksj_sw)
  134. self.cur.execute(sql)
  135. self.conn.commit()
  136. sql = "update {} set version = '{}' where tablename = '{}'".format(self.Vector_Version, rksj_sw,
  137. tablename)
  138. self.cur.execute(sql)
  139. self.conn.commit()
  140. # 插入版本数据
  141. def insertVectorRecords(self, tablename, curversion, targetversion):
  142. sql = "select insertsql from {} t where t.tablename = '{}' and t.version > '{}' and t.version <= '{}'".format(
  143. self.Vector_Version_Details, tablename, curversion, targetversion)
  144. self.cur.execute(sql)
  145. rows = self.cur.fetchall()
  146. for row in rows:
  147. sql = enbase64str(row[0])
  148. self.cur.execute(sql)
  149. sql = "update {} set version = '{}' where tablename = '{}'".format(self.Vector_Version, targetversion,
  150. tablename)
  151. self.cur.execute(sql)
  152. self.conn.commit()
  153. # 创建版本控制表
  154. def insertVectorVersion(self, tablename):
  155. formatted_time = getNow()
  156. # 输出结果
  157. print("当前时间:{}".format(formatted_time))
  158. sql = "insert into {} (tablename, version, type) values ('{}', '{}', '{}')".format(self.Vector_Version,
  159. tablename, formatted_time,
  160. "base")
  161. self.cur.execute(sql)
  162. self.conn.commit()
  163. sql = "alter table {} add column rksj_sw timestamp".format(tablename)
  164. self.cur.execute(sql)
  165. self.conn.commit()
  166. sql = "update {} SET rksj_sw = to_timestamp('{}', 'YYYY-MM-DD HH24:MI:SS')".format(tablename, formatted_time)
  167. self.cur.execute(sql)
  168. self.conn.commit()
  169. details = "insert into {} (tablename, version, insertsql) values ('{}', '{}', '{}')".format(
  170. self.Vector_Version_Details, tablename, formatted_time,
  171. "")
  172. print(details)
  173. self.cur.execute(details)
  174. self.conn.commit()
  175. # 获取文件名称
  176. def getInputFileName(self, file):
  177. filename = file.split("/")[len(file.split("/")) - 1]
  178. return filename[0: filename.find(".")]
  179. # 附件上传FTP
  180. def uploadAttachment(self, layername, layerid, fileliststr):
  181. print("开始上传附件啦!!")
  182. self.ftpOper = FtpOper()
  183. self.ftpOper.ftp.connect(ftpHost, ftpPort)
  184. self.ftpOper.ftp.login(ftpUsername, ftpPassword)
  185. ftp_path = '/三亚数管/' + layername # 图层FTP目录地址
  186. if not self.ftpOper.is_exist('/三亚数管/'):
  187. self.ftpOper.makedir('三亚数管', '/', '/三亚数管/')
  188. if self.ftpOper.is_exist(ftp_path): # 已有此图层目录,则删除
  189. self.ftpOper.deletedir(ftp_path)
  190. self.ftpOper.makedir(layername, '/三亚数管/', ftp_path) # 重新创建图层目录
  191. file_paths_list = fileliststr.split(",")
  192. if len(file_paths_list):
  193. for file_path in file_paths_list:
  194. if os.path.exists(file_path):
  195. filename = os.path.basename(file_path)
  196. filenamenoexten, fileextension = os.path.splitext(filename)
  197. self.ftpOper.uploadfile(file_path, ftp_path)
  198. self.insertVectorFJ(uuid.uuid4().__str__(), layerid, layername, filename, fileextension,
  199. ftp_path + '/' + filename,
  200. file_path, self.ftpOper.ftp.host + ':' + str(self.ftpOper.ftp.port),
  201. str(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())))
  202. self.ftpOper.ftp.close()
  203. # 保存附件信息
  204. def insertVectorFJ(self, id, layerid, layername, name, fjlx, fjdz, fjwz, dldz, cjsj):
  205. sql = "insert into {} (id,layerid, layername, name, fjlx, fjdz, fjwz,dldz,cjsj) values ('{}','{}','{}', '{}', '{}', '{}', '{}', '{}', '{}')".format(
  206. self.Vector_FJ, id, layerid, layername, name, fjlx, fjdz, fjwz, dldz, cjsj)
  207. self.cur.execute(sql)
  208. self.conn.commit()
  209. # 服务发布表插入
  210. def insertVectorServer(self, id, fwqlx, sjy, fwlx, fwdz, fwmc, fwgzkj, fwys, qpfa, zymlbsm, layergroup, zoommin,
  211. zoommax, layername):
  212. # 数据库管理系统服务相关表插入
  213. sql = "insert into {} (id, fwqlx, sjy, fwlx, fwdz, fwmc, fwgzkj, fwys, qpfa, zymlbsm, layergroup, zoommin, zoommax, layername) values ('{}','{}','{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}')".format(
  214. self.Vector_Server, id, fwqlx, sjy, fwlx, fwdz, fwmc, fwgzkj, fwys, qpfa, zymlbsm, layergroup, zoommin,
  215. zoommax, layername)
  216. self.cur.execute(sql)
  217. self.conn.commit()
  218. # 平台资源目录表插入
  219. def insertPortalZymlServer(self, id, fwqlx, sjy, fwlx, fwdz, fwmc, fwgzkj, fwys, qpfa, zymlbsm, layergroup, zoommin,
  220. zoommax, layername):
  221. # 平台资源目录服务相关表插入
  222. sql = "insert into {} (bsm, name, type, pbsm, url, state, parent, server_type, fwmc, " \
  223. "fwgzkj, fwys, qpfa, layergroup, format, maximumlevel, minimumlevel) values ('{}','{}','{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}')".format(
  224. self.Portal_Zyml, id, fwmc, fwlx, zymlbsm, fwdz, '1', '0', fwqlx, layername, fwgzkj, fwys, qpfa,
  225. layergroup, 'image/png', zoommax, zoommin)
  226. self.cur.execute(sql)
  227. self.conn.commit()
  228. # 获取资源目录
  229. def getZyml(self):
  230. self.cur.execute(
  231. "select t.bsm, t.name, t.pbsm from t_yzt_zyml t where t.parent = '1' order by t.lev,t.pbsm , t.sort")
  232. rows = self.cur.fetchall()
  233. return rows
  234. def getVectorZyml(self):
  235. self.cur.execute(
  236. "select bsm,name from t_vector_zyml t order by t.sort,t.name")
  237. rows = self.cur.fetchall()
  238. return rows
  239. # 获取行政区划
  240. def getXzqh(self):
  241. self.cur.execute(
  242. "select t.id, t.name, case when t.pid = '0' then '' else t.pid end from vector.xzqh t order by t.id")
  243. rows = self.cur.fetchall()
  244. return rows
  245. # 获取矢量数据字典类型
  246. def getVectorYwlx(self):
  247. self.cur.execute(
  248. "select distinct(t.ywlx) from t_vector_field t ")
  249. rows = self.cur.fetchall()
  250. return rows
  251. # 获取部门列表
  252. def getDeptList(self):
  253. self.cur.execute(
  254. "select dept_name from (select distinct(dept_name),dept_id from sys_dept t where t.del_flag = '0' order by t.dept_id) a")
  255. rows = self.cur.fetchall()
  256. return rows
  257. # 根据用户名称查询有权限维护的数据库表,返回别名或者表名
  258. def getManagerTables(self, username):
  259. sql = f'select t.id "id",name "name", case when t.table_alias != \'\' then t.table_alias else t.name end as "alias", t.ywlx "ywlx" ' \
  260. f'from {self.Vector_Storage} t where t.glbm = (select dept_name from sys_dept d where d.dept_id = (select dept_id from sys_user u where u.user_name = \'{username}\' )) and t.sjlx = \'vector\''
  261. self.cur.execute(sql)
  262. rows = self.cur.fetchall()
  263. return rows
  264. # 根据业务类型和表名查询必填的字段名
  265. def getMustRequireField(self, tablename, ywlx):
  266. arr = tablename.split(".")
  267. sql = f'select lower(f.name) from {self.Vector_Field} f where f.ywlx = \'{ywlx}\' and lower(f.name) in (' \
  268. f'select column_name from information_schema.columns where table_schema = \'{arr[0]}\' and table_name = \'{arr[1]}\') and f.nullable = \'1\''
  269. self.cur.execute(sql)
  270. rows = self.cur.fetchall()
  271. fields = []
  272. for row in rows:
  273. fields.append(row[0])
  274. return fields
  275. def getAllTableField(self, tablename):
  276. arr = tablename.split(".")
  277. sql = f'select column_name from information_schema.columns where table_schema = \'{arr[0]}\' and table_name = \'{arr[1]}\' and lower(column_name) not in (\'id\',\'geom\', \'shape_leng\', \'shape_area\')'
  278. self.cur.execute(sql)
  279. rows = self.cur.fetchall()
  280. fields = []
  281. for row in rows:
  282. fields.append(row[0])
  283. return fields
  284. # 保存矢量要素
  285. def insertVectorFeature(self, tablename, fields, values, wkt, wktsrid, geomtype, rksj):
  286. arr = tablename.split(".")
  287. insertvalues = ""
  288. # print(values)
  289. for value in values:
  290. if value == None:
  291. insertvalues += "null,"
  292. else:
  293. b = self.is_string(value)
  294. if b == True:
  295. insertvalues += f'\'{value}\','
  296. else:
  297. insertvalues += f'{value},'
  298. # print(insertvalues)
  299. # print(insertvalues[:-1])
  300. # 判断几何类型是否一致
  301. if wkt.startswith(geomtype) == False:
  302. if geomtype.startswith("MULTI"): # 转换为Multi
  303. wkt = 'MULTI' + wkt
  304. wkt = wkt.replace("((", "(((").replace("))", ")))")
  305. else:
  306. wkt = wkt.replace("(((", "((").replace(")))", "))").replace("MULTI", "")
  307. sql = f'insert into {arr[0]}."{arr[1]}" ({",".join(fields)} , geom) values ({insertvalues[:-1]} , public.st_geomfromtext( \'{wkt}\', {wktsrid}))'
  308. sql = sql.replace("None", "null")
  309. # print(sql)
  310. self.cur.execute(sql)
  311. self.conn.commit()
  312. # 插入到更新详情表
  313. details = "insert into {} (tablename, version, insertsql) values ('{}', '{}', '{}')".format(
  314. self.Vector_Version_Details, tablename, rksj,
  315. base64str(sql))
  316. # print(details)
  317. self.cur.execute(details)
  318. self.conn.commit()
  319. def updateVectorVersion(self, tablename, version):
  320. details = "update {} set rksj_sw = to_timestamp('{}', 'YYYY-MM-DD HH24:MI:SS') where rksj_sw is null".format(
  321. tablename, version)
  322. self.cur.execute(details)
  323. self.conn.commit()
  324. details = "update {} set version = '{}', type = 'update' where tablename = '{}'".format(self.Vector_Version,
  325. version, tablename)
  326. self.cur.execute(details)
  327. self.conn.commit()
  328. # 通过表名称获取版本集合
  329. def getTableVersions(self, tablename):
  330. sql = "select distinct(version) from {} t where t.tablename = '{}' order by version desc".format(
  331. self.Vector_Version_Details, tablename)
  332. print(sql)
  333. self.cur.execute(sql)
  334. rows = self.cur.fetchall()
  335. return rows
  336. # 通过表名称获取当前版本
  337. def getTableCurVersion(self, tablename):
  338. sql = "select version from {} t where t.tablename = '{}'".format(
  339. self.Vector_Version, tablename)
  340. print(sql)
  341. self.cur.execute(sql)
  342. rows = self.cur.fetchall()
  343. return rows[0][0]
  344. # 获取矢量数据表的坐标系
  345. def getVectorTableSrid(self, tablename):
  346. arr = tablename.split(".")
  347. sql = f'select public.st_srid(geom) from {arr[0]}."{arr[1]}" limit 1'
  348. self.cur.execute(sql)
  349. rows = self.cur.fetchall()
  350. for row in rows:
  351. return row[0]
  352. # 获取矢量数据表的空间类型
  353. def getVectorTableGeomType(self, tablename):
  354. arr = tablename.split(".")
  355. sql = f'select replace(upper(public.ST_GeometryType(geom)),\'ST_\',\'\') from {arr[0]}."{arr[1]}" limit 1'
  356. self.cur.execute(sql)
  357. rows = self.cur.fetchall()
  358. for row in rows:
  359. return row[0]
  360. def getInstersectsCount(self, tablename, tablesrid, wkts, wktsrid):
  361. arr = tablename.split(".")
  362. wktsql = ""
  363. for i in range(len(wkts)):
  364. wkt = wkts[i]
  365. if i > 0:
  366. wktsql += ' or '
  367. wktsql += f' public.st_intersects(t.geom,public.st_transform( public.st_geomfromtext( \'{wkt}\', {wktsrid}), {tablesrid} )) '
  368. sql = f'select count(1) from {arr[0]}."{arr[1]}" t where {wktsql}'
  369. self.cur.execute(sql)
  370. rows = self.cur.fetchall()
  371. for row in rows:
  372. return row[0]
  373. # 设置表的别名(基于postgresql comment)
  374. def setTableAlias(self, table, alias):
  375. sql = f'COMMENT ON TABLE {table} IS \'{alias}\''
  376. self.cur.execute(sql)
  377. self.conn.commit()
  378. # 删除备份表并创建备份表
  379. def restoreBackTable(self, tablename):
  380. arr = tablename.split(".")
  381. # 删除当前表
  382. deletesql = f'drop table if exists {arr[0]}."{arr[1]}"'
  383. self.cur.execute(deletesql)
  384. self.conn.commit()
  385. # 删除当前表序列
  386. deletesequencesql = f'drop sequence if exists {tablename}{self.Sequence}'
  387. self.cur.execute(deletesequencesql)
  388. self.conn.commit()
  389. # 恢复back表
  390. createsql = f'create table {arr[0]}."{arr[1]}" as table {arr[0]}."{arr[1]}{self.BackTable}"'
  391. self.cur.execute(createsql)
  392. self.conn.commit()
  393. # 查询数据表中id的最大值
  394. sql = f'select (max(id) + 1) from {arr[0]}."{arr[1]}"'
  395. self.cur.execute(sql)
  396. rows = self.cur.fetchall()
  397. max = 1
  398. for row in rows:
  399. max = row[0]
  400. # 创建序列
  401. addsequencesql = f'create sequence {tablename}{self.Sequence} start with {max} increment by 1 no minvalue no maxvalue cache 1'
  402. self.cur.execute(addsequencesql)
  403. self.conn.commit()
  404. # 设置id字段不能为空
  405. notnullsql = f'alter table {arr[0]}."{arr[1]}" alter column id set not null'
  406. self.cur.execute(notnullsql)
  407. self.conn.commit()
  408. # 使用序列
  409. usesequencesql = f'alter table {arr[0]}."{arr[1]}" alter column id set default nextval(\'{tablename}{self.Sequence}\')'
  410. self.cur.execute(usesequencesql)
  411. self.conn.commit()
  412. # 删除备份表并创建备份表
  413. def resetBackTable(self, tablename):
  414. arr = tablename.split(".")
  415. deletesql = f'drop table if exists {arr[0]}."{arr[1]}{self.BackTable}"'
  416. self.cur.execute(deletesql)
  417. self.conn.commit()
  418. createsql = f'create table {arr[0]}."{arr[1]}{self.BackTable}" as table {arr[0]}."{arr[1]}"'
  419. self.cur.execute(createsql)
  420. self.conn.commit()
  421. def getTableName(self, recordid):
  422. sp = recordid.split(".")
  423. res = f'"{sp[0]}"."{sp[1].lower()}"'
  424. return res
  425. # 查询数据库当前模式下有没有同名的表
  426. def checkTableName(self, schemaname, tablename):
  427. sql = f'select tablename from pg_tables t where t.schemaname = \'{schemaname}\' and t.tablename = \'{tablename}\''
  428. self.cur.execute(sql)
  429. rows = self.cur.fetchall()
  430. return len(rows) == 0
  431. # 判断数据是否为字符串
  432. def is_string(self, var):
  433. return isinstance(var, str)