123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 |
- # -*- coding: utf-8 -*-
- """
- ***************************************************************************
- A03.py
- ---------------------
- Date : November 2012
- Copyright : (C) 2012 by Victor Olaya
- Email : volayaf at gmail dot com
- ***************************************************************************
- * *
- * This program is free software; you can redistribute it and/or modify *
- * it under the terms of the GNU General Public License as published by *
- * the Free Software Foundation; either version 2 of the License, or *
- * (at your option) any later version. *
- * *
- ***************************************************************************
- """
- __author__ = 'wanger'
- __date__ = 'November 2024'
- __copyright__ = '(C) 2024, wanger'
- import os
- from PyQt5.QtSql import QSqlDatabase, QSqlQuery
- from osgeo import ogr, gdal
- from PyQt5.QtGui import QIcon
- from PyQt5.QtWidgets import QApplication
- from future.moves import sys
- from qgis.PyQt import QtWidgets
- from qgis._core import QgsProcessingParameterVectorDestination, QgsVectorLayer, QgsVectorFileWriter, \
- QgsCoordinateTransformContext
- from qgis.core import (QgsProcessing,
- QgsProcessingParameterFeatureSource,
- QgsProcessingParameterString,
- QgsProcessingParameterFile,
- QgsProcessingParameterDateTime,
- QgsProcessingParameterEnum,
- QgsProcessingParameterCrs,
- QgsProcessingParameterField,
- QgsProcessingParameterExtent,
- QgsProcessingParameterBoolean,
- QgsProcessingParameterProviderConnection,
- QgsProcessingParameterDatabaseSchema,
- QgsProcessingParameterDatabaseTable,
- QgsProviderRegistry,
- QgsProcessingException,
- QgsProcessingParameterDefinition,
- QgsProviderConnectionException,
- QgsDataSourceUri)
- from processing.algs.gdal.GdalAlgorithm import GdalAlgorithm
- from processing.algs.gdal.GdalUtils import GdalUtils
- from processing.tools.PrintUtils import printStr
- from processing.tools.StringUtils import (getConnectionStr, getNow)
- from processing.tools.GeoServer.Geoserver import Geoserver
- from processing.tools.PostgreSQL.PostgreSQL import PostgreSQL
- from processing.tools.system import isWindows
- from processing.tools.FileUtils import getParentFolderPath
- from processing.tools.topology.read import (getTopoCheckSQL, getTopoCheckDescription, getTopoCheckNote)
- import sqlite3
- pluginPath = os.path.normpath(os.path.join(
- os.path.split(os.path.dirname(__file__))[0], os.pardir))
- gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES")
- gdal.SetConfigOption("SHAPE_ENCODING", "GBK")
- class A03(GdalAlgorithm):
- INPUTVECTOR = "INPUTVECTOR"
- OUTPUTVECTOR = 'OUTPUTVECTOR'
- TOPOLOGYTYPE = "A03"
- in_table = "topology_table"
- out_table = "topology_table_temp"
- TOPOLOGYPARAMS = {
- "intable_s": in_table,
- "outtable": out_table
- }
- def __init__(self):
- super().__init__()
- def initAlgorithm(self, config=None):
- self.addParameter(QgsProcessingParameterFeatureSource(self.INPUTVECTOR,
- self.tr('待检查数据'),
- types=[QgsProcessing.TypeVectorPolygon]))
- self.addParameter(QgsProcessingParameterVectorDestination(self.OUTPUTVECTOR,
- self.tr('输出位置(矢量数据和报告)')))
- def name(self):
- return 'A03'
- def icon(self):
- return QIcon(os.path.join(pluginPath, 'images', 'dbms', 'topology.png'))
- def displayName(self):
- return self.tr(getTopoCheckNote(self.TOPOLOGYTYPE))
- def shortDescription(self):
- return self.tr(getTopoCheckDescription(self.TOPOLOGYTYPE))
- def tags(self):
- t = self.tr('import,into,postgis,database,vector').split(',')
- t.extend(super().tags())
- return t
- def group(self):
- return self.tr('拓扑检查')
- def groupId(self):
- return 'topology'
- def topologycheck(self, parameters, context, feedback, executing=True):
- print("面不能有空隙检查开始啦")
- # TODO 参数设置
- spatialite_db_path = self.spatialite_db_path # Spatialite 数据库路径
- table_name = self.in_table # 导入后的表名
- outtable_name = self.out_table # 分析执行后的表名
- output_vector_path = self.parameterAsOutputLayer(parameters, self.OUTPUTVECTOR, context) # 输出的 SHP 文件路径
- export_layer_name = outtable_name
- # TODO 将 vectorlayer导入到Spatialite
- input_layer = self.parameterAsVectorLayer(parameters, self.INPUTVECTOR, context)
- if not input_layer.isValid():
- return {
- "状态": "拓扑检查失败",
- "错误信息": f"Failed to load SHP file."
- }
- crs = input_layer.crs().authid()
- # TODO 构造连接到Spatialite数据库的URI
- uri = QgsDataSourceUri()
- uri.setDatabase(spatialite_db_path)
- # TODO 将vectorlayer写入Spatialite 数据库
- options = QgsVectorFileWriter.SaveVectorOptions()
- options.driverName = "SQLite"
- options.layerName = table_name
- options.fileEncoding = "UTF-8"
- options.actionOnExistingFile = QgsVectorFileWriter.CreateOrOverwriteLayer
- # TODO 将QgsVectorLayer导入到spatialite并指定表名
- result = QgsVectorFileWriter.writeAsVectorFormatV2(input_layer, spatialite_db_path,
- QgsCoordinateTransformContext(), options)
- if result[0] != QgsVectorFileWriter.NoError:
- return {
- "状态": "拓扑检查失败",
- "错误信息": f"Failed to import SHP file to Spatialite."
- }
- print(f"SHP file imported to Spatialite as table: {table_name}")
- # TODO 执行 SQL 语句
- conn = sqlite3.connect(spatialite_db_path)
- conn.enable_load_extension(True)
- cursor = conn.cursor()
- conn.execute("PRAGMA synchronous = OFF")
- conn.execute("PRAGMA cache_size = -20000") # In KB
- conn.execute("PRAGMA temp_store = MEMORY")
- try:
- conn.execute("SELECT load_extension('mod_spatialite');")
- print("mod_spatialite loaded successfully.")
- except sqlite3.OperationalError as e:
- return {
- "状态": "拓扑检查失败",
- "错误信息": f"Failed to load extension: {e}"
- }
- # TODO SQL语句
- sql = getTopoCheckSQL(self.TOPOLOGYTYPE, self.TOPOLOGYPARAMS)
- try:
- cursor.executescript(f"{sql}")
- print(f"Script executed successfully.{sql}")
- except sqlite3.Error as e:
- return {
- "状态": "拓扑检查失败",
- "错误信息": f"An error occurred: {e}"
- }
- conn.commit()
- conn.close()
- print("SQL execution completed.")
- # TODO 将数据导出为本地矢量文件
- processed_layer = QgsVectorLayer(f"{spatialite_db_path}|layername={export_layer_name}", "processed_layer",
- "ogr")
- path = getParentFolderPath(output_vector_path)
- csv_path = f"{path}\\report.csv"
- if not processed_layer.isValid():
- return {
- "状态": "拓扑检查失败",
- "错误信息": "Failed to load processed layer."
- }
- else:
- options = QgsVectorFileWriter.SaveVectorOptions()
- options.driverName = "CSV" # Output format
- options.includeGeometry = True # Include geometry
- # options.setLayerOptions(QgsVectorFileWriter.LayerOptions())
- # context = QgsCoordinateTransformContext()
- # options.setOutputWkt(True)
- context2 = QgsCoordinateTransformContext()
- error = QgsVectorFileWriter.writeAsVectorFormatV3(processed_layer, csv_path, context2, options)
- if error[0] == QgsVectorFileWriter.NoError:
- print(f"Exported {table_name} to {csv_path} successfully!")
- else:
- return {
- "状态": "拓扑检查失败",
- "错误信息": f"Failed to export {table_name} to CSV: {error}"
- }
- # TODO 导出矢量文件参数配置
- export_options = QgsVectorFileWriter.SaveVectorOptions()
- export_options.driverName = "ESRI Shapefile"
- export_options.fileEncoding = "UTF-8"
- result = QgsVectorFileWriter.writeAsVectorFormatV2(processed_layer, output_vector_path,
- QgsCoordinateTransformContext(), export_options)
- if result[0] != QgsVectorFileWriter.NoError:
- return {
- "状态": "拓扑检查失败",
- "错误信息": f"Failed to export processed layer to SHP file."
- }
- feature_count = processed_layer.featureCount()
- return {
- "状态": "拓扑检查成功",
- "结论": "不符合" if feature_count > 0 else "符合",
- "错误项": f"{feature_count}条记录",
- # "矢量数据输出位置": output_vector_path,
- "报告输出位置": csv_path
- }
- # 判断数据是否为字符串
- def is_string(self, var):
- return isinstance(var, str)
- def getConsoleCommands(self, parameters, context, feedback, executing=True):
- inputvector = self.parameterAsVectorLayer(parameters, self.INPUTVECTOR, context)
- print(f"拓扑检查输入矢量图层:")
- print(inputvector)
- outFile = self.parameterAsOutputLayer(parameters, self.OUTPUTVECTOR, context)
- print(f"拓扑检查输出文件:{outFile}")
- return []
- def contains_keys(self, obj, keys):
- if isinstance(obj, dict):
- return all(key in obj.keys() for key in keys)
- elif hasattr(type(obj), '__dict__'):
- return all(key in obj.__dict__ for key in keys)
- else:
- raise ValueError("Invalid object type")
- def commandName(self):
- return "ogr2ogr"
|