# -*- coding: utf-8 -*-
__author__ = 'wanger'
__date__ = '2024-08-20'
__copyright__ = '(C) 2024 by siwei'
__revision__ = '1.0'
# inbuilt libraries
import os
from pathlib import Path
from typing import List, Optional, Set, Dict, Iterable, Any
import siwei_config
# third-party libraries
import requests
# custom functions
from processing.tools.GeoServer.Calculation_gdal import raster_value
from processing.tools.GeoServer.Style import catagorize_xml, classified_xml, coverage_style_xml, outline_only_xml
from processing.tools.GeoServer.supports import prepare_zip_file, is_valid_xml, is_surrounded_by_quotes
from xmltodict import parse, unparse
default_gridset_name = "WebMercatorQuadx2" # 默认切片方案
default_seed_type = "seed" # 默认切片请求类型 Type can be seed (add tiles), reseed (replace tiles), or truncate (remove tiles)
default_cache_start = 0 # 默认切片始终级别
default_cache_stop = 18
def _parse_request_options(request_options: Dict[str, Any]):
return request_options if request_options is not None else {}
# Custom exceptions.
class GeoserverException(Exception):
def __init__(self, status, message):
self.status = status
self.message = message
super().__init__(f"Status : {self.status} - {self.message}")
# call back class for reading the data
class DataProvider:
def __init__(self, data):
self.data = data
self.finished = False
def read_cb(self, size):
assert len(self.data) <= size
if not self.finished:
self.finished = True
return self.data
else:
# Nothing more to read
return ""
# callback class for reading the files
class FileReader:
def __init__(self, fp):
self.fp = fp
def read_callback(self, size):
return self.fp.read(size)
class Geoserver:
def __init__(
self,
service_url: str = siwei_config.CONFIG['geoserver']['url'], # default deployment url during installation
username: str = siwei_config.CONFIG['geoserver']['username'], # default username during geoserver installation
password: str = siwei_config.CONFIG['geoserver']['password'], # default password during geoserver installation
request_options: Dict[str, Any] = None # additional parameters to be sent with each request
):
self.service_url = service_url
self.username = username
self.password = password
self.request_options = request_options if request_options is not None else {}
def _requests(self,
method: str,
url: str,
**kwargs) -> requests.Response:
if method.lower() == "post":
return requests.post(url, auth=(self.username, self.password), **kwargs, **self.request_options)
elif method.lower() == "get":
return requests.get(url, auth=(self.username, self.password), **kwargs, **self.request_options)
elif method.lower() == "put":
return requests.put(url, auth=(self.username, self.password), **kwargs, **self.request_options)
elif method.lower() == "delete":
return requests.delete(url, auth=(self.username, self.password), **kwargs, **self.request_options)
def get_manifest(self):
url = "{}/rest/about/manifest.json".format(self.service_url)
r = self._requests("get", url)
if r.status_code == 200:
return r.json()
else:
raise GeoserverException(r.status_code, r.content)
def get_version(self):
url = "{}/rest/about/version.json".format(self.service_url)
r = self._requests("get", url)
if r.status_code == 200:
return r.json()
else:
raise GeoserverException(r.status_code, r.content)
def get_status(self):
url = "{}/rest/about/status.json".format(self.service_url)
r = self._requests("get", url)
if r.status_code == 200:
return r.json()
else:
raise GeoserverException(r.status_code, r.content)
def get_system_status(self):
url = "{}/rest/about/system-status.json".format(self.service_url)
r = self._requests("get", url)
if r.status_code == 200:
return r.json()
else:
raise GeoserverException(r.status_code, r.content)
def reload(self):
url = "{}/rest/reload".format(self.service_url)
r = self._requests("post", url)
if r.status_code == 200:
return "Status code: {}".format(r.status_code)
else:
raise GeoserverException(r.status_code, r.content)
def reset(self):
url = "{}/rest/reset".format(self.service_url)
r = self._requests("post", url)
if r.status_code == 200:
return "Status code: {}".format(r.status_code)
else:
raise GeoserverException(r.status_code, r.content)
# _______________________________________________________________________________________________
#
# WORKSPACES
# _______________________________________________________________________________________________
#
def get_default_workspace(self):
url = "{}/rest/workspaces/default".format(self.service_url)
r = self._requests("get", url)
if r.status_code == 200:
return r.json()
else:
raise GeoserverException(r.status_code, r.content)
def get_workspace(self, workspace):
url = "{}/rest/workspaces/{}.json".format(self.service_url, workspace)
r = self._requests("get", url, params={"recurse": "true"})
if r.status_code == 200:
return r.json()
else:
raise GeoserverException(r.status_code, r.content)
def get_workspaces(self):
url = "{}/rest/workspaces".format(self.service_url)
r = self._requests("get", url)
if r.status_code == 200:
return r.json()
else:
raise GeoserverException(r.status_code, r.content)
def set_default_workspace(self, workspace: str):
url = "{}/rest/workspaces/default".format(self.service_url)
data = "{}".format(workspace)
r = self._requests(
"put",
url,
data=data,
headers={"content-type": "text/xml"}
)
if r.status_code == 200:
return "Status code: {}, default workspace {} set!".format(
r.status_code, workspace
)
else:
raise GeoserverException(r.status_code, r.content)
def create_workspace(self, workspace: str):
url = "{}/rest/workspaces".format(self.service_url)
data = "{}".format(workspace)
headers = {"content-type": "text/xml"}
r = self._requests("post", url, data=data, headers=headers)
if r.status_code == 201:
return "{} Workspace {} created!".format(r.status_code, workspace)
else:
return "{} Workspace {} already exists!".format(r.status_code, workspace)
# raise GeoserverException(r.status_code, r.content)
def delete_workspace(self, workspace: str):
payload = {"recurse": "true"}
url = "{}/rest/workspaces/{}".format(self.service_url, workspace)
r = self._requests("delete", url, params=payload)
if r.status_code == 200:
return "Status code: {}, delete workspace".format(r.status_code)
else:
raise GeoserverException(r.status_code, r.content)
def get_datastore(self, store_name: str, workspace: Optional[str] = None):
if workspace is None:
workspace = "default"
url = "{}/rest/workspaces/{}/datastores/{}".format(
self.service_url, workspace, store_name
)
r = self._requests("get", url)
if r.status_code == 200:
return r.json()
else:
raise GeoserverException(r.status_code, r.content)
def get_datastores(self, workspace: Optional[str] = None):
if workspace is None:
workspace = "default"
url = "{}/rest/workspaces/{}/datastores.json".format(
self.service_url, workspace
)
r = self._requests("get", url)
if r.status_code == 200:
return r.json()
else:
raise GeoserverException(r.status_code, r.content)
def get_coveragestore(
self, coveragestore_name: str, workspace: Optional[str] = None
):
payload = {"recurse": "true"}
if workspace is None:
workspace = "default"
url = "{}/rest/workspaces/{}/coveragestores/{}.json".format(
self.service_url, workspace, coveragestore_name
)
r = self._requests(method="get", url=url, params=payload)
if r.status_code == 200:
return r.json()
else:
raise GeoserverException(r.status_code, r.content)
def get_coveragestores(self, workspace: str = None):
if workspace is None:
workspace = "default"
url = "{}/rest/workspaces/{}/coveragestores".format(self.service_url, workspace)
r = self._requests("get", url)
if r.status_code == 200:
return r.json()
else:
raise GeoserverException(r.status_code, r.content)
def create_coveragestore(
self,
path,
workspace: Optional[str] = None,
layer_name: Optional[str] = None,
file_type: str = "GeoTIFF",
content_type: str = "image/tiff",
):
if path is None:
raise Exception("You must provide the full path to the raster")
if workspace is None:
workspace = "default"
if layer_name is None:
layer_name = os.path.basename(path)
f = layer_name.split(".")
if len(f) > 0:
layer_name = f[0]
file_type = file_type.lower()
url = "{0}/rest/workspaces/{1}/coveragestores/{2}/file.{3}?coverageName={2}".format(
self.service_url, workspace, layer_name, file_type
)
headers = {"content-type": content_type, "Accept": "application/json"}
r = None
with open(path, "rb") as f:
r = self._requests(method="put", url=url, data=f, headers=headers)
if r.status_code == 201:
return r.json()
else:
raise GeoserverException(r.status_code, r.content)
def publish_time_dimension_to_coveragestore(
self,
store_name: Optional[str] = None,
workspace: Optional[str] = None,
presentation: Optional[str] = "LIST",
units: Optional[str] = "ISO8601",
default_value: Optional[str] = "MINIMUM",
content_type: str = "application/xml; charset=UTF-8",
):
url = "{0}/rest/workspaces/{1}/coveragestores/{2}/coverages/{2}".format(
self.service_url, workspace, store_name
)
headers = {"content-type": content_type}
time_dimension_data = (
""
"true"
""
""
""
"true"
"{}"
"{}"
""
"{}"
""
""
""
""
"".format(presentation, units, default_value)
)
r = self._requests(
method="put", url=url, data=time_dimension_data, headers=headers
)
if r.status_code in [200, 201]:
return r.json()
else:
raise GeoserverException(r.status_code, r.content)
def get_layer(self, layer_name: str, workspace: Optional[str] = None):
url = "{}/rest/layers/{}".format(self.service_url, layer_name)
if workspace is not None:
url = "{}/rest/workspaces/{}/layers/{}".format(
self.service_url, workspace, layer_name
)
r = self._requests("get", url)
if r.status_code == 200:
return r.json()
else:
raise GeoserverException(r.status_code, r.content)
def get_layers(self, workspace: Optional[str] = None):
url = "{}/rest/layers".format(self.service_url)
if workspace is not None:
url = "{}/rest/workspaces/{}/layers".format(self.service_url, workspace)
r = self._requests("get", url)
if r.status_code == 200:
return r.json()
else:
raise GeoserverException(r.status_code, r.content)
def delete_layer(self, layer_name: str, workspace: Optional[str] = None):
payload = {"recurse": "true"}
url = "{}/rest/workspaces/{}/layers/{}".format(
self.service_url, workspace, layer_name
)
if workspace is None:
url = "{}/rest/layers/{}".format(self.service_url, layer_name)
r = self._requests(method="delete", url=url, params=payload)
if r.status_code == 200:
return "Status code: {}, delete layer".format(r.status_code)
else:
raise GeoserverException(r.status_code, r.content)
def get_layergroups(self, workspace: Optional[str] = None):
url = "{}/rest/layergroups".format(self.service_url)
if workspace is not None:
url = "{}/rest/workspaces/{}/layergroups".format(
self.service_url, workspace
)
r = self._requests("get", url)
if r.status_code == 200:
return r.json()
else:
raise GeoserverException(r.status_code, r.content)
def get_layergroup(self, layer_name: str, workspace: Optional[str] = None):
url = "{}/rest/layergroups/{}".format(self.service_url, layer_name)
if workspace is not None:
url = "{}/rest/workspaces/{}/layergroups/{}".format(
self.service_url, workspace, layer_name
)
r = self._requests("get", url)
if r.status_code == 200:
return r.json()
else:
return None
# raise GeoserverException(r.status_code, r.content)
def create_layergroup(
self,
name: str = "geoserver-rest-layergroup",
mode: str = "single",
title: str = "geoserver-rest layer group",
abstract_text: str = "A new layergroup created with geoserver-rest python package",
layers: List[str] = [],
workspace: Optional[str] = None,
formats: str = "html",
metadata: List[dict] = [],
keywords: List[str] = [],
) -> str:
assert isinstance(name, str), "Name must be of type String:''"
assert isinstance(mode, str), "Mode must be of type String:''"
assert isinstance(title, str), "Title must be of type String:''"
assert isinstance(abstract_text, str), "Abstract text must be of type String:''"
assert isinstance(formats, str), "Format must be of type String:''"
assert isinstance(
metadata, list
), "Metadata must be of type List of dict:[{'about':'geoserver rest data metadata','content_url':'link to content url'}]"
assert isinstance(
keywords, list
), "Keywords must be of type List:['keyword1','keyword2'...]"
assert isinstance(
layers, list
), "Layers must be of type List:['layer1','layer2'...]"
if workspace:
assert isinstance(workspace, str), "Workspace must be of type String:''"
# check if the workspace is valid in GeoServer
if self.get_workspace(workspace) is None:
raise Exception("Workspace is not valid in GeoServer Instance")
supported_modes: Set = {
"single",
"opaque",
"named",
"container",
"eo",
}
supported_formats: Set = {"html", "json", "xml"}
if mode.lower() != "single" and mode.lower() not in supported_modes:
raise Exception(
f"Mode not supported. Acceptable modes are : {supported_modes}"
)
if formats.lower() != "html" and formats.lower() not in supported_formats:
raise Exception(
f"Format not supported. Acceptable formats are : {supported_formats}"
)
# check if it already exist in GeoServer
try:
existing_layergroup = self.get_layergroup(name, workspace=workspace)
except GeoserverException:
existing_layergroup = None
if existing_layergroup is not None:
raise Exception(f"Layergroup: {name} already exist in GeoServer instance")
if len(layers) == 0:
raise Exception("No layer provided!")
else:
for layer in layers:
# check if it is valid in geoserver
try:
# Layer check
self.get_layer(
layer_name=layer,
workspace=workspace if workspace is not None else None,
)
except GeoserverException:
try:
# Layer group check
self.get_layergroup(
layer_name=layer,
workspace=workspace if workspace is not None else None,
)
except GeoserverException:
raise Exception(
f"Layer: {layer} is not a valid layer in the GeoServer instance"
)
skeleton = ""
if workspace:
skeleton += f"{workspace}"
metadata_xml_list = []
if len(metadata) >= 1:
for meta in metadata:
metadata_about = meta.get("about")
metadata_content_url = meta.get("content_url")
metadata_xml_list.append(
f"""
text/plain
{metadata_about}
ISO19115:2003
{metadata_content_url}
"""
)
metadata_xml = f"{''.join(['{}'] * len(metadata_xml_list)).format(*metadata_xml_list)}"
skeleton += metadata_xml
layers_xml_list: List[str] = []
for layer in layers:
published_type = "layer"
try:
# Layer check
self.get_layer(
layer_name=layer,
workspace=workspace if workspace is not None else None,
)
except GeoserverException: # It's a layer group
published_type = "layerGroup"
layers_xml_list.append(
f"""
{layer}
{self.service_url}/layers/{layer}.xml
"""
)
layers_xml: str = f"{''.join(['{}'] * len(layers)).format(*layers_xml_list)}"
skeleton += layers_xml
if len(keywords) >= 1:
keyword_xml_list: List[str] = [
f"{keyword}" for keyword in keywords
]
keywords_xml: str = f"{''.join(['{}'] * len(keywords)).format(*keyword_xml_list)}"
skeleton += keywords_xml
data = f"""
{name}
{mode}
{title}
{abstract_text}
{skeleton}
"""
url = f"{self.service_url}/rest/layergroups/"
r = self._requests(
method="post", url=url, data=data, headers={"content-type": "text/xml"}
)
if r.status_code == 201:
layergroup_url = f"{self.service_url}/rest/layergroups/{name}.{formats}"
return f"layergroup created successfully! Layergroup link: {layergroup_url}"
else:
raise GeoserverException(r.status_code, r.content)
def update_layergroup(
self,
layergroup_name,
title: Optional[str] = None,
abstract_text: Optional[str] = None,
formats: str = "html",
metadata: List[dict] = [],
keywords: List[str] = [],
) -> str:
if self.get_layergroup(layer_name=layergroup_name) is None:
raise Exception(
f"Layer group: {layergroup_name} is not a valid layer group in the Geoserver instance"
)
if title is not None:
assert isinstance(title, str), "Title must be of type String:''"
if abstract_text is not None:
assert isinstance(
abstract_text, str
), "Abstract text must be of type String:''"
assert isinstance(formats, str), "Format must be of type String:''"
assert isinstance(
metadata, list
), "Metadata must be of type List of dict:[{'about':'geoserver rest data metadata','content_url':'lint to content url'}]"
assert isinstance(
keywords, list
), "Keywords must be of type List:['keyword1','keyword2'...]"
supported_formats: Set = {"html", "json", "xml"}
if formats.lower() != "html" and formats.lower() not in supported_formats:
raise Exception(
f"Format not supported. Acceptable formats are : {supported_formats}"
)
skeleton = ""
if title:
skeleton += f"
{title}"
if abstract_text:
skeleton += f"{abstract_text}"
metadata_xml_list = []
if len(metadata) >= 1:
for meta in metadata:
metadata_about = meta.get("about")
metadata_content_url = meta.get("content_url")
metadata_xml_list.append(
f"""
text/plain
{metadata_about}
ISO19115:2003
{metadata_content_url}
"""
)
metadata_xml = f"{''.join(['{}'] * len(metadata_xml_list)).format(*metadata_xml_list)}"
skeleton += metadata_xml
if len(keywords) >= 1:
keyword_xml_list: List[str] = [
f"{keyword}" for keyword in keywords
]
keywords_xml: str = f"{''.join(['{}'] * len(keyword_xml_list)).format(*keyword_xml_list)}"
skeleton += keywords_xml
data = f"""
{skeleton}
"""
url = f"{self.service_url}/rest/layergroups/{layergroup_name}"
r = self._requests(
method="put",
url=url,
data=data,
headers={"content-type": "text/xml", "accept": "application/xml"},
)
if r.status_code == 200:
layergroup_url = (
f"{self.service_url}/rest/layergroups/{layergroup_name}.{formats}"
)
return f"layergroup updated successfully! Layergroup link: {layergroup_url}"
else:
raise GeoserverException(r.status_code, r.content)
def delete_layergroup(
self, layergroup_name: str, workspace: Optional[str] = None
) -> str:
# raises an exception in case the layer group doesn't exist
self.get_layergroup(layer_name=layergroup_name, workspace=workspace)
if workspace is None:
url = f"{self.service_url}/rest/layergroups/{layergroup_name}"
else:
url = f"{self.service_url}/rest/workspaces/{workspace}/layergroups/{layergroup_name}"
r = self._requests(url=url, method="delete")
if r.status_code == 200:
return "Layer group deleted successfully"
else:
return "Layer group deleted successfully"
# raise GeoserverException(r.status_code, r.content)
def add_layer_to_layergroup(
self,
layer_name: str,
layer_workspace: str,
layergroup_name: str,
layergroup_workspace: str = None,
) -> None:
layergroup_info = self.get_layergroup(
layer_name=layergroup_name, workspace=layergroup_workspace
)
layer_info = self.get_layer(layer_name=layer_name, workspace=layer_workspace)
# build list of existing publishables & styles
publishables = layergroup_info["layerGroup"]["publishables"]["published"]
if not isinstance(publishables, list): # only 1 layer up to now
publishables = [publishables]
styles = layergroup_info["layerGroup"]["styles"]["style"]
if not isinstance(styles, list): # only 1 layer up to now
styles = [styles]
# add publishable & style for the new layer
new_pub = {
"name": f"{layer_workspace}:{layer_name}",
"href": f"{self.service_url}/rest/workspaces/{layer_workspace}/layers/{layer_name}.json",
}
publishables.append(new_pub)
new_style = layer_info["layer"]["defaultStyle"]
styles.append(new_style)
data = self._layergroup_definition_from_layers_and_styles(
publishables=publishables, styles=styles
)
if layergroup_workspace is None:
url = f"{self.service_url}/rest/layergroups/{layergroup_name}"
else:
url = f"{self.service_url}/rest/workspaces/{layergroup_workspace}/layergroups/{layergroup_name}"
r = self._requests(
method="put",
url=url,
data=data,
headers={"content-type": "text/xml", "accept": "application/xml"},
)
if r.status_code == 200:
return
else:
raise GeoserverException(r.status_code, r.content)
def remove_layer_from_layergroup(
self,
layer_name: str,
layer_workspace: str,
layergroup_name: str,
layergroup_workspace: str = None,
) -> None:
layergroup_info = self.get_layergroup(
layer_name=layergroup_name, workspace=layergroup_workspace
)
# build list of existing publishables & styles
publishables = layergroup_info["layerGroup"]["publishables"]["published"]
if not isinstance(publishables, list): # only 1 layer up to now
publishables = [publishables]
styles = layergroup_info["layerGroup"]["styles"]["style"]
if not isinstance(styles, list): # only 1 layer up to now
styles = [styles]
layer_to_remove = f"{layer_workspace}:{layer_name}"
revised_set_of_publishables_and_styles = [
(pub, style)
for (pub, style) in zip(
layergroup_info["layerGroup"]["publishables"]["published"],
layergroup_info["layerGroup"]["styles"]["style"],
)
if pub["name"] != layer_to_remove
]
revised_set_of_publishables = list(
map(list, zip(*revised_set_of_publishables_and_styles))
)[0]
revised_set_of_styles = list(
map(list, zip(*revised_set_of_publishables_and_styles))
)[1]
xml_payload = self._layergroup_definition_from_layers_and_styles(
publishables=revised_set_of_publishables, styles=revised_set_of_styles
)
if layergroup_workspace is None:
url = f"{self.service_url}/rest/layergroups/{layergroup_name}"
else:
url = f"{self.service_url}/rest/workspaces/{layergroup_workspace}/layergroups/{layergroup_name}"
r = self._requests(
method="put",
url=url,
data=xml_payload,
headers={"content-type": "text/xml", "accept": "application/xml"},
)
if r.status_code == 200:
return
else:
raise GeoserverException(r.status_code, r.content)
def _layergroup_definition_from_layers_and_styles(
self, publishables: list, styles: list
) -> str:
# the get_layergroup method may return an empty string for style;
# so we get the default styles for each layer with no style information in the layergroup
if len(styles) == 1:
index = [0]
else:
index = range(len(styles))
for ix, this_style, this_layer in zip(index, styles, publishables):
if this_style == "":
this_layer_info = self.get_layer(
layer_name=this_layer["name"].split(":")[1],
workspace=this_layer["name"].split(":")[0],
)
styles[ix] = {
"name": this_layer_info["layer"]["defaultStyle"]["name"],
"href": this_layer_info["layer"]["defaultStyle"]["href"],
}
# build xml structure
layer_skeleton = ""
style_skeleton = ""
for publishable in publishables:
layer_str = f"""
{publishable['name']}
{publishable['href']}
"""
layer_skeleton += layer_str
for style in styles:
style_str = f"""
"""
style_skeleton += style_str
data = f"""
{layer_skeleton}
{style_skeleton}
"""
return data
def get_style(self, style_name, workspace: Optional[str] = None):
url = "{}/rest/styles/{}.json".format(self.service_url, style_name)
if workspace is not None:
url = "{}/rest/workspaces/{}/styles/{}.json".format(
self.service_url, workspace, style_name
)
r = self._requests("get", url)
if r.status_code == 200:
return r.json()
else:
raise GeoserverException(r.status_code, r.content)
def get_styles(self, workspace: Optional[str] = None):
url = "{}/rest/styles.json".format(self.service_url)
if workspace is not None:
url = "{}/rest/workspaces/{}/styles.json".format(
self.service_url, workspace
)
r = self._requests("get", url)
if r.status_code == 200:
return r.json()
else:
raise GeoserverException(r.status_code, r.content)
def upload_style(
self,
path: str,
name: Optional[str] = None,
workspace: Optional[str] = None,
sld_version: str = "1.1.0",
):
if name is None:
name = os.path.basename(path)
f = name.split(".")
if len(f) > 0:
name = f[0]
if is_valid_xml(path):
xml = path
elif Path(path).exists():
with open(path, "r", encoding='utf-8') as f:
xml = f.read()
xml = xml.encode("gbk")
else:
raise ValueError("`path` must be either a path to a style file, or a valid XML string.")
headers = {"content-type": "text/xml"}
url = "{}/rest/workspaces/{}/styles".format(self.service_url, workspace)
sld_content_type = "application/vnd.ogc.sld+xml"
if sld_version == "1.1.0" or sld_version == "1.1":
sld_content_type = "application/vnd.ogc.se+xml"
header_sld = {"content-type": sld_content_type}
if workspace is None:
url = "{}/rest/styles".format(self.service_url)
style_xml = "".format(
name, name + ".sld"
)
r = self._requests(method="post", url=url, data=style_xml, headers=headers)
if r.status_code == 201:
r_sld = self._requests(method="put", url=url + "/" + name, data=xml, headers=header_sld)
if r_sld.status_code == 200:
return r_sld.status_code
else:
return r_sld.status_code
# raise GeoserverException(r_sld.status_code, r_sld.content)
else:
return r.status_code
# raise GeoserverException(r.status_code, r.content)
def create_coveragestyle(
self,
raster_path: str,
style_name: Optional[str] = None,
workspace: str = None,
color_ramp: str = "RdYlGn_r",
cmap_type: str = "ramp",
number_of_classes: int = 5,
opacity: float = 1,
):
raster = raster_value(raster_path)
min_value = raster["min"]
max_value = raster["max"]
if style_name is None:
style_name = raster["file_name"]
coverage_style_xml(
color_ramp,
style_name,
cmap_type,
min_value,
max_value,
number_of_classes,
opacity,
)
style_xml = "".format(
style_name, style_name + ".sld"
)
if style_name is None:
style_name = os.path.basename(raster_path)
f = style_name.split(".")
if len(f) > 0:
style_name = f[0]
headers = {"content-type": "text/xml"}
url = "{}/rest/workspaces/{}/styles".format(self.service_url, workspace)
sld_content_type = "application/vnd.ogc.sld+xml"
header_sld = {"content-type": sld_content_type}
if workspace is None:
url = "{}/rest/styles".format(self.service_url)
r = self._requests(
"post",
url,
data=style_xml,
headers=headers,
)
if r.status_code == 201:
with open("style.sld", "rb") as f:
r_sld = self._requests(method="put", url=url + "/" + style_name, data=f.read(), headers=header_sld)
os.remove("style.sld")
if r_sld.status_code == 200:
return r_sld.status_code
else:
raise GeoserverException(r_sld.status_code, r_sld.content)
else:
raise GeoserverException(r.status_code, r.content)
def create_catagorized_featurestyle(
self,
style_name: str,
column_name: str,
column_distinct_values,
workspace: str = None,
color_ramp: str = "tab20",
geom_type: str = "polygon",
):
catagorize_xml(column_name, column_distinct_values, color_ramp, geom_type)
style_xml = "".format(
style_name, style_name + ".sld"
)
headers = {"content-type": "text/xml"}
url = "{}/rest/workspaces/{}/styles".format(self.service_url, workspace)
sld_content_type = "application/vnd.ogc.sld+xml"
header_sld = {"content-type": sld_content_type}
if workspace is None:
url = "{}/rest/styles".format(self.service_url)
r = self._requests(
"post",
url,
data=style_xml,
headers=headers,
)
if r.status_code == 201:
with open("style.sld", "rb") as f:
r_sld = self._requests(
"put",
url + "/" + style_name,
data=f.read(),
headers=header_sld,
)
os.remove("style.sld")
if r_sld.status_code == 200:
return r_sld.status_code
else:
raise GeoserverException(r_sld.status_code, r_sld.content)
else:
raise GeoserverException(r.status_code, r.content)
def create_outline_featurestyle(
self,
style_name: str,
color: str = "#3579b1",
width: str = "2",
geom_type: str = "polygon",
workspace: Optional[str] = None,
):
outline_only_xml(color, width, geom_type)
style_xml = "".format(
style_name, style_name + ".sld"
)
headers = {"content-type": "text/xml"}
url = "{}/rest/workspaces/{}/styles".format(self.service_url, workspace)
sld_content_type = "application/vnd.ogc.sld+xml"
header_sld = {"content-type": sld_content_type}
if workspace is None:
url = "{}/rest/styles".format(self.service_url)
r = self._requests(
"post",
url,
data=style_xml,
headers=headers,
)
if r.status_code == 201:
with open("style.sld", "rb") as f:
r_sld = self._requests(
"put",
url + "/" + style_name,
data=f.read(),
headers=header_sld,
)
os.remove("style.sld")
if r_sld.status_code == 200:
return r_sld.status_code
else:
raise GeoserverException(r_sld.status_code, r_sld.content)
else:
raise GeoserverException(r.status_code, r.content)
def create_classified_featurestyle(
self,
style_name: str,
column_name: str,
column_distinct_values,
workspace: Optional[str] = None,
color_ramp: str = "tab20",
geom_type: str = "polygon",
# outline_color: str = "#3579b1",
):
classified_xml(
style_name,
column_name,
column_distinct_values,
color_ramp,
geom_type,
)
style_xml = "".format(
column_name, column_name + ".sld"
)
headers = {"content-type": "text/xml"}
url = "{}/rest/workspaces/{}/styles".format(self.service_url, workspace)
sld_content_type = "application/vnd.ogc.sld+xml"
header_sld = {"content-type": sld_content_type}
if workspace is None:
url = "{}/rest/styles".format(self.service_url)
r = self._requests(
"post",
url,
data=style_xml,
headers=headers,
)
if r.status_code == 201:
with open("style.sld", "rb") as f:
r_sld = self._requests(
"put",
url + "/" + style_name,
data=f.read(),
headers=header_sld,
)
os.remove("style.sld")
if r_sld.status_code == 200:
return r_sld.status_code
else:
raise GeoserverException(r_sld.status_code, r_sld.content)
else:
raise GeoserverException(r.status_code, r.content)
def publish_style(
self,
layer_name: str,
style_name: str,
workspace: str,
):
headers = {"content-type": "text/xml"}
url = "{}/rest/layers/{}:{}".format(self.service_url, workspace, layer_name)
style_xml = (
"{}".format(
style_name
)
)
r = self._requests(
"put",
url,
data=style_xml,
headers=headers,
)
if r.status_code == 200:
return r.status_code
else:
raise GeoserverException(r.status_code, r.content)
def delete_style(self, style_name: str, workspace: Optional[str] = None):
payload = {"recurse": "true"}
url = "{}/rest/workspaces/{}/styles/{}".format(
self.service_url, workspace, style_name
)
if workspace is None:
url = "{}/rest/styles/{}".format(self.service_url, style_name)
r = self._requests("delete", url, params=payload)
if r.status_code == 200:
return "Status code: {}, delete style".format(r.status_code)
else:
return "Status code: {}, delete style".format(r.status_code)
# raise GeoserverException(r.status_code, r.content)
def create_featurestore(
self,
store_name: str,
workspace: Optional[str] = None,
db: str = "postgres",
host: str = "localhost",
port: int = 5432,
schema: str = "public",
pg_user: str = "postgres",
pg_password: str = "admin",
overwrite: bool = False,
expose_primary_keys: str = "false",
description: Optional[str] = None,
evictor_run_periodicity: Optional[int] = 300,
max_open_prepared_statements: Optional[int] = 50,
encode_functions: Optional[str] = "false",
primary_key_metadata_table: Optional[str] = None,
batch_insert_size: Optional[int] = 1,
preparedstatements: Optional[str] = "false",
loose_bbox: Optional[str] = "true",
estimated_extends: Optional[str] = "true",
fetch_size: Optional[int] = 1000,
validate_connections: Optional[str] = "true",
support_on_the_fly_geometry_simplification: Optional[str] = "true",
connection_timeout: Optional[int] = 20,
create_database: Optional[str] = "false",
min_connections: Optional[int] = 1,
max_connections: Optional[int] = 10,
evictor_tests_per_run: Optional[int] = 3,
test_while_idle: Optional[str] = "true",
max_connection_idle_time: Optional[int] = 300,
):
url = "{}/rest/workspaces/{}/datastores".format(self.service_url, workspace)
headers = {"content-type": "text/xml"}
database_connection = """
{}
{}
{}
{}
{}
{}
{}
postgis
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
""".format(
store_name,
description,
expose_primary_keys,
host,
port,
pg_user,
pg_password,
schema,
db,
evictor_run_periodicity,
max_open_prepared_statements,
encode_functions,
primary_key_metadata_table,
batch_insert_size,
preparedstatements,
estimated_extends,
fetch_size,
validate_connections,
support_on_the_fly_geometry_simplification,
connection_timeout,
create_database,
min_connections,
max_connections,
evictor_tests_per_run,
test_while_idle,
max_connection_idle_time,
loose_bbox,
)
if overwrite:
url = "{}/rest/workspaces/{}/datastores/{}".format(
self.service_url, workspace, store_name
)
r = self._requests(
"put",
url,
data=database_connection,
headers=headers,
)
else:
r = self._requests(
"post",
url,
data=database_connection,
headers=headers,
)
if r.status_code in [200, 201]:
return "Featurestore created/updated successfully"
else:
raise GeoserverException(r.status_code, r.content)
def create_datastore(
self,
name: str,
path: str,
workspace: Optional[str] = None,
overwrite: bool = False,
):
if workspace is None:
workspace = "default"
if path is None:
raise Exception("You must provide a full path to the data")
data_url = "file:{}".format(path)
if "http://" in path:
data_url = "{}".format(path)
data = "{}{}".format(
name, data_url
)
headers = {"content-type": "text/xml"}
if overwrite:
url = "{}/rest/workspaces/{}/datastores/{}".format(
self.service_url, workspace, name
)
r = self._requests("put", url, data=data, headers=headers)
else:
url = "{}/rest/workspaces/{}/datastores".format(self.service_url, workspace)
r = self._requests(method="post", url=url, data=data, headers=headers)
if r.status_code in [200, 201]:
return "Data store created/updated successfully"
else:
raise GeoserverException(r.status_code, r.content)
def create_shp_datastore(
self,
path: str,
store_name: Optional[str] = None,
workspace: Optional[str] = None,
file_extension: str = "shp",
):
if path is None:
raise Exception("You must provide a full path to shapefile")
if workspace is None:
workspace = "default"
if store_name is None:
store_name = os.path.basename(path)
f = store_name.split(".")
if len(f) > 0:
store_name = f[0]
headers = {
"Content-type": "application/zip",
"Accept": "application/xml",
}
if isinstance(path, dict):
path = prepare_zip_file(store_name, path)
url = "{0}/rest/workspaces/{1}/datastores/{2}/file.{3}?filename={2}&update=overwrite".format(
self.service_url, workspace, store_name, file_extension
)
with open(path, "rb") as f:
r = self._requests("put", url, data=f.read(), headers=headers)
if r.status_code in [200, 201, 202]:
return "The shapefile datastore created successfully!"
else:
raise GeoserverException(r.status_code, r.content)
def create_gpkg_datastore(
self,
path: str,
store_name: Optional[str] = None,
workspace: Optional[str] = None,
file_extension: str = "gpkg",
):
if path is None:
raise Exception("You must provide a full path to shapefile")
if workspace is None:
workspace = "default"
if store_name is None:
store_name = os.path.basename(path)
f = store_name.split(".")
if len(f) > 0:
store_name = f[0]
headers = {
"Content-type": "application/x-sqlite3",
"Accept": "application/json",
}
url = "{0}/rest/workspaces/{1}/datastores/{2}/file.{3}?filename={2}".format(
self.service_url, workspace, store_name, file_extension
)
with open(path, "rb") as f:
r = self._requests("put", url, data=f.read(), headers=headers)
if r.status_code in [200, 201, 202]:
return "The geopackage datastore created successfully!"
else:
raise GeoserverException(r.status_code, r.content)
def publish_featurestore(
self,
store_name: str,
pg_table: str,
workspace: Optional[str] = None,
title: Optional[str] = None,
advertised: Optional[bool] = True,
abstract: Optional[str] = None,
keywords: Optional[List[str]] = None,
cqlfilter: Optional[str] = None
) -> int:
if workspace is None:
workspace = "default"
if title is None:
title = pg_table
url = "{}/rest/workspaces/{}/datastores/{}/featuretypes/".format(
self.service_url, workspace, store_name
)
abstract_xml = f"{abstract}" if abstract else ""
keywords_xml = ""
if keywords:
keywords_xml = ""
for keyword in keywords:
keywords_xml += f"{keyword}"
keywords_xml += ""
cqlfilter_xml = f"{cqlfilter}" if cqlfilter else ""
layer_xml = f"""
{pg_table}
{title}
{advertised}
{abstract_xml}
{keywords_xml}
{cqlfilter_xml}
"""
headers = {"content-type": "text/xml;charset=utf-8"}
print(url)
print(layer_xml)
r = self._requests("post", url, data=layer_xml.encode("utf-8"), headers=headers)
if r.status_code == 201 or r.status_code == 200:
return r.status_code
else:
raise GeoserverException(r.status_code, r.content)
def edit_featuretype(
self,
store_name: str,
workspace: Optional[str],
pg_table: str,
name: str,
title: str,
abstract: Optional[str] = None,
keywords: Optional[List[str]] = None,
recalculate: Optional[str] = None
) -> int:
if workspace is None:
workspace = "default"
recalculate_param = f"?recalculate={recalculate}" if recalculate else ""
url = "{}/rest/workspaces/{}/datastores/{}/featuretypes/{}.xml{}".format(
self.service_url, workspace, store_name, pg_table, recalculate_param
)
# Create XML for abstract and keywords
abstract_xml = f"{abstract}" if abstract else ""
keywords_xml = ""
if keywords:
keywords_xml = ""
for keyword in keywords:
keywords_xml += f"{keyword}"
keywords_xml += ""
layer_xml = f"""
{name}
{title}
{abstract_xml}{keywords_xml}
"""
headers = {"content-type": "text/xml"}
r = self._requests("put", url, data=layer_xml, headers=headers)
if r.status_code == 200:
return r.status_code
else:
raise GeoserverException(r.status_code, r.content)
def publish_featurestore_sqlview(
self,
name: str,
store_name: str,
sql: str,
parameters: Optional[Iterable[Dict]] = None,
key_column: Optional[str] = None,
geom_name: str = "geom",
geom_type: str = "Geometry",
srid: Optional[int] = 4326,
workspace: Optional[str] = None,
) -> int:
if workspace is None:
workspace = "default"
# issue #87
if key_column is not None:
key_column_xml = """{}""".format(key_column)
else:
key_column_xml = """"""
parameters_xml = ""
if parameters is not None:
for parameter in parameters:
# non-string parameters MUST have a default value supplied
if not is_surrounded_by_quotes(sql, parameter["name"]) and not "defaultValue" in parameter:
raise ValueError(f"Parameter `{parameter['name']}` appears to be a non-string in the supplied query"
", but does not have a default value specified. You must supply a default value "
"for non-string parameters using the `defaultValue` key.")
param_name = parameter.get("name", "")
default_value = parameter.get("defaultValue", "")
regexp_validator = parameter.get("regexpValidator", r"^[\w\d\s]+$")
parameters_xml += (f"""
{param_name}
{default_value}
{regexp_validator}
\n
""".strip())
layer_xml = """
{0}
true
{4}
{0}
EPSG:{5}
{0}
{1}
true
{2}
{3}
{5}
{6}
{7}
""".format(
name, sql, geom_name, geom_type, workspace, srid, key_column_xml, parameters_xml
)
# rest API url
url = "{}/rest/workspaces/{}/datastores/{}/featuretypes".format(
self.service_url, workspace, store_name
)
# headers
headers = {"content-type": "text/xml"}
# request
r = self._requests("post", url, data=layer_xml, headers=headers)
if r.status_code == 201:
return r.status_code
else:
raise GeoserverException(r.status_code, r.content)
def get_featuretypes(self, workspace: str = None, store_name: str = None) -> List[str]:
url = "{}/rest/workspaces/{}/datastores/{}/featuretypes.json".format(
self.service_url, workspace, store_name
)
r = self._requests("get", url)
if r.status_code == 200:
r_dict = r.json()
features = [i["name"] for i in r_dict["featureTypes"]["featureType"]]
return features
else:
raise GeoserverException(r.status_code, r.content)
def get_feature_attribute(
self, feature_type_name: str, workspace: str, store_name: str
) -> List[str]:
url = "{}/rest/workspaces/{}/datastores/{}/featuretypes/{}.json".format(
self.service_url, workspace, store_name, feature_type_name
)
r = self._requests("get", url)
if r.status_code == 200:
r_dict = r.json()
attribute = [
i["name"] for i in r_dict["featureType"]["attributes"]["attribute"]
]
return attribute
else:
raise GeoserverException(r.status_code, r.content)
def get_featurestore(self, store_name: str, workspace: str) -> dict:
url = "{}/rest/workspaces/{}/datastores/{}".format(
self.service_url, workspace, store_name
)
r = self._requests("get", url)
if r.status_code == 200:
r_dict = r.json()
return r_dict["dataStore"]
else:
raise GeoserverException(r.status_code, r.content)
def delete_featurestore(
self, featurestore_name: str, workspace: Optional[str] = None
) -> str:
payload = {"recurse": "true"}
url = "{}/rest/workspaces/{}/datastores/{}".format(
self.service_url, workspace, featurestore_name
)
if workspace is None:
url = "{}/datastores/{}".format(self.service_url, featurestore_name)
r = self._requests("delete", url, params=payload)
if r.status_code == 200:
return "Status code: {}, delete featurestore".format(r.status_code)
else:
return "Status code: {}, delete featurestore".format(r.status_code)
# raise GeoserverException(r.status_code, r.content)
def delete_coveragestore(
self, coveragestore_name: str, workspace: Optional[str] = None
) -> str:
payload = {"recurse": "true"}
url = "{}/rest/workspaces/{}/coveragestores/{}".format(
self.service_url, workspace, coveragestore_name
)
if workspace is None:
url = "{}/rest/coveragestores/{}".format(
self.service_url, coveragestore_name
)
r = self._requests("delete", url, params=payload)
if r.status_code == 200:
return "Coverage store deleted successfully"
else:
raise GeoserverException(r.status_code, r.content)
def get_all_users(self, service=None) -> dict:
url = "{}/rest/security/usergroup/".format(self.service_url)
if service is None:
url += "users/"
else:
url += "service/{}/users/".format(service)
headers = {"accept": "application/xml"}
r = self._requests("get", url, headers=headers)
if r.status_code == 200:
return parse(r.content)
else:
raise GeoserverException(r.status_code, r.content)
def create_user(
self, username: str, password: str, enabled: bool = True, service=None
) -> str:
url = "{}/rest/security/usergroup/".format(self.service_url)
if service is None:
url += "users/"
else:
url += "service/{}/users/".format(service)
data = "{}{}{}".format(
username, password, str(enabled).lower()
)
headers = {"content-type": "text/xml", "accept": "application/json"}
r = self._requests("post", url, data=data, headers=headers)
if r.status_code == 201:
return "User created successfully"
else:
raise GeoserverException(r.status_code, r.content)
def modify_user(
self, username: str, new_name=None, new_password=None, enable=None, service=None
) -> str:
url = "{}/rest/security/usergroup/".format(self.service_url)
if service is None:
url += "user/{}".format(username)
else:
url += "service/{}/user/{}".format(service, username)
modifications = dict()
if new_name is not None:
modifications["userName"] = new_name
if new_password is not None:
modifications["password"] = new_password
if enable is not None:
modifications["enabled"] = enable
data = unparse({"user": modifications})
print(url, data)
headers = {"content-type": "text/xml", "accept": "application/json"}
r = self._requests("post", url, data=data, headers=headers)
if r.status_code == 200:
return "User modified successfully"
else:
raise GeoserverException(r.status_code, r.content)
def delete_user(self, username: str, service=None) -> str:
url = "{}/rest/security/usergroup/".format(self.service_url)
if service is None:
url += "user/{}".format(username)
else:
url += "service/{}/user/{}".format(service, username)
headers = {"accept": "application/json"}
r = self._requests("delete", url, headers=headers)
if r.status_code == 200:
return "User deleted successfully"
else:
raise GeoserverException(r.status_code, r.content)
def get_all_usergroups(self, service=None) -> dict:
url = "{}/rest/security/usergroup/".format(self.service_url)
if service is None:
url += "groups/"
else:
url += "service/{}/groups/".format(service)
r = self._requests("get", url)
if r.status_code == 200:
return parse(r.content)
else:
raise GeoserverException(r.status_code, r.content)
def create_usergroup(self, group: str, service=None) -> str:
url = "{}/rest/security/usergroup/".format(self.service_url)
if service is None:
url += "group/{}".format(group)
else:
url += "service/{}/group/{}".format(service, group)
r = self._requests("post", url)
if r.status_code == 201:
return "Group created successfully"
else:
raise GeoserverException(r.status_code, r.content)
def delete_usergroup(self, group: str, service=None) -> str:
url = "{}/rest/security/usergroup/".format(self.service_url)
if service is None:
url += "group/{}".format(group)
else:
url += "service/{}/group/{}".format(service, group)
r = self._requests("delete", url)
if r.status_code == 200:
return "Group deleted successfully"
else:
raise GeoserverException(r.status_code, r.content)
def caching_layer(
self,
layer_name: str,
auto_seed: Optional[bool] = True,
zoom_start: Optional[int] = default_cache_start,
zoom_stop: Optional[int] = default_cache_stop,
gridset_name: Optional[str] = default_gridset_name
) -> int:
url = "{}/gwc/rest/layers/{}".format(
self.service_url, layer_name
)
cache_xml = f"""
true
true
{layer_name}
image/png
image/jpeg
{gridset_name}
{zoom_start}
{zoom_stop}
{zoom_start}
{zoom_stop}
4
4
0
0
0
true
"""
headers = {"content-type": "text/xml"}
gridinfo = self._requests("post", url, data=cache_xml, headers=headers)
if gridinfo.status_code == 200:
if auto_seed:
print(f"""发送{layer_name}切片请求。。。""")
self.seed_caching_layer(layer_name=layer_name, zoom_start=zoom_start, zoom_stop=zoom_stop)
return gridinfo.status_code
else:
raise GeoserverException(gridinfo.status_code, gridinfo.content)
def seed_caching_layer(
self,
layer_name: str,
zoom_start: Optional[int] = default_cache_start,
zoom_stop: Optional[int] = default_cache_stop,
gridset_name: Optional[str] = default_gridset_name,
seedtype: Optional[str] = default_seed_type,
) -> int:
url = "{}/gwc/rest/seed/{}?" \
"threadCount=4" \
"&type={}" \
"&gridSetId={}" \
"&tileFormat=image%2Fpng" \
"&zoomStart={}" \
"&zoomStop={}" \
"¶meter_STYLES=" \
"&minX=&minY=&maxX=&maxY=&tileFailureRetryCount=-1&tileFailureRetryWaitTime=100&totalFailuresBeforeAborting=1000".format(
self.service_url, layer_name, seedtype, gridset_name, zoom_start, zoom_stop
)
headers = {"content-type": "application/json"}
gridinfo = self._requests("post", url, data=None, headers=headers)
if gridinfo.status_code == 201 or gridinfo.status_code == 200:
return gridinfo.status_code
else:
raise GeoserverException(gridinfo.status_code, gridinfo.content)