# -*- coding: utf-8 -*- """ *************************************************************************** AP02.py --------------------- Date : November 2012 Copyright : (C) 2012 by Victor Olaya Email : volayaf at gmail dot com *************************************************************************** * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * *************************************************************************** """ __author__ = 'wanger' __date__ = 'November 2024' __copyright__ = '(C) 2024, wanger' import os from PyQt5.QtSql import QSqlDatabase, QSqlQuery from osgeo import ogr, gdal from PyQt5.QtGui import QIcon from PyQt5.QtWidgets import QApplication from future.moves import sys from qgis.PyQt import QtWidgets from qgis._core import QgsProcessingParameterVectorDestination, QgsVectorLayer, QgsVectorFileWriter, \ QgsCoordinateTransformContext from qgis.core import (QgsProcessing, QgsProcessingParameterFeatureSource, QgsProcessingParameterString, QgsProcessingParameterFile, QgsProcessingParameterDateTime, QgsProcessingParameterEnum, QgsProcessingParameterCrs, QgsProcessingParameterField, QgsProcessingParameterExtent, QgsProcessingParameterBoolean, QgsProcessingParameterProviderConnection, QgsProcessingParameterDatabaseSchema, QgsProcessingParameterDatabaseTable, QgsProviderRegistry, QgsProcessingException, QgsProcessingParameterDefinition, QgsProviderConnectionException, QgsDataSourceUri) from processing.algs.gdal.GdalAlgorithm import GdalAlgorithm from processing.algs.gdal.GdalUtils import GdalUtils from processing.tools.PrintUtils import printStr from processing.tools.StringUtils import (getConnectionStr, getNow) from processing.tools.GeoServer.Geoserver import Geoserver from processing.tools.PostgreSQL.PostgreSQL import PostgreSQL from processing.tools.StringUtils import getUUID from processing.tools.FileUtils import getParentFolderPath from processing.tools.topology.read import (getTopoCheckSQL, getTopoCheckDescription, getTopoCheckNote) from processing.tools.SpatiaLite.SpatiaLite import SpatiaLiteUtils import sqlite3 pluginPath = os.path.dirname(os.path.abspath(__file__)) gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES") gdal.SetConfigOption("SHAPE_ENCODING", "GBK") class AP02(GdalAlgorithm): INPUTVECTOR = "INPUTVECTOR" INPUTTARGETVECTOR = "INPUTTARGETVECTOR" OUTPUTVECTOR = 'OUTPUTVECTOR' TOPOLOGYTYPE = "AP02" uid = getUUID() in_table = "topology_in_table" in_table_t = "topology_in_table_t" out_table = "topology_out_table" TOPOLOGYPARAMS = { "intable_s": f"{in_table}_{uid}", "intable_t": f"{in_table_t}_{uid}", "outtable": f"{out_table}_{uid}" } def __init__(self): super().__init__() def initAlgorithm(self, config=None): self.addParameter(QgsProcessingParameterFeatureSource(self.INPUTVECTOR, self.tr('待检查数据'), types=[QgsProcessing.TypeVectorPolygon])) self.addParameter(QgsProcessingParameterFeatureSource(self.INPUTTARGETVECTOR, self.tr('目标表'), types=[QgsProcessing.TypeVectorPoint])) self.addParameter(QgsProcessingParameterVectorDestination(self.OUTPUTVECTOR, self.tr('输出位置(矢量数据和报告)'))) def name(self): return self.TOPOLOGYTYPE def icon(self): return QIcon(os.path.join(pluginPath, 'topology.png')) def displayName(self): return self.tr(getTopoCheckNote(self.TOPOLOGYTYPE)) def shortDescription(self): return self.tr(getTopoCheckDescription(self.TOPOLOGYTYPE)) def tags(self): t = self.tr('import,into,postgis,database,vector').split(',') t.extend(super().tags()) return t def group(self): return self.tr('拓扑检查') def groupId(self): return 'topology' def topologycheck(self, parameters, context, feedback, executing=True): print("拓扑检查开始啦") # TODO 参数设置 spatialite_db_path = self.spatialite_db_path # Spatialite 数据库路径 table_name = f"{self.in_table}_{self.uid}" # 导入后的表名 table_t_name = f"{self.in_table_t}_{self.uid}" # 导入后的目标表名 outtable_name = f"{self.out_table}_{self.uid}" # 分析执行后的表名 output_vector_path = self.parameterAsOutputLayer(parameters, self.OUTPUTVECTOR, context) # 输出的 SHP 文件路径 export_layer_name = outtable_name # TODO 将 vectorlayer导入到Spatialite input_layer = self.parameterAsVectorLayer(parameters, self.INPUTVECTOR, context) if not input_layer.isValid(): return { "状态": "拓扑检查失败", "错误信息": f"图层数据不可用。" } result = self.importLayerToSpatiaLite(layer=input_layer, table_name=table_name) if result[0] != QgsVectorFileWriter.NoError: return { "状态": "拓扑检查失败", "错误信息": f"图层数据导出到SpatiaLite数据库失败。" } input_target_layer = self.parameterAsVectorLayer(parameters, self.INPUTTARGETVECTOR, context) if not input_target_layer.isValid(): return { "状态": "拓扑检查失败", "错误信息": f"图层数据不可用。" } result = self.importLayerToSpatiaLite(layer=input_target_layer, table_name=table_t_name) if result[0] != QgsVectorFileWriter.NoError: return { "状态": "拓扑检查失败", "错误信息": f"图层数据导出到SpatiaLite数据库失败。" } print(f"file imported to Spatialite as table: {table_name}") dbutils = SpatiaLiteUtils(spatialite_db_path) # TODO SQL语句 sql = getTopoCheckSQL(self.TOPOLOGYTYPE, self.TOPOLOGYPARAMS) try: dbutils.executescript(f"{sql}", self.TOPOLOGYPARAMS) except sqlite3.Error as e: return { "状态": "拓扑检查失败", "错误信息": f"SpatiaLite数据库执行SQL语句失败: {e}" } # TODO 获取结果集条目 feature_count = dbutils.queryCount(outtable_name) if feature_count == 0: dbutils.resetDb() dbutils.closeDb() return self.topologySuccessWithNone() # TODO 将数据导出为本地矢量文件 processed_layer = QgsVectorLayer(f"{spatialite_db_path}|layername={export_layer_name}", "processed_layer", "ogr") path = getParentFolderPath(output_vector_path) csv_path = f"{path}\\report.csv" if not processed_layer.isValid(): dbutils.closeDb() return { "状态": "拓扑检查失败", "错误信息": "拓扑检查结果图层加载失败。" } else: self.vectorLayerExport(layer=processed_layer, type="CSV", path=csv_path) self.vectorLayerExportToVector(layer=processed_layer, path=output_vector_path) os.startfile(path) dbutils.resetDb() dbutils.closeDb() return self.topologySuccess(count=feature_count, reportpath=csv_path) # 判断数据是否为字符串 def is_string(self, var): return isinstance(var, str) def getConsoleCommands(self, parameters, context, feedback, executing=True): return [] def contains_keys(self, obj, keys): if isinstance(obj, dict): return all(key in obj.keys() for key in keys) elif hasattr(type(obj), '__dict__'): return all(key in obj.__dict__ for key in keys) else: raise ValueError("Invalid object type") def commandName(self): return "ogr2ogr"