ImportTableToPostGIS.py 8.2 KB


  1. __author__ = 'liying'
  2. __date__ = 'May 2025'
  3. __copyright__ = '(C) 2025, liying'
  4. import uuid
  5. from qgis.core import (
  6. QgsProcessingAlgorithm,
  7. QgsProcessingParameterFile,
  8. QgsProcessingParameterProviderConnection,
  9. QgsProcessingParameterDatabaseSchema,
  10. QgsProcessingParameterString,
  11. QgsProcessingParameterEnum,
  12. QgsProcessingParameterDateTime,
  13. QgsProcessingParameterBoolean,
  14. )
  15. from PyQt5.QtCore import QCoreApplication
  16. import os
  17. import psycopg2
  18. from psycopg2 import sql
  19. from pandas import read_csv, read_excel
  20. from datetime import datetime
  21. from processing.tools.PostgreSQL.PostgreSQL import PostgreSQL
  22. class ImportTableToPostGIS(QgsProcessingAlgorithm):
  23. DATABASE = 'DATABASE'
  24. SCHEMA = 'SCHEMA'
  25. TABLE = 'TABLE'
  26. INPUT_FILE = 'INPUT_FILE'
  27. VECTOR_SJLY = 'VECTOR_SJLY'
  28. VECTOR_YEAR = 'VECTOR_YEAR'
  29. VECTOR_YWLX = 'VECTOR_YWLX'
  30. VECTOR_GLBM = 'VECTOR_GLBM'
  31. VECTOR_ZYML = 'VECTOR_ZYML'
  32. SOURCE_TYPE = 'SOURCE_TYPE'
  33. Metadata_storage = 'Metadata_storage'
  34. def initAlgorithm(self, config=None):
  35. self.addParameter(QgsProcessingParameterProviderConnection(
  36. self.DATABASE, '数据库连接', 'postgres', defaultValue='192.168.60.2'))
  37. self.addParameter(QgsProcessingParameterDatabaseSchema(
  38. self.SCHEMA, '模式', connectionParameterName=self.DATABASE, defaultValue='base'))
  39. self.addParameter(QgsProcessingParameterString(
  40. self.TABLE, '导入目标表名(新建)', defaultValue='t_table'))
  41. self.addParameter(QgsProcessingParameterFile(
  42. self.INPUT_FILE, '表格文件(CSV或Excel)', optional=False))
  43. self.addParameter(QgsProcessingParameterString(
  44. self.VECTOR_SJLY, '数据来源', optional=False))
  45. self.addParameter(QgsProcessingParameterDateTime(
  46. self.VECTOR_YEAR, '数据时效', type=QgsProcessingParameterDateTime.Type.Date, optional=False))
  47. pgconn = PostgreSQL(schema='base')
  48. self.ywlxs = [row[0] for row in pgconn.getVectorYwlx()]
  49. self.addParameter(QgsProcessingParameterEnum(
  50. self.VECTOR_YWLX, '业务类型', options=self.ywlxs))
  51. self.depts = [row[0] for row in pgconn.getDeptList()]
  52. self.addParameter(QgsProcessingParameterEnum(
  53. self.VECTOR_GLBM, '管理部门', options=self.depts))
  54. self.zymls = [row[1] for row in pgconn.getVectorZyml()]
  55. self.addParameter(QgsProcessingParameterEnum(
  56. self.VECTOR_ZYML, '资源目录', options=self.zymls))
  57. self.addParameter(QgsProcessingParameterString(
  58. self.SOURCE_TYPE, '数据源类型', defaultValue='table', optional=False))
  59. self.addParameter(QgsProcessingParameterBoolean(
  60. self.Metadata_storage, '是否写入数据目录', defaultValue=True))
  61. def processAlgorithm(self, parameters, context, feedback):
  62. input_file = self.parameterAsString(parameters, self.INPUT_FILE, context)
  63. ext = os.path.splitext(input_file)[-1].lower()
  64. df = read_csv(input_file) if ext == '.csv' else read_excel(input_file)
  65. connection_name = self.parameterAsString(parameters, self.DATABASE, context)
  66. schema = self.parameterAsString(parameters, self.SCHEMA, context)
  67. target_table = self.parameterAsString(parameters, self.TABLE, context)
  68. data_source = self.parameterAsString(parameters, self.VECTOR_SJLY, context)
  69. source_type = self.parameterAsString(parameters, self.SOURCE_TYPE, context)
  70. # 转换 QDateTime 为 Python date
  71. qdatetime = self.parameterAsDateTime(parameters, self.VECTOR_YEAR, context)
  72. data_year = qdatetime.date().toPyDate().year # 只保留年份(整数)
  73. pgconn = PostgreSQL(schema='base')
  74. business_type = pgconn.getVectorYwlx()[self.parameterAsInt(parameters, self.VECTOR_YWLX, context)][0]
  75. management_dept = pgconn.getDeptList()[self.parameterAsInt(parameters, self.VECTOR_GLBM, context)][0]
  76. resource_catalog_name = pgconn.getVectorZyml()[self.parameterAsInt(parameters, self.VECTOR_ZYML, context)][1]
  77. bsm_query = f"SELECT bsm FROM t_vector_zyml WHERE name = '{resource_catalog_name}'"
  78. bsm_result = pgconn.execute(bsm_query)
  79. resource_catalog = bsm_result[0][0] if bsm_result else resource_catalog_name
  80. conn_params = {
  81. 'host': "192.168.60.2",
  82. 'port': "5432",
  83. 'dbname': "real3d",
  84. 'user': "postgres",
  85. 'password': "postgis"
  86. }
  87. conn = psycopg2.connect(**conn_params)
  88. cursor = conn.cursor()
  89. # 检查表是否存在,如果存在则添加递增后缀
  90. target_table_new = target_table
  91. suffix = 0
  92. while True:
  93. # 构建检查表是否存在的SQL
  94. check_sql = sql.SQL(
  95. "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = %s AND table_name = %s)")
  96. cursor.execute(check_sql, (schema, target_table_new))
  97. exists = cursor.fetchone()[0]
  98. # 如果表不存在,跳出循环
  99. if not exists:
  100. break
  101. # 如果表存在,增加后缀
  102. suffix += 1
  103. target_table_new = f"{target_table}_{suffix}"
  104. feedback.pushInfo(f"表 {schema}.{target_table} 已存在,尝试使用新名称: {target_table_new}")
  105. feedback.pushInfo(f"将使用表名: {schema}.{target_table_new}")
  106. # 动态生成字段类型(全部为 TEXT)
  107. columns = [f'"{col.strip()}" TEXT' for col in df.columns]
  108. create_sql = sql.SQL("CREATE TABLE {schema}.{table} ({fields})").format(
  109. schema=sql.Identifier(schema),
  110. table=sql.Identifier(target_table_new),
  111. fields=sql.SQL(', ').join(sql.SQL(col) for col in columns)
  112. )
  113. cursor.execute(create_sql)
  114. # 插入数据
  115. insert_cols = [f'"{col.strip()}"' for col in df.columns]
  116. insert_sql = sql.SQL("""
  117. INSERT INTO {schema}.{table} ({fields})
  118. VALUES ({placeholders})
  119. """).format(
  120. schema=sql.Identifier(schema),
  121. table=sql.Identifier(target_table_new),
  122. fields=sql.SQL(', ').join(map(sql.Identifier, df.columns)),
  123. placeholders=sql.SQL(', ').join(sql.Placeholder() * len(df.columns))
  124. )
  125. for _, row in df.iterrows():
  126. cursor.execute(insert_sql, tuple(row.fillna("").astype(str).values))
  127. feedback.pushInfo(f"数据已成功导入表 {schema}.{target_table_new}")
  128. # 元数据插入
  129. if self.parameterAsBool(parameters, self.Metadata_storage, context):
  130. feedback.pushInfo("插入数据目录元数据...")
  131. id = str(uuid.uuid4())
  132. # table_fullname = os.path.basename(input_file)
  133. table_fullname = f"{schema}.{target_table_new}"
  134. metadata_insert_sql = sql.SQL("""
  135. INSERT INTO {schema}.t_vector_storage
  136. (sjywz, rksj, year, ywlx, glbm, id, sjlx, xmlx, name, sjly)
  137. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  138. """).format(schema=sql.Identifier(schema))
  139. cursor.execute(metadata_insert_sql, (
  140. table_fullname, # 使用新表名
  141. datetime.now().date(),
  142. data_year,
  143. business_type,
  144. management_dept,
  145. id,
  146. source_type,
  147. resource_catalog,
  148. table_fullname,
  149. data_source
  150. ))
  151. feedback.pushInfo("数据目录元数据已成功插入。")
  152. conn.commit()
  153. cursor.close()
  154. conn.close()
  155. return {}
  156. def name(self):
  157. return "importtabletopostgis"
  158. def displayName(self):
  159. return "表格数据入库"
  160. def group(self):
  161. return "表格数据工具"
  162. def groupId(self):
  163. return "tabletools"
  164. def shortHelpString(self):
  165. return "将 CSV 或 Excel 表格导入为 PostgreSQL 中的新表,并写入数据目录元数据。"
  166. def createInstance(self):
  167. return ImportTableToPostGIS()