# -*- coding: utf-8 -*- __author__ = 'wanger' __description__ = '批量生成栅格数据元数据表格 EXCEL' __date__ = '2024-11-25' __copyright__ = '(C) 2024 by siwei' __revision__ = '1.0' import os from osgeo import gdal, osr import openpyxl from qgis.core import ( QgsRasterLayer, QgsSpatialIndex, QgsGeometry ) from qgis.PyQt.QtCore import NULL from qgis.PyQt.QtCore import QCoreApplication from qgis.core import (QgsExpression, QgsFeatureRequest, QgsVectorLayer, QgsProcessingAlgorithm, QgsProcessingParameterFolderDestination, QgsProcessingParameterString, QgsProcessingParameterFile) class MakeMetadataProcessingAlgorithm(QgsProcessingAlgorithm): rasterfolder = 'rasterfolder' template = 'template' outputfolder = 'outputfolder' # 图幅xy标识位取值 5:8,8:11 splittingnum = 'splittingnum' splittingnumdefault = '5:8,8:11' def tr(self, string): return QCoreApplication.translate('Processing', string) def createInstance(self): return MakeMetadataProcessingAlgorithm() def name(self): return 'makeMetadata' def displayName(self): return self.tr('生成栅格元数据') def group(self): return self.tr('栅格元数据生成') def groupId(self): return 'metadata' def shortHelpString(self): return self.tr("将选择文件夹内的所有栅格数据根据元数据模板动态生成元数据表。") def initAlgorithm(self, config=None): self.addParameter(QgsProcessingParameterFolderDestination(self.rasterfolder, '分幅数据文件夹')) self.addParameter(QgsProcessingParameterFile(self.template, '模板文件', extension='xlsx')) self.addParameter(QgsProcessingParameterFolderDestination(self.outputfolder, '元数据输出文件夹')) self.addParameter( QgsProcessingParameterString(self.splittingnum, self.tr('图幅xy标识位'), self.splittingnumdefault)) # 通过文件路径来获取文件名和文件后缀 def getImageNumberAndFormat(self, filepath): # 获取文件名(包括后缀) file_name = os.path.basename(filepath) # 获取文件名(不包括后缀) file_name_no_ext = os.path.splitext(file_name)[0] # 获取文件后缀 file_ext = os.path.splitext(file_name)[1][1:] return file_name_no_ext, file_ext # 通过文件路径获取文件大小 def getFileSize(self, filepath): try: file_size = os.path.getsize(filepath) size_mb = file_size / (1024 * 1024) return f"{size_mb:.2f}" except FileNotFoundError: print("文件未找到,请检查路径是否正确!") return 0 # 获取左上角点 def getImageLeftTop(self, dataset): # 获取地理变换参数 geotransform = dataset.GetGeoTransform() if not geotransform: raise ValueError("无法获取地理变换参数!") # 左上角像素的坐标(投影坐标系) x_min = geotransform[0] y_max = geotransform[3] # 获取栅格的空间参考(投影坐标系) spatial_ref = osr.SpatialReference() spatial_ref.ImportFromWkt(dataset.GetProjection()) # 转换为地理坐标系(经纬度) if spatial_ref.IsProjected(): # spatial_ref_geo = spatial_ref.CloneGeogCS() wgs84_crs = osr.SpatialReference() wgs84_crs.ImportFromEPSG(4326) transform = osr.CoordinateTransformation(spatial_ref, wgs84_crs) lon, lat, _ = transform.TransformPoint(x_min, y_max) else: lon, lat = x_min, y_max # 如果已经是地理坐标系,直接使用 return { "lon": lat, "lat": lon, "y": int(x_min), "x": int(y_max) } # 通过栅格数据与矢量数据叠加分析 获取相交面积最大的一块 返回要素 def get_largest_overlapping_feature(self, raster_path, shp_path): """ 查询与栅格相交的矢量要素中面积最大的一块,并返回指定字段的值。 :param raster_path: 栅格文件路径 :param shp_path: 矢量文件路径 :return: 面积最大的要素 """ # 加载栅格图层 raster_layer = QgsRasterLayer(raster_path, "Raster Layer") if not raster_layer.isValid(): raise Exception(f"栅格文件无效: {raster_path}") # 加载矢量图层 vector_layer = QgsVectorLayer(shp_path, "Vector Layer", "ogr") if not vector_layer.isValid(): raise Exception(f"SHP文件无效: {shp_path}") # 获取栅格的范围 raster_extent = raster_layer.extent() # 创建空间索引以加速查询 spatial_index = QgsSpatialIndex(vector_layer.getFeatures()) # 查询与栅格范围相交的要素ID intersecting_ids = spatial_index.intersects(raster_extent) # 遍历这些要素并计算相交区域的面积 max_area = 0 largest_feature = None for fid in intersecting_ids: feature = vector_layer.getFeature(fid) intersection = feature.geometry().intersection(QgsGeometry.fromRect(raster_extent)) # 检查是否有有效相交区域 if intersection.isEmpty(): continue # 计算相交区域的面积 area = intersection.area() # 找到面积最大的要素 if area > max_area: max_area = area largest_feature = feature if largest_feature: return largest_feature else: return None # 替换字符串 type (first,last) def replacestr(self, string, oldvalue, newvalue, type): oldvalue = str(oldvalue) newvalue = str(newvalue) if type == "first": return string.replace(oldvalue, newvalue, 1) elif type == "last": # 查找最后一次出现的 oldvalue 的位置 index = string.rfind(oldvalue) if index != -1: # 用切片将 last occurrence 替换 return string[:index] + newvalue + string[index + len(oldvalue):] # 获取接边情况 def getSplitting(self, image_number, image_format, imagex, imagey, length, folder_path, selectfeature): east = f"{self.replacestr(string=image_number, oldvalue=str(imagey).zfill(length), newvalue=str(imagey + 1).zfill(length), type='last')}.{image_format}" west = f"{self.replacestr(string=image_number, oldvalue=str(imagey).zfill(length), newvalue=str(imagey - 1).zfill(length), type='last')}.{image_format}" north = f"{self.replacestr(string=image_number, oldvalue=str(imagex).zfill(length), newvalue=str(imagex - 1).zfill(length), type='first')}.{image_format}" south = f"{self.replacestr(string=image_number, oldvalue=str(imagex).zfill(length), newvalue=str(imagex + 1).zfill(length), type='first')}.{image_format}" return { "east": selectfeature["east"] if selectfeature["east"] != NULL else self.hasFile(east, folder_path), "west": selectfeature["west"] if selectfeature["west"] != NULL else self.hasFile(west, folder_path), "north": selectfeature["north"] if selectfeature["north"] != NULL else self.hasFile(north, folder_path), "south": selectfeature["south"] if selectfeature["south"] != NULL else self.hasFile(south, folder_path), } # 判断有没有文件 def hasFile(self, filepath, folder_path): if os.path.exists(f"{folder_path}\\{filepath}"): return "已接" else: return "未接" # 处理数值 def processingNumericalValues(self, value): if isinstance(value, str): try: value = float(value) except ValueError: print(f"无法将 '{value}' 转换为浮动数") return None if value.is_integer(): return int(value) else: return round(value, 2) # 经纬度转度分秒 def decimal_to_dms(self, deg): # 计算度(Degrees) degrees = int(deg) # 计算分(Minutes),并截取小数部分 minutes = int((deg - degrees) * 60) # 计算秒(Seconds),保留两位小数 seconds = round((deg - degrees - minutes / 60) * 3600, 2) return f"{degrees}{minutes}{seconds}" # 运行 def processAlgorithm(self, parameters, context, feedback): # 获取模板文件夹 template_file = self.parameterAsString(parameters, self.template, context) # 栅格数据文件夹 raster_folder = self.parameterAsString(parameters, self.rasterfolder, context) # 元数据生成目录 output_folder = self.parameterAsString(parameters, self.outputfolder, context) # 通过图号取接边数据标识位 splittingnum = self.parameterAsString(parameters, self.splittingnum, context) print(template_file, raster_folder, output_folder) # TODO 栅格数据文件夹 folder_path = raster_folder # TODO 元数据输出文件夹 output_path = output_folder # TODO 元数据模板路径 templatepath = template_file # 获取元数据模板文件所在的目录 templatedirectory = os.path.dirname(os.path.abspath(templatepath)) # TODO 空间分析图层位置 默认在模板文件夹内有一个overlap.shp 点线面要素无所谓 overlapVectorPath = f"{templatedirectory}\\overlap.shp" # TODO 网格数据查询 默认在模板文件夹内有一个fishnet.shp 点线面要素无所谓 fishVectorPath = f"{templatedirectory}\\fishnet.shp" fishlayer = QgsVectorLayer(fishVectorPath, "Fish Layer", "ogr") fish_field_name = "imagenum" # 绑定需要从shp数据提取的信息 # data_gain_date 影像/数据获取时间 # data_resolution 影像分辨率/点云密度 overlapParams = { "data_resolution": "resolution", "data_gain_date": "gaindate", "source_type": "sourcetype" } # 记录总计生成了多少个元数据表 totalNumber = 0 # 遍历文件夹中的所有文件 for filename in os.listdir(folder_path): filepath = os.path.join(folder_path, filename) # 检查文件扩展名 if filename.endswith('.img') or filename.endswith('.tif'): dataset = gdal.Open(filepath) # TODO 读取图号 image_number, image_format = self.getImageNumberAndFormat(filepath) file_size = self.getFileSize(filepath) # 通过图号获取网格要素 sql_query = f"strpos('{image_number}', \"{fish_field_name}\") > 0" expression = QgsExpression(sql_query) request = QgsFeatureRequest(expression) # 获取查询结果 matching_features = [f for f in fishlayer.getFeatures(request)] selectfeature = matching_features[0] if dataset: # TODO 获取栅格数据的行数、列数 grid_row = dataset.RasterYSize grid_col = dataset.RasterXSize # TODO波段数 band_size = dataset.RasterCount # TODO 无效值 list 用逗号隔开 nodata_list = [] # TODO 计算高程记录的小数点位数 decimal_number = None for band_index in range(1, dataset.RasterCount + 1): band = dataset.GetRasterBand(band_index) nodata_list.append(int(band.GetNoDataValue())) stats = band.GetStatistics(True, True) # 获取统计信息 min_value, max_value = stats[0], stats[1] if min_value is not None and max_value is not None: # 获取高程小数位数的最大可能值 decimal_number = max(len(str(min_value).split(".")[-1]), len(str(max_value).split(".")[-1])) if '.' in str( min_value) else 0 # TODO 获取栅格数据左上角标 cornerpoint = self.getImageLeftTop(dataset) # 获取地理信息 geo_transform = dataset.GetGeoTransform() # TODO 格网单元尺寸 x_resolution = abs(geo_transform[1]) # 像素宽度 y_resolution = abs(geo_transform[5]) # 像素高度 grid_size = f"{self.processingNumericalValues(x_resolution)}*{self.processingNumericalValues(y_resolution)}" # 获取投影信息 projection = dataset.GetProjection() # 获取栅格数据的投影信息 # 创建 SpatialReference 对象并加载投影信息 spatial_ref = osr.SpatialReference() spatial_ref.ImportFromWkt(projection) # TODO 投影参数集合 central_meridian = None ellipsoid_radius = None ellipsoid_flat = None geodetic_datum = None projection_name = None divided_into_zones = "无分带" with_number = "无代号" plane_coordinate_unit = "米" altitude_name = "正常高" altitude_benchmark = "1985国家高程基准" if spatial_ref.IsProjected(): # TODO 中央子午线 central_meridian = self.processingNumericalValues( spatial_ref.GetProjParm(osr.SRS_PP_CENTRAL_MERIDIAN)) # TODO 椭球长半径 ellipsoid_radius = self.processingNumericalValues(spatial_ref.GetSemiMajor()) # 长半轴 # TODO 椭球扁率 ellipsoid_flat = f"1/{spatial_ref.GetInvFlattening():.3f}" # 反扁率(1/扁率) # TODO 所采用大地基准 geodetic_datum = spatial_ref.GetAttrValue("DATUM") # TODO 地图投影名称 projection_name = spatial_ref.GetAttrValue("PROJECTION") # TODO 分带方式(适用于 UTM) if geodetic_datum == "China_2000": if projection.__contains__("-degree"): number = projection.split("-degree")[0][-1:] divided_into_zones = f'{number}度带' with_number = int(central_meridian / int(number)) elif spatial_ref.IsProjected() and "UTM" in projection: zone = spatial_ref.GetUTMZone() divided_into_zones = "UTM分带" if zone else "其他" with_number = zone if zone else "N/A" # TODO 平面坐标单位 plane_coordinate_unit = spatial_ref.GetLinearUnitsName() linear_units = spatial_ref.GetLinearUnits() # 单位与米的换算关系 # 高程系统(如果有) altitude_name = spatial_ref.GetAttrValue("VERT_CS") altitude_benchmark = spatial_ref.GetAttrValue("VERT_DATUM") # 获取四向接边情况 numlist = splittingnum.split(",") length = int(numlist[0].split(":")[1]) - int(numlist[0].split(":")[0]) imagex = int(image_number[int(numlist[0].split(":")[0]):int(numlist[0].split(":")[1])]) imagey = int(image_number[int(numlist[1].split(":")[0]):int(numlist[1].split(":")[1])]) splitting = self.getSplitting(image_number, image_format, imagex, imagey, length, folder_path, selectfeature) metadatadict = { "image_number": image_number, "data_format": image_format, "data_volume": file_size, "grid_size": grid_size, "grid_row": grid_row, "grid_col": grid_col, "decimal_number": decimal_number, "nodata_number": ",".join(map(str, nodata_list)), "cornerpoint_lon": self.decimal_to_dms(cornerpoint["lon"]), "cornerpoint_lat": self.decimal_to_dms(cornerpoint["lat"]), "cornerpoint_x": cornerpoint["x"], "cornerpoint_y": cornerpoint["y"], "ellipsoid_radius": ellipsoid_radius, "ellipsoid_flat": ellipsoid_flat, "geodetic_datum": "2000国家大地坐标系" if geodetic_datum == "China_2000" else geodetic_datum, "projection_name": "高斯-克吕格投影" if projection_name == "Transverse_Mercator" else projection_name, "central_meridian": central_meridian, "divided_into_zones": divided_into_zones, "with_number": with_number, "plane_coordinate_unit": "米" if plane_coordinate_unit == "metre" else plane_coordinate_unit, "altitude_name": altitude_name, "altitude_benchmark": altitude_benchmark, "join_edge_w": splitting["west"], "join_edge_e": splitting["east"], "join_edge_n": splitting["north"], "join_edge_s": splitting["south"], "land_forms": selectfeature["landforms"] } for field_name in selectfeature.fields().names(): metadatadict[field_name] = selectfeature[field_name] print(f"栅格数据参数获取完成:{image_number}") # TODO 空间叠加 for key in overlapParams: value = overlapParams[key] largest_feature = self.get_largest_overlapping_feature(raster_path=filepath, shp_path=overlapVectorPath) if key == "data_gain_date": data_gain_date = largest_feature[value] field_value = f"{data_gain_date[0:4]}/{data_gain_date[4:6]}/{data_gain_date[6:8]}" else: field_value = largest_feature[value] metadatadict[key] = field_value print(f"开始生成元数据:{image_number}") # 利用模板生成excel workbook = openpyxl.load_workbook(templatepath) sheet = workbook.active # 定义替换数据的字典 replacements = {} for attr in metadatadict: key = f"<{attr}>" value = metadatadict[attr] replacements[key] = value # 遍历单元格,替换占位符 for row in sheet.iter_rows(): for cell in row: if isinstance(cell.value, str): # 确保单元格内容是字符串 for placeholder, replacement in replacements.items(): if placeholder in cell.value: # 替换占位符 try: cell.value = cell.value.replace(placeholder, str(replacement)) except Exception as e: print(e) # 保存修改后的文件 workbook.save(f'{output_path}\\{image_number}.xls') totalNumber = totalNumber + 1 print(f"元数据生成完成:{image_number}") print(f"共生成{totalNumber}个元数据表。") return { "状态": "成功", "任务": f"共生成{totalNumber}个元数据表。" }