# -*- coding: utf-8 -*- __author__ = 'wanger' __date__ = '2024-08-27' __copyright__ = '(C) 2024 by siwei' __revision__ = '1.0' import sqlite3 class SpatiaLiteUtils: def __init__( self, dbpath: str ): self.dbpath = dbpath # 连接到数据库 self.conn = sqlite3.connect(dbpath) self.cursor = self.conn.cursor() self.conn.enable_load_extension(True) self.conn.execute("PRAGMA synchronous = OFF") self.conn.execute("PRAGMA cache_size = -20000") # In KB self.conn.execute("PRAGMA temp_store = MEMORY") self.conn.execute("SELECT load_extension('mod_spatialite');") print("mod_spatialite loaded successfully.") def execute(self, sql): self.cursor.execute(sql) rows = self.cursor.fetchall() return rows def executescript(self,sql): self.cursor.executescript(sql) print(f"Script executed successfully.{sql}") def enable_load_extension(self, bool): self.conn.enable_load_extension(bool) def queryCount(self, tablename): self.cursor.execute(f"SELECT count(1) FROM {tablename};") rows = self.cursor.fetchall() return rows[0][0] #重置数据库 def resetDb(self): # 获取所有表名,排除系统表 self.cursor.execute("""-- 删除所有表 select 'drop table if exists ' || name || ';' from sqlite_master where type='table' and name not like 'sqlite_%' and name like '%topology%' union all -- 删除所有索引 select 'drop index if exists ' || name || ';' from sqlite_master where type='index' and name like '%topology%' union all -- 删除所有视图 select 'drop view if exists ' || name || ';' from sqlite_master where type='view' and name like '%temp%' union all -- 删除所有触发器 select 'drop trigger if exists ' || name || ';' from sqlite_master where type='trigger' and name like '%topology%';""") tables = self.cursor.fetchall() # 循环删除每个表 for sql in tables: self.cursor.execute(sql[0]) # 提交更改并关闭连接 self.conn.commit() def closeDb(self): self.cursor.close() self.conn.close()