||
- # -*- coding: utf-8 -*-
- import os
- import sys
- import json
- import log
- import appconfig
- import arcpy
- import utils
- from arcpy import env
- from db.oracle import Oracle
- reload(sys)
- sys.setdefaultencoding('utf-8')
- class Jsyd:
- """
- 建设用地开发完成情况分析(规划 vs 三调)
-
- 分析内容:
- 1. 分析年度:根据国土调查数据的年份确定
- 2. 规划建设用地面积:统计规划地块中用地性质为建设用地的面积
- 3. 现状建设用地面积:统计现状调查数据中地类为建设用地的面积
- 4. 规划内的建设用地面积:现状建设用地与规划建设用地空间进行空间叠加,统计相交的面积
- 5. 规划外的建设用地面积:用规划建设用地空间擦除现状建设用地,统计剩余现状建设用地的面积
- 6. 建设用地开发完成情况:规划内的建设用地面积/规划建设用地面积
-
- 分析范围:
- - 分析范围为市时,建设用地开发完成情况结果显示到旗县
- - 分析范围为旗县时,建设用地开发完成情况结果显示到乡镇
- """
- def __init__(self, bsm):
- self.bsm = bsm
- self.db = Oracle(appconfig.DB_CONN)
- self.outputname = "JSYD_KFQK_" + bsm
- log.info("任务BSM=" + bsm)
- # 1. 读任务
- self.task()
- # 2. 临时 GDB
- self.root = os.path.dirname(os.path.abspath(__file__))
- path = os.path.join(self.root, "out")
- if not os.path.exists(path):
- os.makedirs(path)
- self.outGdb = os.path.join(path, "{}.gdb".format(self.outputname))
- arcpy.Delete_management(self.outGdb)
- arcpy.CreateFileGDB_management(path, "{}.gdb".format(self.outputname))
- env.overwriteOutput = True
- # ---------- 读任务 ----------
- def task(self):
- # 根据BSM获取任务信息
- # 查询返回字段说明:
- # sjy: 实际源表名
- # xzqdm: 行政区代码
- # fxyz: 分析因子ID
- # rwzt: 任务状态
- # fxtable: 分析表名
- # fxyear: 分析年份
- # CXTJ: 约束条件(现状条件=规划条件)
- # YZNAME: 因子名称
- # SJYNAME: 数据源名称
- # rwkssj: 任务开始时间
- sql = "select t.sjy,t.xzqdm,t.fxyz,t.rwzt,t.fxtable,t.fxyear,to_char(yz.cxtj) as \"CXTJ\",yz.name as \"YZNAME\",sjy.name as \"SJYNAME\",to_char(t.rwkssj, 'yyyy-mm-dd hh24:mi:ss') as \"RWKSSJ\" from t_fxpj_ctfx_main t left join t_fxpj_ctfx_yz yz on yz.id = t.fxyz left join t_fxpj_ctfx_sjy sjy on sjy.tablename = t.sjy where t.id = '{0}'".format(
- self.bsm)
- tasks = self.db.query(sql)
- if not tasks:
- raise Exception("任务标识错误[{}]".format(self.bsm))
- self.task = tasks[0]
- # 解析CXTJ参数,格式支持"2调现状条件=规划条件;3调现状条件=规划条件",根据年份选择使用的条件
- self.task["CXTJ"] = str(self.task["CXTJ"] or "")
- # 按分号分割不同调查类型的条件
- selected_condition = self.task["CXTJ"]
- if ";" in self.task["CXTJ"]:
- conditions = self.task["CXTJ"].split(";")
- # 根据分析年份选择使用哪个条件
- if int(self.task["FXYEAR"]) > 2018:
- # 使用3调条件(分号后)
- selected_condition = conditions[1]
- else:
- # 使用2调条件(分号前)
- selected_condition = conditions[0]
-
- # 解析选定条件中的等号分隔的现状和规划条件,保持原有逻辑不变
- # error_msg = "CXTJ参数格式错误,必须包含等号分隔的现状条件和规划条件,格式:现状条件=规划条件"
- error_msg = "后台数据不足,无法统计!"
- if "=" in selected_condition:
- conditions = selected_condition.split("=")
- # 现状条件判断,如果为空字符串则报错
- if not conditions[0] or not conditions[0].strip():
- self.updateFxzt(self.bsm, "3", error_msg)
- raise Exception(error_msg)
- # 规划条件判断,如果为空字符串则报错
- if not conditions[1] or not conditions[1].strip():
- self.updateFxzt(self.bsm, "3", error_msg)
- raise Exception(error_msg)
-
- self.task["XZ_CXTJ"] = conditions[0] if (conditions[0] and conditions[0].strip()) else None # 现状查询条件,None表示无约束
- self.task["GH_CXTJ"] = conditions[1] if (conditions[1] and conditions[1].strip()) else None # 规划查询条件,None表示无约束
- else:
- # 如果没有等号分隔,则抛出异常,因为格式必须是"现状条件=规划条件"
- self.updateFxzt(self.bsm, "3", error_msg)
- raise Exception(error_msg)
- # ---------- 主流程 ----------
- def run(self):
- try:
- # 1. 分析年度:根据国土调查数据的年份确定
- analysis_year = self.task["FXYEAR"]
-
- # 2. 规划建设用地面积:统计规划地块中用地性质为建设用地的面积
- gh_layer = self.get_gh_jsyd()
- gh_total = self.get_area(gh_layer) # 规划建设用地面积
-
- # 3. 现状建设用地面积:统计现状调查数据中地类为建设用地的面积
- xz_layer = self.get_xz_jsyd()
- xz_total = self.get_area(xz_layer) # 现状建设用地面积
-
- # 4. 规划内的建设用地面积:现状建设用地与规划建设用地空间进行空间叠加,统计相交的面积
- gh_nc = arcpy.analysis.Intersect([gh_layer, xz_layer],
- r"{}\GH_NC".format(self.outGdb))
- # 重新计算相交后的面积
- self.add_tbmj(gh_nc)
- nc_total = self.get_area(gh_nc) # 规划内建设用地面积
-
- # 5. 规划外的建设用地面积:现状建设用地减去规划内建设用地,统计剩余现状建设用地的面积
- gh_wai = arcpy.analysis.Erase(xz_layer, gh_nc,
- r"{}\GH_WAI".format(self.outGdb))
- # 重新计算擦除后的面积
- self.add_tbmj(gh_wai)
- gh_wai_total = self.get_area(gh_wai) # 规划外建设用地面积
-
- # 6. 数据验证:确保规划内+规划外=现状总计
- calculated_total = nc_total + gh_wai_total
- tolerance = 0.01 # 允许0.01平方米的误差
- if abs(calculated_total - xz_total) > tolerance:
- log.warning("数据验证失败:规划内({:.2f}) + 规划外({:.2f}) = {:.2f} ≠ 现状总计({:.2f})".format(
- nc_total, gh_wai_total, calculated_total, xz_total))
- print("警告:面积计算可能有误,请检查数据")
-
- # 7. 建设用地开发完成情况:规划内的建设用地面积/规划建设用地面积
- development_rate = (nc_total / gh_total * 100) if gh_total > 0 else 0
- # 7. 行政区统计逻辑
- # 分析范围为市时,建设用地开发完成情况结果显示到旗县
- # 分析范围为旗县时,建设用地开发完成情况结果显示到乡镇
- result_data = []
- sub_len = self.getSubXzqLength()
-
- # 为规划内和规划外图层添加行政区统计字段
- arcpy.AddField_management(gh_nc, "STATICS", "TEXT")
- arcpy.CalculateField_management(gh_nc, "STATICS",
- "!ZLDWDM![0:{}]".format(sub_len),
- "PYTHON_9.3")
-
- # 为规划图层添加行政区统计字段,用于计算各下级行政区的规划总面积
- arcpy.AddField_management(gh_layer, "STATICS", "TEXT")
- arcpy.CalculateField_management(gh_layer, "STATICS",
- "!XZQDM![0:{}]".format(sub_len),
- "PYTHON_9.3")
-
- # 统计各下级行政区的规划建设用地面积
- gh_stats = r"{}\GH_STATS".format(self.outGdb)
- arcpy.Statistics_analysis(gh_layer, gh_stats,
- [["TBMJ", "SUM"]], "STATICS")
-
- # 统计各下级行政区的规划内建设用地面积
- nc_stats = r"{}\NC_STATS".format(self.outGdb)
- arcpy.Statistics_analysis(gh_nc, nc_stats,
- [["TBMJ", "SUM"]], "STATICS")
-
- # 为现状图层添加行政区统计字段,用于计算各下级行政区的现状总面积
- arcpy.AddField_management(xz_layer, "STATICS", "TEXT")
- arcpy.CalculateField_management(xz_layer, "STATICS",
- "!ZLDWDM![0:{}]".format(sub_len),
- "PYTHON_9.3")
-
- # 统计各下级行政区的现状建设用地面积
- xz_stats = r"{}\XZ_STATS".format(self.outGdb)
- arcpy.Statistics_analysis(xz_layer, xz_stats,
- [["TBMJ", "SUM"]], "STATICS")
-
- # 为规划外图层添加行政区统计字段,用于计算各下级行政区的规划外面积
- arcpy.AddField_management(gh_wai, "STATICS", "TEXT")
- arcpy.CalculateField_management(gh_wai, "STATICS",
- "!ZLDWDM![0:{}]".format(sub_len),
- "PYTHON_9.3")
-
- # 统计各下级行政区的规划外建设用地面积
- gh_wai_stats = r"{}\GH_WAI_STATS".format(self.outGdb)
- arcpy.Statistics_analysis(gh_wai, gh_wai_stats,
- [["TBMJ", "SUM"]], "STATICS")
-
- # 查询行政区名称
- xzq_names = self.get_xzq_names()
-
- # 创建各类面积字典
- gh_area_dict = {} # 规划建设用地面积
- with arcpy.da.SearchCursor(gh_stats, ["STATICS", "SUM_TBMJ"]) as cur:
- for row in cur:
- gh_area_dict[row[0]] = row[1]
-
- xz_area_dict = {} # 现状建设用地面积
- with arcpy.da.SearchCursor(xz_stats, ["STATICS", "SUM_TBMJ"]) as cur:
- for row in cur:
- xz_area_dict[row[0]] = row[1]
-
- nc_area_dict = {} # 规划内建设用地面积
- with arcpy.da.SearchCursor(nc_stats, ["STATICS", "SUM_TBMJ"]) as cur:
- for row in cur:
- nc_area_dict[row[0]] = row[1]
-
- gh_wai_area_dict = {} # 规划外建设用地面积
- with arcpy.da.SearchCursor(gh_wai_stats, ["STATICS", "SUM_TBMJ"]) as cur:
- for row in cur:
- gh_wai_area_dict[row[0]] = row[1]
-
- # 构建各下级行政区的开发完成情况结果数据
- all_xzdm = set(gh_area_dict.keys()) | set(xz_area_dict.keys()) | set(nc_area_dict.keys()) | set(gh_wai_area_dict.keys())
- for xzdm in all_xzdm:
- gh_sub_total = gh_area_dict.get(xzdm, 0) # 规划建设用地面积
- xz_sub_total = xz_area_dict.get(xzdm, 0) # 现状建设用地面积
- nc_sub_total = nc_area_dict.get(xzdm, 0) # 规划内建设用地面积
- gh_wai_sub_total = gh_wai_area_dict.get(xzdm, 0) # 规划外建设用地面积
- name = xzq_names.get(xzdm, "未知区域")
-
- # 计算该下级行政区的建设用地开发完成情况
- sub_development_rate = (nc_sub_total / gh_sub_total * 100) if gh_sub_total > 0 else 0
-
- result_data.append({
- "xzqmc": name,
- "xzqdm": xzdm,
- "proportion": round(sub_development_rate, 2),
- "tbmj": round(gh_sub_total, 2),
- "xzqmj": round(xz_sub_total, 2),
- "ghnmj": round(nc_sub_total, 2),
- "ghwmj": round(gh_wai_sub_total, 2)
- })
- # 8. 回写结果
- self.rwjssj = utils.getNowTimeStr("%Y-%m-%d %H:%M:%S")
-
- # 使用分析因子名称
- yz_name = self.task["YZNAME"] if self.task["YZNAME"] else "建设用地"
-
- # 获取行政区名称
- xzq_names = self.get_xzq_names()
- xzq_name = xzq_names.get(self.task["XZQDM"], "未知行政区")
-
- # 构建分析结果描述(按照用户要求的格式)
- fxjg = "{0}规划{1}约{2},{1}现状约有{3},其中:规划内{1}现状约有{4},规划外{1}现状约{5},{1}开发完成总体约{6:.0f}%。".format(
- xzq_name,
- yz_name,
- self.area_transform(gh_total),
- self.area_transform(xz_total),
- self.area_transform(nc_total),
- self.area_transform(gh_wai_total),
- development_rate)
-
- self.updateFxztAndStatist(self.bsm, "2", fxjg, json.dumps(result_data, ensure_ascii=False))
- log.info("####OK####")
- print("####OK####")
- except arcpy.ExecuteError:
- msg = arcpy.GetMessages()
- log.error(msg)
- self.updateFxzt(self.bsm, "3", msg)
- except:
- msg = str(sys.exc_info()).decode('string-escape')
- log.error(msg)
- self.updateFxzt(self.bsm, "3", msg)
- # ---------- 读规划建设用地 ----------
- def get_gh_jsyd(self):
- env.workspace = appconfig.getSDE("SDE") # 规划数据在SDE库中
- out = r"{}\GH_JSYD".format(self.outGdb)
-
- # 构建查询条件:结合行政区域代码和规划条件
- xzqdm = self.task["XZQDM"]
- xzqdm_condition = "XZQDM LIKE '{}%'".format(xzqdm)
-
- # 如果有规划条件,则组合条件;否则只使用行政区域条件
- if self.task["GH_CXTJ"] and self.task["GH_CXTJ"].strip():
- combined_condition = "({}) AND ({})".format(xzqdm_condition, self.task["GH_CXTJ"])
- else:
- combined_condition = xzqdm_condition
-
- arcpy.Select_analysis("GHYDYH", out, combined_condition)
- self.add_tbmj(out)
- return out
- # ---------- 读现状建设用地 ----------
- def get_xz_jsyd(self):
- env.workspace = appconfig.getSDE("SDE") # 三调库
- out = r"{}\XZ_JSYD".format(self.outGdb)
-
- # 构建查询条件:结合行政区域代码和现状条件
- xzqdm = self.task["XZQDM"]
- xzqdm_condition = "ZLDWDM LIKE '{}%'".format(xzqdm)
-
- # 如果有现状条件,则组合条件;否则只使用行政区域条件
- if self.task["XZ_CXTJ"] and self.task["XZ_CXTJ"].strip():
- combined_condition = "({}) AND ({})".format(xzqdm_condition, self.task["XZ_CXTJ"])
- else:
- combined_condition = xzqdm_condition
-
- arcpy.Select_analysis(self.task["FXTABLE"], out, combined_condition)
- self.add_tbmj(out)
- return out
- # ---------- 查询行政区名称 ----------
- def get_xzq_names(self):
- """
- 从表v_yzt_zysxcx中查询行政区域名称
- 只查询与当前任务行政区代码相关的记录,即id等于该行政区代码或者pid等于该行政区代码的记录
- 并且type必须是XZQ
- :return: 字典,键为行政区代码,值为行政区名称
- """
- xzqdm = self.task["XZQDM"]
- sql = "SELECT id, name FROM v_yzt_zysxcx WHERE (id = '{0}' OR pid = '{0}') AND type = 'XZQ'".format(xzqdm)
- results = self.db.query(sql)
- xzq_dict = {}
- for row in results:
- xzq_dict[row["ID"]] = row["NAME"]
- return xzq_dict
- # ---------- 小工具 ----------
- def add_tbmj(self, fc):
- arcpy.AddField_management(fc, "TBMJ", "DOUBLE")
- arcpy.CalculateField_management(fc, "TBMJ",
- "!shape.area@SQUAREMETERS!",
- "PYTHON_9.3")
- def get_area(self, fc):
- total = 0
- with arcpy.da.SearchCursor(fc, ["TBMJ"]) as cur:
- for row in cur:
- total += row[0]
- return total
- def getSubXzqLength(self):
- """
- 根据行政区代码长度确定下级行政区统计粒度
- 分析范围为市时,建设用地开发完成情况结果显示到旗县
- 分析范围为旗县时,建设用地开发完成情况结果显示到乡镇
- 最大级别只到乡镇(9位行政区代码)
- """
- curlen = len(self.task["XZQDM"])
- if curlen == 4: # 市级(如:1501)
- return 6 # 统计到县级(如:150102)
- elif curlen == 6: # 县级(如:150102)
- return 9 # 统计到乡级(如:150102001)
- else:
- # 对于其他情况,根据当前级别确定下级统计粒度,但最大只到乡镇级别
- if curlen <= 4:
- return 6 # 统计到县级
- elif curlen <= 6:
- return 9 # 统计到乡级
- else:
- return 9 # 最大只到乡镇级别(9位)
- def area_transform(self, area, unit="km2"):
- """
- 面积单位转换
- :param area: 面积值(平方米)
- :param unit: 目标单位,支持 "hectare"(公顷)、"mu"(亩)、"km2"(平方千米),默认为平方千米
- :return: 格式化的面积字符串
- """
- if unit == "hectare":
- # 转换为公顷
- hectare_value = area / 10000
- return "{:.2f}公顷".format(hectare_value)
- elif unit == "mu":
- # 转换为亩
- mu_value = area * 0.0015
- return "{:.2f}亩".format(mu_value)
- elif unit == "km2":
- # 转换为平方千米
- km2_value = area / 1000000
- return "{:.2f}平方千米".format(km2_value)
- else:
- # 默认返回平方千米
- km2_value = area / 1000000
- return "{:.2f}平方千米".format(km2_value)
- # ---------- 任务状态更新 ----------
- def updateFxzt(self, bsm, fxzt, msg):
- self.rwjssj = utils.getNowTimeStr("%Y-%m-%d %H:%M:%S")
- # 清理消息中的特殊字符,防止SQL注入和语法错误
- clean_msg = msg.replace("'", "''") if msg else ""
-
- # 确保中文字符串正确编码
- if isinstance(clean_msg, unicode):
- clean_msg = clean_msg.encode('utf-8')
-
- sql = ("update t_fxpj_ctfx_main t set rwzt='{0}',"
- "rwkssj=to_date('{1}','yyyy-mm-dd hh24:mi:ss'),"
- "rwjssj=to_date('{2}','yyyy-mm-dd hh24:mi:ss'),"
- "fxjg='{3}',fxjgtable='{4}',workspace='{5}' "
- "where t.id='{6}'").format(
- fxzt, self.task["RWKSSJ"], self.rwjssj,
- clean_msg, self.outputname, '', bsm)
- log.info(sql)
- self.db.insert(sql)
- def updateFxztAndStatist(self, bsm, fxzt, msg, statist):
- self.rwjssj = utils.getNowTimeStr("%Y-%m-%d %H:%M:%S")
- # 清理消息中的特殊字符,防止SQL注入和语法错误
- clean_msg = msg.replace("'", "''") if msg else ""
- clean_statist = statist.replace("'", "''") if statist else ""
-
- # 确保中文字符串正确编码
- if isinstance(clean_msg, unicode):
- clean_msg = clean_msg.encode('utf-8')
- if isinstance(clean_statist, unicode):
- clean_statist = clean_statist.encode('utf-8')
-
- sql = ("update t_fxpj_ctfx_main t set rwzt='{0}',"
- "rwkssj=to_date('{1}','yyyy-mm-dd hh24:mi:ss'),"
- "rwjssj=to_date('{2}','yyyy-mm-dd hh24:mi:ss'),"
- "fxjg='{3}',fxjgtable='{4}',workspace='{5}',statist='{7}' "
- "where t.id='{6}'").format(
- fxzt, self.task["RWKSSJ"], self.rwjssj,
- clean_msg, self.outputname, self.outGdb, bsm, clean_statist)
- self.db.insert(sql)
- # ---------------- 入口 ----------------
- if __name__ == "__main__":
- Jsyd("01eb2b2c332a45478e61ae5155b2d867").run()
|