123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- # -*- coding: utf-8 -*-
- __author__ = 'wanger'
- __date__ = '2024-08-27'
- __copyright__ = '(C) 2024 by siwei'
- __revision__ = '1.0'
- import sqlite3
- class SpatiaLiteUtils:
- def __init__(
- self,
- dbpath: str
- ):
- self.dbpath = dbpath
- # 连接到数据库
- self.conn = sqlite3.connect(dbpath)
- self.cursor = self.conn.cursor()
- self.conn.enable_load_extension(True)
- self.conn.execute("PRAGMA synchronous = OFF")
- self.conn.execute("PRAGMA cache_size = -20000") # In KB
- self.conn.execute("PRAGMA temp_store = MEMORY")
- self.conn.execute("SELECT load_extension('mod_spatialite');")
- print("mod_spatialite loaded successfully.")
- def execute(self, sql):
- self.cursor.execute(sql)
- rows = self.cursor.fetchall()
- return rows
- def executescript(self, sql, params):
- self.cursor.executescript(sql)
- print(f"Script executed successfully.{sql}")
- print(f"同步输入输出表字段信息及属性关联")
- tables = params["intable_s"]
- tablet = params["outtable"]
- self.synchronizeFields(tables=tables, tablet=tablet)
- print(f"{tables}字段和属性已关联同步更新到{tablet}")
- def enable_load_extension(self, bool):
- self.conn.enable_load_extension(bool)
- # 同步字段
- def synchronizeFields(self, tables, tablet):
- # 获取 tables 的字段信息
- self.cursor.execute(f"pragma table_info({tables});")
- fields = self.cursor.fetchall()
- # 添加字段到 tablet
- insertfield = []
- for field in fields:
- field_name, field_type = field[1], field[2]
- if field_name != "id" and field_name != "ogc_fid" and field_name != "objectid" and field_name.lower() != "geometry": # 排除 id 字段
- insertfield.append(field_name)
- self.cursor.execute(f"alter table {tablet} add column {field_name} {field_type};")
- # 更新 tablet 中的数据
- update_query = f"update {tablet} set "
- update_query += ", ".join(
- [f"{field} = (select {field} from {tables} where ogc_fid = {tablet}.sid)" for field in insertfield])
- print(update_query)
- self.cursor.execute(update_query)
- self.conn.commit()
- def queryCount(self, tablename):
- self.cursor.execute(f"SELECT count(1) FROM {tablename};")
- rows = self.cursor.fetchall()
- return rows[0][0]
- # 重置数据库
- def resetDb(self):
- # 获取所有表名,排除系统表
- self.cursor.execute("""-- 删除所有表
- select 'drop table if exists ' || name || ';'
- from sqlite_master
- where type='table' and name not like 'sqlite_%' and (name like '%topology%' or name like '%temp%')
-
- union all
- -- 删除所有索引
- select 'drop index if exists ' || name || ';'
- from sqlite_master
- where type='index' and (name like '%topology%' or name like '%temp%')
-
- union all
- -- 删除所有视图
- select 'drop view if exists ' || name || ';'
- from sqlite_master
- where type='view' and (name like '%temp%' or name like '%view%')
-
- union all
- -- 删除所有触发器
- select 'drop trigger if exists ' || name || ';'
- from sqlite_master
- where type='trigger' and name like '%topology%';""")
- tables = self.cursor.fetchall()
- # 循环删除每个表
- # for sql in tables:
- # self.cursor.execute(sql[0])
- # 提交更改并关闭连接
- self.conn.commit()
- def closeDb(self):
- self.cursor.close()
- self.conn.close()
- # db = SpatiaLiteUtils(dbpath="D:\\temp\\output.sqlite")
- # db.synchronizeFields(tables="topology_in_table_5226c4d3c7a84b2e901c5032cbd299fe", tablet="topology_out_table_5226c4d3c7a84b2e901c5032cbd299fe")
- # # db.resetDb()
- # db.closeDb()
|