123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- # -*- coding: utf-8 -*-
- """
- ***************************************************************************
- TopologyCheck.py
- ---------------------
- Date : November 2012
- Copyright : (C) 2012 by Victor Olaya
- Email : volayaf at gmail dot com
- ***************************************************************************
- * *
- * This program is free software; you can redistribute it and/or modify *
- * it under the terms of the GNU General Public License as published by *
- * the Free Software Foundation; either version 2 of the License, or *
- * (at your option) any later version. *
- * *
- ***************************************************************************
- """
- __author__ = 'wanger'
- __date__ = 'November 2024'
- __copyright__ = '(C) 2024, wanger'
- import os
- from PyQt5.QtSql import QSqlDatabase, QSqlQuery
- from osgeo import ogr, gdal
- from PyQt5.QtGui import QIcon
- from PyQt5.QtWidgets import QApplication
- from future.moves import sys
- from qgis.PyQt import QtWidgets
- from qgis._core import QgsProcessingParameterVectorDestination, QgsVectorLayer, QgsVectorFileWriter, \
- QgsCoordinateTransformContext
- from qgis.core import (QgsProcessing,
- QgsProcessingParameterFeatureSource,
- QgsProcessingParameterString,
- QgsProcessingParameterFile,
- QgsProcessingParameterDateTime,
- QgsProcessingParameterEnum,
- QgsProcessingParameterCrs,
- QgsProcessingParameterField,
- QgsProcessingParameterExtent,
- QgsProcessingParameterBoolean,
- QgsProcessingParameterProviderConnection,
- QgsProcessingParameterDatabaseSchema,
- QgsProcessingParameterDatabaseTable,
- QgsProviderRegistry,
- QgsProcessingException,
- QgsProcessingParameterDefinition,
- QgsProviderConnectionException,
- QgsDataSourceUri)
- from processing.algs.gdal.GdalAlgorithm import GdalAlgorithm
- from processing.algs.gdal.GdalUtils import GdalUtils
- from processing.tools.PrintUtils import printStr
- from processing.tools.StringUtils import (getConnectionStr, getNow)
- from processing.tools.GeoServer.Geoserver import Geoserver
- from processing.tools.PostgreSQL.PostgreSQL import PostgreSQL
- from processing.tools.system import isWindows
- pluginPath = os.path.normpath(os.path.join(
- os.path.split(os.path.dirname(__file__))[0], os.pardir))
- gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES")
- gdal.SetConfigOption("SHAPE_ENCODING", "GBK")
- class TopologyCheck(GdalAlgorithm):
- INPUTVECTOR = "INPUTVECTOR"
- TOPOLOGYTYPE = "TOPOLOGYTYPE"
- TOPOLOGYTYPES = ["必须不相交", "不能重叠", "不能相交", "不能有悬挂点", "不能有伪结点", "不能自重叠", "不能自相交",
- "不能相交或内部接触", "必须为单一部件", "不能有空隙"]
- OUTPUTVECTOR = 'OUTPUTVECTOR'
- def __init__(self):
- super().__init__()
- def initAlgorithm(self, config=None):
- self.addParameter(QgsProcessingParameterFeatureSource(self.INPUTVECTOR,
- self.tr('待检查数据'),
- types=[QgsProcessing.TypeVector]))
- EnumOption = QgsProcessingParameterEnum(name=self.TOPOLOGYTYPE,
- description=self.tr('拓扑检查类型'),
- options=self.TOPOLOGYTYPES)
- EnumOption.setAllowMultiple(True)
- self.addParameter(EnumOption)
- self.addParameter(QgsProcessingParameterVectorDestination(self.OUTPUTVECTOR,
- self.tr('输出位置')))
- def name(self):
- return 'topologycheck'
- def icon(self):
- return QIcon(os.path.join(pluginPath, 'images', 'dbms', 'topology.png'))
- def displayName(self):
- return self.tr('拓扑检查')
- def shortDescription(self):
- return self.tr('拓扑检查')
- def tags(self):
- t = self.tr('import,into,postgis,database,vector').split(',')
- t.extend(super().tags())
- return t
- def group(self):
- return self.tr('拓扑检查')
- def groupId(self):
- return 'topology'
- def topologycheck(self, parameters, context, feedback, executing=True):
- print("拓扑检查开始啦")
- types = print(parameters[self.TOPOLOGYTYPE])
- print(f"拓扑检查类型:{types}")
- # TODO 参数设置
- spatialite_db_path = "D:\\temp\\output.sqlite" # Spatialite 数据库路径
- table_name = "topology_table" # 导入后的表名
- output_vector_path = self.parameterAsOutputLayer(parameters, self.OUTPUTVECTOR, context) # 输出的 SHP 文件路径
- export_layer_name = table_name
- # TODO 将 vectorlayer导入到Spatialite
- input_layer = self.parameterAsVectorLayer(parameters, self.INPUTVECTOR, context)
- if not input_layer.isValid():
- print("图层数据不可用。")
- crs = input_layer.crs().authid()
- # TODO 构造连接到Spatialite数据库的URI
- uri = QgsDataSourceUri()
- uri.setDatabase(spatialite_db_path)
- # TODO 将vectorlayer写入Spatialite 数据库
- options = QgsVectorFileWriter.SaveVectorOptions()
- options.driverName = "SQLite"
- options.layerName = table_name
- options.fileEncoding = "UTF-8"
- options.actionOnExistingFile = QgsVectorFileWriter.CreateOrOverwriteLayer
- # TODO 将QgsVectorLayer导入到spatialite并指定表名
- result = QgsVectorFileWriter.writeAsVectorFormatV2(input_layer, spatialite_db_path,
- QgsCoordinateTransformContext(), options)
- if result[0] != QgsVectorFileWriter.NoError:
- print("Failed to import SHP file to Spatialite.")
- print(f"SHP file imported to Spatialite as table: {table_name}")
- # TODO 执行 SQL 语句
- db = QSqlDatabase.addDatabase("QSQLITE")
- db.setDatabaseName(spatialite_db_path)
- if not db.open():
- print("Failed to open Spatialite database.")
- query = QSqlQuery(db)
- # TODO SQL语句
- sql = f"""
- SELECT * FROM {table_name};
- """
- if not query.exec(sql):
- print("SQL execution failed:", query.lastError().text())
- print("SQL execution completed.")
- # TODO 将数据导出为本地矢量文件
- processed_layer = QgsVectorLayer(f"{spatialite_db_path}|layername={export_layer_name}", "processed_layer",
- "ogr")
- if not processed_layer.isValid():
- print("拓扑检查结果图层加载失败。")
- # TODO 导出矢量文件参数配置
- export_options = QgsVectorFileWriter.SaveVectorOptions()
- export_options.driverName = "ESRI Shapefile"
- export_options.fileEncoding = "UTF-8"
- result = QgsVectorFileWriter.writeAsVectorFormatV2(processed_layer, output_vector_path,
- QgsCoordinateTransformContext(), export_options)
- if result[0] != QgsVectorFileWriter.NoError:
- print("Failed to export processed layer to SHP file.")
- print(f"Processed layer exported to SHP file: {output_vector_path}")
- return {
- "状态": "拓扑检查成功",
- "错误项": "条记录",
- "矢量数据输出位置": output_vector_path,
- "报告输出位置": ""
- }
- # 判断数据是否为字符串
- def is_string(self, var):
- return isinstance(var, str)
- def getConsoleCommands(self, parameters, context, feedback, executing=True):
- inputvector = self.parameterAsVectorLayer(parameters, self.INPUTVECTOR, context)
- print(f"拓扑检查输入矢量图层:")
- print(inputvector)
- outFile = self.parameterAsOutputLayer(parameters, self.OUTPUTVECTOR, context)
- print(f"拓扑检查输出文件:{outFile}")
- return []
- def contains_keys(self, obj, keys):
- if isinstance(obj, dict):
- return all(key in obj.keys() for key in keys)
- elif hasattr(type(obj), '__dict__'):
- return all(key in obj.__dict__ for key in keys)
- else:
- raise ValueError("Invalid object type")
- def commandName(self):
- return "ogr2ogr"
|