|
- """
- ***************************************************************************
- GdalAlgorithm.py
- ---------------------
- Date : August 2012
- Copyright : (C) 2012 by Victor Olaya
- Email : volayaf at gmail dot com
- ***************************************************************************
- * *
- * This program is free software; you can redistribute it and/or modify *
- * it under the terms of the GNU General Public License as published by *
- * the Free Software Foundation; either version 2 of the License, or *
- * (at your option) any later version. *
- * *
- ***************************************************************************
- """
- __author__ = 'Victor Olaya'
- __date__ = 'August 2012'
- __copyright__ = '(C) 2012, Victor Olaya'
- import os
- import re
- import numpy as np
- from PyQt5.QtGui import QIcon
- from osgeo import ogr
- from qgis.PyQt.QtCore import QUrl, QCoreApplication
- from qgis._core import QgsCoordinateTransformContext, QgsCoordinateReferenceSystem, QgsCoordinateTransform, \
- QgsVectorLayer, QgsWkbTypes, QgsFeature, QgsGeometry
- from qgis.core import (QgsApplication,
- QgsProject,
- QgsVectorFileWriter,
- QgsProcessingFeatureSourceDefinition,
- QgsProcessingAlgorithm,
- QgsProcessingContext,
- QgsProcessingFeedback,
- QgsProviderRegistry,
- QgsDataSourceUri)
- from processing.algs.gdal.GdalAlgorithmDialog import GdalAlgorithmDialog
- from processing.algs.gdal.GdalUtils import GdalUtils
- from processing.tools.PostgreSQL.PostgreSQL import PostgreSQL
- from processing.tools.GeoServer.Geoserver import Geoserver
- from processing.tools.GeoServer.GeoService import GeoService
- from processing.tools.QGS.QgsProjectUtils import QgsProjectUtils
- from processing.tools.PrintUtils import getLastPrint
- from processing.tools.requestUtils import (spotfileUpload, downloadspotfile, deletespotfile)
- from processing.tools.FileListPrintUtils import getFileListPrint
- from processing.tools.SubprocessUtils import RunSubprocess
- import processing.tools.QGS.load
- from qgis.PyQt.QtCore import NULL
- import config
- pluginPath = os.path.normpath(os.path.join(
- os.path.split(os.path.dirname(__file__))[0], os.pardir))
- class GdalAlgorithm(QgsProcessingAlgorithm):
- def __init__(self):
- super().__init__()
- self.db_config = config.CONFIG['db']
- self.geoserver_config = config.CONFIG['geoserver']
- # 全局参数修改
- self.output_values = {}
- self.spatialite_db_path = "D:\\temp\\output.sqlite"
- self.pgcoon = {
- "host": self.db_config['host'],
- "schema": self.db_config['schema'],
- "port": self.db_config['port'],
- "user": self.db_config['user'],
- "password": self.db_config['password'],
- "dbname": self.db_config['name']
- }
- self.geoservercoon = {
- "url": self.geoserver_config['url'],
- "username": self.geoserver_config['username'],
- "password": self.geoserver_config['password'],
- "defaultworkspace": self.geoserver_config['default_workspace'],
- "cachestart": self.geoserver_config['cachestart'],
- "cacheend": self.geoserver_config['cacheend']
- }
- def icon(self):
- return QIcon(os.path.join(pluginPath, 'images', 'dbms', 'tool.png'))
- # return QgsApplication.getThemeIcon("/providerGdal.svg")
- def tags(self):
- return ['ogr', 'gdal', self.commandName()]
- def svgIconPath(self):
- return os.path.join(pluginPath, 'images', 'dbms', 'tool.png')
- # return QgsApplication.iconPath("providerGdal.svg")
- def createInstance(self, config={}):
- return self.__class__()
- def createCustomParametersWidget(self, parent):
- return GdalAlgorithmDialog(self, parent=parent)
- def getConsoleCommands(self, parameters, context, feedback, executing=True):
- return None
- def getOgrCompatibleSource(self, parameter_name, parameters, context, feedback, executing):
- """
- Interprets a parameter as an OGR compatible source and layer name
- :param executing:
- """
- if not executing and parameter_name in parameters and isinstance(parameters[parameter_name],
- QgsProcessingFeatureSourceDefinition):
- # if not executing, then we throw away all 'selected features only' settings
- # since these have no meaning for command line gdal use, and we don't want to force
- # an export of selected features only to a temporary file just to show the command!
- parameters = {parameter_name: parameters[parameter_name].source}
- input_layer = self.parameterAsVectorLayer(parameters, parameter_name, context)
- ogr_data_path = None
- ogr_layer_name = None
- if input_layer is None or input_layer.dataProvider().name() == 'memory':
- if executing:
- # parameter is not a vector layer - try to convert to a source compatible with OGR
- # and extract selection if required
- ogr_data_path = self.parameterAsCompatibleSourceLayerPath(parameters, parameter_name, context,
- QgsVectorFileWriter.supportedFormatExtensions(),
- QgsVectorFileWriter.supportedFormatExtensions()[
- 0],
- feedback=feedback)
- ogr_layer_name = GdalUtils.ogrLayerName(ogr_data_path)
- else:
- # not executing - don't waste time converting incompatible sources, just return dummy strings
- # for the command preview (since the source isn't compatible with OGR, it has no meaning anyway and can't
- # be run directly in the command line)
- ogr_data_path = 'path_to_data_file'
- ogr_layer_name = 'layer_name'
- elif input_layer.dataProvider().name() == 'ogr':
- if executing and (
- isinstance(parameters[parameter_name], QgsProcessingFeatureSourceDefinition) and parameters[
- parameter_name].selectedFeaturesOnly) \
- or input_layer.subsetString():
- # parameter is a vector layer, with OGR data provider
- # so extract selection if required
- ogr_data_path = self.parameterAsCompatibleSourceLayerPath(parameters, parameter_name, context,
- QgsVectorFileWriter.supportedFormatExtensions(),
- feedback=feedback)
- parts = QgsProviderRegistry.instance().decodeUri('ogr', ogr_data_path)
- ogr_data_path = parts['path']
- if 'layerName' in parts and parts['layerName']:
- ogr_layer_name = parts['layerName']
- else:
- ogr_layer_name = GdalUtils.ogrLayerName(ogr_data_path)
- else:
- # either not using the selection, or
- # not executing - don't worry about 'selected features only' handling. It has no meaning
- # for the command line preview since it has no meaning outside of a QGIS session!
- ogr_data_path = GdalUtils.ogrConnectionStringAndFormatFromLayer(input_layer)[0]
- ogr_layer_name = GdalUtils.ogrLayerName(input_layer.dataProvider().dataSourceUri())
- elif input_layer.dataProvider().name().lower() == 'wfs':
- uri = QgsDataSourceUri(input_layer.source())
- baseUrl = uri.param('url').split('?')[0]
- ogr_data_path = f"WFS:{baseUrl}"
- ogr_layer_name = uri.param('typename')
- else:
- # vector layer, but not OGR - get OGR compatible path
- # TODO - handle "selected features only" mode!!
- ogr_data_path = GdalUtils.ogrConnectionStringFromLayer(input_layer)
- ogr_layer_name = GdalUtils.ogrLayerName(input_layer.dataProvider().dataSourceUri())
- return ogr_data_path, ogr_layer_name
- def setOutputValue(self, name, value):
- self.output_values[name] = value
- def processAlgorithm(self, parameters, context, feedback):
- print("############进入GdalAlgorithm组件的processAlgorithm################")
- # TODO wanger GDB入库
- if parameters.get("INPUTGDB") is not None:
- res = self.gdbimport(parameters, context, feedback, executing=True)
- return res
- # TODO wanger 拓扑检查
- if parameters.get("INPUTVECTOR") is not None:
- res = self.topologycheck(parameters, context, feedback, executing=True)
- return res
- # TODO wanger 生成许可文件
- if parameters.get("LICENSEHOST") is not None:
- self.generateLicense(parameters)
- return {
- "许可文件路径": parameters.get("OUTPUT")
- }
- # TODO wanger 上传图斑文件数据包
- if parameters.get("SPOTFILE") is not None:
- filepath = parameters.get("SPOTFILE")
- print(filepath)
- json_obj = spotfileUpload(filepath=filepath)
- status = int(json_obj["statuscode"])
- if status == 200:
- return {
- "状态": "上传成功!"
- }
- else:
- return {
- "状态": json_obj["message"]
- }
- # TODO wanger 下载图斑文件数据包
- if parameters.get("SPOTFILEDOWNLOAD") is not None:
- fileindex = parameters.get("SPOTFILEDOWNLOAD")
- print("spotfilelist")
- print(self.spotfilelist)
- fileparams = self.spotfilelist[fileindex]
- print(fileparams)
- filestr = downloadspotfile(filename=f'{fileparams["name"]}.{fileparams["filetype"]}',
- dir=parameters.get("SPOTFILEOUT"))
- return {
- "状态": filestr
- }
- # TODO wanger 删除图斑文件数据包
- if parameters.get("SPOTFILEDELETE") is not None:
- fileindex = parameters.get("SPOTFILEDELETE")
- print("spotfilelist")
- print(self.spotfilelist)
- fileparams = self.spotfilelist[fileindex]
- print(fileparams)
- filestr = deletespotfile(id=fileparams["id"])
- return {
- "状态": "删除成功"
- }
- # TODO wanger GeoServer服务发布
- if parameters.get("Publish_Service") is not None:
- zymlbsm = getLastPrint()
- if zymlbsm == None or zymlbsm == '':
- return {
- "Error": "资源目录未选择!"
- }
- print("zymlbsm====" + zymlbsm)
- layer_group_join = ""
- if parameters.get("LAYER_GROUP_JOIN") is not None:
- layer_group_join = self.groups[parameters.get("LAYER_GROUP_JOIN")]
- commands = self.getConsoleCommands(parameters, context, feedback, executing=True)
- geoSer = GeoService()
- result = None
- if parameters.get("INPUTRASTER") != NULL and parameters.get("INPUTRASTER") != "":
- # 获取输入栅格图层
- raster_layer = self.parameterAsRasterLayer(parameters, self.INPUTRASTER, context)
- file = raster_layer.source()
- result = geoSer.publishGeoService(parameters, context, feedback, commands, zymlbsm, layer_group_join,
- file)
- else:
- result = geoSer.publishGeoService(parameters, context, feedback, commands, zymlbsm, layer_group_join,
- None)
- return result
- # === 获取gdal命令参数执行并输出log开始 ===
- commands = self.getConsoleCommands(parameters, context, feedback, executing=True)
- # 增量更新
- if parameters.get("ALLOWOVERLAP") is not None and len(commands) == 0:
- res = self.updateVector(parameters, context, feedback, executing=True)
- return res
- # 版本回退
- if parameters.get("RESTORETABLE") is not None:
- res = self.restoreVector(parameters, context, feedback, executing=True)
- return res
- command = commands[0]
- if command.__contains__(" "):
- GdalUtils.runGdal([command], feedback)
- commands.remove(command)
- GdalUtils.runGdal(commands, feedback)
- else:
- GdalUtils.runGdal(commands, feedback)
- print(commands)
- results = {}
- for o in self.outputDefinitions():
- if o.name() in parameters:
- results[o.name()] = parameters[o.name()]
- for k, v in self.output_values.items():
- results[k] = v
- # === 获取gdal命令参数执行并输出log结束 ===
- # TODO wanger 元数据入库
- if parameters.get("Metadata_storage") is not None and parameters.get("Metadata_storage") == True:
- # 所属行政区划
- ssxzqh = getLastPrint()
- print("ssxzqh====" + ssxzqh)
- # 获取附件列表
- fileliststr = getFileListPrint()
- pgconn = PostgreSQL(schema='base')
- ogrLayer = ""
- layername = ""
- if parameters.get("SOURCE_TYPE") == "vector":
- ogrLayer, layername = self.getOgrCompatibleSource(self.INPUT, parameters, context, feedback, None)
- elif parameters.get("SOURCE_TYPE") == "raster":
- raster_layer = self.parameterAsRasterLayer(parameters, self.INPUT, context)
- ogrLayer = raster_layer.source()
- print(ogrLayer)
- pgconn.metadataStorage(parameters, ssxzqh, fileliststr, self.ywlxs[parameters.get("VECTOR_YWLX")],
- self.depts[parameters.get("VECTOR_GLBM")], ogrLayer, layername,
- self.zymls[parameters.get("VECTOR_ZYML")])
- pgconn.close()
- return results
- def commandName(self):
- parameters = {
- param.name(): "1"
- for param in self.parameterDefinitions()
- }
- context = QgsProcessingContext()
- feedback = QgsProcessingFeedback()
- name = self.getConsoleCommands(parameters, context, feedback, executing=False)[0]
- if name.endswith(".py"):
- name = name[:-3]
- return name
- # 将VectorLayer导入到SpatiaLite数据库
- def importLayerToSpatiaLite(self, layer, table_name):
- # TODO wanger 强制进行坐标系转换 统一入库数据坐标系 增强分析结果准确性
- target_crs = QgsCoordinateReferenceSystem("EPSG:4490")
- # 创建坐标转换对象
- transform_context = QgsProject.instance().transformContext()
- transform = QgsCoordinateTransform(layer.crs(), target_crs, transform_context)
- # 创建内存图层
- geometry_type = layer.wkbType() # 保留原始几何类型
- memory_layer = QgsVectorLayer(
- f"{QgsWkbTypes.displayString(geometry_type)}", # 指定几何类型
- "Transformed Layer",
- "memory"
- )
- # 设置内存图层的 CRS 和属性字段
- memory_layer.setCrs(target_crs)
- memory_layer.dataProvider().addAttributes(layer.fields()) # 复制字段
- memory_layer.updateFields()
- # 遍历原图层的所有要素,逐个转换几何并添加到内存图层
- features = []
- for feature in layer.getFeatures():
- new_feature = QgsFeature(feature) # 复制属性
- geom = feature.geometry()
- if geom: # 检查几何是否有效
- geom.transform(transform) # 转换几何坐标
- new_feature.setGeometry(geom)
- features.append(new_feature)
- # 将转换后的要素添加到内存图层
- memory_layer.dataProvider().addFeatures(features)
- # TODO 构造连接到Spatialite数据库的URI
- uri = QgsDataSourceUri()
- uri.setDatabase(self.spatialite_db_path)
- # TODO 将vectorlayer写入Spatialite 数据库
- options = QgsVectorFileWriter.SaveVectorOptions()
- options.driverName = "SQLite"
- options.layerName = table_name
- options.fileEncoding = "UTF-8"
- # options.crs = target_crs
- options.actionOnExistingFile = QgsVectorFileWriter.CreateOrOverwriteLayer
- # TODO 将QgsVectorLayer导入到spatialite并指定表名
- result = QgsVectorFileWriter.writeAsVectorFormatV2(memory_layer, self.spatialite_db_path,
- QgsProject.instance().transformContext(), options)
- return result
- # 将VectorLayer导出到文件
- def vectorLayerExport(self, layer, path, type):
- options = QgsVectorFileWriter.SaveVectorOptions()
- options.driverName = type # Output format
- # options.includeGeometry = True # Include geometry
- context2 = QgsCoordinateTransformContext()
- error = QgsVectorFileWriter.writeAsVectorFormatV3(layer, path, context2, options)
- if error[0] == QgsVectorFileWriter.NoError:
- print(f"Exported {path} successfully!")
- else:
- print(f"Failed to export to {path}: {error}")
- # 将VectorLayer导出到矢量文件
- def vectorLayerExportToVector(self, layer, path, type="ESRI Shapefile"):
- # TODO 导出矢量文件参数配置
- export_options = QgsVectorFileWriter.SaveVectorOptions()
- export_options.driverName = type
- export_options.fileEncoding = "UTF-8"
- result = QgsVectorFileWriter.writeAsVectorFormatV2(layer, path,
- QgsCoordinateTransformContext(), export_options)
- if result[0] != QgsVectorFileWriter.NoError:
- print(f"Failed to export processed layer to SHP file.")
- print(result)
- # 拓扑检查成功,没有错误项
- def topologySuccessWithNone(self):
- return {
- "状态": "拓扑检查成功",
- "结论": "符合"
- }
- # 拓扑检查成功
- def topologySuccess(self, count, reportpath):
- return {
- "状态": "拓扑检查成功",
- "结论": "不符合" if count > 0 else "符合",
- "错误项": f"{count}条记录",
- "报告输出位置": reportpath
- }
- def tr(self, string, context=''):
- if context == '':
- context = self.__class__.__name__
- return QCoreApplication.translate(context, string)
- def getLayerGeometryType(self, geom_type):
- geom_type_str = ""
- if geom_type == ogr.wkbPoint:
- geom_type_str = "Point"
- elif geom_type == ogr.wkbLineString:
- geom_type_str = "LineString"
- elif geom_type == ogr.wkbPolygon:
- geom_type_str = "Polygon"
- elif geom_type == ogr.wkbMultiPoint:
- geom_type_str = "MultiPoint"
- elif geom_type == ogr.wkbMultiLineString:
- geom_type_str = "MultiLineString"
- elif geom_type == ogr.wkbMultiPolygon:
- geom_type_str = "MultiPolygon"
- elif geom_type == ogr.wkbGeometryCollection:
- geom_type_str = "GeometryCollection"
- else:
- geom_type_str = None
- return geom_type_str
|