123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475 |
- # -*- coding: utf-8 -*-
- __author__ = 'wanger'
- __date__ = '2024-08-20'
- __copyright__ = '(C) 2024 by siwei'
- __revision__ = '1.0'
- import time
- from typing import Optional
- import os
- import psycopg2
- import uuid
- import siwei_config
- from ..FTP.FtpUitl import FtpOper
- from ..FTP.FtpConfig import *
- from ..FileUtils import getInputFileName
- from ..StringUtils import (getNow, base64str, enbase64str)
- from datetime import datetime
- class PostgreSQL:
- # 矢量数据元数据表
- Vector_Storage = "t_vector_storage"
- # 矢量数据服务发布表
- Vector_Server = "t_vector_server"
- # 附件表
- Vector_FJ = "t_vector_fj"
- # 字段表
- Vector_Field = "t_vector_field"
- # 平台资源目录表
- Portal_Zyml = "t_yzt_zyml"
- # 备份表后缀名
- BackTable = "_back"
- # 序列名称后缀
- Sequence = "_id_seq"
- # 版本控制表
- Vector_Version = "t_vector_version"
- # 版本控制详情表
- Vector_Version_Details = "t_vector_version_details"
- # 全局参数修改
- db_config = siwei_config.CONFIG['db']
- def __init__(
- self,
- host: Optional[str] = db_config['host'], # default host during installation
- port: Optional[str] = db_config['port'], # default port during pg installation
- user: Optional[str] = db_config['user'], # default user during pg installation
- password: Optional[str] = db_config['password'], # default password during pg installation
- dbname: Optional[str] = db_config['name'], # default dbname during pg installation
- schema: Optional[str] = db_config['schema']
- ):
- # 配置数据库连接参数并指定schema
- self.connparams = {
- "dbname": dbname,
- "user": user,
- "password": password,
- "host": host,
- "port": port,
- "options": "-c search_path=otherSchema," + schema if schema is not None else None
- }
- self.conn = psycopg2.connect(**self.connparams)
- # 创建一个游标对象
- self.cur = self.conn.cursor()
- # 执行一个查询
- # self.cur.execute("SELECT 省,类型 from \"XZQH3857\";")
- # 获取查询结果
- # rows = self.cur.fetchall()
- # 打印结果
- # for row in rows:
- # for v in row:
- # print(v)
- def execute(self, sql):
- # 执行一个查询
- self.cur.execute(sql)
- # 获取查询结果
- return self.cur.fetchall()
- def close(self):
- # 关闭游标和连接
- self.cur.close()
- self.conn.close()
- # 根据名称删除元数据信息
- def deleteVectorStorage(self, name):
- sql = "delete from {} where name = '{}'".format(self.Vector_Storage, name)
- self.cur.execute(sql)
- self.conn.commit()
- # 保存元数据信息
- def insertVectorStorage(self, id, name, year, xmlx, sjly, xzqh, xzqh_field, ywlx, sjlx, sjywz, glbm, table_alias):
- sql = "insert into {} (id, name, year, xmlx, sjly, xzqh, xzqh_field, ywlx, sjlx, sjywz, glbm, table_alias) values ('{}','{}', '{}', (select bsm from t_vector_zyml where name = '{}'), '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}')".format(
- self.Vector_Storage, id, name, year.toString("yyyy"), xmlx, sjly, xzqh, xzqh_field, ywlx, sjlx, sjywz, glbm,
- table_alias)
- self.cur.execute(sql)
- self.conn.commit()
- # 元数据入库,参数parameters
- def metadataStorage(self, parameters, ssxzqh, fileliststr, ywlx, glbm, ogrLayer, layername, zyml):
- record_id = ""
- id = uuid.uuid4().__str__()
- id = id.replace("-", "")
- print(ogrLayer)
- print(layername)
- print(parameters)
- if parameters.get("TABLE") is None or parameters.get("TABLE") == "":
- if layername is not None and layername != "":
- record_id = parameters.get("SCHEMA") + "." + layername.lower()
- else:
- record_id = parameters.get("SCHEMA") + "." + getInputFileName(ogrLayer).lower()
- else:
- record_id = parameters.get("SCHEMA") + "." + parameters.get("TABLE")
- # if (parameters.get("TABLE") is None or parameters.get("TABLE") == "") and ogrLayer is not None:
- # record_id = parameters.get("SCHEMA") + "." + getInputFileName(ogrLayer).lower()
- # elif ogrLayer is not None:
- # record_id = parameters.get("SCHEMA") + "." + ogrLayer.lower()
- # else:
- # record_id = parameters.get("SCHEMA") + "." + parameters.get("TABLE")
- table_alias = parameters.get("TABLE_ALIAS")
- if table_alias is not None and table_alias != '': # 配置表别名
- self.setTableAlias(table=self.getTableName(recordid=record_id),
- alias=table_alias)
- else:
- table_alias = ''
- self.deleteVectorStorage(name=record_id)
- self.deleteVectorVersion(tablename=record_id)
- self.insertVectorStorage(id=id, name=record_id, year=parameters.get("VECTOR_YEAR"),
- xmlx=zyml, sjly=parameters.get("VECTOR_SJLY"),
- xzqh=ssxzqh, xzqh_field=parameters.get("XZQH_FIELD"),
- ywlx=ywlx, sjlx=parameters.get("SOURCE_TYPE"), sjywz=ogrLayer,
- glbm=glbm, table_alias=table_alias)
- self.insertVectorVersion(tablename=record_id)
- # 附件上传
- if fileliststr is not None and fileliststr != "":
- self.uploadAttachment(layername=record_id, layerid=id, fileliststr=fileliststr)
- # 删除版本控制表
- def deleteVectorVersion(self, tablename):
- sql = "delete from {} where tablename = '{}'".format(self.Vector_Version, tablename)
- self.cur.execute(sql)
- self.conn.commit()
- # 删除版本数据
- def deleteVectorRecords(self, tablename, rksj_sw):
- sql = "delete from {} t where t.rksj_sw > '{}'".format(tablename, rksj_sw)
- self.cur.execute(sql)
- self.conn.commit()
- sql = "update {} set version = '{}' where tablename = '{}'".format(self.Vector_Version, rksj_sw,
- tablename)
- self.cur.execute(sql)
- self.conn.commit()
- # 插入版本数据
- def insertVectorRecords(self, tablename, curversion, targetversion):
- sql = "select insertsql from {} t where t.tablename = '{}' and t.version > '{}' and t.version <= '{}'".format(
- self.Vector_Version_Details, tablename, curversion, targetversion)
- self.cur.execute(sql)
- rows = self.cur.fetchall()
- for row in rows:
- sql = enbase64str(row[0])
- self.cur.execute(sql)
- sql = "update {} set version = '{}' where tablename = '{}'".format(self.Vector_Version, targetversion,
- tablename)
- self.cur.execute(sql)
- self.conn.commit()
- # 创建版本控制表
- def insertVectorVersion(self, tablename):
- formatted_time = getNow()
- # 输出结果
- print("当前时间:{}".format(formatted_time))
- sql = "insert into {} (tablename, version, type) values ('{}', '{}', '{}')".format(self.Vector_Version,
- tablename, formatted_time,
- "base")
- self.cur.execute(sql)
- self.conn.commit()
- sql = "alter table {} add column rksj_sw timestamp".format(tablename)
- self.cur.execute(sql)
- self.conn.commit()
- sql = "update {} SET rksj_sw = to_timestamp('{}', 'YYYY-MM-DD HH24:MI:SS')".format(tablename, formatted_time)
- self.cur.execute(sql)
- self.conn.commit()
- details = "insert into {} (tablename, version, insertsql) values ('{}', '{}', '{}')".format(
- self.Vector_Version_Details, tablename, formatted_time,
- "")
- print(details)
- self.cur.execute(details)
- self.conn.commit()
- # 获取文件名称
- def getInputFileName(self, file):
- filename = file.split("/")[len(file.split("/")) - 1]
- return filename[0: filename.find(".")]
- # 附件上传FTP
- def uploadAttachment(self, layername, layerid, fileliststr):
- print("开始上传附件啦!!")
- self.ftpOper = FtpOper()
- self.ftpOper.ftp.connect(ftpHost, ftpPort)
- self.ftpOper.ftp.login(ftpUsername, ftpPassword)
- ftp_path = '/三亚数管/' + layername # 图层FTP目录地址
- if not self.ftpOper.is_exist('/三亚数管/'):
- self.ftpOper.makedir('三亚数管', '/', '/三亚数管/')
- if self.ftpOper.is_exist(ftp_path): # 已有此图层目录,则删除
- self.ftpOper.deletedir(ftp_path)
- self.ftpOper.makedir(layername, '/三亚数管/', ftp_path) # 重新创建图层目录
- file_paths_list = fileliststr.split(",")
- if len(file_paths_list):
- for file_path in file_paths_list:
- if os.path.exists(file_path):
- filename = os.path.basename(file_path)
- filenamenoexten, fileextension = os.path.splitext(filename)
- self.ftpOper.uploadfile(file_path, ftp_path)
- self.insertVectorFJ(uuid.uuid4().__str__(), layerid, layername, filename, fileextension,
- ftp_path + '/' + filename,
- file_path, self.ftpOper.ftp.host + ':' + str(self.ftpOper.ftp.port),
- str(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())))
- self.ftpOper.ftp.close()
- # 保存附件信息
- def insertVectorFJ(self, id, layerid, layername, name, fjlx, fjdz, fjwz, dldz, cjsj):
- sql = "insert into {} (id,layerid, layername, name, fjlx, fjdz, fjwz,dldz,cjsj) values ('{}','{}','{}', '{}', '{}', '{}', '{}', '{}', '{}')".format(
- self.Vector_FJ, id, layerid, layername, name, fjlx, fjdz, fjwz, dldz, cjsj)
- self.cur.execute(sql)
- self.conn.commit()
- # 服务发布表插入
- def insertVectorServer(self, id, fwqlx, sjy, fwlx, fwdz, fwmc, fwgzkj, fwys, qpfa, zymlbsm, layergroup, zoommin,
- zoommax, layername):
- # 数据库管理系统服务相关表插入
- sql = "insert into {} (id, fwqlx, sjy, fwlx, fwdz, fwmc, fwgzkj, fwys, qpfa, zymlbsm, layergroup, zoommin, zoommax, layername) values ('{}','{}','{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}')".format(
- self.Vector_Server, id, fwqlx, sjy, fwlx, fwdz, fwmc, fwgzkj, fwys, qpfa, zymlbsm, layergroup, zoommin,
- zoommax, layername)
- self.cur.execute(sql)
- self.conn.commit()
- # 平台资源目录表插入
- def insertPortalZymlServer(self, id, fwqlx, sjy, fwlx, fwdz, fwmc, fwgzkj, fwys, qpfa, zymlbsm, layergroup, zoommin,
- zoommax, layername):
- # 平台资源目录服务相关表插入
- sql = "insert into {} (bsm, name, type, pbsm, url, state, parent, server_type, fwmc, " \
- "fwgzkj, fwys, qpfa, layergroup, format, maximumlevel, minimumlevel) values ('{}','{}','{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}')".format(
- self.Portal_Zyml, id, fwmc, fwlx, zymlbsm, fwdz, '1', '0', fwqlx, layername, fwgzkj, fwys, qpfa,
- layergroup, 'image/png', zoommax, zoommin)
- self.cur.execute(sql)
- self.conn.commit()
- # 获取资源目录
- def getZyml(self):
- self.cur.execute(
- "select t.bsm, t.name, t.pbsm from t_yzt_zyml t where t.parent = '1' order by t.lev,t.pbsm , t.sort")
- rows = self.cur.fetchall()
- return rows
- def getVectorZyml(self):
- self.cur.execute(
- "select bsm,name from t_vector_zyml t order by t.sort,t.name")
- rows = self.cur.fetchall()
- return rows
- # 获取行政区划
- def getXzqh(self):
- self.cur.execute(
- "select t.id, t.name, case when t.pid = '0' then '' else t.pid end from vector.xzqh t order by t.id")
- rows = self.cur.fetchall()
- return rows
- # 获取矢量数据字典类型
- def getVectorYwlx(self):
- self.cur.execute(
- "select distinct(t.ywlx) from t_vector_field t ")
- rows = self.cur.fetchall()
- return rows
- # 获取部门列表
- def getDeptList(self):
- self.cur.execute(
- "select dept_name from (select distinct(dept_name),dept_id from sys_dept t where t.del_flag = '0' order by t.dept_id) a")
- rows = self.cur.fetchall()
- return rows
- # 根据用户名称查询有权限维护的数据库表,返回别名或者表名
- def getManagerTables(self, username):
- sql = f'select t.id "id",name "name", case when t.table_alias != \'\' then t.table_alias else t.name end as "alias", t.ywlx "ywlx" ' \
- f'from {self.Vector_Storage} t where t.glbm = (select dept_name from sys_dept d where d.dept_id = (select dept_id from sys_user u where u.user_name = \'{username}\' )) and t.sjlx = \'vector\''
- self.cur.execute(sql)
- rows = self.cur.fetchall()
- return rows
- # 根据业务类型和表名查询必填的字段名
- def getMustRequireField(self, tablename, ywlx):
- arr = tablename.split(".")
- sql = f'select lower(f.name) from {self.Vector_Field} f where f.ywlx = \'{ywlx}\' and lower(f.name) in (' \
- f'select column_name from information_schema.columns where table_schema = \'{arr[0]}\' and table_name = \'{arr[1]}\') and f.nullable = \'1\''
- self.cur.execute(sql)
- rows = self.cur.fetchall()
- fields = []
- for row in rows:
- fields.append(row[0])
- return fields
- def getAllTableField(self, tablename):
- arr = tablename.split(".")
- sql = f'select column_name from information_schema.columns where table_schema = \'{arr[0]}\' and table_name = \'{arr[1]}\' and lower(column_name) not in (\'id\',\'geom\', \'shape_leng\', \'shape_area\')'
- self.cur.execute(sql)
- rows = self.cur.fetchall()
- fields = []
- for row in rows:
- fields.append(row[0])
- return fields
- # 保存矢量要素
- def insertVectorFeature(self, tablename, fields, values, wkt, wktsrid, geomtype, rksj):
- arr = tablename.split(".")
- insertvalues = ""
- # print(values)
- for value in values:
- if value == None:
- insertvalues += "null,"
- else:
- b = self.is_string(value)
- if b == True:
- insertvalues += f'\'{value}\','
- else:
- insertvalues += f'{value},'
- # print(insertvalues)
- # print(insertvalues[:-1])
- # 判断几何类型是否一致
- if wkt.startswith(geomtype) == False:
- if geomtype.startswith("MULTI"): # 转换为Multi
- wkt = 'MULTI' + wkt
- wkt = wkt.replace("((", "(((").replace("))", ")))")
- else:
- wkt = wkt.replace("(((", "((").replace(")))", "))").replace("MULTI", "")
- sql = f'insert into {arr[0]}."{arr[1]}" ({",".join(fields)} , geom) values ({insertvalues[:-1]} , public.st_geomfromtext( \'{wkt}\', {wktsrid}))'
- sql = sql.replace("None", "null")
- # print(sql)
- self.cur.execute(sql)
- self.conn.commit()
- # 插入到更新详情表
- details = "insert into {} (tablename, version, insertsql) values ('{}', '{}', '{}')".format(
- self.Vector_Version_Details, tablename, rksj,
- base64str(sql))
- # print(details)
- self.cur.execute(details)
- self.conn.commit()
- def updateVectorVersion(self, tablename, version):
- details = "update {} set rksj_sw = to_timestamp('{}', 'YYYY-MM-DD HH24:MI:SS') where rksj_sw is null".format(
- tablename, version)
- self.cur.execute(details)
- self.conn.commit()
- details = "update {} set version = '{}', type = 'update' where tablename = '{}'".format(self.Vector_Version,
- version, tablename)
- self.cur.execute(details)
- self.conn.commit()
- # 通过表名称获取版本集合
- def getTableVersions(self, tablename):
- sql = "select distinct(version) from {} t where t.tablename = '{}' order by version desc".format(
- self.Vector_Version_Details, tablename)
- print(sql)
- self.cur.execute(sql)
- rows = self.cur.fetchall()
- return rows
- # 通过表名称获取当前版本
- def getTableCurVersion(self, tablename):
- sql = "select version from {} t where t.tablename = '{}'".format(
- self.Vector_Version, tablename)
- print(sql)
- self.cur.execute(sql)
- rows = self.cur.fetchall()
- return rows[0][0]
- # 获取矢量数据表的坐标系
- def getVectorTableSrid(self, tablename):
- arr = tablename.split(".")
- sql = f'select public.st_srid(geom) from {arr[0]}."{arr[1]}" limit 1'
- self.cur.execute(sql)
- rows = self.cur.fetchall()
- for row in rows:
- return row[0]
- # 获取矢量数据表的空间类型
- def getVectorTableGeomType(self, tablename):
- arr = tablename.split(".")
- sql = f'select replace(upper(public.ST_GeometryType(geom)),\'ST_\',\'\') from {arr[0]}."{arr[1]}" limit 1'
- self.cur.execute(sql)
- rows = self.cur.fetchall()
- for row in rows:
- return row[0]
- def getInstersectsCount(self, tablename, tablesrid, wkts, wktsrid):
- arr = tablename.split(".")
- wktsql = ""
- for i in range(len(wkts)):
- wkt = wkts[i]
- if i > 0:
- wktsql += ' or '
- wktsql += f' public.st_intersects(t.geom,public.st_transform( public.st_geomfromtext( \'{wkt}\', {wktsrid}), {tablesrid} )) '
- sql = f'select count(1) from {arr[0]}."{arr[1]}" t where {wktsql}'
- self.cur.execute(sql)
- rows = self.cur.fetchall()
- for row in rows:
- return row[0]
- # 设置表的别名(基于postgresql comment)
- def setTableAlias(self, table, alias):
- sql = f'COMMENT ON TABLE {table} IS \'{alias}\''
- self.cur.execute(sql)
- self.conn.commit()
- # 删除备份表并创建备份表
- def restoreBackTable(self, tablename):
- arr = tablename.split(".")
- # 删除当前表
- deletesql = f'drop table if exists {arr[0]}."{arr[1]}"'
- self.cur.execute(deletesql)
- self.conn.commit()
- # 删除当前表序列
- deletesequencesql = f'drop sequence if exists {tablename}{self.Sequence}'
- self.cur.execute(deletesequencesql)
- self.conn.commit()
- # 恢复back表
- createsql = f'create table {arr[0]}."{arr[1]}" as table {arr[0]}."{arr[1]}{self.BackTable}"'
- self.cur.execute(createsql)
- self.conn.commit()
- # 查询数据表中id的最大值
- sql = f'select (max(id) + 1) from {arr[0]}."{arr[1]}"'
- self.cur.execute(sql)
- rows = self.cur.fetchall()
- max = 1
- for row in rows:
- max = row[0]
- # 创建序列
- addsequencesql = f'create sequence {tablename}{self.Sequence} start with {max} increment by 1 no minvalue no maxvalue cache 1'
- self.cur.execute(addsequencesql)
- self.conn.commit()
- # 设置id字段不能为空
- notnullsql = f'alter table {arr[0]}."{arr[1]}" alter column id set not null'
- self.cur.execute(notnullsql)
- self.conn.commit()
- # 使用序列
- usesequencesql = f'alter table {arr[0]}."{arr[1]}" alter column id set default nextval(\'{tablename}{self.Sequence}\')'
- self.cur.execute(usesequencesql)
- self.conn.commit()
- # 删除备份表并创建备份表
- def resetBackTable(self, tablename):
- arr = tablename.split(".")
- deletesql = f'drop table if exists {arr[0]}."{arr[1]}{self.BackTable}"'
- self.cur.execute(deletesql)
- self.conn.commit()
- createsql = f'create table {arr[0]}."{arr[1]}{self.BackTable}" as table {arr[0]}."{arr[1]}"'
- self.cur.execute(createsql)
- self.conn.commit()
- def getTableName(self, recordid):
- sp = recordid.split(".")
- res = f'"{sp[0]}"."{sp[1].lower()}"'
- return res
- # 查询数据库当前模式下有没有同名的表
- def checkTableName(self, schemaname, tablename):
- sql = f'select tablename from pg_tables t where t.schemaname = \'{schemaname}\' and t.tablename = \'{tablename}\''
- self.cur.execute(sql)
- rows = self.cur.fetchall()
- return len(rows) == 0
- # 判断数据是否为字符串
- def is_string(self, var):
- return isinstance(var, str)
|