123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- # -*- coding: utf-8 -*-
- """
- ***************************************************************************
- L03.py
- ---------------------
- Date : November 2012
- Copyright : (C) 2012 by Victor Olaya
- Email : volayaf at gmail dot com
- ***************************************************************************
- * *
- * This program is free software; you can redistribute it and/or modify *
- * it under the terms of the GNU General Public License as published by *
- * the Free Software Foundation; either version 2 of the License, or *
- * (at your option) any later version. *
- * *
- ***************************************************************************
- """
- __author__ = 'wanger'
- __date__ = 'November 2024'
- __copyright__ = '(C) 2024, wanger'
- import os
- from PyQt5.QtSql import QSqlDatabase, QSqlQuery
- from osgeo import ogr, gdal
- from PyQt5.QtGui import QIcon
- from PyQt5.QtWidgets import QApplication
- from future.moves import sys
- from qgis.PyQt import QtWidgets
- from qgis._core import QgsProcessingParameterVectorDestination, QgsVectorLayer, QgsVectorFileWriter, \
- QgsCoordinateTransformContext
- from qgis.core import (QgsProcessing,
- QgsProcessingParameterFeatureSource,
- QgsProcessingParameterString,
- QgsProcessingParameterFile,
- QgsProcessingParameterDateTime,
- QgsProcessingParameterEnum,
- QgsProcessingParameterCrs,
- QgsProcessingParameterField,
- QgsProcessingParameterExtent,
- QgsProcessingParameterBoolean,
- QgsProcessingParameterProviderConnection,
- QgsProcessingParameterDatabaseSchema,
- QgsProcessingParameterDatabaseTable,
- QgsProviderRegistry,
- QgsProcessingException,
- QgsProcessingParameterDefinition,
- QgsProviderConnectionException,
- QgsDataSourceUri)
- from processing.algs.gdal.GdalAlgorithm import GdalAlgorithm
- from processing.algs.gdal.GdalUtils import GdalUtils
- from processing.tools.PrintUtils import printStr
- from processing.tools.StringUtils import (getConnectionStr, getNow)
- from processing.tools.GeoServer.Geoserver import Geoserver
- from processing.tools.PostgreSQL.PostgreSQL import PostgreSQL
- from processing.tools.StringUtils import getUUID
- from processing.tools.FileUtils import getParentFolderPath
- from processing.tools.topology.read import (getTopoCheckSQL, getTopoCheckDescription, getTopoCheckNote)
- from processing.tools.SpatiaLite.SpatiaLite import SpatiaLiteUtils
- import sqlite3
- pluginPath = os.path.dirname(os.path.abspath(__file__))
- gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES")
- gdal.SetConfigOption("SHAPE_ENCODING", "GBK")
- class L03(GdalAlgorithm):
- INPUTVECTOR = "INPUTVECTOR"
- OUTPUTVECTOR = 'OUTPUTVECTOR'
- TOPOLOGYTYPE = "L03"
- uid = getUUID()
- in_table = "topology_in_table"
- out_table = "topology_out_table"
- TOPOLOGYPARAMS = {
- "intable_s": f"{in_table}_{uid}",
- "outtable": f"{out_table}_{uid}",
- }
- def __init__(self):
- super().__init__()
- def initAlgorithm(self, config=None):
- self.addParameter(QgsProcessingParameterFeatureSource(self.INPUTVECTOR,
- self.tr('待检查数据'),
- types=[QgsProcessing.TypeVectorLine]))
- self.addParameter(QgsProcessingParameterVectorDestination(self.OUTPUTVECTOR,
- self.tr('输出位置(矢量数据和报告)')))
- def name(self):
- return self.TOPOLOGYTYPE
- def icon(self):
- return QIcon(os.path.join(pluginPath, 'topology.png'))
- def displayName(self):
- return self.tr(getTopoCheckNote(self.TOPOLOGYTYPE))
- def shortDescription(self):
- return self.tr(getTopoCheckDescription(self.TOPOLOGYTYPE))
- def tags(self):
- t = self.tr('import,into,postgis,database,vector').split(',')
- t.extend(super().tags())
- return t
- def group(self):
- return self.tr('拓扑检查')
- def groupId(self):
- return 'topology'
- def topologycheck(self, parameters, context, feedback, executing=True):
- print("拓扑检查开始啦")
- # TODO 参数设置
- spatialite_db_path = self.spatialite_db_path # Spatialite 数据库路径
- table_name = f"{self.in_table}_{self.uid}" # 导入后的表名
- outtable_name = f"{self.out_table}_{self.uid}" # 分析执行后的表名
- output_vector_path = self.parameterAsOutputLayer(parameters, self.OUTPUTVECTOR, context) # 输出的 SHP 文件路径
- export_layer_name = outtable_name
- # TODO 将 vectorlayer导入到Spatialite
- input_layer = self.parameterAsVectorLayer(parameters, self.INPUTVECTOR, context)
- if not input_layer.isValid():
- return {
- "状态": "拓扑检查失败",
- "错误信息": f"图层数据不可用。"
- }
- result = self.importLayerToSpatiaLite(layer=input_layer, table_name=table_name)
- if result[0] != QgsVectorFileWriter.NoError:
- return {
- "状态": "拓扑检查失败",
- "错误信息": f"图层数据导出到SpatiaLite数据库失败。"
- }
- print(f"file imported to Spatialite as table: {table_name}")
- dbutils = SpatiaLiteUtils(spatialite_db_path)
- # TODO SQL语句
- sql = getTopoCheckSQL(self.TOPOLOGYTYPE, self.TOPOLOGYPARAMS)
- try:
- dbutils.executescript(f"{sql}", self.TOPOLOGYPARAMS)
- except sqlite3.Error as e:
- return {
- "状态": "拓扑检查失败",
- "错误信息": f"SpatiaLite数据库执行SQL语句失败: {e}"
- }
- # TODO 获取结果集条目
- feature_count = dbutils.queryCount(outtable_name)
- if feature_count == 0:
- dbutils.resetDb()
- dbutils.closeDb()
- return self.topologySuccessWithNone()
- # TODO 将数据导出为本地矢量文件
- processed_layer = QgsVectorLayer(f"{spatialite_db_path}|layername={export_layer_name}", "processed_layer",
- "ogr")
- path = getParentFolderPath(output_vector_path)
- csv_path = f"{path}\\report.csv"
- if not processed_layer.isValid():
- dbutils.closeDb()
- return {
- "状态": "拓扑检查失败",
- "错误信息": "拓扑检查结果图层加载失败。"
- }
- else:
- self.vectorLayerExport(layer=processed_layer, type="CSV", path=csv_path)
- self.vectorLayerExportToVector(layer=processed_layer, path=output_vector_path)
- os.startfile(path)
- dbutils.resetDb()
- dbutils.closeDb()
- return self.topologySuccess(count=feature_count, reportpath=csv_path)
- # 判断数据是否为字符串
- def is_string(self, var):
- return isinstance(var, str)
- def getConsoleCommands(self, parameters, context, feedback, executing=True):
- return []
- def contains_keys(self, obj, keys):
- if isinstance(obj, dict):
- return all(key in obj.keys() for key in keys)
- elif hasattr(type(obj), '__dict__'):
- return all(key in obj.__dict__ for key in keys)
- else:
- raise ValueError("Invalid object type")
- def commandName(self):
- return "ogr2ogr"
|