123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297 |
- """
- ***************************************************************************
- Postprocessing.py
- ---------------------
- Date : August 2012
- Copyright : (C) 2012 by Victor Olaya
- Email : volayaf at gmail dot com
- ***************************************************************************
- * *
- * This program is free software; you can redistribute it and/or modify *
- * it under the terms of the GNU General Public License as published by *
- * the Free Software Foundation; either version 2 of the License, or *
- * (at your option) any later version. *
- * *
- ***************************************************************************
- """
- __author__ = 'Victor Olaya'
- __date__ = 'August 2012'
- __copyright__ = '(C) 2012, Victor Olaya'
- import os
- import traceback
- from typing import (
- Dict,
- List,
- Optional,
- Tuple
- )
- from qgis.PyQt.QtCore import QCoreApplication
- from qgis.core import (
- Qgis,
- QgsProcessingFeedback,
- QgsProcessingUtils,
- QgsMapLayer,
- QgsWkbTypes,
- QgsMessageLog,
- QgsProcessingContext,
- QgsProcessingAlgorithm,
- QgsLayerTreeLayer,
- QgsLayerTreeGroup
- )
- from processing.core.ProcessingConfig import ProcessingConfig
- from processing.gui.RenderingStyles import RenderingStyles
- SORT_ORDER_CUSTOM_PROPERTY = '_processing_sort_order'
- def determine_output_name(dest_id: str,
- details: QgsProcessingContext.LayerDetails,
- alg: QgsProcessingAlgorithm,
- context: QgsProcessingContext,
- parameters: Dict) -> str:
- """
- If running a model, the execution will arrive here when an
- algorithm that is part of that model is executed. We check if
- its output is a final output of the model, and adapt the output
- name accordingly
- """
- for out in alg.outputDefinitions():
- if out.name() not in parameters:
- continue
- output_value = parameters[out.name()]
- if hasattr(output_value, "sink"):
- output_value = output_value.sink.valueAsString(
- context.expressionContext()
- )[0]
- else:
- output_value = str(output_value)
- if output_value == dest_id:
- return out.name()
- return details.outputName
- def post_process_layer(output_name: str,
- layer: QgsMapLayer,
- alg: QgsProcessingAlgorithm):
- """
- Applies post-processing steps to a layer
- """
- style = None
- if output_name:
- style = RenderingStyles.getStyle(alg.id(), output_name)
- if style is None:
- if layer.type() == Qgis.LayerType.Raster:
- style = ProcessingConfig.getSetting(ProcessingConfig.RASTER_STYLE)
- elif layer.type() == Qgis.LayerType.Vector:
- if layer.geometryType() == QgsWkbTypes.PointGeometry:
- style = ProcessingConfig.getSetting(
- ProcessingConfig.VECTOR_POINT_STYLE)
- elif layer.geometryType() == QgsWkbTypes.LineGeometry:
- style = ProcessingConfig.getSetting(
- ProcessingConfig.VECTOR_LINE_STYLE)
- else:
- style = ProcessingConfig.getSetting(
- ProcessingConfig.VECTOR_POLYGON_STYLE)
- if style:
- layer.loadNamedStyle(style)
- try:
- from qgis._3d import QgsPointCloudLayer3DRenderer
- if layer.type() == Qgis.LayerType.PointCloud:
- if layer.renderer3D() is None:
- # If the layer has no 3D renderer and syncing 3D to 2D
- # renderer is enabled, we create a renderer and set it up
- # with the 2D renderer
- if layer.sync3DRendererTo2DRenderer():
- renderer_3d = QgsPointCloudLayer3DRenderer()
- renderer_3d.convertFrom2DRenderer(layer.renderer())
- layer.setRenderer3D(renderer_3d)
- except ImportError:
- QgsMessageLog.logMessage(
- QCoreApplication.translate(
- "Postprocessing",
- "3D library is not available, "
- "can't assign a 3d renderer to a layer."
- )
- )
- def create_layer_tree_layer(layer: QgsMapLayer,
- details: QgsProcessingContext.LayerDetails) \
- -> QgsLayerTreeLayer:
- """
- Applies post-processing steps to a QgsLayerTreeLayer created for
- an algorithm's output
- """
- layer_tree_layer = QgsLayerTreeLayer(layer)
- if ProcessingConfig.getSetting(ProcessingConfig.VECTOR_FEATURE_COUNT) and \
- layer.type() == Qgis.LayerType.Vector:
- layer_tree_layer.setCustomProperty("showFeatureCount", True)
- if details.layerSortKey:
- layer_tree_layer.setCustomProperty(SORT_ORDER_CUSTOM_PROPERTY,
- details.layerSortKey)
- return layer_tree_layer
- def get_layer_tree_results_group(details: QgsProcessingContext.LayerDetails,
- context: QgsProcessingContext) \
- -> QgsLayerTreeGroup:
- """
- Returns the destination layer tree group to store results in
- """
- destination_project = details.project or context.project()
- # default to placing results in the top level of the layer tree
- results_group = details.project.layerTreeRoot()
- # if a specific results group is specified in Processing settings,
- # respect it (and create if necessary)
- results_group_name = ProcessingConfig.getSetting(
- ProcessingConfig.RESULTS_GROUP_NAME)
- if results_group_name:
- results_group = destination_project.layerTreeRoot().findGroup(
- results_group_name)
- if not results_group:
- results_group = destination_project.layerTreeRoot().insertGroup(
- 0, results_group_name)
- results_group.setExpanded(True)
- # if this particular output layer has a specific output group assigned,
- # find or create it now
- if details.groupName:
- group = results_group.findGroup(details.groupName)
- if not group:
- group = results_group.insertGroup(
- 0, details.groupName)
- group.setExpanded(True)
- else:
- group = results_group
- return group
- def handleAlgorithmResults(alg: QgsProcessingAlgorithm,
- context: QgsProcessingContext,
- feedback: Optional[QgsProcessingFeedback] = None,
- parameters: Optional[Dict] = None):
- if not parameters:
- parameters = {}
- if feedback is None:
- feedback = QgsProcessingFeedback()
- wrong_layers = []
- feedback.setProgressText(
- QCoreApplication.translate(
- 'Postprocessing',
- 'Loading resulting layers'
- )
- )
- i = 0
- added_layers: List[Tuple[QgsLayerTreeGroup, QgsLayerTreeLayer]] = []
- layers_to_post_process: List[Tuple[QgsMapLayer,
- QgsProcessingContext.LayerDetails]] = []
- for dest_id, details in context.layersToLoadOnCompletion().items():
- if feedback.isCanceled():
- return False
- if len(context.layersToLoadOnCompletion()) > 2:
- # only show progress feedback if we're loading a bunch of layers
- feedback.setProgress(
- 100 * i / float(len(context.layersToLoadOnCompletion()))
- )
- try:
- print(f"开始加载输出图层位置:{dest_id}")
- if os.path.exists(dest_id) and os.path.isfile(dest_id):
- layer = QgsProcessingUtils.mapLayerFromString(
- dest_id,
- context,
- typeHint=details.layerTypeHint
- )
- if layer is not None:
- details.setOutputLayerName(layer)
- output_name = determine_output_name(
- dest_id, details, alg, context, parameters
- )
- post_process_layer(output_name, layer, alg)
- # Load layer to layer tree root or to a specific group
- results_group = get_layer_tree_results_group(details, context)
- # note here that we may not retrieve an owned layer -- eg if the
- # output layer already exists in the destination project
- owned_map_layer = context.temporaryLayerStore().takeMapLayer(layer)
- if owned_map_layer:
- details.project.addMapLayer(owned_map_layer, False)
- # we don't add the layer to the tree yet -- that's done
- # later, after we've sorted all added layers
- layer_tree_layer = create_layer_tree_layer(owned_map_layer, details)
- added_layers.append((results_group, layer_tree_layer))
- if details.postProcessor():
- # we defer calling the postProcessor set in the context
- # until the layer has been added to the project's layer
- # tree, just in case the postProcessor contains logic
- # relating to layer tree handling
- layers_to_post_process.append((layer, details))
- else:
- wrong_layers.append(str(dest_id))
- else:
- print(f"输出图层位置不存在:{dest_id}")
- except Exception:
- QgsMessageLog.logMessage(
- QCoreApplication.translate(
- 'Postprocessing',
- "Error loading result layer:"
- ) + "\n" + traceback.format_exc(),
- 'Processing',
- Qgis.Critical)
- wrong_layers.append(str(dest_id))
- i += 1
- # sort added layer tree layers
- sorted_layer_tree_layers = sorted(
- added_layers,
- key=lambda x: x[1].customProperty(SORT_ORDER_CUSTOM_PROPERTY, 0)
- )
- for group, layer_node in sorted_layer_tree_layers:
- layer_node.removeCustomProperty(SORT_ORDER_CUSTOM_PROPERTY)
- group.insertChildNode(0, layer_node)
- # all layers have been added to the layer tree, so safe to call
- # postProcessors now
- for layer, details in layers_to_post_process:
- details.postProcessor().postProcessLayer(
- layer,
- context,
- feedback)
- feedback.setProgress(100)
- if wrong_layers:
- msg = QCoreApplication.translate(
- 'Postprocessing',
- "The following layers were not correctly generated."
- )
- msg += "\n" + "\n".join([f"• {lay}" for lay in wrong_layers]) + '\n'
- msg += QCoreApplication.translate(
- 'Postprocessing',
- "You can check the 'Log Messages Panel' in QGIS main window "
- "to find more information about the execution of the algorithm.")
- feedback.reportError(msg)
- return len(wrong_layers) == 0
|