PostgreSQL.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. # -*- coding: utf-8 -*-
  2. __author__ = 'wanger'
  3. __date__ = '2024-08-20'
  4. __copyright__ = '(C) 2024 by siwei'
  5. __revision__ = '1.0'
  6. import time
  7. from typing import Optional
  8. import os
  9. import psycopg2
  10. import uuid
  11. from psycopg2 import sql
  12. class PostgreSQL:
  13. # 矢量数据元数据表
  14. Vector_Storage = "t_vector_storage"
  15. def __init__(
  16. self,
  17. host: Optional[str] = "192.168.60.2", # default host during installation
  18. port: Optional[str] = "5432", # default port during pg installation
  19. user: Optional[str] = "postgres", # default user during pg installation
  20. password: Optional[str] = "postgis", # default password during pg installation
  21. dbname: Optional[str] = "datamanager", # default dbname during pg installation
  22. schema: Optional[str] = None
  23. ):
  24. # 配置数据库连接参数并指定schema
  25. self.connparams = {
  26. "dbname": dbname,
  27. "user": user,
  28. "password": password,
  29. "host": host,
  30. "port": port,
  31. "options": "-c search_path=otherSchema," + schema if schema is not None else None
  32. }
  33. self.conn = psycopg2.connect(**self.connparams)
  34. # 创建一个游标对象
  35. self.cur = self.conn.cursor()
  36. # 执行一个查询
  37. # self.cur.execute("SELECT 省,类型 from \"XZQH3857\";")
  38. # 获取查询结果
  39. # rows = self.cur.fetchall()
  40. # 打印结果
  41. # for row in rows:
  42. # for v in row:
  43. # print(v)
  44. # def execute(self, sql):
  45. # # 执行一个查询
  46. # self.cur.execute(sql)
  47. # # 获取查询结果
  48. # return self.cur.fetchall()
  49. def execute(self, sql, params=None):
  50. try:
  51. if params:
  52. self.cur.execute(sql, params)
  53. else:
  54. self.cur.execute(sql)
  55. # 如果是 SELECT,才 fetch 结果
  56. if sql.strip().lower().startswith("select"):
  57. return self.cur.fetchall()
  58. else:
  59. self.conn.commit()
  60. return None
  61. except Exception as e:
  62. print(f"SQL执行出错:{e}")
  63. self.conn.rollback()
  64. raise
  65. def close(self):
  66. # 关闭游标和连接
  67. self.cur.close()
  68. self.conn.close()
  69. def fetchone(self):
  70. return self.cur.fetchone()
  71. def getManagerTables(self, username='admin'):
  72. sql = f'select t.id "id",name "name", case when t.table_alias != \'\' then t.table_alias else t.name end as "alias", t.ywlx "ywlx" ' \
  73. f'from {self.Vector_Storage} t where t.glbm = (select dept_name from sys_dept d where d.dept_id = (select dept_id from sys_user u where u.user_name = \'{username}\' )) and t.sjlx = \'vector\''
  74. self.cur.execute(sql)
  75. rows = self.cur.fetchall()
  76. return rows
  77. def deleteVectorStorage(self, sjywz):
  78. try:
  79. self.cur.execute("DELETE FROM t_vector_storage WHERE sjywz = %s", (sjywz,))
  80. self.conn.commit()
  81. except Exception as e:
  82. self.conn.rollback()
  83. print(f"删除元数据记录失败: {e}")
  84. def dropTable(self, tablename):
  85. try:
  86. drop_sql = sql.SQL("DROP TABLE IF EXISTS {}").format(sql.Identifier(tablename))
  87. self.cur.execute(drop_sql)
  88. self.conn.commit() # 必须提交,否则删除无效
  89. except Exception as e:
  90. self.conn.rollback()
  91. print(f"删除表 {tablename} 失败: {e}")
  92. def getZyml(self):
  93. """获取资源目录数据,按sort字段排序"""
  94. self.cur.execute("""
  95. SELECT *
  96. FROM (
  97. SELECT
  98. t.bsm::text,
  99. t.name::text,
  100. t.pbsm::text,
  101. ''::text AS type,
  102. CASE
  103. WHEN t.sort IS NULL OR t.sort = '' THEN 9999
  104. ELSE CAST(t.sort AS INTEGER)
  105. END AS sort
  106. FROM t_vector_zyml t
  107. ORDER BY t.pbsm,
  108. CASE
  109. WHEN t.sort IS NULL OR t.sort = '' THEN 9999
  110. ELSE CAST(t.sort AS INTEGER)
  111. END
  112. ) AS a
  113. UNION ALL
  114. SELECT
  115. s.name::text AS bsm,
  116. CASE
  117. WHEN s.table_alias IS NULL OR s.table_alias = '' THEN s.name
  118. ELSE s.table_alias
  119. END::text AS name,
  120. s.xmlx::text AS pbsm,
  121. s.sjlx::text AS type,
  122. row_number() OVER (ORDER BY cnt DESC)::integer AS sort
  123. FROM (
  124. SELECT s.*, count(*) OVER (PARTITION BY s.xmlx) AS cnt
  125. FROM t_vector_storage s
  126. JOIN t_vector_zyml z ON s.xmlx = z.bsm OR s.xmlx = z.pbsm
  127. ) s
  128. ORDER BY pbsm, sort
  129. """)
  130. columns = [desc[0] for desc in self.cur.description]
  131. return [dict(zip(columns, row)) for row in self.cur.fetchall()]
  132. def addZyml(self, uid, pid, name):
  133. """添加新的资源目录节点"""
  134. # 获取同级节点的最大sort值
  135. self.cur.execute("SELECT COALESCE(MAX(CAST(sort AS INTEGER)), -1) + 1 FROM t_vector_zyml WHERE pbsm = %s",
  136. (pid,))
  137. sort_val = self.cur.fetchone()[0]
  138. self.cur.execute(
  139. "INSERT INTO t_vector_zyml (bsm, name, pbsm, sort) VALUES (%s, %s, %s, %s)",
  140. (uid, name, pid, sort_val)
  141. )
  142. self.conn.commit()
  143. print(f"添加新节点: {name} (bsm: {uid}, 父节点: {pid}, 排序: {sort_val})")
  144. def updateZymlOrder(self, bsm, pbsm, sort_index):
  145. """更新资源目录节点的父节点和排序"""
  146. self.cur.execute(
  147. "UPDATE t_vector_zyml SET pbsm = %s, sort = %s WHERE bsm = %s",
  148. (pbsm, str(sort_index), bsm) # 将整数转换为字符串存储
  149. )
  150. def deleteZyml(self, bsm):
  151. """删除资源目录节点"""
  152. try:
  153. self.cur.execute("DELETE FROM t_vector_zyml WHERE pbsm = %s", (bsm,))
  154. self.cur.execute("DELETE FROM t_vector_zyml WHERE bsm = %s", (bsm,))
  155. self.conn.commit() # 添加这一行:提交事务
  156. except Exception as e:
  157. self.conn.rollback()
  158. raise e
  159. def renameZyml(self, id, name):
  160. self.cur.execute("update t_vector_zyml set name = '{}' where bsm = '{}'".format(name, id))
  161. self.conn.commit()
  162. def getResourceAttr(self, tablename):
  163. self.cur.execute(
  164. "select t.*, case when t.sjlx = 'vector' then '矢量数据' else '栅格数据' end from t_vector_storage t where name = '{}'".format(
  165. tablename))
  166. rows = self.cur.fetchall()
  167. return rows[0]
  168. # 获取行政区划
  169. def getXzqh(self):
  170. self.cur.execute(
  171. "select t.id, t.name, case when t.pid = '0' then '' else t.pid end from vector.xzqh t order by t.id")
  172. rows = self.cur.fetchall()
  173. return rows
  174. # 判断数据是否为字符串
  175. def is_string(self, var):
  176. return isinstance(var, str)