|
@@ -0,0 +1,209 @@
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+"""
|
|
|
+***************************************************************************
|
|
|
+ AA01.py
|
|
|
+ ---------------------
|
|
|
+ Date : November 2012
|
|
|
+ Copyright : (C) 2012 by Victor Olaya
|
|
|
+ Email : volayaf at gmail dot com
|
|
|
+***************************************************************************
|
|
|
+* *
|
|
|
+* This program is free software; you can redistribute it and/or modify *
|
|
|
+* it under the terms of the GNU General Public License as published by *
|
|
|
+* the Free Software Foundation; either version 2 of the License, or *
|
|
|
+* (at your option) any later version. *
|
|
|
+* *
|
|
|
+***************************************************************************
|
|
|
+"""
|
|
|
+
|
|
|
+__author__ = 'wanger'
|
|
|
+__date__ = 'November 2024'
|
|
|
+__copyright__ = '(C) 2024, wanger'
|
|
|
+
|
|
|
+import os
|
|
|
+
|
|
|
+from PyQt5.QtSql import QSqlDatabase, QSqlQuery
|
|
|
+from osgeo import ogr, gdal
|
|
|
+from PyQt5.QtGui import QIcon
|
|
|
+from PyQt5.QtWidgets import QApplication
|
|
|
+from future.moves import sys
|
|
|
+
|
|
|
+from qgis.PyQt import QtWidgets
|
|
|
+from qgis._core import QgsProcessingParameterVectorDestination, QgsVectorLayer, QgsVectorFileWriter, \
|
|
|
+ QgsCoordinateTransformContext
|
|
|
+from qgis.core import (QgsProcessing,
|
|
|
+ QgsProcessingParameterFeatureSource,
|
|
|
+ QgsProcessingParameterString,
|
|
|
+ QgsProcessingParameterFile,
|
|
|
+ QgsProcessingParameterDateTime,
|
|
|
+ QgsProcessingParameterEnum,
|
|
|
+ QgsProcessingParameterCrs,
|
|
|
+ QgsProcessingParameterField,
|
|
|
+ QgsProcessingParameterExtent,
|
|
|
+ QgsProcessingParameterBoolean,
|
|
|
+ QgsProcessingParameterProviderConnection,
|
|
|
+ QgsProcessingParameterDatabaseSchema,
|
|
|
+ QgsProcessingParameterDatabaseTable,
|
|
|
+ QgsProviderRegistry,
|
|
|
+ QgsProcessingException,
|
|
|
+ QgsProcessingParameterDefinition,
|
|
|
+ QgsProviderConnectionException,
|
|
|
+ QgsDataSourceUri)
|
|
|
+
|
|
|
+from processing.algs.gdal.GdalAlgorithm import GdalAlgorithm
|
|
|
+from processing.algs.gdal.GdalUtils import GdalUtils
|
|
|
+from processing.tools.PrintUtils import printStr
|
|
|
+from processing.tools.StringUtils import (getConnectionStr, getNow)
|
|
|
+from processing.tools.GeoServer.Geoserver import Geoserver
|
|
|
+from processing.tools.PostgreSQL.PostgreSQL import PostgreSQL
|
|
|
+from processing.tools.StringUtils import getUUID
|
|
|
+from processing.tools.FileUtils import getParentFolderPath
|
|
|
+from processing.tools.topology.read import (getTopoCheckSQL, getTopoCheckDescription, getTopoCheckNote)
|
|
|
+from processing.tools.SpatiaLite.SpatiaLite import SpatiaLiteUtils
|
|
|
+from processing.tools.SpatiaLite.SpatiaLite import SpatiaLiteUtils
|
|
|
+import sqlite3
|
|
|
+
|
|
|
+pluginPath = os.path.normpath(os.path.join(
|
|
|
+ os.path.split(os.path.dirname(__file__))[0], os.pardir))
|
|
|
+gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES")
|
|
|
+gdal.SetConfigOption("SHAPE_ENCODING", "GBK")
|
|
|
+
|
|
|
+
|
|
|
+class AA01(GdalAlgorithm):
|
|
|
+ INPUTVECTOR = "INPUTVECTOR"
|
|
|
+ INPUTTARGETVECTOR = "INPUTTARGETVECTOR"
|
|
|
+ OUTPUTVECTOR = 'OUTPUTVECTOR'
|
|
|
+ TOPOLOGYTYPE = "AA01"
|
|
|
+ uid = getUUID()
|
|
|
+ in_table = "topology_in_table"
|
|
|
+ in_table_t = "topology_in_table_t"
|
|
|
+ out_table = "topology_out_table"
|
|
|
+ TOPOLOGYPARAMS = {
|
|
|
+ "intable_s": f"{in_table}_{uid}",
|
|
|
+ "intable_t": f"{in_table_t}_{uid}",
|
|
|
+ "outtable": f"{out_table}_{uid}"
|
|
|
+ }
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ super().__init__()
|
|
|
+
|
|
|
+ def initAlgorithm(self, config=None):
|
|
|
+ self.addParameter(QgsProcessingParameterFeatureSource(self.INPUTVECTOR,
|
|
|
+ self.tr('待检查数据'),
|
|
|
+ types=[QgsProcessing.TypeVectorPolygon]))
|
|
|
+ self.addParameter(QgsProcessingParameterFeatureSource(self.INPUTTARGETVECTOR,
|
|
|
+ self.tr('目标表'),
|
|
|
+ types=[QgsProcessing.TypeVectorPolygon]))
|
|
|
+ self.addParameter(QgsProcessingParameterVectorDestination(self.OUTPUTVECTOR,
|
|
|
+ self.tr('输出位置(矢量数据和报告)')))
|
|
|
+
|
|
|
+ def name(self):
|
|
|
+ return self.TOPOLOGYTYPE
|
|
|
+
|
|
|
+ def icon(self):
|
|
|
+ return QIcon(os.path.join(pluginPath, 'images', 'dbms', 'topology.png'))
|
|
|
+
|
|
|
+ def displayName(self):
|
|
|
+ return self.tr(getTopoCheckNote(self.TOPOLOGYTYPE))
|
|
|
+
|
|
|
+ def shortDescription(self):
|
|
|
+ return self.tr(getTopoCheckDescription(self.TOPOLOGYTYPE))
|
|
|
+
|
|
|
+ def tags(self):
|
|
|
+ t = self.tr('import,into,postgis,database,vector').split(',')
|
|
|
+ t.extend(super().tags())
|
|
|
+ return t
|
|
|
+
|
|
|
+ def group(self):
|
|
|
+ return self.tr('拓扑检查')
|
|
|
+
|
|
|
+ def groupId(self):
|
|
|
+ return 'topology'
|
|
|
+
|
|
|
+ def topologycheck(self, parameters, context, feedback, executing=True):
|
|
|
+ print("拓扑检查开始啦")
|
|
|
+ # TODO 参数设置
|
|
|
+ spatialite_db_path = self.spatialite_db_path # Spatialite 数据库路径
|
|
|
+ table_name = f"{self.in_table}_{self.uid}" # 导入后的表名
|
|
|
+ table_t_name = f"{self.in_table_t}_{self.uid}" # 导入后的目标表名
|
|
|
+ outtable_name = f"{self.out_table}_{self.uid}" # 分析执行后的表名
|
|
|
+ output_vector_path = self.parameterAsOutputLayer(parameters, self.OUTPUTVECTOR, context) # 输出的 SHP 文件路径
|
|
|
+ export_layer_name = outtable_name
|
|
|
+ # TODO 将 vectorlayer导入到Spatialite
|
|
|
+ input_layer = self.parameterAsVectorLayer(parameters, self.INPUTVECTOR, context)
|
|
|
+ if not input_layer.isValid():
|
|
|
+ return {
|
|
|
+ "状态": "拓扑检查失败",
|
|
|
+ "错误信息": f"图层数据不可用。"
|
|
|
+ }
|
|
|
+ result = self.importLayerToSpatiaLite(layer=input_layer, table_name=table_name)
|
|
|
+ if result[0] != QgsVectorFileWriter.NoError:
|
|
|
+ return {
|
|
|
+ "状态": "拓扑检查失败",
|
|
|
+ "错误信息": f"图层数据导出到SpatiaLite数据库失败。"
|
|
|
+ }
|
|
|
+ input_target_layer = self.parameterAsVectorLayer(parameters, self.INPUTTARGETVECTOR, context)
|
|
|
+ if not input_target_layer.isValid():
|
|
|
+ return {
|
|
|
+ "状态": "拓扑检查失败",
|
|
|
+ "错误信息": f"图层数据不可用。"
|
|
|
+ }
|
|
|
+ result = self.importLayerToSpatiaLite(layer=input_target_layer, table_name=table_t_name)
|
|
|
+ if result[0] != QgsVectorFileWriter.NoError:
|
|
|
+ return {
|
|
|
+ "状态": "拓扑检查失败",
|
|
|
+ "错误信息": f"图层数据导出到SpatiaLite数据库失败。"
|
|
|
+ }
|
|
|
+ print(f"file imported to Spatialite as table: {table_name}")
|
|
|
+ dbutils = SpatiaLiteUtils(spatialite_db_path)
|
|
|
+ # TODO SQL语句
|
|
|
+ sql = getTopoCheckSQL(self.TOPOLOGYTYPE, self.TOPOLOGYPARAMS)
|
|
|
+ try:
|
|
|
+ dbutils.executescript(f"{sql}")
|
|
|
+ except sqlite3.Error as e:
|
|
|
+ return {
|
|
|
+ "状态": "拓扑检查失败",
|
|
|
+ "错误信息": f"SpatiaLite数据库执行SQL语句失败: {e}"
|
|
|
+ }
|
|
|
+ # TODO 获取结果集条目
|
|
|
+ feature_count = dbutils.queryCount(outtable_name)
|
|
|
+ if feature_count == 0:
|
|
|
+ dbutils.resetDb()
|
|
|
+ dbutils.closeDb()
|
|
|
+ return self.topologySuccessWithNone()
|
|
|
+ # TODO 将数据导出为本地矢量文件
|
|
|
+ processed_layer = QgsVectorLayer(f"{spatialite_db_path}|layername={export_layer_name}", "processed_layer",
|
|
|
+ "ogr")
|
|
|
+ path = getParentFolderPath(output_vector_path)
|
|
|
+ csv_path = f"{path}\\report.csv"
|
|
|
+ if not processed_layer.isValid():
|
|
|
+ dbutils.closeDb()
|
|
|
+ return {
|
|
|
+ "状态": "拓扑检查失败",
|
|
|
+ "错误信息": "拓扑检查结果图层加载失败。"
|
|
|
+ }
|
|
|
+ else:
|
|
|
+ self.vectorLayerExport(layer=processed_layer, type="CSV", path=csv_path)
|
|
|
+ self.vectorLayerExportToVector(layer=processed_layer, path=output_vector_path)
|
|
|
+ os.startfile(path)
|
|
|
+ dbutils.resetDb()
|
|
|
+ dbutils.closeDb()
|
|
|
+ return self.topologySuccess(count=feature_count, reportpath=csv_path)
|
|
|
+
|
|
|
+ # 判断数据是否为字符串
|
|
|
+ def is_string(self, var):
|
|
|
+ return isinstance(var, str)
|
|
|
+
|
|
|
+ def getConsoleCommands(self, parameters, context, feedback, executing=True):
|
|
|
+ return []
|
|
|
+
|
|
|
+ def contains_keys(self, obj, keys):
|
|
|
+ if isinstance(obj, dict):
|
|
|
+ return all(key in obj.keys() for key in keys)
|
|
|
+ elif hasattr(type(obj), '__dict__'):
|
|
|
+ return all(key in obj.__dict__ for key in keys)
|
|
|
+ else:
|
|
|
+ raise ValueError("Invalid object type")
|
|
|
+
|
|
|
+ def commandName(self):
|
|
|
+ return "ogr2ogr"
|