gpkg.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. from pathlib import Path
  2. from file_management.providers.errors import FileDoesNotExistError
  3. import sqlite3
  4. from PyQt5.QtWidgets import QMessageBox
  5. class GPKGFileManager:
  6. def __init__(self, uri):
  7. self._file = Path(uri.database())
  8. self._lyr_name = uri.layer_name()
  9. self._uri = uri
  10. def provider_name(self):
  11. return 'GeoPackage'
  12. def layer_exists(self, target_uri):
  13. with sqlite3.connect(self._file) as conn:
  14. cur = conn.cursor()
  15. cur.execute('SELECT table_name FROM gpkg_contents;')
  16. layers_lower = [x[0].lower() for x in cur.fetchall()]
  17. return target_uri.layer_name().lower() in layers_lower
  18. def database_exists(self, target_uri):
  19. return Path(target_uri.database()).exists()
  20. def ask_overwrite_layer(self, target_uri, iface):
  21. answer = QMessageBox.question(iface.mainWindow(), 'Overwrite Layer',
  22. f'{target_uri.layer_name()} already exists.\nDo you want to replace it?',
  23. QMessageBox.Yes | QMessageBox.No)
  24. return answer == QMessageBox.Yes
  25. def ask_overwrite_database(self, target_uri, iface):
  26. return True # already asked in file dialog
  27. def delete_layer(self, target_uri):
  28. with sqlite3.connect(self._file) as conn:
  29. cur = conn.cursor()
  30. cur.execute('SELECT table_name FROM gpkg_contents;')
  31. layers = [x[0] for x in cur.fetchall()]
  32. layers_lower = [x.lower() for x in layers]
  33. lyr_name = layers[layers_lower.index(target_uri.layer_name().lower())] # allow to be case insensitive
  34. # all tables
  35. cur.execute('SELECT name from SQLITE_SCHEMA where type = \'table\';')
  36. tables = [x[0] for x in cur.fetchall()]
  37. # get the geometry column name
  38. cur.execute(f'SELECT column_name FROM gpkg_geometry_columns WHERE table_name=\'{lyr_name}\';')
  39. geom_col = [x[0] for x in cur.fetchall()][0]
  40. # delete the existing table
  41. cur.execute(f'DROP TABLE \'{lyr_name}\';')
  42. # delete the row in the header tables
  43. gpkg_tables = ['gpkg_contents', 'gpkg_data_column_constraints', 'gpkg_data_columns', 'gpkg_extensions',
  44. 'gpkg_geometry_columns', 'gpkg_metadata', 'gpkg_metadata_reference', 'gpkg_ogr_contents',
  45. 'gpkg_spatial_ref_sys', 'gpkg_tile_matrix', 'gpkg_tile_matrix_set']
  46. for gpkg_table in gpkg_tables:
  47. if gpkg_table not in tables:
  48. continue
  49. cur.execute(f'SELECT name FROM pragma_table_info(\'{gpkg_table}\');')
  50. column_names = [x[0] for x in cur.fetchall()]
  51. if 'table_name' not in column_names:
  52. continue
  53. cur.execute(f'DELETE FROM {gpkg_table} WHERE table_name = \'{lyr_name}\';')
  54. # spatial indexing table
  55. si_table = f'rtree_{lyr_name}_{geom_col}'
  56. if si_table in tables:
  57. cur.execute(f'DROP TABLE \'{si_table}\';')
  58. def delete_database(self, target_uri):
  59. Path(target_uri.database()).unlink()
  60. def rename_database(self, target_uri):
  61. target_path = Path(target_uri.database())
  62. if not self._file.exists():
  63. raise FileDoesNotExistError
  64. self._file.rename(target_path)
  65. def rename_layer(self, target_uri):
  66. target_name = target_uri.layer_name()
  67. with sqlite3.connect(self._file) as conn:
  68. cur = conn.cursor()
  69. cur.execute('SELECT table_name FROM gpkg_contents;')
  70. layers = [x[0] for x in cur.fetchall()]
  71. layers_lower = [x.lower() for x in layers]
  72. # check lyr exists in gpkg
  73. if self._lyr_name.lower() not in layers_lower:
  74. cur.close()
  75. raise TableDoesNotExistError
  76. else:
  77. src_table = layers[layers_lower.index(self._lyr_name.lower())] # allow to be case insensitive
  78. # all tables
  79. cur.execute('SELECT name from SQLITE_SCHEMA where type = \'table\';')
  80. tables = [x[0] for x in cur.fetchall()]
  81. # get the geometry column name
  82. cur.execute(f'SELECT column_name FROM gpkg_geometry_columns WHERE table_name=\'{src_table}\';')
  83. geom_col = [x[0] for x in cur.fetchall()][0]
  84. # change the layer name
  85. cur.execute(f'ALTER TABLE \'{src_table}\' RENAME TO \'{target_name}\';')
  86. # update the table name attribute in the header tables
  87. gpkg_tables = ['gpkg_contents', 'gpkg_data_column_constraints', 'gpkg_data_columns', 'gpkg_extensions',
  88. 'gpkg_geometry_columns', 'gpkg_metadata', 'gpkg_metadata_reference', 'gpkg_ogr_contents',
  89. 'gpkg_spatial_ref_sys', 'gpkg_tile_matrix', 'gpkg_tile_matrix_set']
  90. for gpkg_table in gpkg_tables:
  91. if gpkg_table not in tables:
  92. continue
  93. cur.execute(f'SELECT name FROM pragma_table_info(\'{gpkg_table}\');')
  94. column_names = [x[0] for x in cur.fetchall()]
  95. if 'table_name' not in column_names:
  96. continue
  97. cur.execute(f'UPDATE {gpkg_table} SET table_name = \'{target_name}\' WHERE table_name = \'{src_table}\';')
  98. # spatial indexing table
  99. si_table_src = f'rtree_{src_table}_{geom_col}'
  100. si_table_des = f'rtree_{target_name}_{geom_col}'
  101. if si_table_src in tables:
  102. cur.execute(f'ALTER TABLE \'{si_table_src}\' RENAME TO \'{si_table_des}\';')