from pathlib import Path from file_management.providers.errors import FileDoesNotExistError import sqlite3 from PyQt5.QtWidgets import QMessageBox class GPKGFileManager: def __init__(self, uri): self._file = Path(uri.database()) self._lyr_name = uri.layer_name() self._uri = uri def provider_name(self): return 'GeoPackage' def layer_exists(self, target_uri): with sqlite3.connect(self._file) as conn: cur = conn.cursor() cur.execute('SELECT table_name FROM gpkg_contents;') layers_lower = [x[0].lower() for x in cur.fetchall()] return target_uri.layer_name().lower() in layers_lower def database_exists(self, target_uri): return Path(target_uri.database()).exists() def ask_overwrite_layer(self, target_uri, iface): answer = QMessageBox.question(iface.mainWindow(), 'Overwrite Layer', f'{target_uri.layer_name()} already exists.\nDo you want to replace it?', QMessageBox.Yes | QMessageBox.No) return answer == QMessageBox.Yes def ask_overwrite_database(self, target_uri, iface): return True # already asked in file dialog def delete_layer(self, target_uri): with sqlite3.connect(self._file) as conn: cur = conn.cursor() cur.execute('SELECT table_name FROM gpkg_contents;') layers = [x[0] for x in cur.fetchall()] layers_lower = [x.lower() for x in layers] lyr_name = layers[layers_lower.index(target_uri.layer_name().lower())] # allow to be case insensitive # all tables cur.execute('SELECT name from SQLITE_SCHEMA where type = \'table\';') tables = [x[0] for x in cur.fetchall()] # get the geometry column name cur.execute(f'SELECT column_name FROM gpkg_geometry_columns WHERE table_name=\'{lyr_name}\';') geom_col = [x[0] for x in cur.fetchall()][0] # delete the existing table cur.execute(f'DROP TABLE \'{lyr_name}\';') # delete the row in the header tables gpkg_tables = ['gpkg_contents', 'gpkg_data_column_constraints', 'gpkg_data_columns', 'gpkg_extensions', 'gpkg_geometry_columns', 'gpkg_metadata', 'gpkg_metadata_reference', 'gpkg_ogr_contents', 'gpkg_spatial_ref_sys', 'gpkg_tile_matrix', 'gpkg_tile_matrix_set'] for gpkg_table in gpkg_tables: if gpkg_table not in tables: continue cur.execute(f'SELECT name FROM pragma_table_info(\'{gpkg_table}\');') column_names = [x[0] for x in cur.fetchall()] if 'table_name' not in column_names: continue cur.execute(f'DELETE FROM {gpkg_table} WHERE table_name = \'{lyr_name}\';') # spatial indexing table si_table = f'rtree_{lyr_name}_{geom_col}' if si_table in tables: cur.execute(f'DROP TABLE \'{si_table}\';') def delete_database(self, target_uri): Path(target_uri.database()).unlink() def rename_database(self, target_uri): target_path = Path(target_uri.database()) if not self._file.exists(): raise FileDoesNotExistError self._file.rename(target_path) def rename_layer(self, target_uri): target_name = target_uri.layer_name() with sqlite3.connect(self._file) as conn: cur = conn.cursor() cur.execute('SELECT table_name FROM gpkg_contents;') layers = [x[0] for x in cur.fetchall()] layers_lower = [x.lower() for x in layers] # check lyr exists in gpkg if self._lyr_name.lower() not in layers_lower: cur.close() raise TableDoesNotExistError else: src_table = layers[layers_lower.index(self._lyr_name.lower())] # allow to be case insensitive # all tables cur.execute('SELECT name from SQLITE_SCHEMA where type = \'table\';') tables = [x[0] for x in cur.fetchall()] # get the geometry column name cur.execute(f'SELECT column_name FROM gpkg_geometry_columns WHERE table_name=\'{src_table}\';') geom_col = [x[0] for x in cur.fetchall()][0] # change the layer name cur.execute(f'ALTER TABLE \'{src_table}\' RENAME TO \'{target_name}\';') # update the table name attribute in the header tables gpkg_tables = ['gpkg_contents', 'gpkg_data_column_constraints', 'gpkg_data_columns', 'gpkg_extensions', 'gpkg_geometry_columns', 'gpkg_metadata', 'gpkg_metadata_reference', 'gpkg_ogr_contents', 'gpkg_spatial_ref_sys', 'gpkg_tile_matrix', 'gpkg_tile_matrix_set'] for gpkg_table in gpkg_tables: if gpkg_table not in tables: continue cur.execute(f'SELECT name FROM pragma_table_info(\'{gpkg_table}\');') column_names = [x[0] for x in cur.fetchall()] if 'table_name' not in column_names: continue cur.execute(f'UPDATE {gpkg_table} SET table_name = \'{target_name}\' WHERE table_name = \'{src_table}\';') # spatial indexing table si_table_src = f'rtree_{src_table}_{geom_col}' si_table_des = f'rtree_{target_name}_{geom_col}' if si_table_src in tables: cur.execute(f'ALTER TABLE \'{si_table_src}\' RENAME TO \'{si_table_des}\';')