# -*- coding: utf-8 -*- import os import sys import json import log import appconfig import arcpy import utils from arcpy import env from db.oracle import Oracle reload(sys) sys.setdefaultencoding('utf-8') class Jsyd: """ 建设用地开发完成情况分析(规划 vs 三调) 分析内容: 1. 分析年度:根据国土调查数据的年份确定 2. 规划建设用地面积:统计规划地块中用地性质为建设用地的面积 3. 现状建设用地面积:统计现状调查数据中地类为建设用地的面积 4. 规划内的建设用地面积:现状建设用地与规划建设用地空间进行空间叠加,统计相交的面积 5. 规划外的建设用地面积:用规划建设用地空间擦除现状建设用地,统计剩余现状建设用地的面积 6. 建设用地开发完成情况:规划内的建设用地面积/规划建设用地面积 分析范围: - 分析范围为市时,建设用地开发完成情况结果显示到旗县 - 分析范围为旗县时,建设用地开发完成情况结果显示到乡镇 """ def __init__(self, bsm): self.bsm = bsm self.db = Oracle(appconfig.DB_CONN) self.outputname = "JSYD_KFQK_" + bsm log.info("任务BSM=" + bsm) # 1. 读任务 self.task() # 2. 临时 GDB self.root = os.path.dirname(os.path.abspath(__file__)) path = os.path.join(self.root, "out") if not os.path.exists(path): os.makedirs(path) self.outGdb = os.path.join(path, "{}.gdb".format(self.outputname)) arcpy.Delete_management(self.outGdb) arcpy.CreateFileGDB_management(path, "{}.gdb".format(self.outputname)) env.overwriteOutput = True # ---------- 读任务 ---------- def task(self): # 根据BSM获取任务信息 # 查询返回字段说明: # sjy: 实际源表名 # xzqdm: 行政区代码 # fxyz: 分析因子ID # rwzt: 任务状态 # fxtable: 分析表名 # fxyear: 分析年份 # CXTJ: 约束条件(现状条件=规划条件) # YZNAME: 因子名称 # SJYNAME: 数据源名称 # rwkssj: 任务开始时间 sql = "select t.sjy,t.xzqdm,t.fxyz,t.rwzt,t.fxtable,t.fxyear,to_char(yz.cxtj) as \"CXTJ\",yz.name as \"YZNAME\",sjy.name as \"SJYNAME\",to_char(t.rwkssj, 'yyyy-mm-dd hh24:mi:ss') as \"RWKSSJ\" from t_fxpj_ctfx_main t left join t_fxpj_ctfx_yz yz on yz.id = t.fxyz left join t_fxpj_ctfx_sjy sjy on sjy.tablename = t.sjy where t.id = '{0}'".format( self.bsm) tasks = self.db.query(sql) if not tasks: raise Exception("任务标识错误[{}]".format(self.bsm)) self.task = tasks[0] # 解析CXTJ参数,格式支持"2调现状条件=规划条件;3调现状条件=规划条件",根据年份选择使用的条件 self.task["CXTJ"] = str(self.task["CXTJ"] or "") # 按分号分割不同调查类型的条件 selected_condition = self.task["CXTJ"] if ";" in self.task["CXTJ"]: conditions = self.task["CXTJ"].split(";") # 根据分析年份选择使用哪个条件 if int(self.task["FXYEAR"]) > 2018: # 使用3调条件(分号后) selected_condition = conditions[1] else: # 使用2调条件(分号前) selected_condition = conditions[0] # 解析选定条件中的等号分隔的现状和规划条件,保持原有逻辑不变 # error_msg = "CXTJ参数格式错误,必须包含等号分隔的现状条件和规划条件,格式:现状条件=规划条件" error_msg = "后台数据不足,无法统计!" if "=" in selected_condition: conditions = selected_condition.split("=") # 现状条件判断,如果为空字符串则报错 if not conditions[0] or not conditions[0].strip(): self.updateFxzt(self.bsm, "3", error_msg) raise Exception(error_msg) # 规划条件判断,如果为空字符串则报错 if not conditions[1] or not conditions[1].strip(): self.updateFxzt(self.bsm, "3", error_msg) raise Exception(error_msg) self.task["XZ_CXTJ"] = conditions[0] if (conditions[0] and conditions[0].strip()) else None # 现状查询条件,None表示无约束 self.task["GH_CXTJ"] = conditions[1] if (conditions[1] and conditions[1].strip()) else None # 规划查询条件,None表示无约束 else: # 如果没有等号分隔,则抛出异常,因为格式必须是"现状条件=规划条件" self.updateFxzt(self.bsm, "3", error_msg) raise Exception(error_msg) # ---------- 主流程 ---------- def run(self): try: # 1. 分析年度:根据国土调查数据的年份确定 analysis_year = self.task["FXYEAR"] # 2. 规划建设用地面积:统计规划地块中用地性质为建设用地的面积 gh_layer = self.get_gh_jsyd() gh_total = self.get_area(gh_layer) # 规划建设用地面积 # 3. 现状建设用地面积:统计现状调查数据中地类为建设用地的面积 xz_layer = self.get_xz_jsyd() xz_total = self.get_area(xz_layer) # 现状建设用地面积 # 4. 规划内的建设用地面积:现状建设用地与规划建设用地空间进行空间叠加,统计相交的面积 gh_nc = arcpy.analysis.Intersect([gh_layer, xz_layer], r"{}\GH_NC".format(self.outGdb)) # 重新计算相交后的面积 self.add_tbmj(gh_nc) nc_total = self.get_area(gh_nc) # 规划内建设用地面积 # 5. 规划外的建设用地面积:现状建设用地减去规划内建设用地,统计剩余现状建设用地的面积 gh_wai = arcpy.analysis.Erase(xz_layer, gh_nc, r"{}\GH_WAI".format(self.outGdb)) # 重新计算擦除后的面积 self.add_tbmj(gh_wai) gh_wai_total = self.get_area(gh_wai) # 规划外建设用地面积 # 6. 数据验证:确保规划内+规划外=现状总计 calculated_total = nc_total + gh_wai_total tolerance = 0.01 # 允许0.01平方米的误差 if abs(calculated_total - xz_total) > tolerance: log.warning("数据验证失败:规划内({:.2f}) + 规划外({:.2f}) = {:.2f} ≠ 现状总计({:.2f})".format( nc_total, gh_wai_total, calculated_total, xz_total)) print("警告:面积计算可能有误,请检查数据") # 7. 建设用地开发完成情况:规划内的建设用地面积/规划建设用地面积 development_rate = (nc_total / gh_total * 100) if gh_total > 0 else 0 # 7. 行政区统计逻辑 # 分析范围为市时,建设用地开发完成情况结果显示到旗县 # 分析范围为旗县时,建设用地开发完成情况结果显示到乡镇 result_data = [] sub_len = self.getSubXzqLength() # 为规划内和规划外图层添加行政区统计字段 arcpy.AddField_management(gh_nc, "STATICS", "TEXT") arcpy.CalculateField_management(gh_nc, "STATICS", "!ZLDWDM![0:{}]".format(sub_len), "PYTHON_9.3") # 为规划图层添加行政区统计字段,用于计算各下级行政区的规划总面积 arcpy.AddField_management(gh_layer, "STATICS", "TEXT") arcpy.CalculateField_management(gh_layer, "STATICS", "!XZQDM![0:{}]".format(sub_len), "PYTHON_9.3") # 统计各下级行政区的规划建设用地面积 gh_stats = r"{}\GH_STATS".format(self.outGdb) arcpy.Statistics_analysis(gh_layer, gh_stats, [["TBMJ", "SUM"]], "STATICS") # 统计各下级行政区的规划内建设用地面积 nc_stats = r"{}\NC_STATS".format(self.outGdb) arcpy.Statistics_analysis(gh_nc, nc_stats, [["TBMJ", "SUM"]], "STATICS") # 为现状图层添加行政区统计字段,用于计算各下级行政区的现状总面积 arcpy.AddField_management(xz_layer, "STATICS", "TEXT") arcpy.CalculateField_management(xz_layer, "STATICS", "!ZLDWDM![0:{}]".format(sub_len), "PYTHON_9.3") # 统计各下级行政区的现状建设用地面积 xz_stats = r"{}\XZ_STATS".format(self.outGdb) arcpy.Statistics_analysis(xz_layer, xz_stats, [["TBMJ", "SUM"]], "STATICS") # 为规划外图层添加行政区统计字段,用于计算各下级行政区的规划外面积 arcpy.AddField_management(gh_wai, "STATICS", "TEXT") arcpy.CalculateField_management(gh_wai, "STATICS", "!ZLDWDM![0:{}]".format(sub_len), "PYTHON_9.3") # 统计各下级行政区的规划外建设用地面积 gh_wai_stats = r"{}\GH_WAI_STATS".format(self.outGdb) arcpy.Statistics_analysis(gh_wai, gh_wai_stats, [["TBMJ", "SUM"]], "STATICS") # 查询行政区名称 xzq_names = self.get_xzq_names() # 创建各类面积字典 gh_area_dict = {} # 规划建设用地面积 with arcpy.da.SearchCursor(gh_stats, ["STATICS", "SUM_TBMJ"]) as cur: for row in cur: gh_area_dict[row[0]] = row[1] xz_area_dict = {} # 现状建设用地面积 with arcpy.da.SearchCursor(xz_stats, ["STATICS", "SUM_TBMJ"]) as cur: for row in cur: xz_area_dict[row[0]] = row[1] nc_area_dict = {} # 规划内建设用地面积 with arcpy.da.SearchCursor(nc_stats, ["STATICS", "SUM_TBMJ"]) as cur: for row in cur: nc_area_dict[row[0]] = row[1] gh_wai_area_dict = {} # 规划外建设用地面积 with arcpy.da.SearchCursor(gh_wai_stats, ["STATICS", "SUM_TBMJ"]) as cur: for row in cur: gh_wai_area_dict[row[0]] = row[1] # 构建各下级行政区的开发完成情况结果数据 all_xzdm = set(gh_area_dict.keys()) | set(xz_area_dict.keys()) | set(nc_area_dict.keys()) | set(gh_wai_area_dict.keys()) for xzdm in all_xzdm: gh_sub_total = gh_area_dict.get(xzdm, 0) # 规划建设用地面积 xz_sub_total = xz_area_dict.get(xzdm, 0) # 现状建设用地面积 nc_sub_total = nc_area_dict.get(xzdm, 0) # 规划内建设用地面积 gh_wai_sub_total = gh_wai_area_dict.get(xzdm, 0) # 规划外建设用地面积 name = xzq_names.get(xzdm, "未知区域") # 计算该下级行政区的建设用地开发完成情况 sub_development_rate = (nc_sub_total / gh_sub_total * 100) if gh_sub_total > 0 else 0 result_data.append({ "xzqmc": name, "xzqdm": xzdm, "proportion": round(sub_development_rate, 2), "tbmj": round(gh_sub_total, 2), "xzqmj": round(xz_sub_total, 2), "ghnmj": round(nc_sub_total, 2), "ghwmj": round(gh_wai_sub_total, 2) }) # 8. 回写结果 self.rwjssj = utils.getNowTimeStr("%Y-%m-%d %H:%M:%S") # 使用分析因子名称 yz_name = self.task["YZNAME"] if self.task["YZNAME"] else "建设用地" # 获取行政区名称 xzq_names = self.get_xzq_names() xzq_name = xzq_names.get(self.task["XZQDM"], "未知行政区") # 构建分析结果描述(按照用户要求的格式) fxjg = "{0}规划{1}约{2},{1}现状约有{3},其中:规划内{1}现状约有{4},规划外{1}现状约{5},{1}开发完成总体约{6:.0f}%。".format( xzq_name, yz_name, self.area_transform(gh_total), self.area_transform(xz_total), self.area_transform(nc_total), self.area_transform(gh_wai_total), development_rate) self.updateFxztAndStatist(self.bsm, "2", fxjg, json.dumps(result_data, ensure_ascii=False)) log.info("####OK####") print("####OK####") except arcpy.ExecuteError: msg = arcpy.GetMessages() log.error(msg) self.updateFxzt(self.bsm, "3", msg) except: msg = str(sys.exc_info()).decode('string-escape') log.error(msg) self.updateFxzt(self.bsm, "3", msg) # ---------- 读规划建设用地 ---------- def get_gh_jsyd(self): env.workspace = appconfig.getSDE("SDE") # 规划数据在SDE库中 out = r"{}\GH_JSYD".format(self.outGdb) # 构建查询条件:结合行政区域代码和规划条件 xzqdm = self.task["XZQDM"] xzqdm_condition = "XZQDM LIKE '{}%'".format(xzqdm) # 如果有规划条件,则组合条件;否则只使用行政区域条件 if self.task["GH_CXTJ"] and self.task["GH_CXTJ"].strip(): combined_condition = "({}) AND ({})".format(xzqdm_condition, self.task["GH_CXTJ"]) else: combined_condition = xzqdm_condition arcpy.Select_analysis("GHYDYH", out, combined_condition) self.add_tbmj(out) return out # ---------- 读现状建设用地 ---------- def get_xz_jsyd(self): env.workspace = appconfig.getSDE("SDE") # 三调库 out = r"{}\XZ_JSYD".format(self.outGdb) # 构建查询条件:结合行政区域代码和现状条件 xzqdm = self.task["XZQDM"] xzqdm_condition = "ZLDWDM LIKE '{}%'".format(xzqdm) # 如果有现状条件,则组合条件;否则只使用行政区域条件 if self.task["XZ_CXTJ"] and self.task["XZ_CXTJ"].strip(): combined_condition = "({}) AND ({})".format(xzqdm_condition, self.task["XZ_CXTJ"]) else: combined_condition = xzqdm_condition arcpy.Select_analysis(self.task["FXTABLE"], out, combined_condition) self.add_tbmj(out) return out # ---------- 查询行政区名称 ---------- def get_xzq_names(self): """ 从表v_yzt_zysxcx中查询行政区域名称 只查询与当前任务行政区代码相关的记录,即id等于该行政区代码或者pid等于该行政区代码的记录 并且type必须是XZQ :return: 字典,键为行政区代码,值为行政区名称 """ xzqdm = self.task["XZQDM"] sql = "SELECT id, name FROM v_yzt_zysxcx WHERE (id = '{0}' OR pid = '{0}') AND type = 'XZQ'".format(xzqdm) results = self.db.query(sql) xzq_dict = {} for row in results: xzq_dict[row["ID"]] = row["NAME"] return xzq_dict # ---------- 小工具 ---------- def add_tbmj(self, fc): arcpy.AddField_management(fc, "TBMJ", "DOUBLE") arcpy.CalculateField_management(fc, "TBMJ", "!shape.area@SQUAREMETERS!", "PYTHON_9.3") def get_area(self, fc): total = 0 with arcpy.da.SearchCursor(fc, ["TBMJ"]) as cur: for row in cur: total += row[0] return total def getSubXzqLength(self): """ 根据行政区代码长度确定下级行政区统计粒度 分析范围为市时,建设用地开发完成情况结果显示到旗县 分析范围为旗县时,建设用地开发完成情况结果显示到乡镇 最大级别只到乡镇(9位行政区代码) """ curlen = len(self.task["XZQDM"]) if curlen == 4: # 市级(如:1501) return 6 # 统计到县级(如:150102) elif curlen == 6: # 县级(如:150102) return 9 # 统计到乡级(如:150102001) else: # 对于其他情况,根据当前级别确定下级统计粒度,但最大只到乡镇级别 if curlen <= 4: return 6 # 统计到县级 elif curlen <= 6: return 9 # 统计到乡级 else: return 9 # 最大只到乡镇级别(9位) def area_transform(self, area, unit="km2"): """ 面积单位转换 :param area: 面积值(平方米) :param unit: 目标单位,支持 "hectare"(公顷)、"mu"(亩)、"km2"(平方千米),默认为平方千米 :return: 格式化的面积字符串 """ if unit == "hectare": # 转换为公顷 hectare_value = area / 10000 return "{:.2f}公顷".format(hectare_value) elif unit == "mu": # 转换为亩 mu_value = area * 0.0015 return "{:.2f}亩".format(mu_value) elif unit == "km2": # 转换为平方千米 km2_value = area / 1000000 return "{:.2f}平方千米".format(km2_value) else: # 默认返回平方千米 km2_value = area / 1000000 return "{:.2f}平方千米".format(km2_value) # ---------- 任务状态更新 ---------- def updateFxzt(self, bsm, fxzt, msg): self.rwjssj = utils.getNowTimeStr("%Y-%m-%d %H:%M:%S") # 清理消息中的特殊字符,防止SQL注入和语法错误 clean_msg = msg.replace("'", "''") if msg else "" # 确保中文字符串正确编码 if isinstance(clean_msg, unicode): clean_msg = clean_msg.encode('utf-8') sql = ("update t_fxpj_ctfx_main t set rwzt='{0}'," "rwkssj=to_date('{1}','yyyy-mm-dd hh24:mi:ss')," "rwjssj=to_date('{2}','yyyy-mm-dd hh24:mi:ss')," "fxjg='{3}',fxjgtable='{4}',workspace='{5}' " "where t.id='{6}'").format( fxzt, self.task["RWKSSJ"], self.rwjssj, clean_msg, self.outputname, '', bsm) log.info(sql) self.db.insert(sql) def updateFxztAndStatist(self, bsm, fxzt, msg, statist): self.rwjssj = utils.getNowTimeStr("%Y-%m-%d %H:%M:%S") # 清理消息中的特殊字符,防止SQL注入和语法错误 clean_msg = msg.replace("'", "''") if msg else "" clean_statist = statist.replace("'", "''") if statist else "" # 确保中文字符串正确编码 if isinstance(clean_msg, unicode): clean_msg = clean_msg.encode('utf-8') if isinstance(clean_statist, unicode): clean_statist = clean_statist.encode('utf-8') sql = ("update t_fxpj_ctfx_main t set rwzt='{0}'," "rwkssj=to_date('{1}','yyyy-mm-dd hh24:mi:ss')," "rwjssj=to_date('{2}','yyyy-mm-dd hh24:mi:ss')," "fxjg='{3}',fxjgtable='{4}',workspace='{5}',statist='{7}' " "where t.id='{6}'").format( fxzt, self.task["RWKSSJ"], self.rwjssj, clean_msg, self.outputname, self.outGdb, bsm, clean_statist) self.db.insert(sql) # ---------------- 入口 ---------------- if __name__ == "__main__": Jsyd("01eb2b2c332a45478e61ae5155b2d867").run()