# -*- coding: utf-8 -*- """ *************************************************************************** postgisupdate.py --------------------- Date : November 2012 Copyright : (C) 2012 by Victor Olaya Email : volayaf at gmail dot com *************************************************************************** * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * *************************************************************************** """ __author__ = 'wanger' __date__ = 'November 2024' __copyright__ = '(C) 2024, wanger' import os from osgeo import ogr, gdal from PyQt5.QtGui import QIcon from PyQt5.QtWidgets import QApplication from future.moves import sys from qgis.PyQt import QtWidgets from qgis.core import (QgsProcessing, QgsProcessingParameterFeatureSource, QgsProcessingParameterString, QgsProcessingParameterFile, QgsProcessingParameterDateTime, QgsProcessingParameterEnum, QgsProcessingParameterCrs, QgsProcessingParameterField, QgsProcessingParameterExtent, QgsProcessingParameterBoolean, QgsProcessingParameterProviderConnection, QgsProcessingParameterDatabaseSchema, QgsProcessingParameterDatabaseTable, QgsProviderRegistry, QgsProcessingException, QgsProcessingParameterDefinition, QgsProviderConnectionException, QgsDataSourceUri) from processing.algs.gdal.GdalAlgorithm import GdalAlgorithm from processing.algs.gdal.GdalUtils import GdalUtils from processing.tools.PrintUtils import printStr from processing.tools.StringUtils import (getConnectionStr, getNow) from processing.tools.GeoServer.Geoserver import Geoserver from processing.tools.PostgreSQL.PostgreSQL import PostgreSQL from processing.tools.system import isWindows pluginPath = os.path.normpath(os.path.join( os.path.split(os.path.dirname(__file__))[0], os.pardir)) gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES") gdal.SetConfigOption("SHAPE_ENCODING", "GBK") class Postgisupdate(GdalAlgorithm): LOGIN_USER = "admin" UPDATEFILE = 'UPDATEFILE' TARGET = 'TARGET' ALLOWOVERLAP = "ALLOWOVERLAP" DATABASE = 'DATABASE' INPUTFILE = 'INPUTFILE' SCHEMA = 'SCHEMA' TABLE = 'TABLE' HOST = 'HOST' DBNAME = 'DBNAME' PORT = 'PORT' USER = 'USER' PASSWORD = 'PASSWORD' selectedValue = "selectedValue" tables = [] checklogs = [] def __init__(self): super().__init__() def initAlgorithm(self, config=None): self.addParameter(QgsProcessingParameterFeatureSource(self.UPDATEFILE, self.tr('更新数据包'), types=[QgsProcessing.TypeVector])) pgconn = PostgreSQL(schema='base') self.tables = pgconn.getManagerTables(username=self.LOGIN_USER) pgconn.close() tablenames = [] for row in self.tables: tablenames.append(row[2]) self.addParameter(QgsProcessingParameterEnum(name=self.TARGET, description=self.tr('目标数据源'), options=tablenames)) self.addParameter(QgsProcessingParameterBoolean(self.ALLOWOVERLAP, self.tr('是否允许更新重叠数据'), defaultValue=False)) def name(self): return 'Fpostgisupdate' def icon(self): return QIcon(os.path.join(pluginPath, 'images', 'dbms', 'update.png')) def displayName(self): return self.tr('数据更新') def shortDescription(self): return self.tr('更新数据库目标') def tags(self): t = self.tr('import,into,postgis,database,vector').split(',') t.extend(super().tags()) return t def group(self): return self.tr('数据更新维护') def groupId(self): return 'updatedata' def setSelectedValue(self, v): printStr(v) self.selectedValue = v def getSelectedValue(self): return self.selectedValue def updateVector(self, parameters, context, feedback, executing=True): print("增量更新开始啦") print("===========进行数据更新开始===========") #当前时间 now = getNow() ogrLayer, layername = self.getOgrCompatibleSource(self.UPDATEFILE, parameters, context, feedback, executing) print(ogrLayer) allowoverlap = self.parameterAsBoolean(parameters, self.ALLOWOVERLAP, context) tableinfo = self.tables[parameters[self.TARGET]] tablename = tableinfo[1] ywlx = tableinfo[3] pgconn = PostgreSQL(schema='base') print("===========备份表===========") # pgconn.resetBackTable(tablename=tablename) srid = pgconn.getVectorTableSrid(tablename=tablename) geomtype = pgconn.getVectorTableGeomType(tablename=tablename) print("===========打开矢量数据===========") ds = ogr.Open(ogrLayer, 0) layer = ds.GetLayer() layer_spatial_ref = layer.GetSpatialRef() layer_def = layer.GetLayerDefn() vectorepsg = layer_spatial_ref.GetAttrValue('AUTHORITY', 1) alltablefiled = pgconn.getAllTableField(tablename=tablename) insertcount = 0 misscount = 0 for feature in layer: geom_wkt = feature.GetGeometryRef().ExportToWkt() # 判断是否允许更新重叠数据 updatebool = True if allowoverlap == False: instersectsCount = pgconn.getInstersectsCount(tablename=tablename, tablesrid=srid, wkts=[geom_wkt], wktsrid=vectorepsg) count = int(instersectsCount) if count > 0: updatebool = False misscount += 1 if updatebool == True: # 遍历当前数据的属性信息 attributes = {} for i in range(layer_def.GetFieldCount()): field_def = layer_def.GetFieldDefn(i) field_name = field_def.GetName() field_value = feature.GetField(i) attributes[field_name] = field_value # print(field_value) # if self.is_string(field_value) == True: # print(field_value.encode('utf-8')) # 拼接insert into语句 insertvalus = [] for field in alltablefiled: b = self.contains_keys(attributes, [field.lower()]) if b == True: insertvalus.append(attributes[field.lower()]) else: b = self.contains_keys(attributes, [field.upper()]) if b == True: insertvalus.append(attributes[field.upper()]) else: insertvalus.append(None) pgconn.insertVectorFeature(tablename=tablename, fields=alltablefiled, values=insertvalus, wkt=geom_wkt, wktsrid=vectorepsg, geomtype=geomtype, rksj=now) insertcount += 1 pgconn.updateVectorVersion(tablename=tablename, version=now) pgconn.close() return { "状态": "更新成功", "插入": str(insertcount) + "条记录", "忽略": str(misscount) + "条记录,存在空间叠加。" } # 判断数据是否为字符串 def is_string(self, var): return isinstance(var, str) def getConsoleCommands(self, parameters, context, feedback, executing=True): self.checklogs = [] ogrLayer, layername = self.getOgrCompatibleSource(self.UPDATEFILE, parameters, context, feedback, executing) print(ogrLayer) allowoverlap = self.parameterAsBoolean(parameters, self.ALLOWOVERLAP, context) print("===========进行数据检查开始===========") print("===========获取选择的数据库表===========") tableinfo = self.tables[parameters[self.TARGET]] tablename = tableinfo[1] ywlx = tableinfo[3] pgconn = PostgreSQL(schema='base') srid = pgconn.getVectorTableSrid(tablename=tablename) print("===========打开矢量数据===========") ds = ogr.Open(ogrLayer, 0) layer = ds.GetLayer() layer_spatial_ref = layer.GetSpatialRef() print("===========坐标系检查===========") vectorepsg = layer_spatial_ref.GetAttrValue('AUTHORITY', 1) # 坐标系不一致 if vectorepsg != str(srid): self.checklogs.append(f"与目标数据坐标系不一致,请先重投影到EPSG:{srid}!") print("===========必要字段检查===========") layer_def = layer.GetLayerDefn() vectorfields = [] for i in range(layer_def.GetFieldCount()): field_def = layer_def.GetFieldDefn(i) vectorfields.append(field_def.GetName().lower()) requirefields = pgconn.getMustRequireField(tablename=tablename, ywlx=ywlx) missfields = [] for requirefield in requirefields: contain = False for vectorfield in vectorfields: if vectorfield == requirefield: contain = True break if contain == False: missfields.append(requirefield) if len(missfields) > 0: miss = ','.join(missfields) self.checklogs.append(f'更新包中{miss}字段不存在,请先检查并修复再进行更新!') print("===========空间叠加检查===========") # geom_wkt = [] # for feature in layer: # geom_wkt.append(feature.GetGeometryRef().ExportToWkt()) # instersectsCount = pgconn.getInstersectsCount(tablename=tablename, tablesrid=srid, wkts=geom_wkt, # wktsrid=vectorepsg) # count = int(instersectsCount) # if allowoverlap == False and count > 0: # self.checklogs.append(f"更新包与目标数据有{instersectsCount}块图斑空间重叠!") pgconn.close() return self.checklogs def contains_keys(self, obj, keys): if isinstance(obj, dict): return all(key in obj.keys() for key in keys) elif hasattr(type(obj), '__dict__'): return all(key in obj.__dict__ for key in keys) else: raise ValueError("Invalid object type") def commandName(self): return "ogr2ogr"