# -*- coding: utf-8 -*- __author__ = 'wanger' __date__ = '2024-08-20' __copyright__ = '(C) 2024 by siwei' __revision__ = '1.0' # inbuilt libraries import os from pathlib import Path from typing import List, Optional, Set, Dict, Iterable, Any import siwei_config # third-party libraries import requests # custom functions from processing.tools.GeoServer.Calculation_gdal import raster_value from processing.tools.GeoServer.Style import catagorize_xml, classified_xml, coverage_style_xml, outline_only_xml from processing.tools.GeoServer.supports import prepare_zip_file, is_valid_xml, is_surrounded_by_quotes from xmltodict import parse, unparse default_gridset_name = "WebMercatorQuadx2" # 默认切片方案 default_seed_type = "seed" # 默认切片请求类型 Type can be seed (add tiles), reseed (replace tiles), or truncate (remove tiles) default_cache_start = 0 # 默认切片始终级别 default_cache_stop = 18 def _parse_request_options(request_options: Dict[str, Any]): return request_options if request_options is not None else {} # Custom exceptions. class GeoserverException(Exception): def __init__(self, status, message): self.status = status self.message = message super().__init__(f"Status : {self.status} - {self.message}") # call back class for reading the data class DataProvider: def __init__(self, data): self.data = data self.finished = False def read_cb(self, size): assert len(self.data) <= size if not self.finished: self.finished = True return self.data else: # Nothing more to read return "" # callback class for reading the files class FileReader: def __init__(self, fp): self.fp = fp def read_callback(self, size): return self.fp.read(size) class Geoserver: def __init__( self, service_url: str = siwei_config.CONFIG['geoserver']['url'], # default deployment url during installation username: str = siwei_config.CONFIG['geoserver']['username'], # default username during geoserver installation password: str = siwei_config.CONFIG['geoserver']['password'], # default password during geoserver installation request_options: Dict[str, Any] = None # additional parameters to be sent with each request ): self.service_url = service_url self.username = username self.password = password self.request_options = request_options if request_options is not None else {} def _requests(self, method: str, url: str, **kwargs) -> requests.Response: if method.lower() == "post": return requests.post(url, auth=(self.username, self.password), **kwargs, **self.request_options) elif method.lower() == "get": return requests.get(url, auth=(self.username, self.password), **kwargs, **self.request_options) elif method.lower() == "put": return requests.put(url, auth=(self.username, self.password), **kwargs, **self.request_options) elif method.lower() == "delete": return requests.delete(url, auth=(self.username, self.password), **kwargs, **self.request_options) def get_manifest(self): url = "{}/rest/about/manifest.json".format(self.service_url) r = self._requests("get", url) if r.status_code == 200: return r.json() else: raise GeoserverException(r.status_code, r.content) def get_version(self): url = "{}/rest/about/version.json".format(self.service_url) r = self._requests("get", url) if r.status_code == 200: return r.json() else: raise GeoserverException(r.status_code, r.content) def get_status(self): url = "{}/rest/about/status.json".format(self.service_url) r = self._requests("get", url) if r.status_code == 200: return r.json() else: raise GeoserverException(r.status_code, r.content) def get_system_status(self): url = "{}/rest/about/system-status.json".format(self.service_url) r = self._requests("get", url) if r.status_code == 200: return r.json() else: raise GeoserverException(r.status_code, r.content) def reload(self): url = "{}/rest/reload".format(self.service_url) r = self._requests("post", url) if r.status_code == 200: return "Status code: {}".format(r.status_code) else: raise GeoserverException(r.status_code, r.content) def reset(self): url = "{}/rest/reset".format(self.service_url) r = self._requests("post", url) if r.status_code == 200: return "Status code: {}".format(r.status_code) else: raise GeoserverException(r.status_code, r.content) # _______________________________________________________________________________________________ # # WORKSPACES # _______________________________________________________________________________________________ # def get_default_workspace(self): url = "{}/rest/workspaces/default".format(self.service_url) r = self._requests("get", url) if r.status_code == 200: return r.json() else: raise GeoserverException(r.status_code, r.content) def get_workspace(self, workspace): url = "{}/rest/workspaces/{}.json".format(self.service_url, workspace) r = self._requests("get", url, params={"recurse": "true"}) if r.status_code == 200: return r.json() else: raise GeoserverException(r.status_code, r.content) def get_workspaces(self): url = "{}/rest/workspaces".format(self.service_url) r = self._requests("get", url) if r.status_code == 200: return r.json() else: raise GeoserverException(r.status_code, r.content) def set_default_workspace(self, workspace: str): url = "{}/rest/workspaces/default".format(self.service_url) data = "{}".format(workspace) r = self._requests( "put", url, data=data, headers={"content-type": "text/xml"} ) if r.status_code == 200: return "Status code: {}, default workspace {} set!".format( r.status_code, workspace ) else: raise GeoserverException(r.status_code, r.content) def create_workspace(self, workspace: str): url = "{}/rest/workspaces".format(self.service_url) data = "{}".format(workspace) headers = {"content-type": "text/xml"} r = self._requests("post", url, data=data, headers=headers) if r.status_code == 201: return "{} Workspace {} created!".format(r.status_code, workspace) else: return "{} Workspace {} already exists!".format(r.status_code, workspace) # raise GeoserverException(r.status_code, r.content) def delete_workspace(self, workspace: str): payload = {"recurse": "true"} url = "{}/rest/workspaces/{}".format(self.service_url, workspace) r = self._requests("delete", url, params=payload) if r.status_code == 200: return "Status code: {}, delete workspace".format(r.status_code) else: raise GeoserverException(r.status_code, r.content) def get_datastore(self, store_name: str, workspace: Optional[str] = None): if workspace is None: workspace = "default" url = "{}/rest/workspaces/{}/datastores/{}".format( self.service_url, workspace, store_name ) r = self._requests("get", url) if r.status_code == 200: return r.json() else: raise GeoserverException(r.status_code, r.content) def get_datastores(self, workspace: Optional[str] = None): if workspace is None: workspace = "default" url = "{}/rest/workspaces/{}/datastores.json".format( self.service_url, workspace ) r = self._requests("get", url) if r.status_code == 200: return r.json() else: raise GeoserverException(r.status_code, r.content) def get_coveragestore( self, coveragestore_name: str, workspace: Optional[str] = None ): payload = {"recurse": "true"} if workspace is None: workspace = "default" url = "{}/rest/workspaces/{}/coveragestores/{}.json".format( self.service_url, workspace, coveragestore_name ) r = self._requests(method="get", url=url, params=payload) if r.status_code == 200: return r.json() else: raise GeoserverException(r.status_code, r.content) def get_coveragestores(self, workspace: str = None): if workspace is None: workspace = "default" url = "{}/rest/workspaces/{}/coveragestores".format(self.service_url, workspace) r = self._requests("get", url) if r.status_code == 200: return r.json() else: raise GeoserverException(r.status_code, r.content) def create_coveragestore( self, path, workspace: Optional[str] = None, layer_name: Optional[str] = None, file_type: str = "GeoTIFF", content_type: str = "image/tiff", ): if path is None: raise Exception("You must provide the full path to the raster") if workspace is None: workspace = "default" if layer_name is None: layer_name = os.path.basename(path) f = layer_name.split(".") if len(f) > 0: layer_name = f[0] file_type = file_type.lower() url = "{0}/rest/workspaces/{1}/coveragestores/{2}/file.{3}?coverageName={2}".format( self.service_url, workspace, layer_name, file_type ) headers = {"content-type": content_type, "Accept": "application/json"} r = None with open(path, "rb") as f: r = self._requests(method="put", url=url, data=f, headers=headers) if r.status_code == 201: return r.json() else: raise GeoserverException(r.status_code, r.content) def publish_time_dimension_to_coveragestore( self, store_name: Optional[str] = None, workspace: Optional[str] = None, presentation: Optional[str] = "LIST", units: Optional[str] = "ISO8601", default_value: Optional[str] = "MINIMUM", content_type: str = "application/xml; charset=UTF-8", ): url = "{0}/rest/workspaces/{1}/coveragestores/{2}/coverages/{2}".format( self.service_url, workspace, store_name ) headers = {"content-type": content_type} time_dimension_data = ( "" "true" "" "" "" "true" "{}" "{}" "" "{}" "" "" "" "" "".format(presentation, units, default_value) ) r = self._requests( method="put", url=url, data=time_dimension_data, headers=headers ) if r.status_code in [200, 201]: return r.json() else: raise GeoserverException(r.status_code, r.content) def get_layer(self, layer_name: str, workspace: Optional[str] = None): url = "{}/rest/layers/{}".format(self.service_url, layer_name) if workspace is not None: url = "{}/rest/workspaces/{}/layers/{}".format( self.service_url, workspace, layer_name ) r = self._requests("get", url) if r.status_code == 200: return r.json() else: raise GeoserverException(r.status_code, r.content) def get_layers(self, workspace: Optional[str] = None): url = "{}/rest/layers".format(self.service_url) if workspace is not None: url = "{}/rest/workspaces/{}/layers".format(self.service_url, workspace) r = self._requests("get", url) if r.status_code == 200: return r.json() else: raise GeoserverException(r.status_code, r.content) def delete_layer(self, layer_name: str, workspace: Optional[str] = None): payload = {"recurse": "true"} url = "{}/rest/workspaces/{}/layers/{}".format( self.service_url, workspace, layer_name ) if workspace is None: url = "{}/rest/layers/{}".format(self.service_url, layer_name) r = self._requests(method="delete", url=url, params=payload) if r.status_code == 200: return "Status code: {}, delete layer".format(r.status_code) else: raise GeoserverException(r.status_code, r.content) def get_layergroups(self, workspace: Optional[str] = None): url = "{}/rest/layergroups".format(self.service_url) if workspace is not None: url = "{}/rest/workspaces/{}/layergroups".format( self.service_url, workspace ) r = self._requests("get", url) if r.status_code == 200: return r.json() else: raise GeoserverException(r.status_code, r.content) def get_layergroup(self, layer_name: str, workspace: Optional[str] = None): url = "{}/rest/layergroups/{}".format(self.service_url, layer_name) if workspace is not None: url = "{}/rest/workspaces/{}/layergroups/{}".format( self.service_url, workspace, layer_name ) r = self._requests("get", url) if r.status_code == 200: return r.json() else: return None # raise GeoserverException(r.status_code, r.content) def create_layergroup( self, name: str = "geoserver-rest-layergroup", mode: str = "single", title: str = "geoserver-rest layer group", abstract_text: str = "A new layergroup created with geoserver-rest python package", layers: List[str] = [], workspace: Optional[str] = None, formats: str = "html", metadata: List[dict] = [], keywords: List[str] = [], ) -> str: assert isinstance(name, str), "Name must be of type String:''" assert isinstance(mode, str), "Mode must be of type String:''" assert isinstance(title, str), "Title must be of type String:''" assert isinstance(abstract_text, str), "Abstract text must be of type String:''" assert isinstance(formats, str), "Format must be of type String:''" assert isinstance( metadata, list ), "Metadata must be of type List of dict:[{'about':'geoserver rest data metadata','content_url':'link to content url'}]" assert isinstance( keywords, list ), "Keywords must be of type List:['keyword1','keyword2'...]" assert isinstance( layers, list ), "Layers must be of type List:['layer1','layer2'...]" if workspace: assert isinstance(workspace, str), "Workspace must be of type String:''" # check if the workspace is valid in GeoServer if self.get_workspace(workspace) is None: raise Exception("Workspace is not valid in GeoServer Instance") supported_modes: Set = { "single", "opaque", "named", "container", "eo", } supported_formats: Set = {"html", "json", "xml"} if mode.lower() != "single" and mode.lower() not in supported_modes: raise Exception( f"Mode not supported. Acceptable modes are : {supported_modes}" ) if formats.lower() != "html" and formats.lower() not in supported_formats: raise Exception( f"Format not supported. Acceptable formats are : {supported_formats}" ) # check if it already exist in GeoServer try: existing_layergroup = self.get_layergroup(name, workspace=workspace) except GeoserverException: existing_layergroup = None if existing_layergroup is not None: raise Exception(f"Layergroup: {name} already exist in GeoServer instance") if len(layers) == 0: raise Exception("No layer provided!") else: for layer in layers: # check if it is valid in geoserver try: # Layer check self.get_layer( layer_name=layer, workspace=workspace if workspace is not None else None, ) except GeoserverException: try: # Layer group check self.get_layergroup( layer_name=layer, workspace=workspace if workspace is not None else None, ) except GeoserverException: raise Exception( f"Layer: {layer} is not a valid layer in the GeoServer instance" ) skeleton = "" if workspace: skeleton += f"{workspace}" metadata_xml_list = [] if len(metadata) >= 1: for meta in metadata: metadata_about = meta.get("about") metadata_content_url = meta.get("content_url") metadata_xml_list.append( f""" text/plain {metadata_about} ISO19115:2003 {metadata_content_url} """ ) metadata_xml = f"{''.join(['{}'] * len(metadata_xml_list)).format(*metadata_xml_list)}" skeleton += metadata_xml layers_xml_list: List[str] = [] for layer in layers: published_type = "layer" try: # Layer check self.get_layer( layer_name=layer, workspace=workspace if workspace is not None else None, ) except GeoserverException: # It's a layer group published_type = "layerGroup" layers_xml_list.append( f""" {layer} {self.service_url}/layers/{layer}.xml """ ) layers_xml: str = f"{''.join(['{}'] * len(layers)).format(*layers_xml_list)}" skeleton += layers_xml if len(keywords) >= 1: keyword_xml_list: List[str] = [ f"{keyword}" for keyword in keywords ] keywords_xml: str = f"{''.join(['{}'] * len(keywords)).format(*keyword_xml_list)}" skeleton += keywords_xml data = f""" {name} {mode} {title} {abstract_text} {skeleton} """ url = f"{self.service_url}/rest/layergroups/" r = self._requests( method="post", url=url, data=data, headers={"content-type": "text/xml"} ) if r.status_code == 201: layergroup_url = f"{self.service_url}/rest/layergroups/{name}.{formats}" return f"layergroup created successfully! Layergroup link: {layergroup_url}" else: raise GeoserverException(r.status_code, r.content) def update_layergroup( self, layergroup_name, title: Optional[str] = None, abstract_text: Optional[str] = None, formats: str = "html", metadata: List[dict] = [], keywords: List[str] = [], ) -> str: if self.get_layergroup(layer_name=layergroup_name) is None: raise Exception( f"Layer group: {layergroup_name} is not a valid layer group in the Geoserver instance" ) if title is not None: assert isinstance(title, str), "Title must be of type String:''" if abstract_text is not None: assert isinstance( abstract_text, str ), "Abstract text must be of type String:''" assert isinstance(formats, str), "Format must be of type String:''" assert isinstance( metadata, list ), "Metadata must be of type List of dict:[{'about':'geoserver rest data metadata','content_url':'lint to content url'}]" assert isinstance( keywords, list ), "Keywords must be of type List:['keyword1','keyword2'...]" supported_formats: Set = {"html", "json", "xml"} if formats.lower() != "html" and formats.lower() not in supported_formats: raise Exception( f"Format not supported. Acceptable formats are : {supported_formats}" ) skeleton = "" if title: skeleton += f"{title}" if abstract_text: skeleton += f"{abstract_text}" metadata_xml_list = [] if len(metadata) >= 1: for meta in metadata: metadata_about = meta.get("about") metadata_content_url = meta.get("content_url") metadata_xml_list.append( f""" text/plain {metadata_about} ISO19115:2003 {metadata_content_url} """ ) metadata_xml = f"{''.join(['{}'] * len(metadata_xml_list)).format(*metadata_xml_list)}" skeleton += metadata_xml if len(keywords) >= 1: keyword_xml_list: List[str] = [ f"{keyword}" for keyword in keywords ] keywords_xml: str = f"{''.join(['{}'] * len(keyword_xml_list)).format(*keyword_xml_list)}" skeleton += keywords_xml data = f""" {skeleton} """ url = f"{self.service_url}/rest/layergroups/{layergroup_name}" r = self._requests( method="put", url=url, data=data, headers={"content-type": "text/xml", "accept": "application/xml"}, ) if r.status_code == 200: layergroup_url = ( f"{self.service_url}/rest/layergroups/{layergroup_name}.{formats}" ) return f"layergroup updated successfully! Layergroup link: {layergroup_url}" else: raise GeoserverException(r.status_code, r.content) def delete_layergroup( self, layergroup_name: str, workspace: Optional[str] = None ) -> str: # raises an exception in case the layer group doesn't exist self.get_layergroup(layer_name=layergroup_name, workspace=workspace) if workspace is None: url = f"{self.service_url}/rest/layergroups/{layergroup_name}" else: url = f"{self.service_url}/rest/workspaces/{workspace}/layergroups/{layergroup_name}" r = self._requests(url=url, method="delete") if r.status_code == 200: return "Layer group deleted successfully" else: return "Layer group deleted successfully" # raise GeoserverException(r.status_code, r.content) def add_layer_to_layergroup( self, layer_name: str, layer_workspace: str, layergroup_name: str, layergroup_workspace: str = None, ) -> None: layergroup_info = self.get_layergroup( layer_name=layergroup_name, workspace=layergroup_workspace ) layer_info = self.get_layer(layer_name=layer_name, workspace=layer_workspace) # build list of existing publishables & styles publishables = layergroup_info["layerGroup"]["publishables"]["published"] if not isinstance(publishables, list): # only 1 layer up to now publishables = [publishables] styles = layergroup_info["layerGroup"]["styles"]["style"] if not isinstance(styles, list): # only 1 layer up to now styles = [styles] # add publishable & style for the new layer new_pub = { "name": f"{layer_workspace}:{layer_name}", "href": f"{self.service_url}/rest/workspaces/{layer_workspace}/layers/{layer_name}.json", } publishables.append(new_pub) new_style = layer_info["layer"]["defaultStyle"] styles.append(new_style) data = self._layergroup_definition_from_layers_and_styles( publishables=publishables, styles=styles ) if layergroup_workspace is None: url = f"{self.service_url}/rest/layergroups/{layergroup_name}" else: url = f"{self.service_url}/rest/workspaces/{layergroup_workspace}/layergroups/{layergroup_name}" r = self._requests( method="put", url=url, data=data, headers={"content-type": "text/xml", "accept": "application/xml"}, ) if r.status_code == 200: return else: raise GeoserverException(r.status_code, r.content) def remove_layer_from_layergroup( self, layer_name: str, layer_workspace: str, layergroup_name: str, layergroup_workspace: str = None, ) -> None: layergroup_info = self.get_layergroup( layer_name=layergroup_name, workspace=layergroup_workspace ) # build list of existing publishables & styles publishables = layergroup_info["layerGroup"]["publishables"]["published"] if not isinstance(publishables, list): # only 1 layer up to now publishables = [publishables] styles = layergroup_info["layerGroup"]["styles"]["style"] if not isinstance(styles, list): # only 1 layer up to now styles = [styles] layer_to_remove = f"{layer_workspace}:{layer_name}" revised_set_of_publishables_and_styles = [ (pub, style) for (pub, style) in zip( layergroup_info["layerGroup"]["publishables"]["published"], layergroup_info["layerGroup"]["styles"]["style"], ) if pub["name"] != layer_to_remove ] revised_set_of_publishables = list( map(list, zip(*revised_set_of_publishables_and_styles)) )[0] revised_set_of_styles = list( map(list, zip(*revised_set_of_publishables_and_styles)) )[1] xml_payload = self._layergroup_definition_from_layers_and_styles( publishables=revised_set_of_publishables, styles=revised_set_of_styles ) if layergroup_workspace is None: url = f"{self.service_url}/rest/layergroups/{layergroup_name}" else: url = f"{self.service_url}/rest/workspaces/{layergroup_workspace}/layergroups/{layergroup_name}" r = self._requests( method="put", url=url, data=xml_payload, headers={"content-type": "text/xml", "accept": "application/xml"}, ) if r.status_code == 200: return else: raise GeoserverException(r.status_code, r.content) def _layergroup_definition_from_layers_and_styles( self, publishables: list, styles: list ) -> str: # the get_layergroup method may return an empty string for style; # so we get the default styles for each layer with no style information in the layergroup if len(styles) == 1: index = [0] else: index = range(len(styles)) for ix, this_style, this_layer in zip(index, styles, publishables): if this_style == "": this_layer_info = self.get_layer( layer_name=this_layer["name"].split(":")[1], workspace=this_layer["name"].split(":")[0], ) styles[ix] = { "name": this_layer_info["layer"]["defaultStyle"]["name"], "href": this_layer_info["layer"]["defaultStyle"]["href"], } # build xml structure layer_skeleton = "" style_skeleton = "" for publishable in publishables: layer_str = f""" {publishable['name']} {publishable['href']} """ layer_skeleton += layer_str for style in styles: style_str = f""" """ style_skeleton += style_str data = f""" {layer_skeleton} {style_skeleton} """ return data def get_style(self, style_name, workspace: Optional[str] = None): url = "{}/rest/styles/{}.json".format(self.service_url, style_name) if workspace is not None: url = "{}/rest/workspaces/{}/styles/{}.json".format( self.service_url, workspace, style_name ) r = self._requests("get", url) if r.status_code == 200: return r.json() else: raise GeoserverException(r.status_code, r.content) def get_styles(self, workspace: Optional[str] = None): url = "{}/rest/styles.json".format(self.service_url) if workspace is not None: url = "{}/rest/workspaces/{}/styles.json".format( self.service_url, workspace ) r = self._requests("get", url) if r.status_code == 200: return r.json() else: raise GeoserverException(r.status_code, r.content) def upload_style( self, path: str, name: Optional[str] = None, workspace: Optional[str] = None, sld_version: str = "1.1.0", ): if name is None: name = os.path.basename(path) f = name.split(".") if len(f) > 0: name = f[0] if is_valid_xml(path): xml = path elif Path(path).exists(): with open(path, "r", encoding='utf-8') as f: xml = f.read() xml = xml.encode("gbk") else: raise ValueError("`path` must be either a path to a style file, or a valid XML string.") headers = {"content-type": "text/xml"} url = "{}/rest/workspaces/{}/styles".format(self.service_url, workspace) sld_content_type = "application/vnd.ogc.sld+xml" if sld_version == "1.1.0" or sld_version == "1.1": sld_content_type = "application/vnd.ogc.se+xml" header_sld = {"content-type": sld_content_type} if workspace is None: url = "{}/rest/styles".format(self.service_url) style_xml = "".format( name, name + ".sld" ) r = self._requests(method="post", url=url, data=style_xml, headers=headers) if r.status_code == 201: r_sld = self._requests(method="put", url=url + "/" + name, data=xml, headers=header_sld) if r_sld.status_code == 200: return r_sld.status_code else: return r_sld.status_code # raise GeoserverException(r_sld.status_code, r_sld.content) else: return r.status_code # raise GeoserverException(r.status_code, r.content) def create_coveragestyle( self, raster_path: str, style_name: Optional[str] = None, workspace: str = None, color_ramp: str = "RdYlGn_r", cmap_type: str = "ramp", number_of_classes: int = 5, opacity: float = 1, ): raster = raster_value(raster_path) min_value = raster["min"] max_value = raster["max"] if style_name is None: style_name = raster["file_name"] coverage_style_xml( color_ramp, style_name, cmap_type, min_value, max_value, number_of_classes, opacity, ) style_xml = "".format( style_name, style_name + ".sld" ) if style_name is None: style_name = os.path.basename(raster_path) f = style_name.split(".") if len(f) > 0: style_name = f[0] headers = {"content-type": "text/xml"} url = "{}/rest/workspaces/{}/styles".format(self.service_url, workspace) sld_content_type = "application/vnd.ogc.sld+xml" header_sld = {"content-type": sld_content_type} if workspace is None: url = "{}/rest/styles".format(self.service_url) r = self._requests( "post", url, data=style_xml, headers=headers, ) if r.status_code == 201: with open("style.sld", "rb") as f: r_sld = self._requests(method="put", url=url + "/" + style_name, data=f.read(), headers=header_sld) os.remove("style.sld") if r_sld.status_code == 200: return r_sld.status_code else: raise GeoserverException(r_sld.status_code, r_sld.content) else: raise GeoserverException(r.status_code, r.content) def create_catagorized_featurestyle( self, style_name: str, column_name: str, column_distinct_values, workspace: str = None, color_ramp: str = "tab20", geom_type: str = "polygon", ): catagorize_xml(column_name, column_distinct_values, color_ramp, geom_type) style_xml = "".format( style_name, style_name + ".sld" ) headers = {"content-type": "text/xml"} url = "{}/rest/workspaces/{}/styles".format(self.service_url, workspace) sld_content_type = "application/vnd.ogc.sld+xml" header_sld = {"content-type": sld_content_type} if workspace is None: url = "{}/rest/styles".format(self.service_url) r = self._requests( "post", url, data=style_xml, headers=headers, ) if r.status_code == 201: with open("style.sld", "rb") as f: r_sld = self._requests( "put", url + "/" + style_name, data=f.read(), headers=header_sld, ) os.remove("style.sld") if r_sld.status_code == 200: return r_sld.status_code else: raise GeoserverException(r_sld.status_code, r_sld.content) else: raise GeoserverException(r.status_code, r.content) def create_outline_featurestyle( self, style_name: str, color: str = "#3579b1", width: str = "2", geom_type: str = "polygon", workspace: Optional[str] = None, ): outline_only_xml(color, width, geom_type) style_xml = "".format( style_name, style_name + ".sld" ) headers = {"content-type": "text/xml"} url = "{}/rest/workspaces/{}/styles".format(self.service_url, workspace) sld_content_type = "application/vnd.ogc.sld+xml" header_sld = {"content-type": sld_content_type} if workspace is None: url = "{}/rest/styles".format(self.service_url) r = self._requests( "post", url, data=style_xml, headers=headers, ) if r.status_code == 201: with open("style.sld", "rb") as f: r_sld = self._requests( "put", url + "/" + style_name, data=f.read(), headers=header_sld, ) os.remove("style.sld") if r_sld.status_code == 200: return r_sld.status_code else: raise GeoserverException(r_sld.status_code, r_sld.content) else: raise GeoserverException(r.status_code, r.content) def create_classified_featurestyle( self, style_name: str, column_name: str, column_distinct_values, workspace: Optional[str] = None, color_ramp: str = "tab20", geom_type: str = "polygon", # outline_color: str = "#3579b1", ): classified_xml( style_name, column_name, column_distinct_values, color_ramp, geom_type, ) style_xml = "".format( column_name, column_name + ".sld" ) headers = {"content-type": "text/xml"} url = "{}/rest/workspaces/{}/styles".format(self.service_url, workspace) sld_content_type = "application/vnd.ogc.sld+xml" header_sld = {"content-type": sld_content_type} if workspace is None: url = "{}/rest/styles".format(self.service_url) r = self._requests( "post", url, data=style_xml, headers=headers, ) if r.status_code == 201: with open("style.sld", "rb") as f: r_sld = self._requests( "put", url + "/" + style_name, data=f.read(), headers=header_sld, ) os.remove("style.sld") if r_sld.status_code == 200: return r_sld.status_code else: raise GeoserverException(r_sld.status_code, r_sld.content) else: raise GeoserverException(r.status_code, r.content) def publish_style( self, layer_name: str, style_name: str, workspace: str, ): headers = {"content-type": "text/xml"} url = "{}/rest/layers/{}:{}".format(self.service_url, workspace, layer_name) style_xml = ( "{}".format( style_name ) ) r = self._requests( "put", url, data=style_xml, headers=headers, ) if r.status_code == 200: return r.status_code else: raise GeoserverException(r.status_code, r.content) def delete_style(self, style_name: str, workspace: Optional[str] = None): payload = {"recurse": "true"} url = "{}/rest/workspaces/{}/styles/{}".format( self.service_url, workspace, style_name ) if workspace is None: url = "{}/rest/styles/{}".format(self.service_url, style_name) r = self._requests("delete", url, params=payload) if r.status_code == 200: return "Status code: {}, delete style".format(r.status_code) else: return "Status code: {}, delete style".format(r.status_code) # raise GeoserverException(r.status_code, r.content) def create_featurestore( self, store_name: str, workspace: Optional[str] = None, db: str = "postgres", host: str = "localhost", port: int = 5432, schema: str = "public", pg_user: str = "postgres", pg_password: str = "admin", overwrite: bool = False, expose_primary_keys: str = "false", description: Optional[str] = None, evictor_run_periodicity: Optional[int] = 300, max_open_prepared_statements: Optional[int] = 50, encode_functions: Optional[str] = "false", primary_key_metadata_table: Optional[str] = None, batch_insert_size: Optional[int] = 1, preparedstatements: Optional[str] = "false", loose_bbox: Optional[str] = "true", estimated_extends: Optional[str] = "true", fetch_size: Optional[int] = 1000, validate_connections: Optional[str] = "true", support_on_the_fly_geometry_simplification: Optional[str] = "true", connection_timeout: Optional[int] = 20, create_database: Optional[str] = "false", min_connections: Optional[int] = 1, max_connections: Optional[int] = 10, evictor_tests_per_run: Optional[int] = 3, test_while_idle: Optional[str] = "true", max_connection_idle_time: Optional[int] = 300, ): url = "{}/rest/workspaces/{}/datastores".format(self.service_url, workspace) headers = {"content-type": "text/xml"} database_connection = """ {} {} {} {} {} {} {} postgis {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} """.format( store_name, description, expose_primary_keys, host, port, pg_user, pg_password, schema, db, evictor_run_periodicity, max_open_prepared_statements, encode_functions, primary_key_metadata_table, batch_insert_size, preparedstatements, estimated_extends, fetch_size, validate_connections, support_on_the_fly_geometry_simplification, connection_timeout, create_database, min_connections, max_connections, evictor_tests_per_run, test_while_idle, max_connection_idle_time, loose_bbox, ) if overwrite: url = "{}/rest/workspaces/{}/datastores/{}".format( self.service_url, workspace, store_name ) r = self._requests( "put", url, data=database_connection, headers=headers, ) else: r = self._requests( "post", url, data=database_connection, headers=headers, ) if r.status_code in [200, 201]: return "Featurestore created/updated successfully" else: raise GeoserverException(r.status_code, r.content) def create_datastore( self, name: str, path: str, workspace: Optional[str] = None, overwrite: bool = False, ): if workspace is None: workspace = "default" if path is None: raise Exception("You must provide a full path to the data") data_url = "file:{}".format(path) if "http://" in path: data_url = "{}".format(path) data = "{}{}".format( name, data_url ) headers = {"content-type": "text/xml"} if overwrite: url = "{}/rest/workspaces/{}/datastores/{}".format( self.service_url, workspace, name ) r = self._requests("put", url, data=data, headers=headers) else: url = "{}/rest/workspaces/{}/datastores".format(self.service_url, workspace) r = self._requests(method="post", url=url, data=data, headers=headers) if r.status_code in [200, 201]: return "Data store created/updated successfully" else: raise GeoserverException(r.status_code, r.content) def create_shp_datastore( self, path: str, store_name: Optional[str] = None, workspace: Optional[str] = None, file_extension: str = "shp", ): if path is None: raise Exception("You must provide a full path to shapefile") if workspace is None: workspace = "default" if store_name is None: store_name = os.path.basename(path) f = store_name.split(".") if len(f) > 0: store_name = f[0] headers = { "Content-type": "application/zip", "Accept": "application/xml", } if isinstance(path, dict): path = prepare_zip_file(store_name, path) url = "{0}/rest/workspaces/{1}/datastores/{2}/file.{3}?filename={2}&update=overwrite".format( self.service_url, workspace, store_name, file_extension ) with open(path, "rb") as f: r = self._requests("put", url, data=f.read(), headers=headers) if r.status_code in [200, 201, 202]: return "The shapefile datastore created successfully!" else: raise GeoserverException(r.status_code, r.content) def create_gpkg_datastore( self, path: str, store_name: Optional[str] = None, workspace: Optional[str] = None, file_extension: str = "gpkg", ): if path is None: raise Exception("You must provide a full path to shapefile") if workspace is None: workspace = "default" if store_name is None: store_name = os.path.basename(path) f = store_name.split(".") if len(f) > 0: store_name = f[0] headers = { "Content-type": "application/x-sqlite3", "Accept": "application/json", } url = "{0}/rest/workspaces/{1}/datastores/{2}/file.{3}?filename={2}".format( self.service_url, workspace, store_name, file_extension ) with open(path, "rb") as f: r = self._requests("put", url, data=f.read(), headers=headers) if r.status_code in [200, 201, 202]: return "The geopackage datastore created successfully!" else: raise GeoserverException(r.status_code, r.content) def publish_featurestore( self, store_name: str, pg_table: str, workspace: Optional[str] = None, title: Optional[str] = None, advertised: Optional[bool] = True, abstract: Optional[str] = None, keywords: Optional[List[str]] = None, cqlfilter: Optional[str] = None ) -> int: if workspace is None: workspace = "default" if title is None: title = pg_table url = "{}/rest/workspaces/{}/datastores/{}/featuretypes/".format( self.service_url, workspace, store_name ) abstract_xml = f"{abstract}" if abstract else "" keywords_xml = "" if keywords: keywords_xml = "" for keyword in keywords: keywords_xml += f"{keyword}" keywords_xml += "" cqlfilter_xml = f"{cqlfilter}" if cqlfilter else "" layer_xml = f""" {pg_table} {title} {advertised} {abstract_xml} {keywords_xml} {cqlfilter_xml} """ headers = {"content-type": "text/xml;charset=utf-8"} print(url) print(layer_xml) r = self._requests("post", url, data=layer_xml.encode("utf-8"), headers=headers) if r.status_code == 201 or r.status_code == 200: return r.status_code else: raise GeoserverException(r.status_code, r.content) def edit_featuretype( self, store_name: str, workspace: Optional[str], pg_table: str, name: str, title: str, abstract: Optional[str] = None, keywords: Optional[List[str]] = None, recalculate: Optional[str] = None ) -> int: if workspace is None: workspace = "default" recalculate_param = f"?recalculate={recalculate}" if recalculate else "" url = "{}/rest/workspaces/{}/datastores/{}/featuretypes/{}.xml{}".format( self.service_url, workspace, store_name, pg_table, recalculate_param ) # Create XML for abstract and keywords abstract_xml = f"{abstract}" if abstract else "" keywords_xml = "" if keywords: keywords_xml = "" for keyword in keywords: keywords_xml += f"{keyword}" keywords_xml += "" layer_xml = f""" {name} {title} {abstract_xml}{keywords_xml} """ headers = {"content-type": "text/xml"} r = self._requests("put", url, data=layer_xml, headers=headers) if r.status_code == 200: return r.status_code else: raise GeoserverException(r.status_code, r.content) def publish_featurestore_sqlview( self, name: str, store_name: str, sql: str, parameters: Optional[Iterable[Dict]] = None, key_column: Optional[str] = None, geom_name: str = "geom", geom_type: str = "Geometry", srid: Optional[int] = 4326, workspace: Optional[str] = None, ) -> int: if workspace is None: workspace = "default" # issue #87 if key_column is not None: key_column_xml = """{}""".format(key_column) else: key_column_xml = """""" parameters_xml = "" if parameters is not None: for parameter in parameters: # non-string parameters MUST have a default value supplied if not is_surrounded_by_quotes(sql, parameter["name"]) and not "defaultValue" in parameter: raise ValueError(f"Parameter `{parameter['name']}` appears to be a non-string in the supplied query" ", but does not have a default value specified. You must supply a default value " "for non-string parameters using the `defaultValue` key.") param_name = parameter.get("name", "") default_value = parameter.get("defaultValue", "") regexp_validator = parameter.get("regexpValidator", r"^[\w\d\s]+$") parameters_xml += (f""" {param_name} {default_value} {regexp_validator} \n """.strip()) layer_xml = """ {0} true {4} {0} EPSG:{5} {0} {1} true {2} {3} {5} {6} {7} """.format( name, sql, geom_name, geom_type, workspace, srid, key_column_xml, parameters_xml ) # rest API url url = "{}/rest/workspaces/{}/datastores/{}/featuretypes".format( self.service_url, workspace, store_name ) # headers headers = {"content-type": "text/xml"} # request r = self._requests("post", url, data=layer_xml, headers=headers) if r.status_code == 201: return r.status_code else: raise GeoserverException(r.status_code, r.content) def get_featuretypes(self, workspace: str = None, store_name: str = None) -> List[str]: url = "{}/rest/workspaces/{}/datastores/{}/featuretypes.json".format( self.service_url, workspace, store_name ) r = self._requests("get", url) if r.status_code == 200: r_dict = r.json() features = [i["name"] for i in r_dict["featureTypes"]["featureType"]] return features else: raise GeoserverException(r.status_code, r.content) def get_feature_attribute( self, feature_type_name: str, workspace: str, store_name: str ) -> List[str]: url = "{}/rest/workspaces/{}/datastores/{}/featuretypes/{}.json".format( self.service_url, workspace, store_name, feature_type_name ) r = self._requests("get", url) if r.status_code == 200: r_dict = r.json() attribute = [ i["name"] for i in r_dict["featureType"]["attributes"]["attribute"] ] return attribute else: raise GeoserverException(r.status_code, r.content) def get_featurestore(self, store_name: str, workspace: str) -> dict: url = "{}/rest/workspaces/{}/datastores/{}".format( self.service_url, workspace, store_name ) r = self._requests("get", url) if r.status_code == 200: r_dict = r.json() return r_dict["dataStore"] else: raise GeoserverException(r.status_code, r.content) def delete_featurestore( self, featurestore_name: str, workspace: Optional[str] = None ) -> str: payload = {"recurse": "true"} url = "{}/rest/workspaces/{}/datastores/{}".format( self.service_url, workspace, featurestore_name ) if workspace is None: url = "{}/datastores/{}".format(self.service_url, featurestore_name) r = self._requests("delete", url, params=payload) if r.status_code == 200: return "Status code: {}, delete featurestore".format(r.status_code) else: return "Status code: {}, delete featurestore".format(r.status_code) # raise GeoserverException(r.status_code, r.content) def delete_coveragestore( self, coveragestore_name: str, workspace: Optional[str] = None ) -> str: payload = {"recurse": "true"} url = "{}/rest/workspaces/{}/coveragestores/{}".format( self.service_url, workspace, coveragestore_name ) if workspace is None: url = "{}/rest/coveragestores/{}".format( self.service_url, coveragestore_name ) r = self._requests("delete", url, params=payload) if r.status_code == 200: return "Coverage store deleted successfully" else: raise GeoserverException(r.status_code, r.content) def get_all_users(self, service=None) -> dict: url = "{}/rest/security/usergroup/".format(self.service_url) if service is None: url += "users/" else: url += "service/{}/users/".format(service) headers = {"accept": "application/xml"} r = self._requests("get", url, headers=headers) if r.status_code == 200: return parse(r.content) else: raise GeoserverException(r.status_code, r.content) def create_user( self, username: str, password: str, enabled: bool = True, service=None ) -> str: url = "{}/rest/security/usergroup/".format(self.service_url) if service is None: url += "users/" else: url += "service/{}/users/".format(service) data = "{}{}{}".format( username, password, str(enabled).lower() ) headers = {"content-type": "text/xml", "accept": "application/json"} r = self._requests("post", url, data=data, headers=headers) if r.status_code == 201: return "User created successfully" else: raise GeoserverException(r.status_code, r.content) def modify_user( self, username: str, new_name=None, new_password=None, enable=None, service=None ) -> str: url = "{}/rest/security/usergroup/".format(self.service_url) if service is None: url += "user/{}".format(username) else: url += "service/{}/user/{}".format(service, username) modifications = dict() if new_name is not None: modifications["userName"] = new_name if new_password is not None: modifications["password"] = new_password if enable is not None: modifications["enabled"] = enable data = unparse({"user": modifications}) print(url, data) headers = {"content-type": "text/xml", "accept": "application/json"} r = self._requests("post", url, data=data, headers=headers) if r.status_code == 200: return "User modified successfully" else: raise GeoserverException(r.status_code, r.content) def delete_user(self, username: str, service=None) -> str: url = "{}/rest/security/usergroup/".format(self.service_url) if service is None: url += "user/{}".format(username) else: url += "service/{}/user/{}".format(service, username) headers = {"accept": "application/json"} r = self._requests("delete", url, headers=headers) if r.status_code == 200: return "User deleted successfully" else: raise GeoserverException(r.status_code, r.content) def get_all_usergroups(self, service=None) -> dict: url = "{}/rest/security/usergroup/".format(self.service_url) if service is None: url += "groups/" else: url += "service/{}/groups/".format(service) r = self._requests("get", url) if r.status_code == 200: return parse(r.content) else: raise GeoserverException(r.status_code, r.content) def create_usergroup(self, group: str, service=None) -> str: url = "{}/rest/security/usergroup/".format(self.service_url) if service is None: url += "group/{}".format(group) else: url += "service/{}/group/{}".format(service, group) r = self._requests("post", url) if r.status_code == 201: return "Group created successfully" else: raise GeoserverException(r.status_code, r.content) def delete_usergroup(self, group: str, service=None) -> str: url = "{}/rest/security/usergroup/".format(self.service_url) if service is None: url += "group/{}".format(group) else: url += "service/{}/group/{}".format(service, group) r = self._requests("delete", url) if r.status_code == 200: return "Group deleted successfully" else: raise GeoserverException(r.status_code, r.content) def caching_layer( self, layer_name: str, auto_seed: Optional[bool] = True, zoom_start: Optional[int] = default_cache_start, zoom_stop: Optional[int] = default_cache_stop, gridset_name: Optional[str] = default_gridset_name ) -> int: url = "{}/gwc/rest/layers/{}".format( self.service_url, layer_name ) cache_xml = f""" true true {layer_name} image/png image/jpeg {gridset_name} {zoom_start} {zoom_stop} {zoom_start} {zoom_stop} 4 4 0 0 0 true """ headers = {"content-type": "text/xml"} gridinfo = self._requests("post", url, data=cache_xml, headers=headers) if gridinfo.status_code == 200: if auto_seed: print(f"""发送{layer_name}切片请求。。。""") self.seed_caching_layer(layer_name=layer_name, zoom_start=zoom_start, zoom_stop=zoom_stop) return gridinfo.status_code else: raise GeoserverException(gridinfo.status_code, gridinfo.content) def seed_caching_layer( self, layer_name: str, zoom_start: Optional[int] = default_cache_start, zoom_stop: Optional[int] = default_cache_stop, gridset_name: Optional[str] = default_gridset_name, seedtype: Optional[str] = default_seed_type, ) -> int: url = "{}/gwc/rest/seed/{}?" \ "threadCount=4" \ "&type={}" \ "&gridSetId={}" \ "&tileFormat=image%2Fpng" \ "&zoomStart={}" \ "&zoomStop={}" \ "¶meter_STYLES=" \ "&minX=&minY=&maxX=&maxY=&tileFailureRetryCount=-1&tileFailureRetryWaitTime=100&totalFailuresBeforeAborting=1000".format( self.service_url, layer_name, seedtype, gridset_name, zoom_start, zoom_stop ) headers = {"content-type": "application/json"} gridinfo = self._requests("post", url, data=None, headers=headers) if gridinfo.status_code == 201 or gridinfo.status_code == 200: return gridinfo.status_code else: raise GeoserverException(gridinfo.status_code, gridinfo.content)