|
- __author__ = 'liying'
- __date__ = 'May 2025'
- __copyright__ = '(C) 2025, liying'
- import uuid
- import siwei_config
- from qgis.core import (
- QgsProcessingAlgorithm,
- QgsProcessingParameterFile,
- QgsProcessingParameterProviderConnection,
- QgsProcessingParameterDatabaseSchema,
- QgsProcessingParameterString,
- QgsProcessingParameterEnum,
- QgsProcessingParameterDateTime,
- QgsProcessingParameterBoolean,
- )
- from PyQt5.QtCore import QCoreApplication
- import os
- import psycopg2
- from psycopg2 import sql
- from pandas import read_csv, read_excel
- from datetime import datetime
- from processing.tools.PostgreSQL.PostgreSQL import PostgreSQL
- class ImportTableToPostGIS(QgsProcessingAlgorithm):
- DATABASE = 'DATABASE'
- SCHEMA = 'SCHEMA'
- TABLE = 'TABLE'
- INPUT_FILE = 'INPUT_FILE'
- VECTOR_SJLY = 'VECTOR_SJLY'
- VECTOR_YEAR = 'VECTOR_YEAR'
- VECTOR_YWLX = 'VECTOR_YWLX'
- VECTOR_GLBM = 'VECTOR_GLBM'
- VECTOR_ZYML = 'VECTOR_ZYML'
- SOURCE_TYPE = 'SOURCE_TYPE'
- Metadata_storage = 'Metadata_storage'
- def initAlgorithm(self, config=None):
- self.addParameter(QgsProcessingParameterProviderConnection(
- self.DATABASE, '数据库连接', 'postgres', defaultValue=siwei_config.CONFIG['db']['host']))
- self.addParameter(QgsProcessingParameterDatabaseSchema(
- self.SCHEMA, '模式', connectionParameterName=self.DATABASE, defaultValue='base'))
- self.addParameter(QgsProcessingParameterString(
- self.TABLE, '导入目标表名(新建)', defaultValue='t_table'))
- self.addParameter(QgsProcessingParameterFile(
- self.INPUT_FILE, '表格文件(CSV或Excel)', optional=False))
- self.addParameter(QgsProcessingParameterString(
- self.VECTOR_SJLY, '数据来源', optional=False))
- self.addParameter(QgsProcessingParameterDateTime(
- self.VECTOR_YEAR, '数据时效', type=QgsProcessingParameterDateTime.Type.Date, optional=False))
- pgconn = PostgreSQL(schema='base')
- self.ywlxs = [row[0] for row in pgconn.getVectorYwlx()]
- self.addParameter(QgsProcessingParameterEnum(
- self.VECTOR_YWLX, '业务类型', options=self.ywlxs))
- self.depts = [row[0] for row in pgconn.getDeptList()]
- self.addParameter(QgsProcessingParameterEnum(
- self.VECTOR_GLBM, '管理部门', options=self.depts))
- self.zymls = [row[1] for row in pgconn.getVectorZyml()]
- self.addParameter(QgsProcessingParameterEnum(
- self.VECTOR_ZYML, '资源目录', options=self.zymls))
- self.addParameter(QgsProcessingParameterString(
- self.SOURCE_TYPE, '数据源类型', defaultValue='table', optional=False))
- self.addParameter(QgsProcessingParameterBoolean(
- self.Metadata_storage, '是否写入数据目录', defaultValue=True))
- def processAlgorithm(self, parameters, context, feedback):
- input_file = self.parameterAsString(parameters, self.INPUT_FILE, context)
- ext = os.path.splitext(input_file)[-1].lower()
- df = read_csv(input_file) if ext == '.csv' else read_excel(input_file)
- connection_name = self.parameterAsString(parameters, self.DATABASE, context)
- schema = self.parameterAsString(parameters, self.SCHEMA, context)
- target_table = self.parameterAsString(parameters, self.TABLE, context)
- data_source = self.parameterAsString(parameters, self.VECTOR_SJLY, context)
- source_type = self.parameterAsString(parameters, self.SOURCE_TYPE, context)
- # 转换 QDateTime 为 Python date
- qdatetime = self.parameterAsDateTime(parameters, self.VECTOR_YEAR, context)
- data_year = qdatetime.date().toPyDate().year # 只保留年份(整数)
- pgconn = PostgreSQL(schema='base')
- business_type = pgconn.getVectorYwlx()[self.parameterAsInt(parameters, self.VECTOR_YWLX, context)][0]
- management_dept = pgconn.getDeptList()[self.parameterAsInt(parameters, self.VECTOR_GLBM, context)][0]
- resource_catalog_name = pgconn.getVectorZyml()[self.parameterAsInt(parameters, self.VECTOR_ZYML, context)][1]
- bsm_query = f"SELECT bsm FROM t_vector_zyml WHERE name = '{resource_catalog_name}'"
- bsm_result = pgconn.execute(bsm_query)
- resource_catalog = bsm_result[0][0] if bsm_result else resource_catalog_name
- conn_params = {
- 'host': siwei_config.CONFIG['db']['host'],
- 'port': siwei_config.CONFIG['db']['port'],
- 'dbname': siwei_config.CONFIG['db']['name'],
- 'user': siwei_config.CONFIG['db']['user'],
- 'password': siwei_config.CONFIG['db']['password'],
- "connect_timeout": 10,
- }
- conn = psycopg2.connect(**conn_params)
- cursor = conn.cursor()
- # 检查表是否存在,如果存在则添加递增后缀
- target_table_new = target_table
- suffix = 0
- while True:
- # 构建检查表是否存在的SQL
- check_sql = sql.SQL(
- "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = %s AND table_name = %s)")
- cursor.execute(check_sql, (schema, target_table_new))
- exists = cursor.fetchone()[0]
- # 如果表不存在,跳出循环
- if not exists:
- break
- # 如果表存在,增加后缀
- suffix += 1
- target_table_new = f"{target_table}_{suffix}"
- feedback.pushInfo(f"表 {schema}.{target_table} 已存在,尝试使用新名称: {target_table_new}")
- feedback.pushInfo(f"将使用表名: {schema}.{target_table_new}")
- # 动态生成字段类型(全部为 TEXT)
- columns = [f'"{col.strip()}" TEXT' for col in df.columns]
- create_sql = sql.SQL("CREATE TABLE {schema}.{table} ({fields})").format(
- schema=sql.Identifier(schema),
- table=sql.Identifier(target_table_new),
- fields=sql.SQL(', ').join(sql.SQL(col) for col in columns)
- )
- cursor.execute(create_sql)
- # 插入数据
- insert_cols = [f'"{col.strip()}"' for col in df.columns]
- insert_sql = sql.SQL("""
- INSERT INTO {schema}.{table} ({fields})
- VALUES ({placeholders})
- """).format(
- schema=sql.Identifier(schema),
- table=sql.Identifier(target_table_new),
- fields=sql.SQL(', ').join(map(sql.Identifier, df.columns)),
- placeholders=sql.SQL(', ').join(sql.Placeholder() * len(df.columns))
- )
- for _, row in df.iterrows():
- cursor.execute(insert_sql, tuple(row.fillna("").astype(str).values))
- feedback.pushInfo(f"数据已成功导入表 {schema}.{target_table_new}")
- # 元数据插入
- if self.parameterAsBool(parameters, self.Metadata_storage, context):
- feedback.pushInfo("插入数据目录元数据...")
- id = str(uuid.uuid4())
- # table_fullname = os.path.basename(input_file)
- table_fullname = f"{schema}.{target_table_new}"
- metadata_insert_sql = sql.SQL("""
- INSERT INTO {schema}.t_vector_storage
- (sjywz, rksj, year, ywlx, glbm, id, sjlx, xmlx, name, sjly)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """).format(schema=sql.Identifier(schema))
- cursor.execute(metadata_insert_sql, (
- table_fullname, # 使用新表名
- datetime.now().date(),
- data_year,
- business_type,
- management_dept,
- id,
- source_type,
- resource_catalog,
- table_fullname,
- data_source
- ))
- feedback.pushInfo("数据目录元数据已成功插入。")
- conn.commit()
- cursor.close()
- conn.close()
- return {}
- def name(self):
- return "importtabletopostgis"
- def displayName(self):
- return "表格数据入库"
- def group(self):
- return "表格数据工具"
- def groupId(self):
- return "tabletools"
- def shortHelpString(self):
- return "将 CSV 或 Excel 表格导入为 PostgreSQL 中的新表,并写入数据目录元数据。"
- def createInstance(self):
- return ImportTableToPostGIS()
|