# -*- coding: utf-8 -*- __author__ = 'wanger' __date__ = '2024-08-27' __copyright__ = '(C) 2024 by siwei' __revision__ = '1.0' import sqlite3 class SpatiaLiteUtils: def __init__( self, dbpath: str ): self.dbpath = dbpath # 连接到数据库 self.conn = sqlite3.connect(dbpath) self.cursor = self.conn.cursor() self.conn.enable_load_extension(True) self.conn.execute("PRAGMA synchronous = OFF") self.conn.execute("PRAGMA cache_size = -20000") # In KB self.conn.execute("PRAGMA temp_store = MEMORY") self.conn.execute("SELECT load_extension('mod_spatialite');") print("mod_spatialite loaded successfully.") def execute(self, sql): self.cursor.execute(sql) rows = self.cursor.fetchall() return rows def executescript(self, sql, params): self.cursor.executescript(sql) print(f"Script executed successfully.{sql}") print(f"同步输入输出表字段信息及属性关联") tables = params["intable_s"] tablet = params["outtable"] self.synchronizeFields(tables=tables, tablet=tablet) print(f"{tables}字段和属性已关联同步更新到{tablet}") def enable_load_extension(self, bool): self.conn.enable_load_extension(bool) # 同步字段 def synchronizeFields(self, tables, tablet): # 获取 tables 的字段信息 self.cursor.execute(f"pragma table_info({tables});") fields = self.cursor.fetchall() # 添加字段到 tablet insertfield = [] for field in fields: field_name, field_type = field[1], field[2] if field_name != "id" and field_name != "ogc_fid" and field_name != "objectid" and field_name.lower() != "geometry": # 排除 id 字段 insertfield.append(field_name) self.cursor.execute(f"alter table {tablet} add column {field_name} {field_type};") # 更新 tablet 中的数据 update_query = f"update {tablet} set " update_query += ", ".join( [f"{field} = (select {field} from {tables} where ogc_fid = {tablet}.sid)" for field in insertfield]) print(update_query) self.cursor.execute(update_query) self.conn.commit() def queryCount(self, tablename): self.cursor.execute(f"SELECT count(1) FROM {tablename};") rows = self.cursor.fetchall() return rows[0][0] # 重置数据库 def resetDb(self): # 获取所有表名,排除系统表 self.cursor.execute("""-- 删除所有表 select 'drop table if exists ' || name || ';' from sqlite_master where type='table' and name not like 'sqlite_%' and (name like '%topology%' or name like '%temp%') union all -- 删除所有索引 select 'drop index if exists ' || name || ';' from sqlite_master where type='index' and (name like '%topology%' or name like '%temp%') union all -- 删除所有视图 select 'drop view if exists ' || name || ';' from sqlite_master where type='view' and (name like '%temp%' or name like '%view%') union all -- 删除所有触发器 select 'drop trigger if exists ' || name || ';' from sqlite_master where type='trigger' and name like '%topology%';""") tables = self.cursor.fetchall() # 循环删除每个表 # for sql in tables: # self.cursor.execute(sql[0]) # 提交更改并关闭连接 self.conn.commit() def closeDb(self): self.cursor.close() self.conn.close() # db = SpatiaLiteUtils(dbpath="D:\\temp\\output.sqlite") # db.synchronizeFields(tables="topology_in_table_5226c4d3c7a84b2e901c5032cbd299fe", tablet="topology_out_table_5226c4d3c7a84b2e901c5032cbd299fe") # # db.resetDb() # db.closeDb()