| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- from pathlib import Path
- from file_management.providers.errors import FileDoesNotExistError
- import sqlite3
- from PyQt5.QtWidgets import QMessageBox
- class GPKGFileManager:
- def __init__(self, uri):
- self._file = Path(uri.database())
- self._lyr_name = uri.layer_name()
- self._uri = uri
- def provider_name(self):
- return 'GeoPackage'
- def layer_exists(self, target_uri):
- with sqlite3.connect(self._file) as conn:
- cur = conn.cursor()
- cur.execute('SELECT table_name FROM gpkg_contents;')
- layers_lower = [x[0].lower() for x in cur.fetchall()]
- return target_uri.layer_name().lower() in layers_lower
- def database_exists(self, target_uri):
- return Path(target_uri.database()).exists()
- def ask_overwrite_layer(self, target_uri, iface):
- answer = QMessageBox.question(iface.mainWindow(), 'Overwrite Layer',
- f'{target_uri.layer_name()} already exists.\nDo you want to replace it?',
- QMessageBox.Yes | QMessageBox.No)
- return answer == QMessageBox.Yes
- def ask_overwrite_database(self, target_uri, iface):
- return True # already asked in file dialog
- def delete_layer(self, target_uri):
- with sqlite3.connect(self._file) as conn:
- cur = conn.cursor()
- cur.execute('SELECT table_name FROM gpkg_contents;')
- layers = [x[0] for x in cur.fetchall()]
- layers_lower = [x.lower() for x in layers]
- lyr_name = layers[layers_lower.index(target_uri.layer_name().lower())] # allow to be case insensitive
- # all tables
- cur.execute('SELECT name from SQLITE_SCHEMA where type = \'table\';')
- tables = [x[0] for x in cur.fetchall()]
- # get the geometry column name
- cur.execute(f'SELECT column_name FROM gpkg_geometry_columns WHERE table_name=\'{lyr_name}\';')
- geom_col = [x[0] for x in cur.fetchall()][0]
- # delete the existing table
- cur.execute(f'DROP TABLE \'{lyr_name}\';')
- # delete the row in the header tables
- gpkg_tables = ['gpkg_contents', 'gpkg_data_column_constraints', 'gpkg_data_columns', 'gpkg_extensions',
- 'gpkg_geometry_columns', 'gpkg_metadata', 'gpkg_metadata_reference', 'gpkg_ogr_contents',
- 'gpkg_spatial_ref_sys', 'gpkg_tile_matrix', 'gpkg_tile_matrix_set']
- for gpkg_table in gpkg_tables:
- if gpkg_table not in tables:
- continue
- cur.execute(f'SELECT name FROM pragma_table_info(\'{gpkg_table}\');')
- column_names = [x[0] for x in cur.fetchall()]
- if 'table_name' not in column_names:
- continue
- cur.execute(f'DELETE FROM {gpkg_table} WHERE table_name = \'{lyr_name}\';')
- # spatial indexing table
- si_table = f'rtree_{lyr_name}_{geom_col}'
- if si_table in tables:
- cur.execute(f'DROP TABLE \'{si_table}\';')
- def delete_database(self, target_uri):
- Path(target_uri.database()).unlink()
- def rename_database(self, target_uri):
- target_path = Path(target_uri.database())
- if not self._file.exists():
- raise FileDoesNotExistError
- self._file.rename(target_path)
- def rename_layer(self, target_uri):
- target_name = target_uri.layer_name()
- with sqlite3.connect(self._file) as conn:
- cur = conn.cursor()
- cur.execute('SELECT table_name FROM gpkg_contents;')
- layers = [x[0] for x in cur.fetchall()]
- layers_lower = [x.lower() for x in layers]
- # check lyr exists in gpkg
- if self._lyr_name.lower() not in layers_lower:
- cur.close()
- raise TableDoesNotExistError
- else:
- src_table = layers[layers_lower.index(self._lyr_name.lower())] # allow to be case insensitive
- # all tables
- cur.execute('SELECT name from SQLITE_SCHEMA where type = \'table\';')
- tables = [x[0] for x in cur.fetchall()]
- # get the geometry column name
- cur.execute(f'SELECT column_name FROM gpkg_geometry_columns WHERE table_name=\'{src_table}\';')
- geom_col = [x[0] for x in cur.fetchall()][0]
- # change the layer name
- cur.execute(f'ALTER TABLE \'{src_table}\' RENAME TO \'{target_name}\';')
- # update the table name attribute in the header tables
- gpkg_tables = ['gpkg_contents', 'gpkg_data_column_constraints', 'gpkg_data_columns', 'gpkg_extensions',
- 'gpkg_geometry_columns', 'gpkg_metadata', 'gpkg_metadata_reference', 'gpkg_ogr_contents',
- 'gpkg_spatial_ref_sys', 'gpkg_tile_matrix', 'gpkg_tile_matrix_set']
- for gpkg_table in gpkg_tables:
- if gpkg_table not in tables:
- continue
- cur.execute(f'SELECT name FROM pragma_table_info(\'{gpkg_table}\');')
- column_names = [x[0] for x in cur.fetchall()]
- if 'table_name' not in column_names:
- continue
- cur.execute(f'UPDATE {gpkg_table} SET table_name = \'{target_name}\' WHERE table_name = \'{src_table}\';')
- # spatial indexing table
- si_table_src = f'rtree_{src_table}_{geom_col}'
- si_table_des = f'rtree_{target_name}_{geom_col}'
- if si_table_src in tables:
- cur.execute(f'ALTER TABLE \'{si_table_src}\' RENAME TO \'{si_table_des}\';')
|