# -*- coding: utf-8 -*- import os import sys import json import log import appconfig import arcpy import utils from arcpy import env from db.oracle import Oracle import uuid # 全局用地类型定义 LAND_TYPES = { 'SYJSYD': { 'name': 'SYJSYD', # 拼音简化名称 'chinese_name': '所有建设用地', 'cxtj': 'SYJSYD', 'xzq_field': 'QSDWDM' # 行政编码字段名称 }, 'YJJBNT': { 'name': 'YJJBNT', 'chinese_name': '永久基本农田', 'xzq_field': 'YJJBNTTBBH' # 行政编码字段名称 }, 'STBHHL': { 'name': 'STBHHL', 'chinese_name': '生态保护红线', 'xzq_field': 'XZQDM' # 行政编码字段名称 }, 'GYYD': { 'name': 'GYYD', 'chinese_name': '工业用地', 'xzq_field': 'QSDWDM' # 行政编码字段名称 }, 'CZKFBJ': { 'name': 'CZKFBJ', 'chinese_name': '城镇开发边界', 'xzq_field': 'XZQDM' # 行政编码字段名称 }, 'CZZYD': { 'name': 'CZZYD', 'chinese_name': '城镇住宅用地', 'xzq_field': 'QSDWDM' # 行政编码字段名称 } } class Csydjc: """ 图层面积计算系统 功能: - 计算图层间的相交面积 - 计算图层间的擦除面积 流程: 1. 查询行政区域数据 2. 查询各类用地查询条件 3. 生成临时GDB文件 4. 计算图层面积 5. 输出计算结果 """ def __init__(self, year=None, xzq_id=None): self.db = Oracle(appconfig.DB_CONN) self.outputname = "CSYDJC_YJFX" self.year = year self.xzq_id = xzq_id log.info("图层面积计算系统开始") # 临时 GDB - 按年份和行政区域组织缓存目录 self.root = os.path.dirname(os.path.abspath(__file__)) # 如果提供了年份和行政区域,使用缓存目录 if year and xzq_id: cache_dir = "cache_{}_{}_{}".format(year, xzq_id, self.outputname) self.cache_path = os.path.join(self.root, "cache") self.outGdb = os.path.join(self.cache_path, "{}.gdb".format(cache_dir)) else: # 兼容旧版本,使用随机目录 random_suffix = uuid.uuid4().hex[:8] path = os.path.join(self.root, "out", random_suffix) if not os.path.exists(path): os.makedirs(path) self.outGdb = os.path.join(path, "{}.gdb".format(self.outputname)) self.cache_path = path # 确保缓存目录存在 if not os.path.exists(self.cache_path): os.makedirs(self.cache_path) # 只有当GDB不存在时才创建 if not arcpy.Exists(self.outGdb): arcpy.CreateFileGDB_management(self.cache_path, os.path.basename(self.outGdb)) log.info("创建新的GDB缓存: {}".format(self.outGdb)) else: log.info("使用现有GDB缓存: {}".format(self.outGdb)) env.overwriteOutput = True def get_xzq_data(self, xzq_id="1505"): """ 从v_yzt_zysxcx表查询行政区域数据 查询条件:id=xzq_id或pid=xzq_id,且type='XZQ' """ sql = """ SELECT id, name, pid, type FROM v_yzt_zysxcx WHERE (id = '{}' OR pid = '{}') AND type = 'XZQ' ORDER BY id """.format(xzq_id, xzq_id) log.info("查询行政区域数据: " + sql) xzq_data = self.db.query(sql) log.info("查询到{}条行政区域数据".format(len(xzq_data))) return xzq_data def generate_land_conditions_template(self, year): """ 生成用地查询条件模板字典,不包含具体的行政ID 返回格式: {land_type: {'SJLX': sjlx, 'CXTJ_TEMPLATE': cxtj_template, 'XZQ_FIELD': xzq_field, 'SSMK': ssmk}} """ # 使用全局LAND_TYPES变量 land_types = [land_type['chinese_name'] for land_type in LAND_TYPES.values()] # 构建IN条件,一次性查询所有数据 land_types_str = "','".join(land_types) sql = """ SELECT name, sjlx, cxtj, ssmk FROM t_fxpj_ctfx_yz WHERE name IN ('{}') """.format(land_types_str) log.info("查询用地条件模板: {}".format(sql)) try: results = self.db.query(sql) conditions_template = {} if results: for result in results: name = result.get('NAME') sjlx = result.get('SJLX') cxtj = result.get('CXTJ') ssmk = result.get('SSMK') # 确保数据类型正确 sjlx = str(sjlx) if sjlx is not None else '' cxtj = str(cxtj) if cxtj is not None else '' name = str(name) if name is not None else '' ssmk = str(ssmk) if ssmk is not None else '' # 处理sjlx:如果是DLTB,创建模板 if sjlx == 'DLTB': sjlx_template = "DLTB_{xzq_id}00_{year}12" elif sjlx == 'YJJBNTBHTB': sjlx_template = 'yjjbntbhtb_150500_202212' else: sjlx_template = sjlx # 在所有sjlx前面添加sde.前缀 if sjlx_template: sjlx_template = "sde." + sjlx_template # 处理cxtj:根据年份和条件处理查询条件 cxtj_base = cxtj if cxtj and ';' in cxtj: if year > 2018: # 年份大于2018:取;后面的条件 parts = cxtj.split(';') if len(parts) > 1: cxtj_base = parts[1].strip() # 若条件中有=号,则取=号前面的 if '=' in cxtj_base: cxtj_base = cxtj_base.split('=')[0].strip() else: # 年份小于等于2018:取;前面的条件 parts = cxtj.split(';') if len(parts) > 0: cxtj_base = parts[0].strip() # 若条件中有=号,则取=号前面的 if '=' in cxtj_base: cxtj_base = cxtj_base.split('=')[0].strip() # 根据用地类型获取对应的行政编码字段名称 xzq_field = 'XZQDM' # 默认字段名称 for land_type_key, land_type_info in LAND_TYPES.items(): if land_type_info['chinese_name'] == name: xzq_field = land_type_info.get('xzq_field', 'XZQDM') break # 构建条件模板对象 condition_template = { 'SJLX_TEMPLATE': sjlx_template, 'CXTJ_BASE': cxtj_base, 'XZQ_FIELD': xzq_field, 'SSMK': ssmk } conditions_template[name] = condition_template log.info("{}条件模板: sjlx_template={}, cxtj_base={}, xzq_field={}, ssmk={}".format( name, sjlx_template, cxtj_base, xzq_field, ssmk)) # 检查是否所有需要的用地类型都查询到了 for land_type in land_types: if land_type not in conditions_template: log.warning("未找到{}的查询条件模板".format(land_type)) else: log.error("未查询到任何用地条件数据") except Exception as e: log.error("查询用地条件模板失败: {}".format(str(e))) conditions_template = {} return conditions_template def apply_xzq_to_conditions(self, conditions_template, year, xzq_id): """ 将行政ID应用到条件模板中,生成具体的查询条件 """ conditions = {} for land_type, template in conditions_template.items(): sjlx_template = template['SJLX_TEMPLATE'] cxtj_base = template['CXTJ_BASE'] xzq_field = template['XZQ_FIELD'] ssmk = template['SSMK'] # 应用年份和行政ID到sjlx模板 sjlx = sjlx_template.format(year=year, xzq_id=xzq_id[0:4]) # 构建完整的cxtj条件 if cxtj_base: # 如果已有条件,添加AND连接符 cxtj = "{} AND {} LIKE '{}%'".format(cxtj_base, xzq_field, xzq_id) else: # 如果没有条件,直接设置行政id条件 cxtj = "{} LIKE '{}%'".format(xzq_field, xzq_id) # 构建最终条件对象 condition = { 'SJLX': sjlx, 'CXTJ': cxtj, 'SSMK': ssmk } conditions[land_type] = condition return conditions def get_land_type_name(self, chinese_name): """ 根据中文名称获取拼音简化名称 """ for key, value in LAND_TYPES.items(): if value['chinese_name'] == chinese_name: return value['name'] return chinese_name.replace(' ', '_') # 如果找不到,使用原名称并替换空格 def check_cache_exists(self, land_type, xzq_id): """ 检查指定用地类型的缓存是否存在 """ # 使用拼音简化名称进行文件命名 land_type_name = self.get_land_type_name(land_type) output_fc = os.path.join(self.outGdb, "{}_{}".format(land_type_name, xzq_id)) exists = arcpy.Exists(output_fc) if exists: # 检查要素类是否有数据 try: count = int(arcpy.GetCount_management(output_fc).getOutput(0)) if count > 0: log.info("发现{}的缓存数据: {} (共{}个要素)".format(land_type, output_fc, count)) return True, output_fc else: log.warning("{}的缓存文件存在但无数据,将重新查询".format(land_type)) return False, None except Exception as e: log.warning("检查{}缓存数据时出错: {},将重新查询".format(land_type, str(e))) return False, None return False, None def query_land_data(self, xzq_id, land_conditions): """ 根据行政区域ID和查询条件从SDE库查询土地数据,支持缓存机制 """ land_data = {} cache_hits = 0 total_queries = 0 for land_type, condition in land_conditions.items(): if not condition: continue total_queries += 1 # 首先检查缓存 cache_exists, cached_fc = self.check_cache_exists(land_type, xzq_id) if cache_exists: land_data[land_type] = cached_fc cache_hits += 1 continue # 缓存不存在,从数据库查询 sjlx = condition.get('SJLX') cxtj = condition.get('CXTJ') if not sjlx: log.warning("{}缺少sjlx参数".format(land_type)) continue try: env.workspace = appconfig.getSDE("SDE") # 使用拼音简化名称进行文件命名 land_type_name = self.get_land_type_name(land_type) output_fc = os.path.join(self.outGdb, "{}_{}".format(land_type_name, xzq_id)) # 构建查询条件:使用1=1作为前缀,然后拼接cxtj if cxtj and cxtj.strip(): where_clause = "1=1 AND {}".format(cxtj.strip()) else: where_clause = "1=1" log.info("从数据库查询{}数据...".format(land_type)) # 使用MakeFeatureLayer_management创建临时图层 temp_layer = "temp_layer_{}".format(land_type) arcpy.MakeFeatureLayer_management(sjlx, temp_layer, where_clause) # 将临时图层复制到输出要素类 arcpy.CopyFeatures_management(temp_layer, output_fc) # 删除临时图层 arcpy.Delete_management(temp_layer) land_data[land_type] = output_fc log.info("成功查询{}数据到: {}".format(land_type, output_fc)) log.info("使用的查询条件: {}".format(where_clause)) except Exception as e: log.error("查询{}数据失败: {}".format(land_type, str(e))) log.error("查询条件: {}".format(where_clause if 'where_clause' in locals() else 'N/A')) log.error("数据源: {}".format(sjlx)) # 输出缓存统计信息 if total_queries > 0: cache_rate = (cache_hits / total_queries) * 100 log.info("缓存命中统计: {}/{} ({:.1f}%)".format(cache_hits, total_queries, cache_rate)) if cache_hits > 0: log.info("使用缓存加速了{}个查询,节省了大量时间".format(cache_hits)) return land_data def calculate_overlap_area(self, fc1, fc2, operation="intersect"): """ 计算两个要素类的重叠面积 operation: intersect(相交), erase(擦除) """ try: output_name = "temp_{}".format(operation) output_fc = os.path.join(self.outGdb, output_name) if operation == "intersect": arcpy.Intersect_analysis([fc1, fc2], output_fc) elif operation == "erase": arcpy.Erase_analysis(fc1, fc2, output_fc) else: raise ValueError("不支持的操作类型: {}".format(operation)) # 计算面积 total_area = 0 with arcpy.da.SearchCursor(output_fc, ["SHAPE@AREA"]) as cursor: for row in cursor: total_area += row[0] # 转换为平方公里 area_km2 = total_area / 1000000 # 清理临时文件 arcpy.Delete_management(output_fc) return area_km2 except Exception as e: log.error("计算面积失败: {}".format(str(e))) return 0 def calculate_areas(self, land_data, year): """ 计算图层间的相交和擦除面积,提供有意义的分析结果 参数: land_data: 土地数据 year: 年份 """ results = [] # 检查输入数据是否为空 if not land_data: log.warning("土地数据为空,返回零值结果") return self.generate_zero_results(year) # 获取查询到的图层数据 layers = {} for land_type, fc_path in land_data.items(): # 使用拼音简化名称生成图层名称,避免中文字符 land_type_name = self.get_land_type_name(land_type) layer_name = "layer_{}".format(land_type_name) try: arcpy.MakeFeatureLayer_management(fc_path, layer_name) count = int(arcpy.GetCount_management(layer_name).getOutput(0)) if count > 0: layers[land_type_name] = layer_name log.info("加载{}图层成功,要素数量: {}".format(land_type, count)) else: log.info("{}图层无数据".format(land_type)) except Exception as e: log.error("加载{}图层失败: {}".format(land_type, str(e))) # 如果没有任何有效图层,返回零值结果 if not layers: log.warning("没有有效的图层数据,返回零值结果") return self.generate_zero_results(year) # 获取分析组合定义 analysis_combinations = self.get_analysis_combinations(year) # 执行分析 for combo in analysis_combinations: layer1_name = combo['layer1'] layer2_name = combo['layer2'] operation = combo['operation'] description = combo['description'] # 将中文名称转换为拼音简化名称来查找图层 layer1_key = self.get_land_type_name(layer1_name) layer2_key = self.get_land_type_name(layer2_name) if layer1_key in layers and layer2_key in layers: layer1 = layers[layer1_key] layer2 = layers[layer2_key] # 计算面积 area = self.calculate_overlap_area(layer1, layer2, operation) if operation == 'intersect': log.info("{}: {:.4f}平方公里".format(description, area)) else: # erase log.info("{}: {:.4f}平方公里".format(description, area)) result = { 'analysis_type': description, 'layer1': layer1_name, 'layer2': layer2_name, 'operation': operation, 'area_km2': area, 'description': description } results.append(result) # 将数据库相关信息添加到结果中,供后续统一更新 result['zbbh'] = combo['zbbh'] result['jcn'] = combo['jcn'] else: # 当图层缺失时,生成零值结果而不是跳过 missing_layers = [] if layer1_key not in layers: missing_layers.append(layer1_name) if layer2_key not in layers: missing_layers.append(layer2_name) log.warning("无法进行{}分析,缺少图层: {},生成零值结果".format(description, ', '.join(missing_layers))) # 生成零值结果 result = { 'analysis_type': description, 'layer1': layer1_name, 'layer2': layer2_name, 'operation': operation, 'area_km2': 0.0, 'description': description, 'zbbh': combo['zbbh'], 'jcn': combo['jcn'] } results.append(result) # 记录零值结果 log.info("{}: 0.0000平方公里 (图层缺失)".format(description)) # 清理临时图层 for layer_name in layers.values(): try: arcpy.Delete_management(layer_name) except: pass return results def update_zbmx_zbjcz_table(self, zbbh, xzqdm, jcn, area_km2): """ 查询并更新t_zbmx_zbjcz表 参数: zbbh: 指标编号 xzqdm: 行政区代码 jcn: 检查年 area_km2: 计算出的面积(平方公里) """ try: # 将面积值保留2位小数 area_rounded = round(area_km2, 2) # 查询是否存在符合条件的记录 query_sql = """ SELECT COUNT(*) FROM t_zbmx_zbjcz WHERE zbbh = '{}' AND xzqdm = '{}' AND jcn = '{}' """.format(zbbh, xzqdm, jcn) log.info("执行查询SQL: {}".format(query_sql.strip())) result = self.db.query(query_sql) count = result[0][0] if result and len(result) > 0 else 0 if count > 0: # 存在记录,执行更新 update_sql = """ UPDATE t_zbmx_zbjcz SET jcz = {} WHERE zbbh = '{}' AND xzqdm = '{}' AND jcn = '{}' """.format(area_rounded, zbbh, xzqdm, jcn) log.info("执行更新SQL: {}".format(update_sql.strip())) self.db.execute(update_sql) log.info("已更新t_zbmx_zbjcz表: zbbh={}, xzqdm={}, jcn={}, jcz={:.2f}平方公里".format( zbbh, xzqdm, jcn, area_rounded)) else: # 不存在记录,记录日志 log.info("t_zbmx_zbjcz表中未找到符合条件的记录: zbbh={}, xzqdm={}, jcn={}".format(zbbh, xzqdm, jcn)) except Exception as e: log.error("更新t_zbmx_zbjcz表失败: {}".format(str(e))) def output_analysis_results(self, results): """ 输出格式化的分析结果并更新数据库 """ if not results: log.info("没有分析结果") return log.info("=" * 80) log.info("图层面积分析结果") log.info("=" * 80) total_area = 0 updated_count = 0 for result in results: area = result['area_km2'] description = result['description'] log.info("{}: {:.4f} 平方公里".format(description, area)) total_area += area # 更新数据库 if 'zbbh' in result and 'xzq_id' in result and 'jcn' in result: try: self.update_zbmx_zbjcz_table( result['zbbh'], result['xzq_id'], result['jcn'], area ) updated_count += 1 except Exception as e: log.error("更新数据库失败 - {}: {}".format(description, str(e))) log.info("-" * 60) log.info("总计面积: {:.4f} 平方公里".format(total_area)) log.info("数据库更新记录数: {}".format(updated_count)) log.info("=" * 80) # 输出详细的JSON格式结果 log.info("详细分析结果:") print(json.dumps(results, ensure_ascii=False, indent=2)) def get_analysis_combinations(self, year): """ 获取分析组合定义 参数: year: 年份 返回: 分析组合列表 """ return [ { 'layer1': 'SYJSYD', 'layer2': 'YJJBNT', 'operation': 'intersect', 'description': '建设用地侵占永久基本农田面积', 'zbbh': 'A-150', 'jcn': year }, { 'layer1': 'SYJSYD', 'layer2': 'STBHHL', 'operation': 'intersect', 'description': '建设用地侵占生态保护红线面积', 'zbbh': 'A-151', 'jcn': year }, { 'layer1': 'GYYD', 'layer2': 'CZKFBJ', 'operation': 'erase', 'description': '城镇开发边界外工业用地面积', 'zbbh': 'A-152', 'jcn': year }, { 'layer1': 'CZZYD', 'layer2': 'CZKFBJ', 'operation': 'erase', 'description': '城镇开发边界外城镇住宅用地面积', 'zbbh': 'A-153', 'jcn': year } ] def generate_zero_results(self, year=None): """ 生成零值结果,用于没有数据的行政区域 参数: year: 年份,用于生成分析组合 """ if year is None: year = 2023 # 默认年份 # 获取分析组合定义 analysis_combinations = self.get_analysis_combinations(year) # 生成零值结果 zero_results = [] for combo in analysis_combinations: result = { 'analysis_type': combo['description'], 'layer1': combo['layer1'], 'layer2': combo['layer2'], 'operation': combo['operation'], 'area_km2': 0.0, 'description': combo['description'], 'zbbh': combo['zbbh'], 'jcn': combo['jcn'] } zero_results.append(result) log.info("生成{}个零值分析结果".format(len(zero_results))) return zero_results def run(self, year, xzq_id="1505"): """ 执行图层面积计算 参数: year: 年份 xzq_id: 行政区域代码,默认为1505 """ try: log.info("开始图层面积计算,行政区域: {}, 年份: {}".format(xzq_id, year)) # 1. 查询行政区域数据(包括当前行政区域及其子行政区) xzq_data = self.get_xzq_data(xzq_id) if not xzq_data: raise Exception("未查询到行政区域数据") log.info("查询到{}个行政区域".format(len(xzq_data))) # 2. 生成用地查询条件模板(一次性生成,避免重复查询) log.info("生成用地查询条件模板...") conditions_template = self.generate_land_conditions_template(year) if not conditions_template: log.warning("未生成用地查询条件模板,将为所有行政区域生成零值结果") conditions_template = {} log.info("成功生成{}种用地类型的条件模板".format(len(conditions_template))) # 3. 遍历每个行政区域进行计算 all_results = [] for xzq_item in xzq_data: current_xzq_id = xzq_item.get('ID') current_xzq_name = xzq_item.get('NAME') if current_xzq_id == "1505": continue; log.info("正在处理行政区域: {} ({})".format(current_xzq_name, current_xzq_id)) # 3.1 将行政ID应用到条件模板中,生成具体的查询条件 land_conditions = self.apply_xzq_to_conditions(conditions_template, year, current_xzq_id) if not land_conditions: log.warning("行政区域 {} 未生成用地查询条件,生成零值结果".format(current_xzq_name)) # 为空区域生成零值结果 zero_results = self.generate_zero_results(year) for result in zero_results: result['xzq_id'] = current_xzq_id result['xzq_name'] = current_xzq_name all_results.extend(zero_results) continue log.info("为行政区域 {} 生成了{}种用地类型的查询条件".format(current_xzq_name, len(land_conditions))) # 3.2 查询当前行政区域的土地数据 land_data = self.query_land_data(current_xzq_id, land_conditions) # 3.3 计算当前行政区域的面积 log.info("开始计算行政区域 {} 的图层面积...".format(current_xzq_name)) if not land_data or all(not data for data in land_data.values()): log.warning("行政区域 {} 未查询到土地数据,生成零值结果".format(current_xzq_name)) # 为没有数据的区域生成零值结果 zero_results = self.generate_zero_results(year) for result in zero_results: result['xzq_id'] = current_xzq_id result['xzq_name'] = current_xzq_name all_results.extend(zero_results) else: results = self.calculate_areas(land_data, year) # 为结果添加行政区域信息 for result in results: result['xzq_id'] = current_xzq_id result['xzq_name'] = current_xzq_name all_results.extend(results) # 4. 输出所有结果 self.output_analysis_results(all_results) log.info("所有行政区域图层面积计算完成,共处理{}个行政区域".format(len(xzq_data))) except Exception as e: log.error("图层面积计算失败: {}".format(str(e))) raise e finally: # 清理资源 self.db.close() if __name__ == "__main__": # 执行图层面积计算 year = 2020 xzq_id = "1505" # 创建实例时传递年份和行政区域参数以启用缓存 analyzer = Csydjc(year=year, xzq_id=xzq_id) analyzer.run(year, xzq_id)