ImportTableToPostGIS.py 8.4 KB


  1. __author__ = 'liying'
  2. __date__ = 'May 2025'
  3. __copyright__ = '(C) 2025, liying'
  4. import uuid
  5. import siwei_config
  6. from qgis.core import (
  7. QgsProcessingAlgorithm,
  8. QgsProcessingParameterFile,
  9. QgsProcessingParameterProviderConnection,
  10. QgsProcessingParameterDatabaseSchema,
  11. QgsProcessingParameterString,
  12. QgsProcessingParameterEnum,
  13. QgsProcessingParameterDateTime,
  14. QgsProcessingParameterBoolean,
  15. )
  16. from PyQt5.QtCore import QCoreApplication
  17. import os
  18. import psycopg2
  19. from psycopg2 import sql
  20. from pandas import read_csv, read_excel
  21. from datetime import datetime
  22. from processing.tools.PostgreSQL.PostgreSQL import PostgreSQL
  23. class ImportTableToPostGIS(QgsProcessingAlgorithm):
  24. DATABASE = 'DATABASE'
  25. SCHEMA = 'SCHEMA'
  26. TABLE = 'TABLE'
  27. INPUT_FILE = 'INPUT_FILE'
  28. VECTOR_SJLY = 'VECTOR_SJLY'
  29. VECTOR_YEAR = 'VECTOR_YEAR'
  30. VECTOR_YWLX = 'VECTOR_YWLX'
  31. VECTOR_GLBM = 'VECTOR_GLBM'
  32. VECTOR_ZYML = 'VECTOR_ZYML'
  33. SOURCE_TYPE = 'SOURCE_TYPE'
  34. Metadata_storage = 'Metadata_storage'
  35. def initAlgorithm(self, config=None):
  36. self.addParameter(QgsProcessingParameterProviderConnection(
  37. self.DATABASE, '数据库连接', 'postgres', defaultValue=siwei_config.CONFIG['db']['host']))
  38. self.addParameter(QgsProcessingParameterDatabaseSchema(
  39. self.SCHEMA, '模式', connectionParameterName=self.DATABASE, defaultValue='base'))
  40. self.addParameter(QgsProcessingParameterString(
  41. self.TABLE, '导入目标表名(新建)', defaultValue='t_table'))
  42. self.addParameter(QgsProcessingParameterFile(
  43. self.INPUT_FILE, '表格文件(CSV或Excel)', optional=False))
  44. self.addParameter(QgsProcessingParameterString(
  45. self.VECTOR_SJLY, '数据来源', optional=False))
  46. self.addParameter(QgsProcessingParameterDateTime(
  47. self.VECTOR_YEAR, '数据时效', type=QgsProcessingParameterDateTime.Type.Date, optional=False))
  48. pgconn = PostgreSQL(schema='base')
  49. self.ywlxs = [row[0] for row in pgconn.getVectorYwlx()]
  50. self.addParameter(QgsProcessingParameterEnum(
  51. self.VECTOR_YWLX, '业务类型', options=self.ywlxs))
  52. self.depts = [row[0] for row in pgconn.getDeptList()]
  53. self.addParameter(QgsProcessingParameterEnum(
  54. self.VECTOR_GLBM, '管理部门', options=self.depts))
  55. self.zymls = [row[1] for row in pgconn.getVectorZyml()]
  56. self.addParameter(QgsProcessingParameterEnum(
  57. self.VECTOR_ZYML, '资源目录', options=self.zymls))
  58. self.addParameter(QgsProcessingParameterString(
  59. self.SOURCE_TYPE, '数据源类型', defaultValue='table', optional=False))
  60. self.addParameter(QgsProcessingParameterBoolean(
  61. self.Metadata_storage, '是否写入数据目录', defaultValue=True))
  62. def processAlgorithm(self, parameters, context, feedback):
  63. input_file = self.parameterAsString(parameters, self.INPUT_FILE, context)
  64. ext = os.path.splitext(input_file)[-1].lower()
  65. df = read_csv(input_file) if ext == '.csv' else read_excel(input_file)
  66. connection_name = self.parameterAsString(parameters, self.DATABASE, context)
  67. schema = self.parameterAsString(parameters, self.SCHEMA, context)
  68. target_table = self.parameterAsString(parameters, self.TABLE, context)
  69. data_source = self.parameterAsString(parameters, self.VECTOR_SJLY, context)
  70. source_type = self.parameterAsString(parameters, self.SOURCE_TYPE, context)
  71. # 转换 QDateTime 为 Python date
  72. qdatetime = self.parameterAsDateTime(parameters, self.VECTOR_YEAR, context)
  73. data_year = qdatetime.date().toPyDate().year # 只保留年份(整数)
  74. pgconn = PostgreSQL(schema='base')
  75. business_type = pgconn.getVectorYwlx()[self.parameterAsInt(parameters, self.VECTOR_YWLX, context)][0]
  76. management_dept = pgconn.getDeptList()[self.parameterAsInt(parameters, self.VECTOR_GLBM, context)][0]
  77. resource_catalog_name = pgconn.getVectorZyml()[self.parameterAsInt(parameters, self.VECTOR_ZYML, context)][1]
  78. bsm_query = f"SELECT bsm FROM t_vector_zyml WHERE name = '{resource_catalog_name}'"
  79. bsm_result = pgconn.execute(bsm_query)
  80. resource_catalog = bsm_result[0][0] if bsm_result else resource_catalog_name
  81. conn_params = {
  82. 'host': siwei_config.CONFIG['db']['host'],
  83. 'port': siwei_config.CONFIG['db']['port'],
  84. 'dbname': siwei_config.CONFIG['db']['name'],
  85. 'user': siwei_config.CONFIG['db']['user'],
  86. 'password': siwei_config.CONFIG['db']['password'],
  87. "connect_timeout": 10,
  88. }
  89. conn = psycopg2.connect(**conn_params)
  90. cursor = conn.cursor()
  91. # 检查表是否存在,如果存在则添加递增后缀
  92. target_table_new = target_table
  93. suffix = 0
  94. while True:
  95. # 构建检查表是否存在的SQL
  96. check_sql = sql.SQL(
  97. "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = %s AND table_name = %s)")
  98. cursor.execute(check_sql, (schema, target_table_new))
  99. exists = cursor.fetchone()[0]
  100. # 如果表不存在,跳出循环
  101. if not exists:
  102. break
  103. # 如果表存在,增加后缀
  104. suffix += 1
  105. target_table_new = f"{target_table}_{suffix}"
  106. feedback.pushInfo(f"表 {schema}.{target_table} 已存在,尝试使用新名称: {target_table_new}")
  107. feedback.pushInfo(f"将使用表名: {schema}.{target_table_new}")
  108. # 动态生成字段类型(全部为 TEXT)
  109. columns = [f'"{col.strip()}" TEXT' for col in df.columns]
  110. create_sql = sql.SQL("CREATE TABLE {schema}.{table} ({fields})").format(
  111. schema=sql.Identifier(schema),
  112. table=sql.Identifier(target_table_new),
  113. fields=sql.SQL(', ').join(sql.SQL(col) for col in columns)
  114. )
  115. cursor.execute(create_sql)
  116. # 插入数据
  117. insert_cols = [f'"{col.strip()}"' for col in df.columns]
  118. insert_sql = sql.SQL("""
  119. INSERT INTO {schema}.{table} ({fields})
  120. VALUES ({placeholders})
  121. """).format(
  122. schema=sql.Identifier(schema),
  123. table=sql.Identifier(target_table_new),
  124. fields=sql.SQL(', ').join(map(sql.Identifier, df.columns)),
  125. placeholders=sql.SQL(', ').join(sql.Placeholder() * len(df.columns))
  126. )
  127. for _, row in df.iterrows():
  128. cursor.execute(insert_sql, tuple(row.fillna("").astype(str).values))
  129. feedback.pushInfo(f"数据已成功导入表 {schema}.{target_table_new}")
  130. # 元数据插入
  131. if self.parameterAsBool(parameters, self.Metadata_storage, context):
  132. feedback.pushInfo("插入数据目录元数据...")
  133. id = str(uuid.uuid4())
  134. # table_fullname = os.path.basename(input_file)
  135. table_fullname = f"{schema}.{target_table_new}"
  136. metadata_insert_sql = sql.SQL("""
  137. INSERT INTO {schema}.t_vector_storage
  138. (sjywz, rksj, year, ywlx, glbm, id, sjlx, xmlx, name, sjly)
  139. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  140. """).format(schema=sql.Identifier(schema))
  141. cursor.execute(metadata_insert_sql, (
  142. table_fullname, # 使用新表名
  143. datetime.now().date(),
  144. data_year,
  145. business_type,
  146. management_dept,
  147. id,
  148. source_type,
  149. resource_catalog,
  150. table_fullname,
  151. data_source
  152. ))
  153. feedback.pushInfo("数据目录元数据已成功插入。")
  154. conn.commit()
  155. cursor.close()
  156. conn.close()
  157. return {}
  158. def name(self):
  159. return "importtabletopostgis"
  160. def displayName(self):
  161. return "表格数据入库"
  162. def group(self):
  163. return "表格数据工具"
  164. def groupId(self):
  165. return "tabletools"
  166. def shortHelpString(self):
  167. return "将 CSV 或 Excel 表格导入为 PostgreSQL 中的新表,并写入数据目录元数据。"
  168. def createInstance(self):
  169. return ImportTableToPostGIS()