|
- # -*- coding: utf-8 -*-
- __author__ = 'wanger'
- __date__ = '2024-08-20'
- __copyright__ = '(C) 2024 by siwei'
- __revision__ = '1.0'
- # inbuilt libraries
- import os
- from pathlib import Path
- from typing import List, Optional, Set, Dict, Iterable, Any
- import siwei_config
- # third-party libraries
- import requests
- # custom functions
- from processing.tools.GeoServer.Calculation_gdal import raster_value
- from processing.tools.GeoServer.Style import catagorize_xml, classified_xml, coverage_style_xml, outline_only_xml
- from processing.tools.GeoServer.supports import prepare_zip_file, is_valid_xml, is_surrounded_by_quotes
- from xmltodict import parse, unparse
- default_gridset_name = "WebMercatorQuadx2" # 默认切片方案
- default_seed_type = "seed" # 默认切片请求类型 Type can be seed (add tiles), reseed (replace tiles), or truncate (remove tiles)
- default_cache_start = 0 # 默认切片始终级别
- default_cache_stop = 18
- def _parse_request_options(request_options: Dict[str, Any]):
- return request_options if request_options is not None else {}
- # Custom exceptions.
- class GeoserverException(Exception):
- def __init__(self, status, message):
- self.status = status
- self.message = message
- super().__init__(f"Status : {self.status} - {self.message}")
- # call back class for reading the data
- class DataProvider:
- def __init__(self, data):
- self.data = data
- self.finished = False
- def read_cb(self, size):
- assert len(self.data) <= size
- if not self.finished:
- self.finished = True
- return self.data
- else:
- # Nothing more to read
- return ""
- # callback class for reading the files
- class FileReader:
- def __init__(self, fp):
- self.fp = fp
- def read_callback(self, size):
- return self.fp.read(size)
- class Geoserver:
- def __init__(
- self,
- service_url: str = siwei_config.CONFIG['geoserver']['url'], # default deployment url during installation
- username: str = siwei_config.CONFIG['geoserver']['username'], # default username during geoserver installation
- password: str = siwei_config.CONFIG['geoserver']['password'], # default password during geoserver installation
- request_options: Dict[str, Any] = None # additional parameters to be sent with each request
- ):
- self.service_url = service_url
- self.username = username
- self.password = password
- self.request_options = request_options if request_options is not None else {}
- def _requests(self,
- method: str,
- url: str,
- **kwargs) -> requests.Response:
- if method.lower() == "post":
- return requests.post(url, auth=(self.username, self.password), **kwargs, **self.request_options)
- elif method.lower() == "get":
- return requests.get(url, auth=(self.username, self.password), **kwargs, **self.request_options)
- elif method.lower() == "put":
- return requests.put(url, auth=(self.username, self.password), **kwargs, **self.request_options)
- elif method.lower() == "delete":
- return requests.delete(url, auth=(self.username, self.password), **kwargs, **self.request_options)
- def get_manifest(self):
- url = "{}/rest/about/manifest.json".format(self.service_url)
- r = self._requests("get", url)
- if r.status_code == 200:
- return r.json()
- else:
- raise GeoserverException(r.status_code, r.content)
- def get_version(self):
- url = "{}/rest/about/version.json".format(self.service_url)
- r = self._requests("get", url)
- if r.status_code == 200:
- return r.json()
- else:
- raise GeoserverException(r.status_code, r.content)
- def get_status(self):
- url = "{}/rest/about/status.json".format(self.service_url)
- r = self._requests("get", url)
- if r.status_code == 200:
- return r.json()
- else:
- raise GeoserverException(r.status_code, r.content)
- def get_system_status(self):
- url = "{}/rest/about/system-status.json".format(self.service_url)
- r = self._requests("get", url)
- if r.status_code == 200:
- return r.json()
- else:
- raise GeoserverException(r.status_code, r.content)
- def reload(self):
- url = "{}/rest/reload".format(self.service_url)
- r = self._requests("post", url)
- if r.status_code == 200:
- return "Status code: {}".format(r.status_code)
- else:
- raise GeoserverException(r.status_code, r.content)
- def reset(self):
- url = "{}/rest/reset".format(self.service_url)
- r = self._requests("post", url)
- if r.status_code == 200:
- return "Status code: {}".format(r.status_code)
- else:
- raise GeoserverException(r.status_code, r.content)
- # _______________________________________________________________________________________________
- #
- # WORKSPACES
- # _______________________________________________________________________________________________
- #
- def get_default_workspace(self):
- url = "{}/rest/workspaces/default".format(self.service_url)
- r = self._requests("get", url)
- if r.status_code == 200:
- return r.json()
- else:
- raise GeoserverException(r.status_code, r.content)
- def get_workspace(self, workspace):
- url = "{}/rest/workspaces/{}.json".format(self.service_url, workspace)
- r = self._requests("get", url, params={"recurse": "true"})
- if r.status_code == 200:
- return r.json()
- else:
- raise GeoserverException(r.status_code, r.content)
- def get_workspaces(self):
- url = "{}/rest/workspaces".format(self.service_url)
- r = self._requests("get", url)
- if r.status_code == 200:
- return r.json()
- else:
- raise GeoserverException(r.status_code, r.content)
- def set_default_workspace(self, workspace: str):
- url = "{}/rest/workspaces/default".format(self.service_url)
- data = "<workspace><name>{}</name></workspace>".format(workspace)
- r = self._requests(
- "put",
- url,
- data=data,
- headers={"content-type": "text/xml"}
- )
- if r.status_code == 200:
- return "Status code: {}, default workspace {} set!".format(
- r.status_code, workspace
- )
- else:
- raise GeoserverException(r.status_code, r.content)
- def create_workspace(self, workspace: str):
- url = "{}/rest/workspaces".format(self.service_url)
- data = "<workspace><name>{}</name></workspace>".format(workspace)
- headers = {"content-type": "text/xml"}
- r = self._requests("post", url, data=data, headers=headers)
- if r.status_code == 201:
- return "{} Workspace {} created!".format(r.status_code, workspace)
- else:
- return "{} Workspace {} already exists!".format(r.status_code, workspace)
- # raise GeoserverException(r.status_code, r.content)
- def delete_workspace(self, workspace: str):
- payload = {"recurse": "true"}
- url = "{}/rest/workspaces/{}".format(self.service_url, workspace)
- r = self._requests("delete", url, params=payload)
- if r.status_code == 200:
- return "Status code: {}, delete workspace".format(r.status_code)
- else:
- raise GeoserverException(r.status_code, r.content)
- def get_datastore(self, store_name: str, workspace: Optional[str] = None):
- if workspace is None:
- workspace = "default"
- url = "{}/rest/workspaces/{}/datastores/{}".format(
- self.service_url, workspace, store_name
- )
- r = self._requests("get", url)
- if r.status_code == 200:
- return r.json()
- else:
- raise GeoserverException(r.status_code, r.content)
- def get_datastores(self, workspace: Optional[str] = None):
- if workspace is None:
- workspace = "default"
- url = "{}/rest/workspaces/{}/datastores.json".format(
- self.service_url, workspace
- )
- r = self._requests("get", url)
- if r.status_code == 200:
- return r.json()
- else:
- raise GeoserverException(r.status_code, r.content)
- def get_coveragestore(
- self, coveragestore_name: str, workspace: Optional[str] = None
- ):
- payload = {"recurse": "true"}
- if workspace is None:
- workspace = "default"
- url = "{}/rest/workspaces/{}/coveragestores/{}.json".format(
- self.service_url, workspace, coveragestore_name
- )
- r = self._requests(method="get", url=url, params=payload)
- if r.status_code == 200:
- return r.json()
- else:
- raise GeoserverException(r.status_code, r.content)
- def get_coveragestores(self, workspace: str = None):
- if workspace is None:
- workspace = "default"
- url = "{}/rest/workspaces/{}/coveragestores".format(self.service_url, workspace)
- r = self._requests("get", url)
- if r.status_code == 200:
- return r.json()
- else:
- raise GeoserverException(r.status_code, r.content)
- def create_coveragestore(
- self,
- path,
- workspace: Optional[str] = None,
- layer_name: Optional[str] = None,
- file_type: str = "GeoTIFF",
- content_type: str = "image/tiff",
- ):
- if path is None:
- raise Exception("You must provide the full path to the raster")
- if workspace is None:
- workspace = "default"
- if layer_name is None:
- layer_name = os.path.basename(path)
- f = layer_name.split(".")
- if len(f) > 0:
- layer_name = f[0]
- file_type = file_type.lower()
- url = "{0}/rest/workspaces/{1}/coveragestores/{2}/file.{3}?coverageName={2}".format(
- self.service_url, workspace, layer_name, file_type
- )
- headers = {"content-type": content_type, "Accept": "application/json"}
- r = None
- with open(path, "rb") as f:
- r = self._requests(method="put", url=url, data=f, headers=headers)
- if r.status_code == 201:
- return r.json()
- else:
- raise GeoserverException(r.status_code, r.content)
- def publish_time_dimension_to_coveragestore(
- self,
- store_name: Optional[str] = None,
- workspace: Optional[str] = None,
- presentation: Optional[str] = "LIST",
- units: Optional[str] = "ISO8601",
- default_value: Optional[str] = "MINIMUM",
- content_type: str = "application/xml; charset=UTF-8",
- ):
- url = "{0}/rest/workspaces/{1}/coveragestores/{2}/coverages/{2}".format(
- self.service_url, workspace, store_name
- )
- headers = {"content-type": content_type}
- time_dimension_data = (
- "<coverage>"
- "<enabled>true</enabled>"
- "<metadata>"
- "<entry key='time'>"
- "<dimensionInfo>"
- "<enabled>true</enabled>"
- "<presentation>{}</presentation>"
- "<units>{}</units>"
- "<defaultValue>"
- "<strategy>{}</strategy>"
- "</defaultValue>"
- "</dimensionInfo>"
- "</entry>"
- "</metadata>"
- "</coverage>".format(presentation, units, default_value)
- )
- r = self._requests(
- method="put", url=url, data=time_dimension_data, headers=headers
- )
- if r.status_code in [200, 201]:
- return r.json()
- else:
- raise GeoserverException(r.status_code, r.content)
- def get_layer(self, layer_name: str, workspace: Optional[str] = None):
- url = "{}/rest/layers/{}".format(self.service_url, layer_name)
- if workspace is not None:
- url = "{}/rest/workspaces/{}/layers/{}".format(
- self.service_url, workspace, layer_name
- )
- r = self._requests("get", url)
- if r.status_code == 200:
- return r.json()
- else:
- raise GeoserverException(r.status_code, r.content)
- def get_layers(self, workspace: Optional[str] = None):
- url = "{}/rest/layers".format(self.service_url)
- if workspace is not None:
- url = "{}/rest/workspaces/{}/layers".format(self.service_url, workspace)
- r = self._requests("get", url)
- if r.status_code == 200:
- return r.json()
- else:
- raise GeoserverException(r.status_code, r.content)
- def delete_layer(self, layer_name: str, workspace: Optional[str] = None):
- payload = {"recurse": "true"}
- url = "{}/rest/workspaces/{}/layers/{}".format(
- self.service_url, workspace, layer_name
- )
- if workspace is None:
- url = "{}/rest/layers/{}".format(self.service_url, layer_name)
- r = self._requests(method="delete", url=url, params=payload)
- if r.status_code == 200:
- return "Status code: {}, delete layer".format(r.status_code)
- else:
- raise GeoserverException(r.status_code, r.content)
- def get_layergroups(self, workspace: Optional[str] = None):
- url = "{}/rest/layergroups".format(self.service_url)
- if workspace is not None:
- url = "{}/rest/workspaces/{}/layergroups".format(
- self.service_url, workspace
- )
- r = self._requests("get", url)
- if r.status_code == 200:
- return r.json()
- else:
- raise GeoserverException(r.status_code, r.content)
- def get_layergroup(self, layer_name: str, workspace: Optional[str] = None):
- url = "{}/rest/layergroups/{}".format(self.service_url, layer_name)
- if workspace is not None:
- url = "{}/rest/workspaces/{}/layergroups/{}".format(
- self.service_url, workspace, layer_name
- )
- r = self._requests("get", url)
- if r.status_code == 200:
- return r.json()
- else:
- return None
- # raise GeoserverException(r.status_code, r.content)
- def create_layergroup(
- self,
- name: str = "geoserver-rest-layergroup",
- mode: str = "single",
- title: str = "geoserver-rest layer group",
- abstract_text: str = "A new layergroup created with geoserver-rest python package",
- layers: List[str] = [],
- workspace: Optional[str] = None,
- formats: str = "html",
- metadata: List[dict] = [],
- keywords: List[str] = [],
- ) -> str:
- assert isinstance(name, str), "Name must be of type String:''"
- assert isinstance(mode, str), "Mode must be of type String:''"
- assert isinstance(title, str), "Title must be of type String:''"
- assert isinstance(abstract_text, str), "Abstract text must be of type String:''"
- assert isinstance(formats, str), "Format must be of type String:''"
- assert isinstance(
- metadata, list
- ), "Metadata must be of type List of dict:[{'about':'geoserver rest data metadata','content_url':'link to content url'}]"
- assert isinstance(
- keywords, list
- ), "Keywords must be of type List:['keyword1','keyword2'...]"
- assert isinstance(
- layers, list
- ), "Layers must be of type List:['layer1','layer2'...]"
- if workspace:
- assert isinstance(workspace, str), "Workspace must be of type String:''"
- # check if the workspace is valid in GeoServer
- if self.get_workspace(workspace) is None:
- raise Exception("Workspace is not valid in GeoServer Instance")
- supported_modes: Set = {
- "single",
- "opaque",
- "named",
- "container",
- "eo",
- }
- supported_formats: Set = {"html", "json", "xml"}
- if mode.lower() != "single" and mode.lower() not in supported_modes:
- raise Exception(
- f"Mode not supported. Acceptable modes are : {supported_modes}"
- )
- if formats.lower() != "html" and formats.lower() not in supported_formats:
- raise Exception(
- f"Format not supported. Acceptable formats are : {supported_formats}"
- )
- # check if it already exist in GeoServer
- try:
- existing_layergroup = self.get_layergroup(name, workspace=workspace)
- except GeoserverException:
- existing_layergroup = None
- if existing_layergroup is not None:
- raise Exception(f"Layergroup: {name} already exist in GeoServer instance")
- if len(layers) == 0:
- raise Exception("No layer provided!")
- else:
- for layer in layers:
- # check if it is valid in geoserver
- try:
- # Layer check
- self.get_layer(
- layer_name=layer,
- workspace=workspace if workspace is not None else None,
- )
- except GeoserverException:
- try:
- # Layer group check
- self.get_layergroup(
- layer_name=layer,
- workspace=workspace if workspace is not None else None,
- )
- except GeoserverException:
- raise Exception(
- f"Layer: {layer} is not a valid layer in the GeoServer instance"
- )
- skeleton = ""
- if workspace:
- skeleton += f"<workspace><name>{workspace}</name></workspace>"
- metadata_xml_list = []
- if len(metadata) >= 1:
- for meta in metadata:
- metadata_about = meta.get("about")
- metadata_content_url = meta.get("content_url")
- metadata_xml_list.append(
- f"""
- <metadataLink>
- <type>text/plain</type>
- <about>{metadata_about}</about>
- <metadataType>ISO19115:2003</metadataType>
- <content>{metadata_content_url}</content>
- </metadataLink>
- """
- )
- metadata_xml = f"<metadataLinks>{''.join(['{}'] * len(metadata_xml_list)).format(*metadata_xml_list)}</metadataLinks>"
- skeleton += metadata_xml
- layers_xml_list: List[str] = []
- for layer in layers:
- published_type = "layer"
- try:
- # Layer check
- self.get_layer(
- layer_name=layer,
- workspace=workspace if workspace is not None else None,
- )
- except GeoserverException: # It's a layer group
- published_type = "layerGroup"
- layers_xml_list.append(
- f"""<published type="{published_type}">
- <name>{layer}</name>
- <link>{self.service_url}/layers/{layer}.xml</link>
- </published>
- """
- )
- layers_xml: str = f"<publishables>{''.join(['{}'] * len(layers)).format(*layers_xml_list)}</publishables>"
- skeleton += layers_xml
- if len(keywords) >= 1:
- keyword_xml_list: List[str] = [
- f"<keyword>{keyword}</keyword>" for keyword in keywords
- ]
- keywords_xml: str = f"<keywords>{''.join(['{}'] * len(keywords)).format(*keyword_xml_list)}</keywords>"
- skeleton += keywords_xml
- data = f"""
- <layerGroup>
- <name>{name}</name>
- <mode>{mode}</mode>
- <title>{title}</title>
- <abstractTxt>{abstract_text}</abstractTxt>
- {skeleton}
- </layerGroup>
- """
- url = f"{self.service_url}/rest/layergroups/"
- r = self._requests(
- method="post", url=url, data=data, headers={"content-type": "text/xml"}
- )
- if r.status_code == 201:
- layergroup_url = f"{self.service_url}/rest/layergroups/{name}.{formats}"
- return f"layergroup created successfully! Layergroup link: {layergroup_url}"
- else:
- raise GeoserverException(r.status_code, r.content)
- def update_layergroup(
- self,
- layergroup_name,
- title: Optional[str] = None,
- abstract_text: Optional[str] = None,
- formats: str = "html",
- metadata: List[dict] = [],
- keywords: List[str] = [],
- ) -> str:
- if self.get_layergroup(layer_name=layergroup_name) is None:
- raise Exception(
- f"Layer group: {layergroup_name} is not a valid layer group in the Geoserver instance"
- )
- if title is not None:
- assert isinstance(title, str), "Title must be of type String:''"
- if abstract_text is not None:
- assert isinstance(
- abstract_text, str
- ), "Abstract text must be of type String:''"
- assert isinstance(formats, str), "Format must be of type String:''"
- assert isinstance(
- metadata, list
- ), "Metadata must be of type List of dict:[{'about':'geoserver rest data metadata','content_url':'lint to content url'}]"
- assert isinstance(
- keywords, list
- ), "Keywords must be of type List:['keyword1','keyword2'...]"
- supported_formats: Set = {"html", "json", "xml"}
- if formats.lower() != "html" and formats.lower() not in supported_formats:
- raise Exception(
- f"Format not supported. Acceptable formats are : {supported_formats}"
- )
- skeleton = ""
- if title:
- skeleton += f"<title>{title}</title>"
- if abstract_text:
- skeleton += f"<abstractTxt>{abstract_text}</abstractTxt>"
- metadata_xml_list = []
- if len(metadata) >= 1:
- for meta in metadata:
- metadata_about = meta.get("about")
- metadata_content_url = meta.get("content_url")
- metadata_xml_list.append(
- f"""
- <metadataLink>
- <type>text/plain</type>
- <about>{metadata_about}</about>
- <metadataType>ISO19115:2003</metadataType>
- <content>{metadata_content_url}</content>
- </metadataLink>
- """
- )
- metadata_xml = f"<metadataLinks>{''.join(['{}'] * len(metadata_xml_list)).format(*metadata_xml_list)}</metadataLinks>"
- skeleton += metadata_xml
- if len(keywords) >= 1:
- keyword_xml_list: List[str] = [
- f"<keyword>{keyword}</keyword>" for keyword in keywords
- ]
- keywords_xml: str = f"<keywords>{''.join(['{}'] * len(keyword_xml_list)).format(*keyword_xml_list)}</keywords>"
- skeleton += keywords_xml
- data = f"""
- <layerGroup>
- {skeleton}
- </layerGroup>
- """
- url = f"{self.service_url}/rest/layergroups/{layergroup_name}"
- r = self._requests(
- method="put",
- url=url,
- data=data,
- headers={"content-type": "text/xml", "accept": "application/xml"},
- )
- if r.status_code == 200:
- layergroup_url = (
- f"{self.service_url}/rest/layergroups/{layergroup_name}.{formats}"
- )
- return f"layergroup updated successfully! Layergroup link: {layergroup_url}"
- else:
- raise GeoserverException(r.status_code, r.content)
- def delete_layergroup(
- self, layergroup_name: str, workspace: Optional[str] = None
- ) -> str:
- # raises an exception in case the layer group doesn't exist
- self.get_layergroup(layer_name=layergroup_name, workspace=workspace)
- if workspace is None:
- url = f"{self.service_url}/rest/layergroups/{layergroup_name}"
- else:
- url = f"{self.service_url}/rest/workspaces/{workspace}/layergroups/{layergroup_name}"
- r = self._requests(url=url, method="delete")
- if r.status_code == 200:
- return "Layer group deleted successfully"
- else:
- return "Layer group deleted successfully"
- # raise GeoserverException(r.status_code, r.content)
- def add_layer_to_layergroup(
- self,
- layer_name: str,
- layer_workspace: str,
- layergroup_name: str,
- layergroup_workspace: str = None,
- ) -> None:
- layergroup_info = self.get_layergroup(
- layer_name=layergroup_name, workspace=layergroup_workspace
- )
- layer_info = self.get_layer(layer_name=layer_name, workspace=layer_workspace)
- # build list of existing publishables & styles
- publishables = layergroup_info["layerGroup"]["publishables"]["published"]
- if not isinstance(publishables, list): # only 1 layer up to now
- publishables = [publishables]
- styles = layergroup_info["layerGroup"]["styles"]["style"]
- if not isinstance(styles, list): # only 1 layer up to now
- styles = [styles]
- # add publishable & style for the new layer
- new_pub = {
- "name": f"{layer_workspace}:{layer_name}",
- "href": f"{self.service_url}/rest/workspaces/{layer_workspace}/layers/{layer_name}.json",
- }
- publishables.append(new_pub)
- new_style = layer_info["layer"]["defaultStyle"]
- styles.append(new_style)
- data = self._layergroup_definition_from_layers_and_styles(
- publishables=publishables, styles=styles
- )
- if layergroup_workspace is None:
- url = f"{self.service_url}/rest/layergroups/{layergroup_name}"
- else:
- url = f"{self.service_url}/rest/workspaces/{layergroup_workspace}/layergroups/{layergroup_name}"
- r = self._requests(
- method="put",
- url=url,
- data=data,
- headers={"content-type": "text/xml", "accept": "application/xml"},
- )
- if r.status_code == 200:
- return
- else:
- raise GeoserverException(r.status_code, r.content)
- def remove_layer_from_layergroup(
- self,
- layer_name: str,
- layer_workspace: str,
- layergroup_name: str,
- layergroup_workspace: str = None,
- ) -> None:
- layergroup_info = self.get_layergroup(
- layer_name=layergroup_name, workspace=layergroup_workspace
- )
- # build list of existing publishables & styles
- publishables = layergroup_info["layerGroup"]["publishables"]["published"]
- if not isinstance(publishables, list): # only 1 layer up to now
- publishables = [publishables]
- styles = layergroup_info["layerGroup"]["styles"]["style"]
- if not isinstance(styles, list): # only 1 layer up to now
- styles = [styles]
- layer_to_remove = f"{layer_workspace}:{layer_name}"
- revised_set_of_publishables_and_styles = [
- (pub, style)
- for (pub, style) in zip(
- layergroup_info["layerGroup"]["publishables"]["published"],
- layergroup_info["layerGroup"]["styles"]["style"],
- )
- if pub["name"] != layer_to_remove
- ]
- revised_set_of_publishables = list(
- map(list, zip(*revised_set_of_publishables_and_styles))
- )[0]
- revised_set_of_styles = list(
- map(list, zip(*revised_set_of_publishables_and_styles))
- )[1]
- xml_payload = self._layergroup_definition_from_layers_and_styles(
- publishables=revised_set_of_publishables, styles=revised_set_of_styles
- )
- if layergroup_workspace is None:
- url = f"{self.service_url}/rest/layergroups/{layergroup_name}"
- else:
- url = f"{self.service_url}/rest/workspaces/{layergroup_workspace}/layergroups/{layergroup_name}"
- r = self._requests(
- method="put",
- url=url,
- data=xml_payload,
- headers={"content-type": "text/xml", "accept": "application/xml"},
- )
- if r.status_code == 200:
- return
- else:
- raise GeoserverException(r.status_code, r.content)
- def _layergroup_definition_from_layers_and_styles(
- self, publishables: list, styles: list
- ) -> str:
- # the get_layergroup method may return an empty string for style;
- # so we get the default styles for each layer with no style information in the layergroup
- if len(styles) == 1:
- index = [0]
- else:
- index = range(len(styles))
- for ix, this_style, this_layer in zip(index, styles, publishables):
- if this_style == "":
- this_layer_info = self.get_layer(
- layer_name=this_layer["name"].split(":")[1],
- workspace=this_layer["name"].split(":")[0],
- )
- styles[ix] = {
- "name": this_layer_info["layer"]["defaultStyle"]["name"],
- "href": this_layer_info["layer"]["defaultStyle"]["href"],
- }
- # build xml structure
- layer_skeleton = ""
- style_skeleton = ""
- for publishable in publishables:
- layer_str = f"""
- <published type="layer">
- <name>{publishable['name']}</name>
- <link>{publishable['href']}</link>
- </published>
- """
- layer_skeleton += layer_str
- for style in styles:
- style_str = f"""
- <style>
- <name>{style['name']}</name>
- <link>{style['href']}</link>
- </style>
- """
- style_skeleton += style_str
- data = f"""
- <layerGroup>
- <publishables>
- {layer_skeleton}
- </publishables>
- <styles>
- {style_skeleton}
- </styles>
- </layerGroup>
- """
- return data
- def get_style(self, style_name, workspace: Optional[str] = None):
- url = "{}/rest/styles/{}.json".format(self.service_url, style_name)
- if workspace is not None:
- url = "{}/rest/workspaces/{}/styles/{}.json".format(
- self.service_url, workspace, style_name
- )
- r = self._requests("get", url)
- if r.status_code == 200:
- return r.json()
- else:
- raise GeoserverException(r.status_code, r.content)
- def get_styles(self, workspace: Optional[str] = None):
- url = "{}/rest/styles.json".format(self.service_url)
- if workspace is not None:
- url = "{}/rest/workspaces/{}/styles.json".format(
- self.service_url, workspace
- )
- r = self._requests("get", url)
- if r.status_code == 200:
- return r.json()
- else:
- raise GeoserverException(r.status_code, r.content)
- def upload_style(
- self,
- path: str,
- name: Optional[str] = None,
- workspace: Optional[str] = None,
- sld_version: str = "1.1.0",
- ):
- if name is None:
- name = os.path.basename(path)
- f = name.split(".")
- if len(f) > 0:
- name = f[0]
- if is_valid_xml(path):
- xml = path
- elif Path(path).exists():
- with open(path, "r", encoding='utf-8') as f:
- xml = f.read()
- xml = xml.encode("gbk")
- else:
- raise ValueError("`path` must be either a path to a style file, or a valid XML string.")
- headers = {"content-type": "text/xml"}
- url = "{}/rest/workspaces/{}/styles".format(self.service_url, workspace)
- sld_content_type = "application/vnd.ogc.sld+xml"
- if sld_version == "1.1.0" or sld_version == "1.1":
- sld_content_type = "application/vnd.ogc.se+xml"
- header_sld = {"content-type": sld_content_type}
- if workspace is None:
- url = "{}/rest/styles".format(self.service_url)
- style_xml = "<style><name>{}</name><filename>{}</filename></style>".format(
- name, name + ".sld"
- )
- r = self._requests(method="post", url=url, data=style_xml, headers=headers)
- if r.status_code == 201:
- r_sld = self._requests(method="put", url=url + "/" + name, data=xml, headers=header_sld)
- if r_sld.status_code == 200:
- return r_sld.status_code
- else:
- return r_sld.status_code
- # raise GeoserverException(r_sld.status_code, r_sld.content)
- else:
- return r.status_code
- # raise GeoserverException(r.status_code, r.content)
- def create_coveragestyle(
- self,
- raster_path: str,
- style_name: Optional[str] = None,
- workspace: str = None,
- color_ramp: str = "RdYlGn_r",
- cmap_type: str = "ramp",
- number_of_classes: int = 5,
- opacity: float = 1,
- ):
- raster = raster_value(raster_path)
- min_value = raster["min"]
- max_value = raster["max"]
- if style_name is None:
- style_name = raster["file_name"]
- coverage_style_xml(
- color_ramp,
- style_name,
- cmap_type,
- min_value,
- max_value,
- number_of_classes,
- opacity,
- )
- style_xml = "<style><name>{}</name><filename>{}</filename></style>".format(
- style_name, style_name + ".sld"
- )
- if style_name is None:
- style_name = os.path.basename(raster_path)
- f = style_name.split(".")
- if len(f) > 0:
- style_name = f[0]
- headers = {"content-type": "text/xml"}
- url = "{}/rest/workspaces/{}/styles".format(self.service_url, workspace)
- sld_content_type = "application/vnd.ogc.sld+xml"
- header_sld = {"content-type": sld_content_type}
- if workspace is None:
- url = "{}/rest/styles".format(self.service_url)
- r = self._requests(
- "post",
- url,
- data=style_xml,
- headers=headers,
- )
- if r.status_code == 201:
- with open("style.sld", "rb") as f:
- r_sld = self._requests(method="put", url=url + "/" + style_name, data=f.read(), headers=header_sld)
- os.remove("style.sld")
- if r_sld.status_code == 200:
- return r_sld.status_code
- else:
- raise GeoserverException(r_sld.status_code, r_sld.content)
- else:
- raise GeoserverException(r.status_code, r.content)
- def create_catagorized_featurestyle(
- self,
- style_name: str,
- column_name: str,
- column_distinct_values,
- workspace: str = None,
- color_ramp: str = "tab20",
- geom_type: str = "polygon",
- ):
- catagorize_xml(column_name, column_distinct_values, color_ramp, geom_type)
- style_xml = "<style><name>{}</name><filename>{}</filename></style>".format(
- style_name, style_name + ".sld"
- )
- headers = {"content-type": "text/xml"}
- url = "{}/rest/workspaces/{}/styles".format(self.service_url, workspace)
- sld_content_type = "application/vnd.ogc.sld+xml"
- header_sld = {"content-type": sld_content_type}
- if workspace is None:
- url = "{}/rest/styles".format(self.service_url)
- r = self._requests(
- "post",
- url,
- data=style_xml,
- headers=headers,
- )
- if r.status_code == 201:
- with open("style.sld", "rb") as f:
- r_sld = self._requests(
- "put",
- url + "/" + style_name,
- data=f.read(),
- headers=header_sld,
- )
- os.remove("style.sld")
- if r_sld.status_code == 200:
- return r_sld.status_code
- else:
- raise GeoserverException(r_sld.status_code, r_sld.content)
- else:
- raise GeoserverException(r.status_code, r.content)
- def create_outline_featurestyle(
- self,
- style_name: str,
- color: str = "#3579b1",
- width: str = "2",
- geom_type: str = "polygon",
- workspace: Optional[str] = None,
- ):
- outline_only_xml(color, width, geom_type)
- style_xml = "<style><name>{}</name><filename>{}</filename></style>".format(
- style_name, style_name + ".sld"
- )
- headers = {"content-type": "text/xml"}
- url = "{}/rest/workspaces/{}/styles".format(self.service_url, workspace)
- sld_content_type = "application/vnd.ogc.sld+xml"
- header_sld = {"content-type": sld_content_type}
- if workspace is None:
- url = "{}/rest/styles".format(self.service_url)
- r = self._requests(
- "post",
- url,
- data=style_xml,
- headers=headers,
- )
- if r.status_code == 201:
- with open("style.sld", "rb") as f:
- r_sld = self._requests(
- "put",
- url + "/" + style_name,
- data=f.read(),
- headers=header_sld,
- )
- os.remove("style.sld")
- if r_sld.status_code == 200:
- return r_sld.status_code
- else:
- raise GeoserverException(r_sld.status_code, r_sld.content)
- else:
- raise GeoserverException(r.status_code, r.content)
- def create_classified_featurestyle(
- self,
- style_name: str,
- column_name: str,
- column_distinct_values,
- workspace: Optional[str] = None,
- color_ramp: str = "tab20",
- geom_type: str = "polygon",
- # outline_color: str = "#3579b1",
- ):
- classified_xml(
- style_name,
- column_name,
- column_distinct_values,
- color_ramp,
- geom_type,
- )
- style_xml = "<style><name>{}</name><filename>{}</filename></style>".format(
- column_name, column_name + ".sld"
- )
- headers = {"content-type": "text/xml"}
- url = "{}/rest/workspaces/{}/styles".format(self.service_url, workspace)
- sld_content_type = "application/vnd.ogc.sld+xml"
- header_sld = {"content-type": sld_content_type}
- if workspace is None:
- url = "{}/rest/styles".format(self.service_url)
- r = self._requests(
- "post",
- url,
- data=style_xml,
- headers=headers,
- )
- if r.status_code == 201:
- with open("style.sld", "rb") as f:
- r_sld = self._requests(
- "put",
- url + "/" + style_name,
- data=f.read(),
- headers=header_sld,
- )
- os.remove("style.sld")
- if r_sld.status_code == 200:
- return r_sld.status_code
- else:
- raise GeoserverException(r_sld.status_code, r_sld.content)
- else:
- raise GeoserverException(r.status_code, r.content)
- def publish_style(
- self,
- layer_name: str,
- style_name: str,
- workspace: str,
- ):
- headers = {"content-type": "text/xml"}
- url = "{}/rest/layers/{}:{}".format(self.service_url, workspace, layer_name)
- style_xml = (
- "<layer><defaultStyle><name>{}</name></defaultStyle></layer>".format(
- style_name
- )
- )
- r = self._requests(
- "put",
- url,
- data=style_xml,
- headers=headers,
- )
- if r.status_code == 200:
- return r.status_code
- else:
- raise GeoserverException(r.status_code, r.content)
- def delete_style(self, style_name: str, workspace: Optional[str] = None):
- payload = {"recurse": "true"}
- url = "{}/rest/workspaces/{}/styles/{}".format(
- self.service_url, workspace, style_name
- )
- if workspace is None:
- url = "{}/rest/styles/{}".format(self.service_url, style_name)
- r = self._requests("delete", url, params=payload)
- if r.status_code == 200:
- return "Status code: {}, delete style".format(r.status_code)
- else:
- return "Status code: {}, delete style".format(r.status_code)
- # raise GeoserverException(r.status_code, r.content)
- def create_featurestore(
- self,
- store_name: str,
- workspace: Optional[str] = None,
- db: str = "postgres",
- host: str = "localhost",
- port: int = 5432,
- schema: str = "public",
- pg_user: str = "postgres",
- pg_password: str = "admin",
- overwrite: bool = False,
- expose_primary_keys: str = "false",
- description: Optional[str] = None,
- evictor_run_periodicity: Optional[int] = 300,
- max_open_prepared_statements: Optional[int] = 50,
- encode_functions: Optional[str] = "false",
- primary_key_metadata_table: Optional[str] = None,
- batch_insert_size: Optional[int] = 1,
- preparedstatements: Optional[str] = "false",
- loose_bbox: Optional[str] = "true",
- estimated_extends: Optional[str] = "true",
- fetch_size: Optional[int] = 1000,
- validate_connections: Optional[str] = "true",
- support_on_the_fly_geometry_simplification: Optional[str] = "true",
- connection_timeout: Optional[int] = 20,
- create_database: Optional[str] = "false",
- min_connections: Optional[int] = 1,
- max_connections: Optional[int] = 10,
- evictor_tests_per_run: Optional[int] = 3,
- test_while_idle: Optional[str] = "true",
- max_connection_idle_time: Optional[int] = 300,
- ):
- url = "{}/rest/workspaces/{}/datastores".format(self.service_url, workspace)
- headers = {"content-type": "text/xml"}
- database_connection = """
- <dataStore>
- <name>{}</name>
- <description>{}</description>
- <connectionParameters>
- <entry key="Expose primary keys">{}</entry>
- <entry key="host">{}</entry>
- <entry key="port">{}</entry>
- <entry key="user">{}</entry>
- <entry key="passwd">{}</entry>
- <entry key="dbtype">postgis</entry>
- <entry key="schema">{}</entry>
- <entry key="database">{}</entry>
- <entry key="Evictor run periodicity">{}</entry>
- <entry key="Max open prepared statements">{}</entry>
- <entry key="encode functions">{}</entry>
- <entry key="Primary key metadata table">{}</entry>
- <entry key="Batch insert size">{}</entry>
- <entry key="preparedStatements">{}</entry>
- <entry key="Estimated extends">{}</entry>
- <entry key="fetch size">{}</entry>
- <entry key="validate connections">{}</entry>
- <entry key="Support on the fly geometry simplification">{}</entry>
- <entry key="Connection timeout">{}</entry>
- <entry key="create database">{}</entry>
- <entry key="min connections">{}</entry>
- <entry key="max connections">{}</entry>
- <entry key="Evictor tests per run">{}</entry>
- <entry key="Test while idle">{}</entry>
- <entry key="Max connection idle time">{}</entry>
- <entry key="Loose bbox">{}</entry>
- </connectionParameters>
- </dataStore>
- """.format(
- store_name,
- description,
- expose_primary_keys,
- host,
- port,
- pg_user,
- pg_password,
- schema,
- db,
- evictor_run_periodicity,
- max_open_prepared_statements,
- encode_functions,
- primary_key_metadata_table,
- batch_insert_size,
- preparedstatements,
- estimated_extends,
- fetch_size,
- validate_connections,
- support_on_the_fly_geometry_simplification,
- connection_timeout,
- create_database,
- min_connections,
- max_connections,
- evictor_tests_per_run,
- test_while_idle,
- max_connection_idle_time,
- loose_bbox,
- )
- if overwrite:
- url = "{}/rest/workspaces/{}/datastores/{}".format(
- self.service_url, workspace, store_name
- )
- r = self._requests(
- "put",
- url,
- data=database_connection,
- headers=headers,
- )
- else:
- r = self._requests(
- "post",
- url,
- data=database_connection,
- headers=headers,
- )
- if r.status_code in [200, 201]:
- return "Featurestore created/updated successfully"
- else:
- raise GeoserverException(r.status_code, r.content)
- def create_datastore(
- self,
- name: str,
- path: str,
- workspace: Optional[str] = None,
- overwrite: bool = False,
- ):
- if workspace is None:
- workspace = "default"
- if path is None:
- raise Exception("You must provide a full path to the data")
- data_url = "<url>file:{}</url>".format(path)
- if "http://" in path:
- data_url = "<GET_CAPABILITIES_URL>{}</GET_CAPABILITIES_URL>".format(path)
- data = "<dataStore><name>{}</name><connectionParameters>{}</connectionParameters></dataStore>".format(
- name, data_url
- )
- headers = {"content-type": "text/xml"}
- if overwrite:
- url = "{}/rest/workspaces/{}/datastores/{}".format(
- self.service_url, workspace, name
- )
- r = self._requests("put", url, data=data, headers=headers)
- else:
- url = "{}/rest/workspaces/{}/datastores".format(self.service_url, workspace)
- r = self._requests(method="post", url=url, data=data, headers=headers)
- if r.status_code in [200, 201]:
- return "Data store created/updated successfully"
- else:
- raise GeoserverException(r.status_code, r.content)
- def create_shp_datastore(
- self,
- path: str,
- store_name: Optional[str] = None,
- workspace: Optional[str] = None,
- file_extension: str = "shp",
- ):
- if path is None:
- raise Exception("You must provide a full path to shapefile")
- if workspace is None:
- workspace = "default"
- if store_name is None:
- store_name = os.path.basename(path)
- f = store_name.split(".")
- if len(f) > 0:
- store_name = f[0]
- headers = {
- "Content-type": "application/zip",
- "Accept": "application/xml",
- }
- if isinstance(path, dict):
- path = prepare_zip_file(store_name, path)
- url = "{0}/rest/workspaces/{1}/datastores/{2}/file.{3}?filename={2}&update=overwrite".format(
- self.service_url, workspace, store_name, file_extension
- )
- with open(path, "rb") as f:
- r = self._requests("put", url, data=f.read(), headers=headers)
- if r.status_code in [200, 201, 202]:
- return "The shapefile datastore created successfully!"
- else:
- raise GeoserverException(r.status_code, r.content)
- def create_gpkg_datastore(
- self,
- path: str,
- store_name: Optional[str] = None,
- workspace: Optional[str] = None,
- file_extension: str = "gpkg",
- ):
- if path is None:
- raise Exception("You must provide a full path to shapefile")
- if workspace is None:
- workspace = "default"
- if store_name is None:
- store_name = os.path.basename(path)
- f = store_name.split(".")
- if len(f) > 0:
- store_name = f[0]
- headers = {
- "Content-type": "application/x-sqlite3",
- "Accept": "application/json",
- }
- url = "{0}/rest/workspaces/{1}/datastores/{2}/file.{3}?filename={2}".format(
- self.service_url, workspace, store_name, file_extension
- )
- with open(path, "rb") as f:
- r = self._requests("put", url, data=f.read(), headers=headers)
- if r.status_code in [200, 201, 202]:
- return "The geopackage datastore created successfully!"
- else:
- raise GeoserverException(r.status_code, r.content)
- def publish_featurestore(
- self,
- store_name: str,
- pg_table: str,
- workspace: Optional[str] = None,
- title: Optional[str] = None,
- advertised: Optional[bool] = True,
- abstract: Optional[str] = None,
- keywords: Optional[List[str]] = None,
- cqlfilter: Optional[str] = None
- ) -> int:
- if workspace is None:
- workspace = "default"
- if title is None:
- title = pg_table
- url = "{}/rest/workspaces/{}/datastores/{}/featuretypes/".format(
- self.service_url, workspace, store_name
- )
- abstract_xml = f"<abstract>{abstract}</abstract>" if abstract else ""
- keywords_xml = ""
- if keywords:
- keywords_xml = "<keywords>"
- for keyword in keywords:
- keywords_xml += f"<string>{keyword}</string>"
- keywords_xml += "</keywords>"
- cqlfilter_xml = f"<cqlFilter>{cqlfilter}</cqlFilter>" if cqlfilter else ""
- layer_xml = f"""<featureType>
- <name>{pg_table}</name>
- <title>{title}</title>
- <advertised>{advertised}</advertised>
- {abstract_xml}
- {keywords_xml}
- {cqlfilter_xml}
- </featureType>"""
- headers = {"content-type": "text/xml;charset=utf-8"}
- print(url)
- print(layer_xml)
- r = self._requests("post", url, data=layer_xml.encode("utf-8"), headers=headers)
- if r.status_code == 201 or r.status_code == 200:
- return r.status_code
- else:
- raise GeoserverException(r.status_code, r.content)
- def edit_featuretype(
- self,
- store_name: str,
- workspace: Optional[str],
- pg_table: str,
- name: str,
- title: str,
- abstract: Optional[str] = None,
- keywords: Optional[List[str]] = None,
- recalculate: Optional[str] = None
- ) -> int:
- if workspace is None:
- workspace = "default"
- recalculate_param = f"?recalculate={recalculate}" if recalculate else ""
- url = "{}/rest/workspaces/{}/datastores/{}/featuretypes/{}.xml{}".format(
- self.service_url, workspace, store_name, pg_table, recalculate_param
- )
- # Create XML for abstract and keywords
- abstract_xml = f"<abstract>{abstract}</abstract>" if abstract else ""
- keywords_xml = ""
- if keywords:
- keywords_xml = "<keywords>"
- for keyword in keywords:
- keywords_xml += f"<string>{keyword}</string>"
- keywords_xml += "</keywords>"
- layer_xml = f"""<featureType>
- <name>{name}</name>
- <title>{title}</title>
- {abstract_xml}{keywords_xml}
- </featureType>"""
- headers = {"content-type": "text/xml"}
- r = self._requests("put", url, data=layer_xml, headers=headers)
- if r.status_code == 200:
- return r.status_code
- else:
- raise GeoserverException(r.status_code, r.content)
- def publish_featurestore_sqlview(
- self,
- name: str,
- store_name: str,
- sql: str,
- parameters: Optional[Iterable[Dict]] = None,
- key_column: Optional[str] = None,
- geom_name: str = "geom",
- geom_type: str = "Geometry",
- srid: Optional[int] = 4326,
- workspace: Optional[str] = None,
- ) -> int:
- if workspace is None:
- workspace = "default"
- # issue #87
- if key_column is not None:
- key_column_xml = """<keyColumn>{}</keyColumn>""".format(key_column)
- else:
- key_column_xml = """"""
- parameters_xml = ""
- if parameters is not None:
- for parameter in parameters:
- # non-string parameters MUST have a default value supplied
- if not is_surrounded_by_quotes(sql, parameter["name"]) and not "defaultValue" in parameter:
- raise ValueError(f"Parameter `{parameter['name']}` appears to be a non-string in the supplied query"
- ", but does not have a default value specified. You must supply a default value "
- "for non-string parameters using the `defaultValue` key.")
- param_name = parameter.get("name", "")
- default_value = parameter.get("defaultValue", "")
- regexp_validator = parameter.get("regexpValidator", r"^[\w\d\s]+$")
- parameters_xml += (f"""
- <parameter>
- <name>{param_name}</name>
- <defaultValue>{default_value}</defaultValue>
- <regexpValidator>{regexp_validator}</regexpValidator>
- </parameter>\n
- """.strip())
- layer_xml = """<featureType>
- <name>{0}</name>
- <enabled>true</enabled>
- <namespace>
- <name>{4}</name>
- </namespace>
- <title>{0}</title>
- <srs>EPSG:{5}</srs>
- <metadata>
- <entry key="JDBC_VIRTUAL_TABLE">
- <virtualTable>
- <name>{0}</name>
- <sql>{1}</sql>
- <escapeSql>true</escapeSql>
- <geometry>
- <name>{2}</name>
- <type>{3}</type>
- <srid>{5}</srid>
- </geometry>{6}
- {7}
- </virtualTable>
- </entry>
- </metadata>
- </featureType>""".format(
- name, sql, geom_name, geom_type, workspace, srid, key_column_xml, parameters_xml
- )
- # rest API url
- url = "{}/rest/workspaces/{}/datastores/{}/featuretypes".format(
- self.service_url, workspace, store_name
- )
- # headers
- headers = {"content-type": "text/xml"}
- # request
- r = self._requests("post", url, data=layer_xml, headers=headers)
- if r.status_code == 201:
- return r.status_code
- else:
- raise GeoserverException(r.status_code, r.content)
- def get_featuretypes(self, workspace: str = None, store_name: str = None) -> List[str]:
- url = "{}/rest/workspaces/{}/datastores/{}/featuretypes.json".format(
- self.service_url, workspace, store_name
- )
- r = self._requests("get", url)
- if r.status_code == 200:
- r_dict = r.json()
- features = [i["name"] for i in r_dict["featureTypes"]["featureType"]]
- return features
- else:
- raise GeoserverException(r.status_code, r.content)
- def get_feature_attribute(
- self, feature_type_name: str, workspace: str, store_name: str
- ) -> List[str]:
- url = "{}/rest/workspaces/{}/datastores/{}/featuretypes/{}.json".format(
- self.service_url, workspace, store_name, feature_type_name
- )
- r = self._requests("get", url)
- if r.status_code == 200:
- r_dict = r.json()
- attribute = [
- i["name"] for i in r_dict["featureType"]["attributes"]["attribute"]
- ]
- return attribute
- else:
- raise GeoserverException(r.status_code, r.content)
- def get_featurestore(self, store_name: str, workspace: str) -> dict:
- url = "{}/rest/workspaces/{}/datastores/{}".format(
- self.service_url, workspace, store_name
- )
- r = self._requests("get", url)
- if r.status_code == 200:
- r_dict = r.json()
- return r_dict["dataStore"]
- else:
- raise GeoserverException(r.status_code, r.content)
- def delete_featurestore(
- self, featurestore_name: str, workspace: Optional[str] = None
- ) -> str:
- payload = {"recurse": "true"}
- url = "{}/rest/workspaces/{}/datastores/{}".format(
- self.service_url, workspace, featurestore_name
- )
- if workspace is None:
- url = "{}/datastores/{}".format(self.service_url, featurestore_name)
- r = self._requests("delete", url, params=payload)
- if r.status_code == 200:
- return "Status code: {}, delete featurestore".format(r.status_code)
- else:
- return "Status code: {}, delete featurestore".format(r.status_code)
- # raise GeoserverException(r.status_code, r.content)
- def delete_coveragestore(
- self, coveragestore_name: str, workspace: Optional[str] = None
- ) -> str:
- payload = {"recurse": "true"}
- url = "{}/rest/workspaces/{}/coveragestores/{}".format(
- self.service_url, workspace, coveragestore_name
- )
- if workspace is None:
- url = "{}/rest/coveragestores/{}".format(
- self.service_url, coveragestore_name
- )
- r = self._requests("delete", url, params=payload)
- if r.status_code == 200:
- return "Coverage store deleted successfully"
- else:
- raise GeoserverException(r.status_code, r.content)
- def get_all_users(self, service=None) -> dict:
- url = "{}/rest/security/usergroup/".format(self.service_url)
- if service is None:
- url += "users/"
- else:
- url += "service/{}/users/".format(service)
- headers = {"accept": "application/xml"}
- r = self._requests("get", url, headers=headers)
- if r.status_code == 200:
- return parse(r.content)
- else:
- raise GeoserverException(r.status_code, r.content)
- def create_user(
- self, username: str, password: str, enabled: bool = True, service=None
- ) -> str:
- url = "{}/rest/security/usergroup/".format(self.service_url)
- if service is None:
- url += "users/"
- else:
- url += "service/{}/users/".format(service)
- data = "<user><userName>{}</userName><password>{}</password><enabled>{}</enabled></user>".format(
- username, password, str(enabled).lower()
- )
- headers = {"content-type": "text/xml", "accept": "application/json"}
- r = self._requests("post", url, data=data, headers=headers)
- if r.status_code == 201:
- return "User created successfully"
- else:
- raise GeoserverException(r.status_code, r.content)
- def modify_user(
- self, username: str, new_name=None, new_password=None, enable=None, service=None
- ) -> str:
- url = "{}/rest/security/usergroup/".format(self.service_url)
- if service is None:
- url += "user/{}".format(username)
- else:
- url += "service/{}/user/{}".format(service, username)
- modifications = dict()
- if new_name is not None:
- modifications["userName"] = new_name
- if new_password is not None:
- modifications["password"] = new_password
- if enable is not None:
- modifications["enabled"] = enable
- data = unparse({"user": modifications})
- print(url, data)
- headers = {"content-type": "text/xml", "accept": "application/json"}
- r = self._requests("post", url, data=data, headers=headers)
- if r.status_code == 200:
- return "User modified successfully"
- else:
- raise GeoserverException(r.status_code, r.content)
- def delete_user(self, username: str, service=None) -> str:
- url = "{}/rest/security/usergroup/".format(self.service_url)
- if service is None:
- url += "user/{}".format(username)
- else:
- url += "service/{}/user/{}".format(service, username)
- headers = {"accept": "application/json"}
- r = self._requests("delete", url, headers=headers)
- if r.status_code == 200:
- return "User deleted successfully"
- else:
- raise GeoserverException(r.status_code, r.content)
- def get_all_usergroups(self, service=None) -> dict:
- url = "{}/rest/security/usergroup/".format(self.service_url)
- if service is None:
- url += "groups/"
- else:
- url += "service/{}/groups/".format(service)
- r = self._requests("get", url)
- if r.status_code == 200:
- return parse(r.content)
- else:
- raise GeoserverException(r.status_code, r.content)
- def create_usergroup(self, group: str, service=None) -> str:
- url = "{}/rest/security/usergroup/".format(self.service_url)
- if service is None:
- url += "group/{}".format(group)
- else:
- url += "service/{}/group/{}".format(service, group)
- r = self._requests("post", url)
- if r.status_code == 201:
- return "Group created successfully"
- else:
- raise GeoserverException(r.status_code, r.content)
- def delete_usergroup(self, group: str, service=None) -> str:
- url = "{}/rest/security/usergroup/".format(self.service_url)
- if service is None:
- url += "group/{}".format(group)
- else:
- url += "service/{}/group/{}".format(service, group)
- r = self._requests("delete", url)
- if r.status_code == 200:
- return "Group deleted successfully"
- else:
- raise GeoserverException(r.status_code, r.content)
- def caching_layer(
- self,
- layer_name: str,
- auto_seed: Optional[bool] = True,
- zoom_start: Optional[int] = default_cache_start,
- zoom_stop: Optional[int] = default_cache_stop,
- gridset_name: Optional[str] = default_gridset_name
- ) -> int:
- url = "{}/gwc/rest/layers/{}".format(
- self.service_url, layer_name
- )
- cache_xml = f"""<GeoServerLayer>
- <enabled>true</enabled>
- <inMemoryCached>true</inMemoryCached>
- <name>{layer_name}</name>
- <mimeFormats>
- <string>image/png</string>
- <string>image/jpeg</string>
- </mimeFormats>
- <gridSubsets>
- <gridSubset>
- <gridSetName>{gridset_name}</gridSetName>
- <zoomStart>{zoom_start}</zoomStart>
- <zoomStop>{zoom_stop}</zoomStop>
- <minCachedLevel>{zoom_start}</minCachedLevel>
- <maxCachedLevel>{zoom_stop}</maxCachedLevel>
- </gridSubset>
- </gridSubsets>
- <metaWidthHeight>
- <int>4</int>
- <int>4</int>
- </metaWidthHeight>
- <expireCache>0</expireCache>
- <expireClients>0</expireClients>
- <parameterFilters>
- </parameterFilters>
- <gutter>0</gutter>
- <autoCacheStyles>true</autoCacheStyles>
- </GeoServerLayer>"""
- headers = {"content-type": "text/xml"}
- gridinfo = self._requests("post", url, data=cache_xml, headers=headers)
- if gridinfo.status_code == 200:
- if auto_seed:
- print(f"""发送{layer_name}切片请求。。。""")
- self.seed_caching_layer(layer_name=layer_name, zoom_start=zoom_start, zoom_stop=zoom_stop)
- return gridinfo.status_code
- else:
- raise GeoserverException(gridinfo.status_code, gridinfo.content)
- def seed_caching_layer(
- self,
- layer_name: str,
- zoom_start: Optional[int] = default_cache_start,
- zoom_stop: Optional[int] = default_cache_stop,
- gridset_name: Optional[str] = default_gridset_name,
- seedtype: Optional[str] = default_seed_type,
- ) -> int:
- url = "{}/gwc/rest/seed/{}?" \
- "threadCount=4" \
- "&type={}" \
- "&gridSetId={}" \
- "&tileFormat=image%2Fpng" \
- "&zoomStart={}" \
- "&zoomStop={}" \
- "¶meter_STYLES=" \
- "&minX=&minY=&maxX=&maxY=&tileFailureRetryCount=-1&tileFailureRetryWaitTime=100&totalFailuresBeforeAborting=1000".format(
- self.service_url, layer_name, seedtype, gridset_name, zoom_start, zoom_stop
- )
- headers = {"content-type": "application/json"}
- gridinfo = self._requests("post", url, data=None, headers=headers)
- if gridinfo.status_code == 201 or gridinfo.status_code == 200:
- return gridinfo.status_code
- else:
- raise GeoserverException(gridinfo.status_code, gridinfo.content)
|