ImportTableToPostGIS.py 8.3 KB


  1. __author__ = 'liying'
  2. __date__ = 'May 2025'
  3. __copyright__ = '(C) 2025, liying'
  4. import uuid
  5. import siwei_config
  6. from qgis.core import (
  7. QgsProcessingAlgorithm,
  8. QgsProcessingParameterFile,
  9. QgsProcessingParameterProviderConnection,
  10. QgsProcessingParameterDatabaseSchema,
  11. QgsProcessingParameterString,
  12. QgsProcessingParameterEnum,
  13. QgsProcessingParameterDateTime,
  14. QgsProcessingParameterBoolean,
  15. )
  16. from PyQt5.QtCore import QCoreApplication
  17. import os
  18. import psycopg2
  19. from psycopg2 import sql
  20. from pandas import read_csv, read_excel
  21. from datetime import datetime
  22. from processing.tools.PostgreSQL.PostgreSQL import PostgreSQL
  23. class ImportTableToPostGIS(QgsProcessingAlgorithm):
  24. DATABASE = 'DATABASE'
  25. SCHEMA = 'SCHEMA'
  26. TABLE = 'TABLE'
  27. INPUT_FILE = 'INPUT_FILE'
  28. VECTOR_SJLY = 'VECTOR_SJLY'
  29. VECTOR_YEAR = 'VECTOR_YEAR'
  30. VECTOR_YWLX = 'VECTOR_YWLX'
  31. VECTOR_GLBM = 'VECTOR_GLBM'
  32. VECTOR_ZYML = 'VECTOR_ZYML'
  33. SOURCE_TYPE = 'SOURCE_TYPE'
  34. Metadata_storage = 'Metadata_storage'
  35. def initAlgorithm(self, config=None):
  36. self.addParameter(QgsProcessingParameterProviderConnection(
  37. self.DATABASE, '数据库连接', 'postgres', defaultValue=siwei_config['db']['host']))
  38. self.addParameter(QgsProcessingParameterDatabaseSchema(
  39. self.SCHEMA, '模式', connectionParameterName=self.DATABASE, defaultValue='base'))
  40. self.addParameter(QgsProcessingParameterString(
  41. self.TABLE, '导入目标表名(新建)', defaultValue='t_table'))
  42. self.addParameter(QgsProcessingParameterFile(
  43. self.INPUT_FILE, '表格文件(CSV或Excel)', optional=False))
  44. self.addParameter(QgsProcessingParameterString(
  45. self.VECTOR_SJLY, '数据来源', optional=False))
  46. self.addParameter(QgsProcessingParameterDateTime(
  47. self.VECTOR_YEAR, '数据时效', type=QgsProcessingParameterDateTime.Type.Date, optional=False))
  48. pgconn = PostgreSQL(schema='base')
  49. self.ywlxs = [row[0] for row in pgconn.getVectorYwlx()]
  50. self.addParameter(QgsProcessingParameterEnum(
  51. self.VECTOR_YWLX, '业务类型', options=self.ywlxs))
  52. self.depts = [row[0] for row in pgconn.getDeptList()]
  53. self.addParameter(QgsProcessingParameterEnum(
  54. self.VECTOR_GLBM, '管理部门', options=self.depts))
  55. self.zymls = [row[1] for row in pgconn.getVectorZyml()]
  56. self.addParameter(QgsProcessingParameterEnum(
  57. self.VECTOR_ZYML, '资源目录', options=self.zymls))
  58. self.addParameter(QgsProcessingParameterString(
  59. self.SOURCE_TYPE, '数据源类型', defaultValue='table', optional=False))
  60. self.addParameter(QgsProcessingParameterBoolean(
  61. self.Metadata_storage, '是否写入数据目录', defaultValue=True))
  62. def processAlgorithm(self, parameters, context, feedback):
  63. input_file = self.parameterAsString(parameters, self.INPUT_FILE, context)
  64. ext = os.path.splitext(input_file)[-1].lower()
  65. df = read_csv(input_file) if ext == '.csv' else read_excel(input_file)
  66. connection_name = self.parameterAsString(parameters, self.DATABASE, context)
  67. schema = self.parameterAsString(parameters, self.SCHEMA, context)
  68. target_table = self.parameterAsString(parameters, self.TABLE, context)
  69. data_source = self.parameterAsString(parameters, self.VECTOR_SJLY, context)
  70. source_type = self.parameterAsString(parameters, self.SOURCE_TYPE, context)
  71. # 转换 QDateTime 为 Python date
  72. qdatetime = self.parameterAsDateTime(parameters, self.VECTOR_YEAR, context)
  73. data_year = qdatetime.date().toPyDate().year # 只保留年份(整数)
  74. pgconn = PostgreSQL(schema='base')
  75. business_type = pgconn.getVectorYwlx()[self.parameterAsInt(parameters, self.VECTOR_YWLX, context)][0]
  76. management_dept = pgconn.getDeptList()[self.parameterAsInt(parameters, self.VECTOR_GLBM, context)][0]
  77. resource_catalog_name = pgconn.getVectorZyml()[self.parameterAsInt(parameters, self.VECTOR_ZYML, context)][1]
  78. bsm_query = f"SELECT bsm FROM t_vector_zyml WHERE name = '{resource_catalog_name}'"
  79. bsm_result = pgconn.execute(bsm_query)
  80. resource_catalog = bsm_result[0][0] if bsm_result else resource_catalog_name
  81. conn_params = {
  82. 'host': siwei_config['db']['host'],
  83. 'port': siwei_config['db']['port'],
  84. 'dbname': siwei_config['db']['name'],
  85. 'user': siwei_config['db']['user'],
  86. 'password': siwei_config['db']['password'],
  87. }
  88. conn = psycopg2.connect(**conn_params)
  89. cursor = conn.cursor()
  90. # 检查表是否存在,如果存在则添加递增后缀
  91. target_table_new = target_table
  92. suffix = 0
  93. while True:
  94. # 构建检查表是否存在的SQL
  95. check_sql = sql.SQL(
  96. "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = %s AND table_name = %s)")
  97. cursor.execute(check_sql, (schema, target_table_new))
  98. exists = cursor.fetchone()[0]
  99. # 如果表不存在,跳出循环
  100. if not exists:
  101. break
  102. # 如果表存在,增加后缀
  103. suffix += 1
  104. target_table_new = f"{target_table}_{suffix}"
  105. feedback.pushInfo(f"表 {schema}.{target_table} 已存在,尝试使用新名称: {target_table_new}")
  106. feedback.pushInfo(f"将使用表名: {schema}.{target_table_new}")
  107. # 动态生成字段类型(全部为 TEXT)
  108. columns = [f'"{col.strip()}" TEXT' for col in df.columns]
  109. create_sql = sql.SQL("CREATE TABLE {schema}.{table} ({fields})").format(
  110. schema=sql.Identifier(schema),
  111. table=sql.Identifier(target_table_new),
  112. fields=sql.SQL(', ').join(sql.SQL(col) for col in columns)
  113. )
  114. cursor.execute(create_sql)
  115. # 插入数据
  116. insert_cols = [f'"{col.strip()}"' for col in df.columns]
  117. insert_sql = sql.SQL("""
  118. INSERT INTO {schema}.{table} ({fields})
  119. VALUES ({placeholders})
  120. """).format(
  121. schema=sql.Identifier(schema),
  122. table=sql.Identifier(target_table_new),
  123. fields=sql.SQL(', ').join(map(sql.Identifier, df.columns)),
  124. placeholders=sql.SQL(', ').join(sql.Placeholder() * len(df.columns))
  125. )
  126. for _, row in df.iterrows():
  127. cursor.execute(insert_sql, tuple(row.fillna("").astype(str).values))
  128. feedback.pushInfo(f"数据已成功导入表 {schema}.{target_table_new}")
  129. # 元数据插入
  130. if self.parameterAsBool(parameters, self.Metadata_storage, context):
  131. feedback.pushInfo("插入数据目录元数据...")
  132. id = str(uuid.uuid4())
  133. # table_fullname = os.path.basename(input_file)
  134. table_fullname = f"{schema}.{target_table_new}"
  135. metadata_insert_sql = sql.SQL("""
  136. INSERT INTO {schema}.t_vector_storage
  137. (sjywz, rksj, year, ywlx, glbm, id, sjlx, xmlx, name, sjly)
  138. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  139. """).format(schema=sql.Identifier(schema))
  140. cursor.execute(metadata_insert_sql, (
  141. table_fullname, # 使用新表名
  142. datetime.now().date(),
  143. data_year,
  144. business_type,
  145. management_dept,
  146. id,
  147. source_type,
  148. resource_catalog,
  149. table_fullname,
  150. data_source
  151. ))
  152. feedback.pushInfo("数据目录元数据已成功插入。")
  153. conn.commit()
  154. cursor.close()
  155. conn.close()
  156. return {}
  157. def name(self):
  158. return "importtabletopostgis"
  159. def displayName(self):
  160. return "表格数据入库"
  161. def group(self):
  162. return "表格数据工具"
  163. def groupId(self):
  164. return "tabletools"
  165. def shortHelpString(self):
  166. return "将 CSV 或 Excel 表格导入为 PostgreSQL 中的新表,并写入数据目录元数据。"
  167. def createInstance(self):
  168. return ImportTableToPostGIS()