__author__ = 'liying' __date__ = 'May 2025' __copyright__ = '(C) 2025, liying' import uuid import siwei_config from qgis.core import ( QgsProcessingAlgorithm, QgsProcessingParameterFile, QgsProcessingParameterProviderConnection, QgsProcessingParameterDatabaseSchema, QgsProcessingParameterString, QgsProcessingParameterEnum, QgsProcessingParameterDateTime, QgsProcessingParameterBoolean, ) from PyQt5.QtCore import QCoreApplication import os import psycopg2 from psycopg2 import sql from pandas import read_csv, read_excel from datetime import datetime from processing.tools.PostgreSQL.PostgreSQL import PostgreSQL class ImportTableToPostGIS(QgsProcessingAlgorithm): DATABASE = 'DATABASE' SCHEMA = 'SCHEMA' TABLE = 'TABLE' INPUT_FILE = 'INPUT_FILE' VECTOR_SJLY = 'VECTOR_SJLY' VECTOR_YEAR = 'VECTOR_YEAR' VECTOR_YWLX = 'VECTOR_YWLX' VECTOR_GLBM = 'VECTOR_GLBM' VECTOR_ZYML = 'VECTOR_ZYML' SOURCE_TYPE = 'SOURCE_TYPE' Metadata_storage = 'Metadata_storage' def initAlgorithm(self, config=None): self.addParameter(QgsProcessingParameterProviderConnection( self.DATABASE, '数据库连接', 'postgres', defaultValue=siwei_config.CONFIG['db']['host'])) self.addParameter(QgsProcessingParameterDatabaseSchema( self.SCHEMA, '模式', connectionParameterName=self.DATABASE, defaultValue='base')) self.addParameter(QgsProcessingParameterString( self.TABLE, '导入目标表名(新建)', defaultValue='t_table')) self.addParameter(QgsProcessingParameterFile( self.INPUT_FILE, '表格文件(CSV或Excel)', optional=False)) self.addParameter(QgsProcessingParameterString( self.VECTOR_SJLY, '数据来源', optional=False)) self.addParameter(QgsProcessingParameterDateTime( self.VECTOR_YEAR, '数据时效', type=QgsProcessingParameterDateTime.Type.Date, optional=False)) pgconn = PostgreSQL(schema='base') self.ywlxs = [row[0] for row in pgconn.getVectorYwlx()] self.addParameter(QgsProcessingParameterEnum( self.VECTOR_YWLX, '业务类型', options=self.ywlxs)) self.depts = [row[0] for row in pgconn.getDeptList()] self.addParameter(QgsProcessingParameterEnum( self.VECTOR_GLBM, '管理部门', options=self.depts)) self.zymls = [row[1] for row in pgconn.getVectorZyml()] self.addParameter(QgsProcessingParameterEnum( self.VECTOR_ZYML, '资源目录', options=self.zymls)) self.addParameter(QgsProcessingParameterString( self.SOURCE_TYPE, '数据源类型', defaultValue='table', optional=False)) self.addParameter(QgsProcessingParameterBoolean( self.Metadata_storage, '是否写入数据目录', defaultValue=True)) def processAlgorithm(self, parameters, context, feedback): input_file = self.parameterAsString(parameters, self.INPUT_FILE, context) ext = os.path.splitext(input_file)[-1].lower() df = read_csv(input_file) if ext == '.csv' else read_excel(input_file) connection_name = self.parameterAsString(parameters, self.DATABASE, context) schema = self.parameterAsString(parameters, self.SCHEMA, context) target_table = self.parameterAsString(parameters, self.TABLE, context) data_source = self.parameterAsString(parameters, self.VECTOR_SJLY, context) source_type = self.parameterAsString(parameters, self.SOURCE_TYPE, context) # 转换 QDateTime 为 Python date qdatetime = self.parameterAsDateTime(parameters, self.VECTOR_YEAR, context) data_year = qdatetime.date().toPyDate().year # 只保留年份(整数) pgconn = PostgreSQL(schema='base') business_type = pgconn.getVectorYwlx()[self.parameterAsInt(parameters, self.VECTOR_YWLX, context)][0] management_dept = pgconn.getDeptList()[self.parameterAsInt(parameters, self.VECTOR_GLBM, context)][0] resource_catalog_name = pgconn.getVectorZyml()[self.parameterAsInt(parameters, self.VECTOR_ZYML, context)][1] bsm_query = f"SELECT bsm FROM t_vector_zyml WHERE name = '{resource_catalog_name}'" bsm_result = pgconn.execute(bsm_query) resource_catalog = bsm_result[0][0] if bsm_result else resource_catalog_name conn_params = { 'host': siwei_config.CONFIG['db']['host'], 'port': siwei_config.CONFIG['db']['port'], 'dbname': siwei_config.CONFIG['db']['name'], 'user': siwei_config.CONFIG['db']['user'], 'password': siwei_config.CONFIG['db']['password'], "connect_timeout": 10, } conn = psycopg2.connect(**conn_params) cursor = conn.cursor() # 检查表是否存在,如果存在则添加递增后缀 target_table_new = target_table suffix = 0 while True: # 构建检查表是否存在的SQL check_sql = sql.SQL( "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = %s AND table_name = %s)") cursor.execute(check_sql, (schema, target_table_new)) exists = cursor.fetchone()[0] # 如果表不存在,跳出循环 if not exists: break # 如果表存在,增加后缀 suffix += 1 target_table_new = f"{target_table}_{suffix}" feedback.pushInfo(f"表 {schema}.{target_table} 已存在,尝试使用新名称: {target_table_new}") feedback.pushInfo(f"将使用表名: {schema}.{target_table_new}") # 动态生成字段类型(全部为 TEXT) columns = [f'"{col.strip()}" TEXT' for col in df.columns] create_sql = sql.SQL("CREATE TABLE {schema}.{table} ({fields})").format( schema=sql.Identifier(schema), table=sql.Identifier(target_table_new), fields=sql.SQL(', ').join(sql.SQL(col) for col in columns) ) cursor.execute(create_sql) # 插入数据 insert_cols = [f'"{col.strip()}"' for col in df.columns] insert_sql = sql.SQL(""" INSERT INTO {schema}.{table} ({fields}) VALUES ({placeholders}) """).format( schema=sql.Identifier(schema), table=sql.Identifier(target_table_new), fields=sql.SQL(', ').join(map(sql.Identifier, df.columns)), placeholders=sql.SQL(', ').join(sql.Placeholder() * len(df.columns)) ) for _, row in df.iterrows(): cursor.execute(insert_sql, tuple(row.fillna("").astype(str).values)) feedback.pushInfo(f"数据已成功导入表 {schema}.{target_table_new}") # 元数据插入 if self.parameterAsBool(parameters, self.Metadata_storage, context): feedback.pushInfo("插入数据目录元数据...") id = str(uuid.uuid4()) # table_fullname = os.path.basename(input_file) table_fullname = f"{schema}.{target_table_new}" metadata_insert_sql = sql.SQL(""" INSERT INTO {schema}.t_vector_storage (sjywz, rksj, year, ywlx, glbm, id, sjlx, xmlx, name, sjly) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """).format(schema=sql.Identifier(schema)) cursor.execute(metadata_insert_sql, ( table_fullname, # 使用新表名 datetime.now().date(), data_year, business_type, management_dept, id, source_type, resource_catalog, table_fullname, data_source )) feedback.pushInfo("数据目录元数据已成功插入。") conn.commit() cursor.close() conn.close() return {} def name(self): return "importtabletopostgis" def displayName(self): return "表格数据入库" def group(self): return "表格数据工具" def groupId(self): return "tabletools" def shortHelpString(self): return "将 CSV 或 Excel 表格导入为 PostgreSQL 中的新表,并写入数据目录元数据。" def createInstance(self): return ImportTableToPostGIS()