# -*- coding: utf-8 -*- """ *************************************************************************** postgisrestore.py --------------------- Date : November 2012 Copyright : (C) 2012 by Victor Olaya Email : volayaf at gmail dot com *************************************************************************** * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * *************************************************************************** """ __author__ = 'wanger' __date__ = 'November 2024' __copyright__ = '(C) 2024, wanger' import os from osgeo import ogr, gdal from PyQt5.QtGui import QIcon from PyQt5.QtWidgets import QApplication from future.moves import sys from qgis.PyQt import QtWidgets from qgis.core import (QgsProcessing, QgsProcessingParameterFeatureSource, QgsProcessingParameterString, QgsProcessingParameterFile, QgsProcessingParameterDateTime, QgsProcessingParameterEnum, QgsProcessingParameterCrs, QgsProcessingParameterField, QgsProcessingParameterExtent, QgsProcessingParameterBoolean, QgsProcessingParameterProviderConnection, QgsProcessingParameterDatabaseSchema, QgsProcessingParameterDatabaseTable, QgsProviderRegistry, QgsProcessingException, QgsProcessingParameterDefinition, QgsProviderConnectionException, QgsDataSourceUri) from processing.algs.gdal.GdalAlgorithm import GdalAlgorithm from processing.algs.gdal.GdalUtils import GdalUtils from processing.tools.PrintUtils import printStr from processing.tools.StringUtils import getConnectionStr from processing.tools.GeoServer.Geoserver import Geoserver from processing.tools.PostgreSQL.PostgreSQL import PostgreSQL from processing.tools.Login.Redis import Redis from processing.tools.system import isWindows pluginPath = os.path.normpath(os.path.join( os.path.split(os.path.dirname(__file__))[0], os.pardir)) gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES") gdal.SetConfigOption("SHAPE_ENCODING", "GBK") class Postgisrestore(GdalAlgorithm): LOGIN_USER = "admin" UPDATEFILE = 'UPDATEFILE' RESTORETABLE = 'RESTORETABLE' RESTOREVERSION = 'RESTOREVERSION' CURVERSION = "CURVERSION" ALLOWOVERLAP = "ALLOWOVERLAP" DATABASE = 'DATABASE' INPUTFILE = 'INPUTFILE' SCHEMA = 'SCHEMA' TABLE = 'TABLE' HOST = 'HOST' DBNAME = 'DBNAME' PORT = 'PORT' USER = 'USER' PASSWORD = 'PASSWORD' selectedValue = "selectedValue" tables = [] checklogs = [] def __init__(self): super().__init__() def initAlgorithm(self, config=None): pgconn = PostgreSQL(schema='base') self.tables = pgconn.getManagerTables(username=self.LOGIN_USER) tablenames = [] for row in self.tables: tablenames.append(row[2]) self.addParameter(QgsProcessingParameterEnum(name=self.RESTORETABLE, description=self.tr('目标数据源'), options=tablenames)) redis = Redis() tablename = redis.get("curRestoreTable") print(tablename) if tablename is not None: tablename = tablename.decode('utf-8') versions = pgconn.getTableVersions(tablename=tablename) curversion = pgconn.getTableCurVersion(tablename=tablename) tableversion_param = QgsProcessingParameterString( self.CURVERSION, self.tr('当前版本号'), defaultValue=curversion) self.addParameter(tableversion_param) self.versionlist = [] for row in versions: self.versionlist.append(row[0]) self.addParameter(QgsProcessingParameterEnum(name=self.RESTOREVERSION, description=self.tr('目标版本'), options=self.versionlist)) pgconn.close() def name(self): return 'postgisrestore' def icon(self): return QIcon(os.path.join(pluginPath, 'images', 'dbms', 'restore.png')) def displayName(self): return self.tr('版本回退') def shortDescription(self): return self.tr('回退数据库版本') def tags(self): t = self.tr('import,into,postgis,database,vector').split(',') t.extend(super().tags()) return t def group(self): return self.tr('数据更新维护') def groupId(self): return 'updatedata' def setSelectedValue(self, v): printStr(v) self.selectedValue = v def getSelectedValue(self): return self.selectedValue def restoreVector(self, parameters, context, feedback, executing=True): print("版本回退开始啦") pgconn = PostgreSQL(schema='base') tableinfo = self.tables[parameters[self.RESTORETABLE]] tablename = tableinfo[1] # 判断回退版本关系 curversion = parameters[self.CURVERSION] targetversion = self.versionlist[parameters[self.RESTOREVERSION]] message = "版本回退成功" if curversion == targetversion: message = "版本相同不需要操作" elif curversion > targetversion: print("需要删除近期版本更新的数据") pgconn.deleteVectorRecords(tablename=tablename, rksj_sw=targetversion) else: pgconn.insertVectorRecords(tablename=tablename, curversion=curversion, targetversion=targetversion) print("需要插入近期版本更新的数据") # print("===========备份表===========") # pgconn.restoreBackTable(tablename=tablename) pgconn.close() return { "状态": message } # 判断数据是否为字符串 def is_string(self, var): return isinstance(var, str) def getConsoleCommands(self, parameters, context, feedback, executing=True): return [] def contains_keys(self, obj, keys): if isinstance(obj, dict): return all(key in obj.keys() for key in keys) elif hasattr(type(obj), '__dict__'): return all(key in obj.__dict__ for key in keys) else: raise ValueError("Invalid object type") def commandName(self): return "ogr2ogr"