# -*- coding: utf-8 -*- """ *************************************************************************** A03.py --------------------- Date : November 2012 Copyright : (C) 2012 by Victor Olaya Email : volayaf at gmail dot com *************************************************************************** * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * *************************************************************************** """ __author__ = 'wanger' __date__ = 'November 2024' __copyright__ = '(C) 2024, wanger' import os from PyQt5.QtSql import QSqlDatabase, QSqlQuery from osgeo import ogr, gdal from PyQt5.QtGui import QIcon from PyQt5.QtWidgets import QApplication from future.moves import sys from qgis.PyQt import QtWidgets from qgis._core import QgsProcessingParameterVectorDestination, QgsVectorLayer, QgsVectorFileWriter, \ QgsCoordinateTransformContext from qgis.core import (QgsProcessing, QgsProcessingParameterFeatureSource, QgsProcessingParameterString, QgsProcessingParameterFile, QgsProcessingParameterDateTime, QgsProcessingParameterEnum, QgsProcessingParameterCrs, QgsProcessingParameterField, QgsProcessingParameterExtent, QgsProcessingParameterBoolean, QgsProcessingParameterProviderConnection, QgsProcessingParameterDatabaseSchema, QgsProcessingParameterDatabaseTable, QgsProviderRegistry, QgsProcessingException, QgsProcessingParameterDefinition, QgsProviderConnectionException, QgsDataSourceUri) from processing.algs.gdal.GdalAlgorithm import GdalAlgorithm from processing.algs.gdal.GdalUtils import GdalUtils from processing.tools.PrintUtils import printStr from processing.tools.StringUtils import (getConnectionStr, getNow) from processing.tools.GeoServer.Geoserver import Geoserver from processing.tools.PostgreSQL.PostgreSQL import PostgreSQL from processing.tools.system import isWindows from processing.tools.FileUtils import getParentFolderPath from processing.tools.topology.read import (getTopoCheckSQL, getTopoCheckDescription, getTopoCheckNote) import sqlite3 pluginPath = os.path.normpath(os.path.join( os.path.split(os.path.dirname(__file__))[0], os.pardir)) gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES") gdal.SetConfigOption("SHAPE_ENCODING", "GBK") class A03(GdalAlgorithm): INPUTVECTOR = "INPUTVECTOR" OUTPUTVECTOR = 'OUTPUTVECTOR' TOPOLOGYTYPE = "A03" in_table = "topology_table" out_table = "topology_table_temp" TOPOLOGYPARAMS = { "intable_s": in_table, "outtable": out_table } def __init__(self): super().__init__() def initAlgorithm(self, config=None): self.addParameter(QgsProcessingParameterFeatureSource(self.INPUTVECTOR, self.tr('待检查数据'), types=[QgsProcessing.TypeVectorPolygon])) self.addParameter(QgsProcessingParameterVectorDestination(self.OUTPUTVECTOR, self.tr('输出位置(矢量数据和报告)'))) def name(self): return 'A03' def icon(self): return QIcon(os.path.join(pluginPath, 'images', 'dbms', 'topology.png')) def displayName(self): return self.tr(getTopoCheckNote(self.TOPOLOGYTYPE)) def shortDescription(self): return self.tr(getTopoCheckDescription(self.TOPOLOGYTYPE)) def tags(self): t = self.tr('import,into,postgis,database,vector').split(',') t.extend(super().tags()) return t def group(self): return self.tr('拓扑检查') def groupId(self): return 'topology' def topologycheck(self, parameters, context, feedback, executing=True): print("面不能有空隙检查开始啦") # TODO 参数设置 spatialite_db_path = self.spatialite_db_path # Spatialite 数据库路径 table_name = self.in_table # 导入后的表名 outtable_name = self.out_table # 分析执行后的表名 output_vector_path = self.parameterAsOutputLayer(parameters, self.OUTPUTVECTOR, context) # 输出的 SHP 文件路径 export_layer_name = outtable_name # TODO 将 vectorlayer导入到Spatialite input_layer = self.parameterAsVectorLayer(parameters, self.INPUTVECTOR, context) if not input_layer.isValid(): return { "状态": "拓扑检查失败", "错误信息": f"Failed to load SHP file." } crs = input_layer.crs().authid() # TODO 构造连接到Spatialite数据库的URI uri = QgsDataSourceUri() uri.setDatabase(spatialite_db_path) # TODO 将vectorlayer写入Spatialite 数据库 options = QgsVectorFileWriter.SaveVectorOptions() options.driverName = "SQLite" options.layerName = table_name options.fileEncoding = "UTF-8" options.actionOnExistingFile = QgsVectorFileWriter.CreateOrOverwriteLayer # TODO 将QgsVectorLayer导入到spatialite并指定表名 result = QgsVectorFileWriter.writeAsVectorFormatV2(input_layer, spatialite_db_path, QgsCoordinateTransformContext(), options) if result[0] != QgsVectorFileWriter.NoError: return { "状态": "拓扑检查失败", "错误信息": f"Failed to import SHP file to Spatialite." } print(f"SHP file imported to Spatialite as table: {table_name}") # TODO 执行 SQL 语句 conn = sqlite3.connect(spatialite_db_path) conn.enable_load_extension(True) cursor = conn.cursor() conn.execute("PRAGMA synchronous = OFF") conn.execute("PRAGMA cache_size = -20000") # In KB conn.execute("PRAGMA temp_store = MEMORY") try: conn.execute("SELECT load_extension('mod_spatialite');") print("mod_spatialite loaded successfully.") except sqlite3.OperationalError as e: return { "状态": "拓扑检查失败", "错误信息": f"Failed to load extension: {e}" } # TODO SQL语句 sql = getTopoCheckSQL(self.TOPOLOGYTYPE, self.TOPOLOGYPARAMS) try: cursor.executescript(f"{sql}") print(f"Script executed successfully.{sql}") except sqlite3.Error as e: return { "状态": "拓扑检查失败", "错误信息": f"An error occurred: {e}" } conn.commit() conn.close() print("SQL execution completed.") # TODO 将数据导出为本地矢量文件 processed_layer = QgsVectorLayer(f"{spatialite_db_path}|layername={export_layer_name}", "processed_layer", "ogr") path = getParentFolderPath(output_vector_path) csv_path = f"{path}\\report.csv" if not processed_layer.isValid(): return { "状态": "拓扑检查失败", "错误信息": "Failed to load processed layer." } else: options = QgsVectorFileWriter.SaveVectorOptions() options.driverName = "CSV" # Output format options.includeGeometry = True # Include geometry # options.setLayerOptions(QgsVectorFileWriter.LayerOptions()) # context = QgsCoordinateTransformContext() # options.setOutputWkt(True) context2 = QgsCoordinateTransformContext() error = QgsVectorFileWriter.writeAsVectorFormatV3(processed_layer, csv_path, context2, options) if error[0] == QgsVectorFileWriter.NoError: print(f"Exported {table_name} to {csv_path} successfully!") else: return { "状态": "拓扑检查失败", "错误信息": f"Failed to export {table_name} to CSV: {error}" } # TODO 导出矢量文件参数配置 export_options = QgsVectorFileWriter.SaveVectorOptions() export_options.driverName = "ESRI Shapefile" export_options.fileEncoding = "UTF-8" result = QgsVectorFileWriter.writeAsVectorFormatV2(processed_layer, output_vector_path, QgsCoordinateTransformContext(), export_options) if result[0] != QgsVectorFileWriter.NoError: return { "状态": "拓扑检查失败", "错误信息": f"Failed to export processed layer to SHP file." } feature_count = processed_layer.featureCount() return { "状态": "拓扑检查成功", "结论": "不符合" if feature_count > 0 else "符合", "错误项": f"{feature_count}条记录", # "矢量数据输出位置": output_vector_path, "报告输出位置": csv_path } # 判断数据是否为字符串 def is_string(self, var): return isinstance(var, str) def getConsoleCommands(self, parameters, context, feedback, executing=True): inputvector = self.parameterAsVectorLayer(parameters, self.INPUTVECTOR, context) print(f"拓扑检查输入矢量图层:") print(inputvector) outFile = self.parameterAsOutputLayer(parameters, self.OUTPUTVECTOR, context) print(f"拓扑检查输出文件:{outFile}") return [] def contains_keys(self, obj, keys): if isinstance(obj, dict): return all(key in obj.keys() for key in keys) elif hasattr(type(obj), '__dict__'): return all(key in obj.__dict__ for key in keys) else: raise ValueError("Invalid object type") def commandName(self): return "ogr2ogr"