PostgreSQL.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. # -*- coding: utf-8 -*-
  2. __author__ = 'wanger'
  3. __date__ = '2024-08-20'
  4. __copyright__ = '(C) 2024 by siwei'
  5. __revision__ = '1.0'
  6. import time
  7. from typing import Optional
  8. import os
  9. import psycopg2
  10. import uuid
  11. from psycopg2 import sql
  12. class PostgreSQL:
  13. # 矢量数据元数据表
  14. Vector_Storage = "t_vector_storage"
  15. def __init__(
  16. self,
  17. host: Optional[str] = "192.168.60.2", # default host during installation
  18. port: Optional[str] = "5432", # default port during pg installation
  19. user: Optional[str] = "postgres", # default user during pg installation
  20. password: Optional[str] = "postgis", # default password during pg installation
  21. dbname: Optional[str] = "datamanager", # default dbname during pg installation
  22. schema: Optional[str] = None
  23. ):
  24. # 配置数据库连接参数并指定schema
  25. self.connparams = {
  26. "dbname": dbname,
  27. "user": user,
  28. "password": password,
  29. "host": host,
  30. "port": port,
  31. "options": "-c search_path=otherSchema," + schema if schema is not None else None
  32. }
  33. self.conn = psycopg2.connect(**self.connparams)
  34. # 创建一个游标对象
  35. self.cur = self.conn.cursor()
  36. # 执行一个查询
  37. # self.cur.execute("SELECT 省,类型 from \"XZQH3857\";")
  38. # 获取查询结果
  39. # rows = self.cur.fetchall()
  40. # 打印结果
  41. # for row in rows:
  42. # for v in row:
  43. # print(v)
  44. # def execute(self, sql):
  45. # # 执行一个查询
  46. # self.cur.execute(sql)
  47. # # 获取查询结果
  48. # return self.cur.fetchall()
  49. def execute(self, sql, params=None):
  50. try:
  51. if params:
  52. self.cur.execute(sql, params)
  53. else:
  54. self.cur.execute(sql)
  55. # 如果是 SELECT,才 fetch 结果
  56. if sql.strip().lower().startswith("select"):
  57. return self.cur.fetchall()
  58. else:
  59. self.conn.commit()
  60. return None
  61. except Exception as e:
  62. print(f"SQL执行出错:{e}")
  63. self.conn.rollback()
  64. raise
  65. def close(self):
  66. # 关闭游标和连接
  67. self.cur.close()
  68. self.conn.close()
  69. def fetchone(self):
  70. return self.cur.fetchone()
  71. def getManagerTables(self, username='admin'):
  72. sql = f'select t.id "id",name "name", case when t.table_alias != \'\' then t.table_alias else t.name end as "alias", t.ywlx "ywlx" ' \
  73. f'from {self.Vector_Storage} t where t.glbm = (select dept_name from sys_dept d where d.dept_id = (select dept_id from sys_user u where u.user_name = \'{username}\' )) and t.sjlx = \'vector\''
  74. self.cur.execute(sql)
  75. rows = self.cur.fetchall()
  76. return rows
  77. def deleteVectorStorage(self, sjywz):
  78. try:
  79. self.cur.execute("DELETE FROM t_vector_storage WHERE sjywz = %s", (sjywz,))
  80. self.conn.commit()
  81. except Exception as e:
  82. self.conn.rollback()
  83. print(f"删除元数据记录失败: {e}")
  84. def dropTable(self, tablename):
  85. try:
  86. drop_sql = sql.SQL("DROP TABLE IF EXISTS {}").format(sql.Identifier(tablename))
  87. self.cur.execute(drop_sql)
  88. self.conn.commit() # 必须提交,否则删除无效
  89. except Exception as e:
  90. self.conn.rollback()
  91. print(f"删除表 {tablename} 失败: {e}")
  92. def getZyml(self):
  93. """获取资源目录数据,合并目录和数据项,提供 layertype 字段"""
  94. self.cur.execute("""
  95. SELECT *
  96. FROM (SELECT t.bsm::text AS bsm, t.name::text AS name, t.pbsm::text AS pbsm, ''::text AS layertype, CASE
  97. WHEN t.sort IS NULL OR t.sort = ''
  98. THEN 9999
  99. ELSE CAST(t.sort AS INTEGER)
  100. END AS sort
  101. FROM t_vector_zyml t
  102. UNION ALL
  103. SELECT s.name::text AS bsm, CASE
  104. WHEN s.table_alias IS NULL OR s.table_alias = ''
  105. THEN s.name
  106. ELSE s.table_alias
  107. END::text AS name, s.xmlx::text AS pbsm, s.sjlx::text AS layertype, row_number() OVER (PARTITION BY s.xmlx ORDER BY s.rksj DESC)::integer AS sort
  108. FROM (SELECT s.*
  109. FROM t_vector_storage s
  110. JOIN t_vector_zyml z ON s.xmlx = z.bsm OR s.xmlx = z.pbsm) AS s) AS all_data
  111. ORDER BY pbsm, sort
  112. """)
  113. columns = [desc[0] for desc in self.cur.description]
  114. return [dict(zip(columns, row)) for row in self.cur.fetchall()]
  115. def addZyml(self, uid, pid, name):
  116. """添加新的资源目录节点"""
  117. # 获取同级节点的最大sort值
  118. self.cur.execute("SELECT COALESCE(MAX(CAST(sort AS INTEGER)), -1) + 1 FROM t_vector_zyml WHERE pbsm = %s",
  119. (pid,))
  120. sort_val = self.cur.fetchone()[0]
  121. self.cur.execute(
  122. "INSERT INTO t_vector_zyml (bsm, name, pbsm, sort) VALUES (%s, %s, %s, %s)",
  123. (uid, name, pid, sort_val)
  124. )
  125. self.conn.commit()
  126. print(f"添加新节点: {name} (bsm: {uid}, 父节点: {pid}, 排序: {sort_val})")
  127. def updateZymlOrder(self, bsm, pbsm, sort_index):
  128. """更新资源目录节点的父节点和排序"""
  129. self.cur.execute(
  130. "UPDATE t_vector_zyml SET pbsm = %s, sort = %s WHERE bsm = %s",
  131. (pbsm, str(sort_index), bsm) # 将整数转换为字符串存储
  132. )
  133. def deleteZyml(self, bsm):
  134. """删除资源目录节点"""
  135. try:
  136. self.cur.execute("DELETE FROM t_vector_zyml WHERE pbsm = %s", (bsm,))
  137. self.cur.execute("DELETE FROM t_vector_zyml WHERE bsm = %s", (bsm,))
  138. self.conn.commit() # 添加这一行:提交事务
  139. except Exception as e:
  140. self.conn.rollback()
  141. raise e
  142. def renameZyml(self, id, name):
  143. self.cur.execute("update t_vector_zyml set name = '{}' where bsm = '{}'".format(name, id))
  144. self.conn.commit()
  145. def renameStorage(self, id, name):
  146. self.cur.execute("update t_vector_storage set table_alias = '{}' where name = '{}'".format(name, id))
  147. self.conn.commit()
  148. def getResourceAttr(self, tablename):
  149. self.cur.execute(
  150. "select t.*, case when t.sjlx = 'vector' then '矢量数据' else '栅格数据' end from t_vector_storage t where name = '{}'".format(
  151. tablename))
  152. rows = self.cur.fetchall()
  153. return rows[0]
  154. # 获取行政区划
  155. def getXzqh(self):
  156. self.cur.execute(
  157. "select t.id, t.name, case when t.pid = '0' then '' else t.pid end from vector.xzqh t order by t.id")
  158. rows = self.cur.fetchall()
  159. return rows
  160. def getVectorZyml(self):
  161. self.cur.execute(
  162. "select bsm,name from t_vector_zyml t order by t.sort,t.name")
  163. rows = self.cur.fetchall()
  164. return rows
  165. # 判断数据是否为字符串
  166. def is_string(self, var):
  167. return isinstance(var, str)