123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311 |
- """
- ***************************************************************************
- TopoColors.py
- --------------
- Date : February 2017
- Copyright : (C) 2017 by Nyall Dawson
- Email : nyall dot dawson at gmail dot com
- ***************************************************************************
- * *
- * This program is free software; you can redistribute it and/or modify *
- * it under the terms of the GNU General Public License as published by *
- * the Free Software Foundation; either version 2 of the License, or *
- * (at your option) any later version. *
- * *
- ***************************************************************************
- """
- __author__ = 'Nyall Dawson'
- __date__ = 'February 2017'
- __copyright__ = '(C) 2017, Nyall Dawson'
- import os
- import operator
- import sys
- from collections import defaultdict
- from qgis.core import (QgsField,
- QgsFeatureSink,
- QgsGeometry,
- QgsSpatialIndex,
- QgsPointXY,
- NULL,
- QgsProcessing,
- QgsProcessingException,
- QgsProcessingParameterFeatureSource,
- QgsProcessingParameterDistance,
- QgsProcessingParameterNumber,
- QgsProcessingParameterEnum,
- QgsProcessingParameterFeatureSink)
- from qgis.PyQt.QtCore import (QVariant)
- from processing.algs.qgis.QgisAlgorithm import QgisAlgorithm
- pluginPath = os.path.split(os.path.split(os.path.dirname(__file__))[0])[0]
- class TopoColor(QgisAlgorithm):
- INPUT = 'INPUT'
- MIN_COLORS = 'MIN_COLORS'
- MIN_DISTANCE = 'MIN_DISTANCE'
- BALANCE = 'BALANCE'
- OUTPUT = 'OUTPUT'
- def tags(self):
- return self.tr('topocolor,colors,graph,adjacent,assign').split(',')
- def group(self):
- return self.tr('Cartography')
- def groupId(self):
- return 'cartography'
- def __init__(self):
- super().__init__()
- def initAlgorithm(self, config=None):
- self.addParameter(QgsProcessingParameterFeatureSource(self.INPUT,
- self.tr('Input layer'),
- [QgsProcessing.TypeVectorPolygon]))
- self.addParameter(QgsProcessingParameterNumber(self.MIN_COLORS,
- self.tr('Minimum number of colors'), minValue=1, maxValue=1000,
- defaultValue=4))
- self.addParameter(QgsProcessingParameterDistance(self.MIN_DISTANCE,
- self.tr('Minimum distance between features'),
- parentParameterName=self.INPUT, minValue=0.0,
- defaultValue=0.0))
- balance_by = [self.tr('By feature count'),
- self.tr('By assigned area'),
- self.tr('By distance between colors')]
- self.addParameter(QgsProcessingParameterEnum(
- self.BALANCE,
- self.tr('Balance color assignment'),
- options=balance_by, defaultValue=0))
- self.addParameter(
- QgsProcessingParameterFeatureSink(self.OUTPUT, self.tr('Colored'), QgsProcessing.TypeVectorPolygon))
- def name(self):
- return 'topologicalcoloring'
- def displayName(self):
- return self.tr('Topological coloring')
- def processAlgorithm(self, parameters, context, feedback):
- source = self.parameterAsSource(parameters, self.INPUT, context)
- if source is None:
- raise QgsProcessingException(self.invalidSourceError(parameters, self.INPUT))
- min_colors = self.parameterAsInt(parameters, self.MIN_COLORS, context)
- balance_by = self.parameterAsEnum(parameters, self.BALANCE, context)
- min_distance = self.parameterAsDouble(parameters, self.MIN_DISTANCE, context)
- fields = source.fields()
- fields.append(QgsField('color_id', QVariant.Int))
- (sink, dest_id) = self.parameterAsSink(parameters, self.OUTPUT, context,
- fields, source.wkbType(), source.sourceCrs())
- if sink is None:
- raise QgsProcessingException(self.invalidSinkError(parameters, self.OUTPUT))
- features = {f.id(): f for f in source.getFeatures()}
- topology, id_graph = self.compute_graph(features, feedback, min_distance=min_distance)
- feature_colors = ColoringAlgorithm.balanced(features,
- balance=balance_by,
- graph=topology,
- feedback=feedback,
- min_colors=min_colors)
- if len(feature_colors) == 0:
- return {self.OUTPUT: dest_id}
- max_colors = max(feature_colors.values())
- feedback.pushInfo(self.tr('{} colors required').format(max_colors))
- total = 20.0 / len(features)
- current = 0
- for feature_id, input_feature in features.items():
- if feedback.isCanceled():
- break
- output_feature = input_feature
- attributes = input_feature.attributes()
- if feature_id in feature_colors:
- attributes.append(feature_colors[feature_id])
- else:
- attributes.append(NULL)
- output_feature.setAttributes(attributes)
- sink.addFeature(output_feature, QgsFeatureSink.FastInsert)
- current += 1
- feedback.setProgress(80 + int(current * total))
- return {self.OUTPUT: dest_id}
- @staticmethod
- def compute_graph(features, feedback, create_id_graph=False, min_distance=0):
- """ compute topology from a layer/field """
- s = Graph(sort_graph=False)
- id_graph = None
- if create_id_graph:
- id_graph = Graph(sort_graph=True)
- # skip features without geometry
- features_with_geometry = {f_id: f for (f_id, f) in features.items() if f.hasGeometry()}
- total = 70.0 / len(features_with_geometry) if features_with_geometry else 1
- index = QgsSpatialIndex()
- i = 0
- for feature_id, f in features_with_geometry.items():
- if feedback.isCanceled():
- break
- g = f.geometry()
- if min_distance > 0:
- g = g.buffer(min_distance, 5)
- engine = QgsGeometry.createGeometryEngine(g.constGet())
- engine.prepareGeometry()
- feature_bounds = g.boundingBox()
- # grow bounds a little so we get touching features
- feature_bounds.grow(feature_bounds.width() * 0.01)
- intersections = index.intersects(feature_bounds)
- for l2 in intersections:
- f2 = features_with_geometry[l2]
- if engine.intersects(f2.geometry().constGet()):
- s.add_edge(f.id(), f2.id())
- s.add_edge(f2.id(), f.id())
- if id_graph:
- id_graph.add_edge(f.id(), f2.id())
- index.addFeature(f)
- i += 1
- feedback.setProgress(int(i * total))
- for feature_id, f in features_with_geometry.items():
- if feedback.isCanceled():
- break
- if feature_id not in s.node_edge:
- s.add_edge(feature_id, None)
- return s, id_graph
- class ColoringAlgorithm:
- @staticmethod
- def balanced(features, graph, feedback, balance=0, min_colors=4):
- feature_colors = {}
- # start with minimum number of colors in pool
- color_pool = set(range(1, min_colors + 1))
- # calculate count of neighbours
- neighbour_count = defaultdict(int)
- for feature_id, neighbours in graph.node_edge.items():
- neighbour_count[feature_id] += len(neighbours)
- # sort features by neighbour count - we want to handle those with more neighbours first
- sorted_by_count = [feature_id for feature_id in sorted(neighbour_count.items(),
- key=operator.itemgetter(1),
- reverse=True)]
- # counts for each color already assigned
- color_counts = defaultdict(int)
- color_areas = defaultdict(float)
- for c in color_pool:
- color_counts[c] = 0
- color_areas[c] = 0
- total = 10.0 / len(sorted_by_count) if sorted_by_count else 1
- i = 0
- for (feature_id, n) in sorted_by_count:
- if feedback.isCanceled():
- break
- # first work out which already assigned colors are adjacent to this feature
- adjacent_colors = {
- feature_colors[neighbour]
- for neighbour in graph.node_edge[feature_id]
- if neighbour in feature_colors
- }
- # from the existing colors, work out which are available (ie non-adjacent)
- available_colors = color_pool.difference(adjacent_colors)
- feature_color = -1
- if len(available_colors) == 0:
- # no existing colors available for this feature, so add new color to pool and repeat
- min_colors += 1
- return ColoringAlgorithm.balanced(features, graph, feedback, balance, min_colors)
- else:
- if balance == 0:
- # choose least used available color
- counts = [(c, v) for c, v in color_counts.items() if c in available_colors]
- feature_color = sorted(counts, key=operator.itemgetter(1))[0][0]
- color_counts[feature_color] += 1
- elif balance == 1:
- areas = [(c, v) for c, v in color_areas.items() if c in available_colors]
- feature_color = sorted(areas, key=operator.itemgetter(1))[0][0]
- color_areas[feature_color] += features[feature_id].geometry().area()
- elif balance == 2:
- min_distances = {c: sys.float_info.max for c in available_colors}
- this_feature_centroid = features[feature_id].geometry().centroid().constGet()
- # find features for all available colors
- other_features = {f_id: c for (f_id, c) in feature_colors.items() if c in available_colors}
- # loop through these, and calculate the minimum distance from this feature to the nearest
- # feature with each assigned color
- for other_feature_id, c in other_features.items():
- if feedback.isCanceled():
- break
- other_geometry = features[other_feature_id].geometry()
- other_centroid = other_geometry.centroid().constGet()
- distance = this_feature_centroid.distanceSquared(other_centroid)
- if distance < min_distances[c]:
- min_distances[c] = distance
- # choose color such that minimum distance is maximised! ie we want MAXIMAL separation between
- # features with the same color
- feature_color = sorted(min_distances, key=min_distances.__getitem__, reverse=True)[0]
- feature_colors[feature_id] = feature_color
- i += 1
- feedback.setProgress(70 + int(i * total))
- return feature_colors
- class Graph:
- def __init__(self, sort_graph=True):
- self.sort_graph = sort_graph
- self.node_edge = {}
- def add_edge(self, i, j):
- ij = [i, j]
- if self.sort_graph:
- ij.sort()
- (i, j) = ij
- if i in self.node_edge:
- self.node_edge[i].add(j)
- else:
- self.node_edge[i] = {j}
- def make_full(self):
- g = Graph(sort_graph=False)
- for k in self.node_edge.keys():
- for v in self.node_edge[k]:
- g.add_edge(v, k)
- g.add_edge(k, v)
- return g
|