""" *************************************************************************** ogr2ogrtopostgislist.py --------------------- Date : November 2012 Copyright : (C) 2012 by Victor Olaya Email : volayaf at gmail dot com *************************************************************************** * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * *************************************************************************** """ __author__ = 'wanger' __date__ = 'November 2024' __copyright__ = '(C) 2024, wanger' import os from PyQt5.QtGui import QIcon from PyQt5.QtWidgets import QApplication from future.moves import sys from qgis.PyQt import QtWidgets from qgis.core import (QgsProcessing, QgsProcessingParameterFeatureSource, QgsProcessingParameterString, QgsProcessingParameterRasterLayer, QgsProcessingParameterFile, QgsProcessingParameterDateTime, QgsProcessingParameterEnum, QgsProcessingParameterCrs, QgsProcessingParameterField, QgsProcessingParameterExtent, QgsProcessingParameterBoolean, QgsProcessingParameterProviderConnection, QgsProcessingParameterDatabaseSchema, QgsProcessingParameterDatabaseTable, QgsProviderRegistry, QgsProcessingException, QgsProcessingParameterDefinition, QgsProviderConnectionException, QgsDataSourceUri) from processing.algs.gdal.GdalAlgorithm import GdalAlgorithm from processing.algs.gdal.GdalUtils import GdalUtils from processing.tools.PrintUtils import printStr from processing.tools.StringUtils import getConnectionStr from processing.tools.GeoServer.Geoserver import Geoserver from processing.tools.system import isWindows import siwei_config pluginPath = os.path.normpath(os.path.join( os.path.split(os.path.dirname(__file__))[0], os.pardir)) class Postgistogeoserver(GdalAlgorithm): DATABASE = 'DATABASE' INPUTFILE = 'INPUTFILE' INPUTRASTER = 'INPUTRASTER' SCHEMA = 'SCHEMA' TABLE = 'TABLE' HOST = 'HOST' DBNAME = 'DBNAME' PORT = 'PORT' USER = 'USER' PASSWORD = 'PASSWORD' WHERE = 'WHERE' # TODO GeoServer追加属性 Publish_Service = 'Publish_Service' GeoServer_URI = 'GeoServer_URI' Server_NAME = 'Server_NAME' GeoServer_WORKSPACE = 'GeoServer_WORKSPACE' GeoServer_USERNAME = 'GeoServer_USERNAME' GeoServer_PASSWORD = 'GeoServer_PASSWORD' GeoServer_Caching = 'GeoServer_Caching' GeoServer_Caching_Start = 'GeoServer_Caching_Start' GeoServer_Caching_Stop = 'GeoServer_Caching_Stop' LAYER_GROUP_JOIN = 'LAYER_GROUP_JOIN' LAYER_GROUP_ADD = 'LAYER_GROUP_ADD' # 关联平台资源目录标识码 ZYMLBSM = "ZYMLBSM" selectedValue = "selectedValue" # 全局参数修改 geoserver_config = siwei_config.CONFIG['geoserver'] GeoServerConfig = { "url": geoserver_config["url"], "username": geoserver_config["username"], "password": geoserver_config["password"], "defaultworkspace": geoserver_config["default_workspace"], } def __init__(self): super().__init__() def initAlgorithm(self, config=None): # self.addParameter(QgsProcessingParameterFile(self.INPUTRASTER, # self.tr('栅格数据'), # extension='tif', optional=True)) self.addParameter(QgsProcessingParameterRasterLayer(self.INPUTRASTER, self.tr('栅格数据'), [QgsProcessing.TypeRaster], optional=True)) self.addParameter(QgsProcessingParameterFile(self.INPUTFILE, self.tr('数据工程文件'), extension='qgs', optional=True)) # db_param = QgsProcessingParameterProviderConnection( # self.DATABASE, # self.tr('数据库'), 'postgres', optional=True) # self.addParameter(db_param) # schema_param = QgsProcessingParameterDatabaseSchema( # self.SCHEMA, # self.tr('模式'), defaultValue='public', connectionParameterName=self.DATABASE, # optional=True) # self.addParameter(schema_param) # table_param = QgsProcessingParameterDatabaseTable( # self.TABLE, # self.tr('表'), defaultValue='', connectionParameterName=self.DATABASE, # schemaParameterName=self.SCHEMA, # optional=True) # self.addParameter(table_param) # self.addParameter(QgsProcessingParameterString(self.WHERE, # self.tr('where语句'), "", # optional=True)) # widget = QtWidgets.QTableView() # self.createCustomParametersWidget(widget) # GeoServer配置 self.addParameter(QgsProcessingParameterString(self.Server_NAME, self.tr('服务名称'), "", optional=False)) self.addParameter(QgsProcessingParameterString(self.GeoServer_URI, self.tr('服务器地址'), self.geoservercoon["url"], optional=False)) self.addParameter(QgsProcessingParameterString(self.GeoServer_USERNAME, self.tr('服务器用户名'), self.geoservercoon["username"], optional=False)) self.addParameter(QgsProcessingParameterString(self.GeoServer_PASSWORD, self.tr('服务器密码'), self.geoservercoon["password"], optional=False)) self.addParameter(QgsProcessingParameterString(self.GeoServer_WORKSPACE, self.tr('服务器工作空间'), self.geoservercoon["defaultworkspace"], optional=False)) # 图层组配置 geo = Geoserver(service_url=self.geoservercoon["url"], username=self.geoservercoon["username"], password=self.geoservercoon["password"]) self.groups = [] requestgroups = geo.get_layergroups(self.geoservercoon["defaultworkspace"]) if requestgroups["layerGroups"] != "": groups = requestgroups["layerGroups"]["layerGroup"] for group in groups: self.groups.append(str(group["name"])) self.addParameter(QgsProcessingParameterEnum(name=self.LAYER_GROUP_JOIN, description=self.tr('加入图层组'), options=self.groups, optional=True)) self.addParameter(QgsProcessingParameterString(self.LAYER_GROUP_ADD, self.tr('创建图层组'), "", optional=True)) self.addParameter(QgsProcessingParameterBoolean(self.GeoServer_Caching, self.tr('数据切片'), defaultValue=False)) self.addParameter(QgsProcessingParameterString(self.GeoServer_Caching_Start, self.tr('开始级别'), self.geoservercoon["cachestart"], optional=True)) self.addParameter(QgsProcessingParameterString(self.GeoServer_Caching_Stop, self.tr('终止级别'), self.geoservercoon["cacheend"], optional=True)) server_parameter = QgsProcessingParameterBoolean(self.Publish_Service, self.tr('服务发布'), defaultValue=True) self.addParameter(server_parameter) # 平台资源目录 # self.zymlParams = QgsProcessingParameterString(self.ZYMLBSM, # self.tr('关联平台资源目录'), "", # optional=True) # self.addParameter(self.zymlParams) def name(self): return 'postgistogeoserver' def icon(self): return QIcon(os.path.join(pluginPath, 'images', 'dbms', 'publishserver.png')) def displayName(self): return self.tr('服务发布') def shortDescription(self): return self.tr('发布地图矢量数据,栅格数据与数据工程文件二选一。') def tags(self): t = self.tr('import,into,postgis,database,vector').split(',') t.extend(super().tags()) return t def group(self): return self.tr('发布工具') def groupId(self): return 'publishserver' def setSelectedValue(self, v): printStr(v) self.selectedValue = v def getSelectedValue(self): return self.selectedValue def getConsoleCommands(self, parameters, context, feedback, executing=True): # print("selectedValue===" + self.selectedValue) # self.setOutputValue(self.ZYMLBSM, parameters.get(self.ZYMLBSM)) # print(self.zymlParams) # self.zymlParams.setDefaultValue('New Default Value') # global_selected_value = self.selectedValue # print("global_selected_value=====" + global_selected_value) print(parameters) if parameters.get(self.DATABASE) is not None: connection_name = self.parameterAsConnectionName(parameters, self.DATABASE, context) if not connection_name: raise QgsProcessingException( self.tr('No connection specified')) try: md = QgsProviderRegistry.instance().providerMetadata('postgres') conn = md.createConnection(connection_name) except QgsProviderConnectionException: raise QgsProcessingException( self.tr('Could not retrieve connection details for {}').format(connection_name)) uri = conn.uri() # 通过uri获取数据库连接信息 connection_parts = QgsDataSourceUri(uri).connectionInfo(executing).split(' ') # print(connection_parts) # self.DBNAME = self.getConnectionStr(connection_parts, 0) # self.HOST = self.getConnectionStr(connection_parts, 1) # self.PORT = self.getConnectionStr(connection_parts, 2) # self.USER = self.getConnectionStr(connection_parts, 3) # self.PASSWORD = self.getConnectionStr(connection_parts, 4) arguments = [ getConnectionStr(connection_parts, 1), getConnectionStr(connection_parts, 2), getConnectionStr(connection_parts, 0), getConnectionStr(connection_parts, 3), getConnectionStr(connection_parts, 4) ] return arguments else: return [] def commandName(self): return "ogr2ogr"