# -*- coding: utf-8 -*- __author__ = 'wanger' __date__ = '2024-08-20' __copyright__ = '(C) 2024 by siwei' __revision__ = '1.0' import time from typing import Optional import os import psycopg2 import uuid from psycopg2 import sql class PostgreSQL: # 矢量数据元数据表 Vector_Storage = "t_vector_storage" def __init__( self, host: Optional[str] = "192.168.60.2", # default host during installation port: Optional[str] = "5432", # default port during pg installation user: Optional[str] = "postgres", # default user during pg installation password: Optional[str] = "postgis", # default password during pg installation dbname: Optional[str] = "datamanager", # default dbname during pg installation schema: Optional[str] = None ): # 配置数据库连接参数并指定schema self.connparams = { "dbname": dbname, "user": user, "password": password, "host": host, "port": port, "options": "-c search_path=otherSchema," + schema if schema is not None else None } self.conn = psycopg2.connect(**self.connparams) # 创建一个游标对象 self.cur = self.conn.cursor() # 执行一个查询 # self.cur.execute("SELECT 省,类型 from \"XZQH3857\";") # 获取查询结果 # rows = self.cur.fetchall() # 打印结果 # for row in rows: # for v in row: # print(v) # def execute(self, sql): # # 执行一个查询 # self.cur.execute(sql) # # 获取查询结果 # return self.cur.fetchall() def execute(self, sql, params=None): try: if params: self.cur.execute(sql, params) else: self.cur.execute(sql) # 如果是 SELECT,才 fetch 结果 if sql.strip().lower().startswith("select"): return self.cur.fetchall() else: self.conn.commit() return None except Exception as e: print(f"SQL执行出错:{e}") self.conn.rollback() raise def close(self): # 关闭游标和连接 self.cur.close() self.conn.close() def fetchone(self): return self.cur.fetchone() def getManagerTables(self, username='admin'): sql = f'select t.id "id",name "name", case when t.table_alias != \'\' then t.table_alias else t.name end as "alias", t.ywlx "ywlx" ' \ f'from {self.Vector_Storage} t where t.glbm = (select dept_name from sys_dept d where d.dept_id = (select dept_id from sys_user u where u.user_name = \'{username}\' )) and t.sjlx = \'vector\'' self.cur.execute(sql) rows = self.cur.fetchall() return rows def deleteVectorStorage(self, sjywz): try: self.cur.execute("DELETE FROM t_vector_storage WHERE sjywz = %s", (sjywz,)) self.conn.commit() except Exception as e: self.conn.rollback() print(f"删除元数据记录失败: {e}") def dropTable(self, tablename): try: drop_sql = sql.SQL("DROP TABLE IF EXISTS {}").format(sql.Identifier(tablename)) self.cur.execute(drop_sql) self.conn.commit() # 必须提交,否则删除无效 except Exception as e: self.conn.rollback() print(f"删除表 {tablename} 失败: {e}") def getZyml(self): """获取资源目录数据,按sort字段排序""" self.cur.execute(""" SELECT * FROM ( SELECT t.bsm::text, t.name::text, t.pbsm::text, ''::text AS type, CASE WHEN t.sort IS NULL OR t.sort = '' THEN 9999 ELSE CAST(t.sort AS INTEGER) END AS sort FROM t_vector_zyml t ORDER BY t.pbsm, CASE WHEN t.sort IS NULL OR t.sort = '' THEN 9999 ELSE CAST(t.sort AS INTEGER) END ) AS a UNION ALL SELECT s.name::text AS bsm, CASE WHEN s.table_alias IS NULL OR s.table_alias = '' THEN s.name ELSE s.table_alias END::text AS name, s.xmlx::text AS pbsm, s.sjlx::text AS type, row_number() OVER (ORDER BY cnt DESC)::integer AS sort FROM ( SELECT s.*, count(*) OVER (PARTITION BY s.xmlx) AS cnt FROM t_vector_storage s JOIN t_vector_zyml z ON s.xmlx = z.bsm OR s.xmlx = z.pbsm ) s ORDER BY pbsm, sort """) columns = [desc[0] for desc in self.cur.description] return [dict(zip(columns, row)) for row in self.cur.fetchall()] def addZyml(self, uid, pid, name): """添加新的资源目录节点""" # 获取同级节点的最大sort值 self.cur.execute("SELECT COALESCE(MAX(CAST(sort AS INTEGER)), -1) + 1 FROM t_vector_zyml WHERE pbsm = %s", (pid,)) sort_val = self.cur.fetchone()[0] self.cur.execute( "INSERT INTO t_vector_zyml (bsm, name, pbsm, sort) VALUES (%s, %s, %s, %s)", (uid, name, pid, sort_val) ) self.conn.commit() print(f"添加新节点: {name} (bsm: {uid}, 父节点: {pid}, 排序: {sort_val})") def updateZymlOrder(self, bsm, pbsm, sort_index): """更新资源目录节点的父节点和排序""" self.cur.execute( "UPDATE t_vector_zyml SET pbsm = %s, sort = %s WHERE bsm = %s", (pbsm, str(sort_index), bsm) # 将整数转换为字符串存储 ) def deleteZyml(self, bsm): """删除资源目录节点""" try: self.cur.execute("DELETE FROM t_vector_zyml WHERE pbsm = %s", (bsm,)) self.cur.execute("DELETE FROM t_vector_zyml WHERE bsm = %s", (bsm,)) self.conn.commit() # 添加这一行:提交事务 except Exception as e: self.conn.rollback() raise e def renameZyml(self, id, name): self.cur.execute("update t_vector_zyml set name = '{}' where bsm = '{}'".format(name, id)) self.conn.commit() def getResourceAttr(self, tablename): self.cur.execute( "select t.*, case when t.sjlx = 'vector' then '矢量数据' else '栅格数据' end from t_vector_storage t where name = '{}'".format( tablename)) rows = self.cur.fetchall() return rows[0] # 获取行政区划 def getXzqh(self): self.cur.execute( "select t.id, t.name, case when t.pid = '0' then '' else t.pid end from vector.xzqh t order by t.id") rows = self.cur.fetchall() return rows # 判断数据是否为字符串 def is_string(self, var): return isinstance(var, str)