| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- # -*- coding: utf-8 -*-
- __author__ = 'wanger'
- __date__ = '2024-08-20'
- __copyright__ = '(C) 2024 by siwei'
- __revision__ = '1.0'
- import time
- from typing import Optional
- import os
- import psycopg2
- import uuid
- class PostgreSQL:
- # 矢量数据元数据表
- Vector_Storage = "t_vector_storage"
- def __init__(
- self,
- host: Optional[str] = "192.168.60.2", # default host during installation
- port: Optional[str] = "5432", # default port during pg installation
- user: Optional[str] = "postgres", # default user during pg installation
- password: Optional[str] = "postgis", # default password during pg installation
- dbname: Optional[str] = "real3d", # default dbname during pg installation
- schema: Optional[str] = None
- ):
- # 配置数据库连接参数并指定schema
- self.connparams = {
- "dbname": dbname,
- "user": user,
- "password": password,
- "host": host,
- "port": port,
- "options": "-c search_path=otherSchema," + schema if schema is not None else None
- }
- self.conn = psycopg2.connect(**self.connparams)
- # 创建一个游标对象
- self.cur = self.conn.cursor()
- # 执行一个查询
- # self.cur.execute("SELECT 省,类型 from \"XZQH3857\";")
- # 获取查询结果
- # rows = self.cur.fetchall()
- # 打印结果
- # for row in rows:
- # for v in row:
- # print(v)
- # def execute(self, sql):
- # # 执行一个查询
- # self.cur.execute(sql)
- # # 获取查询结果
- # return self.cur.fetchall()
- def execute(self, sql, params=None):
- try:
- if params:
- self.cur.execute(sql, params)
- else:
- self.cur.execute(sql)
- # 如果是 SELECT,才 fetch 结果
- if sql.strip().lower().startswith("select"):
- return self.cur.fetchall()
- else:
- self.conn.commit()
- return None
- except Exception as e:
- print(f"SQL执行出错:{e}")
- self.conn.rollback()
- raise
- def close(self):
- # 关闭游标和连接
- self.cur.close()
- self.conn.close()
- def fetchone(self):
- return self.cur.fetchone()
- def getManagerTables(self, username='admin'):
- sql = f'select t.id "id",name "name", case when t.table_alias != \'\' then t.table_alias else t.name end as "alias", t.ywlx "ywlx" ' \
- f'from {self.Vector_Storage} t where t.glbm = (select dept_name from sys_dept d where d.dept_id = (select dept_id from sys_user u where u.user_name = \'{username}\' )) and t.sjlx = \'vector\''
- self.cur.execute(sql)
- rows = self.cur.fetchall()
- return rows
- def dropTable(self, tablename):
- try:
- drop_sql = sql.SQL("DROP TABLE IF EXISTS {}").format(sql.Identifier(tablename))
- self.cur.execute(drop_sql)
- except Exception as e:
- print(f"删除表 {tablename} 失败: {e}")
- def getZyml(self):
- """获取资源目录数据,按sort字段排序"""
- self.cur.execute("""
- SELECT *
- FROM (
- SELECT
- t.bsm::text,
- t.name::text,
- t.pbsm::text,
- ''::text AS type,
- CASE
- WHEN t.sort IS NULL OR t.sort = '' THEN 9999
- ELSE CAST(t.sort AS INTEGER)
- END AS sort
- FROM t_vector_zyml t
- ORDER BY t.pbsm,
- CASE
- WHEN t.sort IS NULL OR t.sort = '' THEN 9999
- ELSE CAST(t.sort AS INTEGER)
- END
- ) AS a
- UNION ALL
- SELECT
- s.name::text AS bsm,
- CASE
- WHEN s.table_alias IS NULL OR s.table_alias = '' THEN s.name
- ELSE s.table_alias
- END::text AS name,
- s.xmlx::text AS pbsm,
- s.sjlx::text AS type,
- row_number() OVER (ORDER BY cnt DESC)::integer AS sort
- FROM (
- SELECT s.*, count(*) OVER (PARTITION BY s.xmlx) AS cnt
- FROM t_vector_storage s
- JOIN t_vector_zyml z ON s.xmlx = z.bsm OR s.xmlx = z.pbsm
- ) s
- ORDER BY pbsm, sort
- """)
- columns = [desc[0] for desc in self.cur.description]
- return [dict(zip(columns, row)) for row in self.cur.fetchall()]
- def addZyml(self, uid, pid, name):
- """添加新的资源目录节点"""
- # 获取同级节点的最大sort值
- self.cur.execute("SELECT COALESCE(MAX(CAST(sort AS INTEGER)), -1) + 1 FROM t_vector_zyml WHERE pbsm = %s",
- (pid,))
- sort_val = self.cur.fetchone()[0]
- self.cur.execute(
- "INSERT INTO t_vector_zyml (bsm, name, pbsm, sort) VALUES (%s, %s, %s, %s)",
- (uid, name, pid, sort_val)
- )
- self.conn.commit()
- print(f"添加新节点: {name} (bsm: {uid}, 父节点: {pid}, 排序: {sort_val})")
- def updateZymlOrder(self, bsm, pbsm, sort_index):
- """更新资源目录节点的父节点和排序"""
- self.cur.execute(
- "UPDATE t_vector_zyml SET pbsm = %s, sort = %s WHERE bsm = %s",
- (pbsm, str(sort_index), bsm) # 将整数转换为字符串存储
- )
- def deleteZyml(self, bsm):
- """删除资源目录节点"""
- # 先删除所有子节点
- self.cur.execute("DELETE FROM t_vector_zyml WHERE pbsm = %s", (bsm,))
- # 再删除自身
- self.cur.execute("DELETE FROM t_vector_zyml WHERE bsm = %s", (bsm,))
- def renameZyml(self, id, name):
- self.cur.execute("update t_vector_zyml set name = '{}' where bsm = '{}'".format(name, id))
- self.conn.commit()
- def getResourceAttr(self, tablename):
- self.cur.execute(
- "select t.*, case when t.sjlx = 'vector' then '矢量数据' else '栅格数据' end from t_vector_storage t where name = '{}'".format(
- tablename))
- rows = self.cur.fetchall()
- return rows[0]
- # 获取行政区划
- def getXzqh(self):
- self.cur.execute(
- "select t.id, t.name, case when t.pid = '0' then '' else t.pid end from vector.xzqh t order by t.id")
- rows = self.cur.fetchall()
- return rows
- # 判断数据是否为字符串
- def is_string(self, var):
- return isinstance(var, str)
|