# -*- coding: utf-8 -*- """ *************************************************************************** TopologyCheck.py --------------------- Date : November 2012 Copyright : (C) 2012 by Victor Olaya Email : volayaf at gmail dot com *************************************************************************** * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * *************************************************************************** """ __author__ = 'wanger' __date__ = 'November 2024' __copyright__ = '(C) 2024, wanger' import os from PyQt5.QtSql import QSqlDatabase, QSqlQuery from osgeo import ogr, gdal from PyQt5.QtGui import QIcon from PyQt5.QtWidgets import QApplication from future.moves import sys from qgis.PyQt import QtWidgets from qgis._core import QgsProcessingParameterVectorDestination, QgsVectorLayer, QgsVectorFileWriter, \ QgsCoordinateTransformContext from qgis.core import (QgsProcessing, QgsProcessingParameterFeatureSource, QgsProcessingParameterString, QgsProcessingParameterFile, QgsProcessingParameterDateTime, QgsProcessingParameterEnum, QgsProcessingParameterCrs, QgsProcessingParameterField, QgsProcessingParameterExtent, QgsProcessingParameterBoolean, QgsProcessingParameterProviderConnection, QgsProcessingParameterDatabaseSchema, QgsProcessingParameterDatabaseTable, QgsProviderRegistry, QgsProcessingException, QgsProcessingParameterDefinition, QgsProviderConnectionException, QgsDataSourceUri) from processing.algs.gdal.GdalAlgorithm import GdalAlgorithm from processing.algs.gdal.GdalUtils import GdalUtils from processing.tools.PrintUtils import printStr from processing.tools.StringUtils import (getConnectionStr, getNow) from processing.tools.GeoServer.Geoserver import Geoserver from processing.tools.PostgreSQL.PostgreSQL import PostgreSQL from processing.tools.system import isWindows pluginPath = os.path.normpath(os.path.join( os.path.split(os.path.dirname(__file__))[0], os.pardir)) gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES") gdal.SetConfigOption("SHAPE_ENCODING", "GBK") class TopologyCheck(GdalAlgorithm): INPUTVECTOR = "INPUTVECTOR" TOPOLOGYTYPE = "TOPOLOGYTYPE" TOPOLOGYTYPES = ["必须不相交", "不能重叠", "不能相交", "不能有悬挂点", "不能有伪结点", "不能自重叠", "不能自相交", "不能相交或内部接触", "必须为单一部件", "不能有空隙"] OUTPUTVECTOR = 'OUTPUTVECTOR' def __init__(self): super().__init__() def initAlgorithm(self, config=None): self.addParameter(QgsProcessingParameterFeatureSource(self.INPUTVECTOR, self.tr('待检查数据'), types=[QgsProcessing.TypeVector])) EnumOption = QgsProcessingParameterEnum(name=self.TOPOLOGYTYPE, description=self.tr('拓扑检查类型'), options=self.TOPOLOGYTYPES) EnumOption.setAllowMultiple(True) self.addParameter(EnumOption) self.addParameter(QgsProcessingParameterVectorDestination(self.OUTPUTVECTOR, self.tr('输出位置'))) def name(self): return 'topologycheck' def icon(self): return QIcon(os.path.join(pluginPath, 'images', 'dbms', 'topology.png')) def displayName(self): return self.tr('拓扑检查') def shortDescription(self): return self.tr('拓扑检查') def tags(self): t = self.tr('import,into,postgis,database,vector').split(',') t.extend(super().tags()) return t def group(self): return self.tr('拓扑检查') def groupId(self): return 'topology' def topologycheck(self, parameters, context, feedback, executing=True): print("拓扑检查开始啦") types = print(parameters[self.TOPOLOGYTYPE]) print(f"拓扑检查类型:{types}") # TODO 参数设置 spatialite_db_path = "D:\\temp\\output.sqlite" # Spatialite 数据库路径 table_name = "topology_table" # 导入后的表名 output_vector_path = self.parameterAsOutputLayer(parameters, self.OUTPUTVECTOR, context) # 输出的 SHP 文件路径 export_layer_name = table_name # TODO 将 vectorlayer导入到Spatialite input_layer = self.parameterAsVectorLayer(parameters, self.INPUTVECTOR, context) if not input_layer.isValid(): print("图层数据不可用。") crs = input_layer.crs().authid() # TODO 构造连接到Spatialite数据库的URI uri = QgsDataSourceUri() uri.setDatabase(spatialite_db_path) # TODO 将vectorlayer写入Spatialite 数据库 options = QgsVectorFileWriter.SaveVectorOptions() options.driverName = "SQLite" options.layerName = table_name options.fileEncoding = "UTF-8" options.actionOnExistingFile = QgsVectorFileWriter.CreateOrOverwriteLayer # TODO 将QgsVectorLayer导入到spatialite并指定表名 result = QgsVectorFileWriter.writeAsVectorFormatV2(input_layer, spatialite_db_path, QgsCoordinateTransformContext(), options) if result[0] != QgsVectorFileWriter.NoError: print("Failed to import SHP file to Spatialite.") print(f"SHP file imported to Spatialite as table: {table_name}") # TODO 执行 SQL 语句 db = QSqlDatabase.addDatabase("QSQLITE") db.setDatabaseName(spatialite_db_path) if not db.open(): print("Failed to open Spatialite database.") query = QSqlQuery(db) # TODO SQL语句 sql = f""" SELECT * FROM {table_name}; """ if not query.exec(sql): print("SQL execution failed:", query.lastError().text()) print("SQL execution completed.") # TODO 将数据导出为本地矢量文件 processed_layer = QgsVectorLayer(f"{spatialite_db_path}|layername={export_layer_name}", "processed_layer", "ogr") if not processed_layer.isValid(): print("拓扑检查结果图层加载失败。") # TODO 导出矢量文件参数配置 export_options = QgsVectorFileWriter.SaveVectorOptions() export_options.driverName = "ESRI Shapefile" export_options.fileEncoding = "UTF-8" result = QgsVectorFileWriter.writeAsVectorFormatV2(processed_layer, output_vector_path, QgsCoordinateTransformContext(), export_options) if result[0] != QgsVectorFileWriter.NoError: print("Failed to export processed layer to SHP file.") print(f"Processed layer exported to SHP file: {output_vector_path}") return { "状态": "拓扑检查成功", "错误项": "条记录", "矢量数据输出位置": output_vector_path, "报告输出位置": "" } # 判断数据是否为字符串 def is_string(self, var): return isinstance(var, str) def getConsoleCommands(self, parameters, context, feedback, executing=True): inputvector = self.parameterAsVectorLayer(parameters, self.INPUTVECTOR, context) print(f"拓扑检查输入矢量图层:") print(inputvector) outFile = self.parameterAsOutputLayer(parameters, self.OUTPUTVECTOR, context) print(f"拓扑检查输出文件:{outFile}") return [] def contains_keys(self, obj, keys): if isinstance(obj, dict): return all(key in obj.keys() for key in keys) elif hasattr(type(obj), '__dict__'): return all(key in obj.__dict__ for key in keys) else: raise ValueError("Invalid object type") def commandName(self): return "ogr2ogr"