plugin_test.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. """
  2. ***************************************************************************
  3. plugin_test.py
  4. ---------------------
  5. Date : May 2017
  6. Copyright : (C) 2017, Sandro Santilli
  7. Email : strk at kbt dot io
  8. ***************************************************************************
  9. * *
  10. * This program is free software; you can redistribute it and/or modify *
  11. * it under the terms of the GNU General Public License as published by *
  12. * the Free Software Foundation; either version 2 of the License, or *
  13. * (at your option) any later version. *
  14. * *
  15. ***************************************************************************
  16. """
  17. __author__ = 'Sandro Santilli'
  18. __date__ = 'May 2017'
  19. __copyright__ = '(C) 2017, Sandro Santilli'
  20. import os
  21. import re
  22. import qgis
  23. import unittest
  24. from qgis.testing import start_app, QgisTestCase
  25. from qgis.core import QgsDataSourceUri
  26. from qgis.utils import iface
  27. from qgis.PyQt.QtCore import QObject
  28. start_app()
  29. from db_manager.db_plugins.postgis.plugin import PostGisDBPlugin, PGRasterTable
  30. from db_manager.db_plugins.postgis.plugin import PGDatabase
  31. from db_manager.db_plugins.postgis.data_model import PGSqlResultModel
  32. from db_manager.db_plugins.plugin import Table
  33. from db_manager.db_plugins.postgis.connector import PostGisDBConnector
  34. class TestDBManagerPostgisPlugin(QgisTestCase):
  35. @classmethod
  36. def setUpClass(self):
  37. self.old_pgdatabase_env = os.environ.get('PGDATABASE')
  38. # QGIS_PGTEST_DB contains the full connection string and not only the DB name!
  39. QGIS_PGTEST_DB = os.environ.get('QGIS_PGTEST_DB')
  40. if QGIS_PGTEST_DB is not None:
  41. test_uri = QgsDataSourceUri(QGIS_PGTEST_DB)
  42. self.testdb = test_uri.database()
  43. else:
  44. self.testdb = 'qgis_test'
  45. os.environ['PGDATABASE'] = self.testdb
  46. # Create temporary service file
  47. self.old_pgservicefile_env = os.environ.get('PGSERVICEFILE')
  48. self.tmpservicefile = '/tmp/qgis-test-{}-pg_service.conf'.format(os.getpid())
  49. os.environ['PGSERVICEFILE'] = self.tmpservicefile
  50. f = open(self.tmpservicefile, "w")
  51. f.write("[dbmanager]\ndbname={}\n".format(self.testdb))
  52. # TODO: add more things if PGSERVICEFILE was already set ?
  53. f.close()
  54. @classmethod
  55. def tearDownClass(self):
  56. # Restore previous env variables if needed
  57. if self.old_pgdatabase_env:
  58. os.environ['PGDATABASE'] = self.old_pgdatabase_env
  59. if self.old_pgservicefile_env:
  60. os.environ['PGSERVICEFILE'] = self.old_pgservicefile_env
  61. # Remove temporary service file
  62. os.unlink(self.tmpservicefile)
  63. # See https://github.com/qgis/QGIS/issues/24525
  64. def test_rasterTableURI(self):
  65. def check_rasterTableURI(expected_dbname):
  66. tables = database.tables()
  67. raster_tables_count = 0
  68. for tab in tables:
  69. if tab.type == Table.RasterType:
  70. raster_tables_count += 1
  71. uri = tab.uri()
  72. m = re.search(' dbname=\'([^ ]*)\' ', uri)
  73. self.assertTrue(m)
  74. actual_dbname = m.group(1)
  75. self.assertEqual(actual_dbname, expected_dbname)
  76. # print(tab.type)
  77. # print(tab.quotedName())
  78. # print(tab)
  79. # We need to make sure a database is created with at
  80. # least one raster table !
  81. self.assertGreaterEqual(raster_tables_count, 1)
  82. obj = QObject() # needs to be kept alive
  83. obj.connectionName = lambda: 'fake'
  84. obj.providerName = lambda: 'postgres'
  85. # Test for empty URI
  86. # See https://github.com/qgis/QGIS/issues/24525
  87. # and https://github.com/qgis/QGIS/issues/19005
  88. expected_dbname = self.testdb
  89. os.environ['PGDATABASE'] = expected_dbname
  90. database = PGDatabase(obj, QgsDataSourceUri())
  91. self.assertIsInstance(database, PGDatabase)
  92. uri = database.uri()
  93. self.assertEqual(uri.host(), '')
  94. self.assertEqual(uri.username(), '')
  95. self.assertEqual(uri.database(), expected_dbname)
  96. self.assertEqual(uri.service(), '')
  97. check_rasterTableURI(expected_dbname)
  98. # Test for service-only URI
  99. # See https://github.com/qgis/QGIS/issues/24526
  100. os.environ['PGDATABASE'] = 'fake'
  101. database = PGDatabase(obj, QgsDataSourceUri('service=dbmanager'))
  102. self.assertIsInstance(database, PGDatabase)
  103. uri = database.uri()
  104. self.assertEqual(uri.host(), '')
  105. self.assertEqual(uri.username(), '')
  106. self.assertEqual(uri.database(), '')
  107. self.assertEqual(uri.service(), 'dbmanager')
  108. check_rasterTableURI(expected_dbname)
  109. # See https://github.com/qgis/QGIS/issues/24732
  110. def test_unicodeInQuery(self):
  111. os.environ['PGDATABASE'] = self.testdb
  112. obj = QObject() # needs to be kept alive
  113. obj.connectionName = lambda: 'fake'
  114. obj.providerName = lambda: 'postgres'
  115. database = PGDatabase(obj, QgsDataSourceUri())
  116. self.assertIsInstance(database, PGDatabase)
  117. # SQL as string literal
  118. res = database.sqlResultModel("SELECT 'é'::text", obj)
  119. self.assertIsInstance(res, PGSqlResultModel)
  120. dat = res.getData(0, 0)
  121. self.assertEqual(dat, "é")
  122. # SQL as unicode literal
  123. res = database.sqlResultModel("SELECT 'é'::text", obj)
  124. self.assertIsInstance(res, PGSqlResultModel)
  125. dat = res.getData(0, 0)
  126. self.assertEqual(dat, "é")
  127. if __name__ == '__main__':
  128. unittest.main()