# -*- coding: utf-8 -*- __author__ = 'wanger' __date__ = '2024-08-20' __copyright__ = '(C) 2024 by siwei' __revision__ = '1.0' import time from typing import Optional import os import psycopg2 import uuid import siwei_config from ..FTP.FtpUitl import FtpOper from ..FTP.FtpConfig import * from ..FileUtils import getInputFileName from ..StringUtils import (getNow, base64str, enbase64str) from datetime import datetime class PostgreSQL: # 矢量数据元数据表 Vector_Storage = "t_vector_storage" # 矢量数据服务发布表 Vector_Server = "t_vector_server" # 附件表 Vector_FJ = "t_vector_fj" # 字段表 Vector_Field = "t_vector_field" # 平台资源目录表 Portal_Zyml = "t_yzt_zyml" # 备份表后缀名 BackTable = "_back" # 序列名称后缀 Sequence = "_id_seq" # 版本控制表 Vector_Version = "t_vector_version" # 版本控制详情表 Vector_Version_Details = "t_vector_version_details" # 全局参数修改 db_config = siwei_config.CONFIG['db'] def __init__( self, host: Optional[str] = db_config['host'], # default host during installation port: Optional[str] = db_config['port'], # default port during pg installation user: Optional[str] = db_config['user'], # default user during pg installation password: Optional[str] = db_config['password'], # default password during pg installation dbname: Optional[str] = db_config['name'], # default dbname during pg installation schema: Optional[str] = db_config['schema'] ): # 配置数据库连接参数并指定schema self.connparams = { "dbname": dbname, "user": user, "password": password, "host": host, "port": port, "options": "-c search_path=otherSchema," + schema if schema is not None else None } self.conn = psycopg2.connect(**self.connparams) # 创建一个游标对象 self.cur = self.conn.cursor() # 执行一个查询 # self.cur.execute("SELECT 省,类型 from \"XZQH3857\";") # 获取查询结果 # rows = self.cur.fetchall() # 打印结果 # for row in rows: # for v in row: # print(v) def execute(self, sql): # 执行一个查询 self.cur.execute(sql) # 获取查询结果 return self.cur.fetchall() def close(self): # 关闭游标和连接 self.cur.close() self.conn.close() # 根据名称删除元数据信息 def deleteVectorStorage(self, name): sql = "delete from {} where name = '{}'".format(self.Vector_Storage, name) self.cur.execute(sql) self.conn.commit() # 保存元数据信息 def insertVectorStorage(self, id, name, year, xmlx, sjly, xzqh, xzqh_field, ywlx, sjlx, sjywz, glbm, table_alias): sql = "insert into {} (id, name, year, xmlx, sjly, xzqh, xzqh_field, ywlx, sjlx, sjywz, glbm, table_alias) values ('{}','{}', '{}', (select bsm from t_vector_zyml where name = '{}'), '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}')".format( self.Vector_Storage, id, name, year.toString("yyyy"), xmlx, sjly, xzqh, xzqh_field, ywlx, sjlx, sjywz, glbm, table_alias) self.cur.execute(sql) self.conn.commit() # 元数据入库,参数parameters def metadataStorage(self, parameters, ssxzqh, fileliststr, ywlx, glbm, ogrLayer, layername, zyml): record_id = "" id = uuid.uuid4().__str__() id = id.replace("-", "") print(ogrLayer) print(layername) print(parameters) if parameters.get("TABLE") is None or parameters.get("TABLE") == "": if layername is not None and layername != "": record_id = parameters.get("SCHEMA") + "." + layername.lower() else: record_id = parameters.get("SCHEMA") + "." + getInputFileName(ogrLayer).lower() else: record_id = parameters.get("SCHEMA") + "." + parameters.get("TABLE") # if (parameters.get("TABLE") is None or parameters.get("TABLE") == "") and ogrLayer is not None: # record_id = parameters.get("SCHEMA") + "." + getInputFileName(ogrLayer).lower() # elif ogrLayer is not None: # record_id = parameters.get("SCHEMA") + "." + ogrLayer.lower() # else: # record_id = parameters.get("SCHEMA") + "." + parameters.get("TABLE") table_alias = parameters.get("TABLE_ALIAS") if table_alias is not None and table_alias != '': # 配置表别名 self.setTableAlias(table=self.getTableName(recordid=record_id), alias=table_alias) else: table_alias = '' self.deleteVectorStorage(name=record_id) self.deleteVectorVersion(tablename=record_id) self.insertVectorStorage(id=id, name=record_id, year=parameters.get("VECTOR_YEAR"), xmlx=zyml, sjly=parameters.get("VECTOR_SJLY"), xzqh=ssxzqh, xzqh_field=parameters.get("XZQH_FIELD"), ywlx=ywlx, sjlx=parameters.get("SOURCE_TYPE"), sjywz=ogrLayer, glbm=glbm, table_alias=table_alias) self.insertVectorVersion(tablename=record_id) # 附件上传 if fileliststr is not None and fileliststr != "": self.uploadAttachment(layername=record_id, layerid=id, fileliststr=fileliststr) # 删除版本控制表 def deleteVectorVersion(self, tablename): sql = "delete from {} where tablename = '{}'".format(self.Vector_Version, tablename) self.cur.execute(sql) self.conn.commit() # 删除版本数据 def deleteVectorRecords(self, tablename, rksj_sw): sql = "delete from {} t where t.rksj_sw > '{}'".format(tablename, rksj_sw) self.cur.execute(sql) self.conn.commit() sql = "update {} set version = '{}' where tablename = '{}'".format(self.Vector_Version, rksj_sw, tablename) self.cur.execute(sql) self.conn.commit() # 插入版本数据 def insertVectorRecords(self, tablename, curversion, targetversion): sql = "select insertsql from {} t where t.tablename = '{}' and t.version > '{}' and t.version <= '{}'".format( self.Vector_Version_Details, tablename, curversion, targetversion) self.cur.execute(sql) rows = self.cur.fetchall() for row in rows: sql = enbase64str(row[0]) self.cur.execute(sql) sql = "update {} set version = '{}' where tablename = '{}'".format(self.Vector_Version, targetversion, tablename) self.cur.execute(sql) self.conn.commit() # 创建版本控制表 def insertVectorVersion(self, tablename): formatted_time = getNow() # 输出结果 print("当前时间:{}".format(formatted_time)) sql = "insert into {} (tablename, version, type) values ('{}', '{}', '{}')".format(self.Vector_Version, tablename, formatted_time, "base") self.cur.execute(sql) self.conn.commit() sql = "alter table {} add column rksj_sw timestamp".format(tablename) self.cur.execute(sql) self.conn.commit() sql = "update {} SET rksj_sw = to_timestamp('{}', 'YYYY-MM-DD HH24:MI:SS')".format(tablename, formatted_time) self.cur.execute(sql) self.conn.commit() details = "insert into {} (tablename, version, insertsql) values ('{}', '{}', '{}')".format( self.Vector_Version_Details, tablename, formatted_time, "") print(details) self.cur.execute(details) self.conn.commit() # 获取文件名称 def getInputFileName(self, file): filename = file.split("/")[len(file.split("/")) - 1] return filename[0: filename.find(".")] # 附件上传FTP def uploadAttachment(self, layername, layerid, fileliststr): print("开始上传附件啦!!") self.ftpOper = FtpOper() self.ftpOper.ftp.connect(ftpHost, ftpPort) self.ftpOper.ftp.login(ftpUsername, ftpPassword) ftp_path = '/三亚数管/' + layername # 图层FTP目录地址 if not self.ftpOper.is_exist('/三亚数管/'): self.ftpOper.makedir('三亚数管', '/', '/三亚数管/') if self.ftpOper.is_exist(ftp_path): # 已有此图层目录,则删除 self.ftpOper.deletedir(ftp_path) self.ftpOper.makedir(layername, '/三亚数管/', ftp_path) # 重新创建图层目录 file_paths_list = fileliststr.split(",") if len(file_paths_list): for file_path in file_paths_list: if os.path.exists(file_path): filename = os.path.basename(file_path) filenamenoexten, fileextension = os.path.splitext(filename) self.ftpOper.uploadfile(file_path, ftp_path) self.insertVectorFJ(uuid.uuid4().__str__(), layerid, layername, filename, fileextension, ftp_path + '/' + filename, file_path, self.ftpOper.ftp.host + ':' + str(self.ftpOper.ftp.port), str(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))) self.ftpOper.ftp.close() # 保存附件信息 def insertVectorFJ(self, id, layerid, layername, name, fjlx, fjdz, fjwz, dldz, cjsj): sql = "insert into {} (id,layerid, layername, name, fjlx, fjdz, fjwz,dldz,cjsj) values ('{}','{}','{}', '{}', '{}', '{}', '{}', '{}', '{}')".format( self.Vector_FJ, id, layerid, layername, name, fjlx, fjdz, fjwz, dldz, cjsj) self.cur.execute(sql) self.conn.commit() # 服务发布表插入 def insertVectorServer(self, id, fwqlx, sjy, fwlx, fwdz, fwmc, fwgzkj, fwys, qpfa, zymlbsm, layergroup, zoommin, zoommax, layername): # 数据库管理系统服务相关表插入 sql = "insert into {} (id, fwqlx, sjy, fwlx, fwdz, fwmc, fwgzkj, fwys, qpfa, zymlbsm, layergroup, zoommin, zoommax, layername) values ('{}','{}','{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}')".format( self.Vector_Server, id, fwqlx, sjy, fwlx, fwdz, fwmc, fwgzkj, fwys, qpfa, zymlbsm, layergroup, zoommin, zoommax, layername) self.cur.execute(sql) self.conn.commit() # 平台资源目录表插入 def insertPortalZymlServer(self, id, fwqlx, sjy, fwlx, fwdz, fwmc, fwgzkj, fwys, qpfa, zymlbsm, layergroup, zoommin, zoommax, layername): # 平台资源目录服务相关表插入 sql = "insert into {} (bsm, name, type, pbsm, url, state, parent, server_type, fwmc, " \ "fwgzkj, fwys, qpfa, layergroup, format, maximumlevel, minimumlevel) values ('{}','{}','{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}')".format( self.Portal_Zyml, id, fwmc, fwlx, zymlbsm, fwdz, '1', '0', fwqlx, layername, fwgzkj, fwys, qpfa, layergroup, 'image/png', zoommax, zoommin) self.cur.execute(sql) self.conn.commit() # 获取资源目录 def getZyml(self): self.cur.execute( "select t.bsm, t.name, t.pbsm from t_yzt_zyml t where t.parent = '1' order by t.lev,t.pbsm , t.sort") rows = self.cur.fetchall() return rows def getVectorZyml(self): self.cur.execute( "select bsm,name from t_vector_zyml t order by t.sort,t.name") rows = self.cur.fetchall() return rows # 获取行政区划 def getXzqh(self): self.cur.execute( "select t.id, t.name, case when t.pid = '0' then '' else t.pid end from vector.xzqh t order by t.id") rows = self.cur.fetchall() return rows # 获取矢量数据字典类型 def getVectorYwlx(self): self.cur.execute( "select distinct(t.ywlx) from t_vector_field t ") rows = self.cur.fetchall() return rows # 获取部门列表 def getDeptList(self): self.cur.execute( "select dept_name from (select distinct(dept_name),dept_id from sys_dept t where t.del_flag = '0' order by t.dept_id) a") rows = self.cur.fetchall() return rows # 根据用户名称查询有权限维护的数据库表,返回别名或者表名 def getManagerTables(self, username): sql = f'select t.id "id",name "name", case when t.table_alias != \'\' then t.table_alias else t.name end as "alias", t.ywlx "ywlx" ' \ f'from {self.Vector_Storage} t where t.glbm = (select dept_name from sys_dept d where d.dept_id = (select dept_id from sys_user u where u.user_name = \'{username}\' )) and t.sjlx = \'vector\'' self.cur.execute(sql) rows = self.cur.fetchall() return rows # 根据业务类型和表名查询必填的字段名 def getMustRequireField(self, tablename, ywlx): arr = tablename.split(".") sql = f'select lower(f.name) from {self.Vector_Field} f where f.ywlx = \'{ywlx}\' and lower(f.name) in (' \ f'select column_name from information_schema.columns where table_schema = \'{arr[0]}\' and table_name = \'{arr[1]}\') and f.nullable = \'1\'' self.cur.execute(sql) rows = self.cur.fetchall() fields = [] for row in rows: fields.append(row[0]) return fields def getAllTableField(self, tablename): arr = tablename.split(".") sql = f'select column_name from information_schema.columns where table_schema = \'{arr[0]}\' and table_name = \'{arr[1]}\' and lower(column_name) not in (\'id\',\'geom\', \'shape_leng\', \'shape_area\')' self.cur.execute(sql) rows = self.cur.fetchall() fields = [] for row in rows: fields.append(row[0]) return fields # 保存矢量要素 def insertVectorFeature(self, tablename, fields, values, wkt, wktsrid, geomtype, rksj): arr = tablename.split(".") insertvalues = "" # print(values) for value in values: if value == None: insertvalues += "null," else: b = self.is_string(value) if b == True: insertvalues += f'\'{value}\',' else: insertvalues += f'{value},' # print(insertvalues) # print(insertvalues[:-1]) # 判断几何类型是否一致 if wkt.startswith(geomtype) == False: if geomtype.startswith("MULTI"): # 转换为Multi wkt = 'MULTI' + wkt wkt = wkt.replace("((", "(((").replace("))", ")))") else: wkt = wkt.replace("(((", "((").replace(")))", "))").replace("MULTI", "") sql = f'insert into {arr[0]}."{arr[1]}" ({",".join(fields)} , geom) values ({insertvalues[:-1]} , public.st_geomfromtext( \'{wkt}\', {wktsrid}))' sql = sql.replace("None", "null") # print(sql) self.cur.execute(sql) self.conn.commit() # 插入到更新详情表 details = "insert into {} (tablename, version, insertsql) values ('{}', '{}', '{}')".format( self.Vector_Version_Details, tablename, rksj, base64str(sql)) # print(details) self.cur.execute(details) self.conn.commit() def updateVectorVersion(self, tablename, version): details = "update {} set rksj_sw = to_timestamp('{}', 'YYYY-MM-DD HH24:MI:SS') where rksj_sw is null".format( tablename, version) self.cur.execute(details) self.conn.commit() details = "update {} set version = '{}', type = 'update' where tablename = '{}'".format(self.Vector_Version, version, tablename) self.cur.execute(details) self.conn.commit() # 通过表名称获取版本集合 def getTableVersions(self, tablename): sql = "select distinct(version) from {} t where t.tablename = '{}' order by version desc".format( self.Vector_Version_Details, tablename) print(sql) self.cur.execute(sql) rows = self.cur.fetchall() return rows # 通过表名称获取当前版本 def getTableCurVersion(self, tablename): sql = "select version from {} t where t.tablename = '{}'".format( self.Vector_Version, tablename) print(sql) self.cur.execute(sql) rows = self.cur.fetchall() return rows[0][0] # 获取矢量数据表的坐标系 def getVectorTableSrid(self, tablename): arr = tablename.split(".") sql = f'select public.st_srid(geom) from {arr[0]}."{arr[1]}" limit 1' self.cur.execute(sql) rows = self.cur.fetchall() for row in rows: return row[0] # 获取矢量数据表的空间类型 def getVectorTableGeomType(self, tablename): arr = tablename.split(".") sql = f'select replace(upper(public.ST_GeometryType(geom)),\'ST_\',\'\') from {arr[0]}."{arr[1]}" limit 1' self.cur.execute(sql) rows = self.cur.fetchall() for row in rows: return row[0] def getInstersectsCount(self, tablename, tablesrid, wkts, wktsrid): arr = tablename.split(".") wktsql = "" for i in range(len(wkts)): wkt = wkts[i] if i > 0: wktsql += ' or ' wktsql += f' public.st_intersects(t.geom,public.st_transform( public.st_geomfromtext( \'{wkt}\', {wktsrid}), {tablesrid} )) ' sql = f'select count(1) from {arr[0]}."{arr[1]}" t where {wktsql}' self.cur.execute(sql) rows = self.cur.fetchall() for row in rows: return row[0] # 设置表的别名(基于postgresql comment) def setTableAlias(self, table, alias): sql = f'COMMENT ON TABLE {table} IS \'{alias}\'' self.cur.execute(sql) self.conn.commit() # 删除备份表并创建备份表 def restoreBackTable(self, tablename): arr = tablename.split(".") # 删除当前表 deletesql = f'drop table if exists {arr[0]}."{arr[1]}"' self.cur.execute(deletesql) self.conn.commit() # 删除当前表序列 deletesequencesql = f'drop sequence if exists {tablename}{self.Sequence}' self.cur.execute(deletesequencesql) self.conn.commit() # 恢复back表 createsql = f'create table {arr[0]}."{arr[1]}" as table {arr[0]}."{arr[1]}{self.BackTable}"' self.cur.execute(createsql) self.conn.commit() # 查询数据表中id的最大值 sql = f'select (max(id) + 1) from {arr[0]}."{arr[1]}"' self.cur.execute(sql) rows = self.cur.fetchall() max = 1 for row in rows: max = row[0] # 创建序列 addsequencesql = f'create sequence {tablename}{self.Sequence} start with {max} increment by 1 no minvalue no maxvalue cache 1' self.cur.execute(addsequencesql) self.conn.commit() # 设置id字段不能为空 notnullsql = f'alter table {arr[0]}."{arr[1]}" alter column id set not null' self.cur.execute(notnullsql) self.conn.commit() # 使用序列 usesequencesql = f'alter table {arr[0]}."{arr[1]}" alter column id set default nextval(\'{tablename}{self.Sequence}\')' self.cur.execute(usesequencesql) self.conn.commit() # 删除备份表并创建备份表 def resetBackTable(self, tablename): arr = tablename.split(".") deletesql = f'drop table if exists {arr[0]}."{arr[1]}{self.BackTable}"' self.cur.execute(deletesql) self.conn.commit() createsql = f'create table {arr[0]}."{arr[1]}{self.BackTable}" as table {arr[0]}."{arr[1]}"' self.cur.execute(createsql) self.conn.commit() def getTableName(self, recordid): sp = recordid.split(".") res = f'"{sp[0]}"."{sp[1].lower()}"' return res # 查询数据库当前模式下有没有同名的表 def checkTableName(self, schemaname, tablename): sql = f'select tablename from pg_tables t where t.schemaname = \'{schemaname}\' and t.tablename = \'{tablename}\'' self.cur.execute(sql) rows = self.cur.fetchall() return len(rows) == 0 # 判断数据是否为字符串 def is_string(self, var): return isinstance(var, str)