__author__ = 'liying' __date__ = 'May 2025' __copyright__ = '(C) 2025, liying' import uuid from qgis.core import ( QgsProcessingAlgorithm, QgsProcessingParameterFile, QgsProcessingParameterProviderConnection, QgsProcessingParameterDatabaseSchema, QgsProcessingParameterString, QgsProcessingParameterEnum, QgsProcessingParameterDateTime, QgsProcessingParameterBoolean, QgsProcessingParameterCrs, ) from qgis.PyQt.QtCore import QCoreApplication import os import psycopg2 from psycopg2 import sql from datetime import datetime from processing.tools.PostgreSQL.PostgreSQL import PostgreSQL class ImportOSGBToPostGIS(QgsProcessingAlgorithm): DATABASE = 'DATABASE' SCHEMA = 'SCHEMA' TABLE = 'TABLE' INPUT_DIR = 'INPUT_DIR' VECTOR_SJLY = 'VECTOR_SJLY' VECTOR_YEAR = 'VECTOR_YEAR' VECTOR_YWLX = 'VECTOR_YWLX' VECTOR_GLBM = 'VECTOR_GLBM' VECTOR_ZYML = 'VECTOR_ZYML' T_SRS = 'T_SRS' RASTER_T = 'RASTER_T' SOURCE_TYPE = 'SOURCE_TYPE' Metadata_storage = 'Metadata_storage' INDEX = 'INDEX' RASTER_T_LIST = ['不分块', '128', '256', '512'] @staticmethod def tr(string): return QCoreApplication.translate('ImportOSGBToPostGIS', string) def initAlgorithm(self, config=None): self.addParameter(QgsProcessingParameterProviderConnection( self.DATABASE, self.tr('数据库连接'), 'postgres', defaultValue='192.168.60.2' )) self.addParameter(QgsProcessingParameterDatabaseSchema( self.SCHEMA, self.tr('模式'), connectionParameterName=self.DATABASE, defaultValue='base' )) self.addParameter(QgsProcessingParameterString( self.TABLE, self.tr('表名'), defaultValue='t_vector_storage' )) self.addParameter(QgsProcessingParameterFile( self.INPUT_DIR, self.tr('OSGB 数据目录'), behavior=QgsProcessingParameterFile.Folder, optional=False )) self.addParameter(QgsProcessingParameterString( self.VECTOR_SJLY, self.tr('数据来源'), optional=False )) self.addParameter(QgsProcessingParameterDateTime( self.VECTOR_YEAR, self.tr('数据时效'), type=QgsProcessingParameterDateTime.Type.Date, optional=False )) pgconn = PostgreSQL(schema='base') rows = pgconn.getVectorYwlx() self.ywlxs = [row[0] for row in rows] self.addParameter(QgsProcessingParameterEnum(self.VECTOR_YWLX, self.tr('业务类型'), options=self.ywlxs, optional=False)) rows = pgconn.getDeptList() self.depts = [row[0] for row in rows] self.addParameter(QgsProcessingParameterEnum(self.VECTOR_GLBM, self.tr('管理部门'), options=self.depts)) rows = pgconn.getVectorZyml() self.zymls = [row[1] for row in rows] self.addParameter(QgsProcessingParameterEnum(self.VECTOR_ZYML, self.tr('资源目录'), options=self.zymls)) self.addParameter(QgsProcessingParameterCrs( self.T_SRS, self.tr('指定入库坐标系'), defaultValue='EPSG:4326', optional=False )) self.addParameter(QgsProcessingParameterEnum( self.RASTER_T, self.tr('分块存储大小'), options=self.RASTER_T_LIST, defaultValue=0, optional=False )) self.addParameter(QgsProcessingParameterString( self.SOURCE_TYPE, self.tr('数据源类型'), defaultValue='osgb', optional=False )) self.addParameter(QgsProcessingParameterBoolean( self.Metadata_storage, self.tr('元数据入库'), defaultValue=True )) self.addParameter(QgsProcessingParameterBoolean( self.INDEX, self.tr('创建空间索引'), defaultValue=True )) def processAlgorithm(self, parameters, context, feedback): connection_name = self.parameterAsString(parameters, self.DATABASE, context) schema = self.parameterAsString(parameters, self.SCHEMA, context) table = self.parameterAsString(parameters, self.TABLE, context) input_dir = self.parameterAsString(parameters, self.INPUT_DIR, context) data_source = self.parameterAsString(parameters, self.VECTOR_SJLY, context) # 转换 QDateTime 为 Python date qdatetime = self.parameterAsDateTime(parameters, self.VECTOR_YEAR, context) data_year = qdatetime.date().toPyDate().year # 只保留年份(整数) business_type_index = self.parameterAsInt(parameters, self.VECTOR_YWLX, context) management_dept_index = self.parameterAsInt(parameters, self.VECTOR_GLBM, context) resource_catalog_index = self.parameterAsInt(parameters, self.VECTOR_ZYML, context) t_srs = self.parameterAsCrs(parameters, self.T_SRS, context) raster_t_index = self.parameterAsInt(parameters, self.RASTER_T, context) source_type = self.parameterAsString(parameters, self.SOURCE_TYPE, context) metadata_storage = self.parameterAsBoolean(parameters, self.Metadata_storage, context) create_index = self.parameterAsBoolean(parameters, self.INDEX, context) pgconn = PostgreSQL(schema='base') business_type = pgconn.getVectorYwlx()[business_type_index][0] management_dept = pgconn.getDeptList()[management_dept_index][0] resource_catalog_name = pgconn.getVectorZyml()[resource_catalog_index][1] # 查资源目录 bsm query = f"SELECT bsm FROM t_vector_zyml WHERE name = '{resource_catalog_name}'" result = pgconn.execute(query) if result and len(result) > 0: resource_catalog = result[0][0] else: feedback.reportError(f"未找到名称为 {resource_catalog_name} 的资源目录 BSM 值。") raise Exception(f"资源目录未找到:{resource_catalog_name}") # 连接数据库 conn_params = { 'host': "192.168.60.2", 'port': "5432", 'dbname': "real3d", 'user': "postgres", 'password': "postgis" } feedback.pushInfo("连接数据库...") conn = psycopg2.connect(**conn_params) cursor = conn.cursor() feedback.pushInfo("插入数据目录元数据...") id = str(uuid.uuid4()) nm = os.path.basename(input_dir) insert_sql = sql.SQL(""" INSERT INTO {schema}.{table} (sjywz, rksj, year, ywlx, glbm, id, sjlx, xmlx, name, sjly) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """).format( schema=sql.Identifier(schema), table=sql.Identifier(table) ) cursor.execute(insert_sql, ( input_dir, datetime.now().date(), data_year, business_type, management_dept, id, source_type, resource_catalog, nm, data_source )) conn.commit() cursor.close() conn.close() feedback.pushInfo("数据目录元数据已成功插入。") return {} def name(self): return "importosgbtopostgis" def displayName(self): return "三维数据入库" def group(self): return "三维数据工具" def groupId(self): return "osgbtools" def shortHelpString(self): return "扫描 OSGB 文件目录,将路径、包围盒等元数据信息写入 PostGIS 数据库。" def createInstance(self): return ImportOSGBToPostGIS()