__author__ = 'liying' __date__ = 'May 2025' __copyright__ = '(C) 2025, liying' import os import uuid from datetime import datetime import psycopg2 import siwei_config from processing.tools.PostgreSQL.PostgreSQL import PostgreSQL from psycopg2 import sql from qgis.PyQt.QtCore import QCoreApplication from qgis.core import ( QgsProcessingAlgorithm, QgsProcessingParameterFile, QgsProcessingParameterProviderConnection, QgsProcessingParameterDatabaseSchema, QgsProcessingParameterString, QgsProcessingParameterEnum, QgsProcessingParameterDateTime, QgsProcessingParameterBoolean, QgsProcessingParameterCrs ) class ImportSingleOSGBToPostGIS(QgsProcessingAlgorithm): DATABASE = 'DATABASE' SCHEMA = 'SCHEMA' TABLE = 'TABLE' INPUT_DIR = 'INPUT_DIR' VECTOR_SJLY = 'VECTOR_SJLY' VECTOR_YEAR = 'VECTOR_YEAR' VECTOR_YWLX = 'VECTOR_YWLX' VECTOR_GLBM = 'VECTOR_GLBM' VECTOR_ZYML = 'VECTOR_ZYML' T_SRS = 'T_SRS' RASTER_T = 'RASTER_T' SOURCE_TYPE = 'SOURCE_TYPE' Metadata_storage = 'Metadata_storage' INDEX = 'INDEX' RASTER_T_LIST = ['不分块', '128', '256', '512'] @staticmethod def tr(string): return QCoreApplication.translate('ImportSingleOSGBToPostGIS', string) def initAlgorithm(self, config=None): self.addParameter(QgsProcessingParameterProviderConnection( self.DATABASE, self.tr('数据库连接'), 'postgres', defaultValue=siwei_config.CONFIG['db']['host'] )) self.addParameter(QgsProcessingParameterDatabaseSchema( self.SCHEMA, self.tr('模式'), connectionParameterName=self.DATABASE, defaultValue='base' )) self.addParameter(QgsProcessingParameterString( self.TABLE, self.tr('表名'), defaultValue='t_vector_storage' )) self.addParameter(QgsProcessingParameterFile( self.INPUT_DIR, self.tr('OSGB 数据目录'), behavior=QgsProcessingParameterFile.Folder, optional=False )) self.addParameter(QgsProcessingParameterString( self.VECTOR_SJLY, self.tr('数据来源'), optional=False )) self.addParameter(QgsProcessingParameterDateTime( self.VECTOR_YEAR, self.tr('数据时效'), type=QgsProcessingParameterDateTime.Type.Date, optional=False )) pgconn = PostgreSQL(schema='base') rows = pgconn.getVectorYwlx() self.ywlxs = [row[0] for row in rows] self.addParameter(QgsProcessingParameterEnum(name=self.VECTOR_YWLX, description=self.tr('业务类型'), options=self.ywlxs, optional=False)) rows = pgconn.getDeptList() self.depts = [row[0] for row in rows] self.addParameter(QgsProcessingParameterEnum(name=self.VECTOR_GLBM, description=self.tr('管理部门'), options=self.depts)) rows = pgconn.getVectorZyml() self.zymls = [row[1] for row in rows] self.addParameter(QgsProcessingParameterEnum(name=self.VECTOR_ZYML, description=self.tr('资源目录'), options=self.zymls)) self.addParameter(QgsProcessingParameterCrs( self.T_SRS, self.tr('指定入库坐标系'), defaultValue='EPSG:4326', optional=False )) self.addParameter(QgsProcessingParameterEnum( self.RASTER_T, self.tr('分块存储大小'), options=self.RASTER_T_LIST, defaultValue=0, optional=False )) self.addParameter(QgsProcessingParameterString( self.SOURCE_TYPE, self.tr('数据源类型'), defaultValue='osgb', optional=False )) self.addParameter(QgsProcessingParameterBoolean( self.Metadata_storage, self.tr('元数据入库'), defaultValue=True )) self.addParameter(QgsProcessingParameterBoolean( self.INDEX, self.tr('创建空间索引'), defaultValue=True )) def processAlgorithm(self, parameters, context, feedback): connection_name = self.parameterAsString(parameters, self.DATABASE, context) schema = self.parameterAsString(parameters, self.SCHEMA, context) table = self.parameterAsString(parameters, self.TABLE, context) input_dir = self.parameterAsString(parameters, self.INPUT_DIR, context) data_source = self.parameterAsString(parameters, self.VECTOR_SJLY, context) # 转换 QDateTime 为 Python date qdatetime = self.parameterAsDateTime(parameters, self.VECTOR_YEAR, context) data_year = qdatetime.date().toPyDate().year # 只保留年份(整数) business_type_index = self.parameterAsInt(parameters, self.VECTOR_YWLX, context) management_dept_index = self.parameterAsInt(parameters, self.VECTOR_GLBM, context) resource_catalog_index = self.parameterAsInt(parameters, self.VECTOR_ZYML, context) source_type = self.parameterAsString(parameters, self.SOURCE_TYPE, context) pgconn = PostgreSQL(schema='base') business_type = pgconn.getVectorYwlx()[business_type_index][0] management_dept = pgconn.getDeptList()[management_dept_index][0] resource_catalog = pgconn.getVectorZyml()[resource_catalog_index][1] query = f"SELECT bsm FROM t_vector_zyml WHERE name = '{resource_catalog}'" result = pgconn.execute(query) if result and len(result) > 0: resource_catalog = result[0][0] else: feedback.pushWarning(f"未找到名称为 {resource_catalog} 的BSM值") db_config = siwei_config.CONFIG['db'] conn_params = { 'host': db_config['host'], 'port': db_config['port'], 'dbname': db_config['name'], 'user': db_config['user'], 'password': db_config['password'] } conn = psycopg2.connect(**conn_params) cursor = conn.cursor() feedback.pushInfo("插入目录级元数据记录...") id = str(uuid.uuid4()) nm = os.path.basename(input_dir) insert_sql = sql.SQL(""" INSERT INTO {schema}.{table} (sjywz, rksj, year, ywlx, glbm, id, sjlx, xmlx, name, sjly) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """).format( schema=sql.Identifier(schema), table=sql.Identifier(table) ) cursor.execute(insert_sql, ( input_dir, datetime.now().date(), data_year, business_type, management_dept, id, source_type, resource_catalog, nm, data_source )) conn.commit() feedback.pushInfo("扫描目录并插入单体模型记录...") for root, dirs, files in os.walk(input_dir): for file in files: if file.lower().endswith('.osgb'): full_path = os.path.join(root, file) model_id = str(uuid.uuid4()) cursor.execute(insert_sql, ( full_path, datetime.now().date(), data_year, business_type, management_dept, model_id, source_type, resource_catalog, file, data_source )) conn.commit() cursor.close() conn.close() feedback.pushInfo("所有单体模型路径已成功插入数据库。") return {} def name(self): return "importsingleosgbtopostgis" def displayName(self): return "单体三维模型入库" def group(self): return "三维数据工具" def groupId(self): return "osgbtools" def shortHelpString(self): return "扫描 OSGB 文件目录,将单体模型文件路径等元数据信息写入 PostGIS 数据库。" def createInstance(self): return ImportSingleOSGBToPostGIS()