import ollama import psycopg2 import json import uuid import datetime from llm_model.query import query from psycopg2.extras import DictCursor from app.utils.pinyin_utils import replace_word from app.common.word import target_word from app.common.res import res_success, res_error from app.utils.log_utils import logger chat_history = "用户:你好,我是智能助手,请问有什么可以帮助您?\\n智能助手:好的,请问您有什么需求?" sys_xuanzhi = """请扮演文本提取工具,根据输入和聊天上下文信息,基于以下因子选择、选址范围和用地类型提取这句话中的关键信息,提取到的结果请严格以json格式字符串输出并保障寄送格式正确无误, 选址范围 = ['抱坡区','天涯区','崖州区','海棠区','吉阳区' ], 因子选择 = ["高程","坡度","永久基本农田","城镇开发边界内","生态保护红线","文化保护区","自然保护地","风景名胜区","国有使用权","河道管理线","水库","公益林","火葬场","垃圾处理场","污水处理场","高压线","变电站","古树","城市道路","主要出入口","文化活动设施","体育运动场所","排水","供水","燃气","电力","电信","十五分钟社区生活圈邻里中心","社区服务设施","零售商业场所","医疗卫生设施","幼儿园服务半径","小学服务半径","为老服务设施"], 用地类型 = ['园地','耕地','林地','草地','湿地','公共卫生用地','老年人社会福利用地','儿童社会福利用地','残疾人社会福利用地','其他社会福利用地','零售商业用地','批发市场用地','餐饮用地','旅馆用地','公用设施营业网点用地','娱乐用地','康体用地','一类工业用地','二类工业用地','广播电视设施用地','环卫用地','消防用地','干渠','水工设施用地','其他公用设施用地','公园绿地','防护绿地','广场用地','军事设施用地','使领馆用地','宗教用地','文物古迹用地','监教场所用地','殡葬用地','其他特殊用地','河流水面','湖泊水面','水库水面','坑塘水面','沟渠','冰川及常年积雪','渔业基础设施用海','增养殖用海','捕捞海域','工业用海','盐田用海','固体矿产用海','油气用海','可再生能源用海','海底电缆管道用海','港口用海','农业设施建设用地','工矿用地','畜禽养殖设施建设用地','水产养殖设施建设用地','城镇住宅用地','特殊用地','居住用地','绿地与开敞空间用地','水田','水浇地','旱地','果园','茶园','橡胶园','其他园地','乔木林地','竹林地','城镇社区服务设施用地','农村宅基地','农村社区服务设施用地','机关团体用地','科研用地','文化用地','教育用地','体育用地','医疗卫生用地','社会福利用地','商业用地','商务金融用地','二类农村宅基地','图书与展览用地','文化活动用地','高等教育用地','中等职业教育用地','体育训练用地','其他交通设施用地','供水用地','排水用地','供电用地','供燃气用地','供热用地','通信用地','邮政用地','医院用地','基层医疗卫生设施用地','田间道','盐碱地','沙地','裸土地','裸岩石砾地','村道用地','村庄内部道路用地','公共管理与公共服务用地','仓储用地','交通运输用地','公用设施用地','交通运输用海','航运用海','路桥隧道用海','风景旅游用海','文体休闲娱乐用海','军事用海','其他特殊用海','空闲地','田坎','港口码头用地','管道运输用地','城市轨道交通用地','城镇道路用地','交通场站用地','一类城镇住宅用地','二类城镇住宅用地','三类城镇住宅用地','一类农村宅基地','商业服务业用地','三类工业用地','一类物流仓储用地','二类物流仓储用地','三类物流仓储用地','盐田','对外交通场站用地','公共交通场站用地','社会停车场用地','中小学用地','幼儿园用地','其他教育用地','体育场馆用地','灌木林地','其他林地','天然牧草地','人工牧草地','其他草地','森林沼泽','灌丛沼泽','沼泽草地','其他沼泽地','沿海滩涂','内陆滩涂','红树林地','乡村道路用地','种植设施建设用地','娱乐康体用地','其他商业服务业用地','工业用地','采矿用地','物流仓储用地','储备库用地','铁路用地','公路用地','机场用地'], landType是用地类型 districtName是选址范围 area是用地大小,单位统一转换为亩 factors是因子选择 其他公里、千米的单位转换为米 输出json格式数据如下: {     "districtName": "抱坡区", "landType": "居住用地",     "area": {         "min": 30,         "max": 50     },     "factors": [         {             "type": "医疗卫生设施",             "condition": "lt",             "value": "500"         },         {             "type": "永久基本农田",             "condition": "not_intersect"         },         {             "type": "火葬场",             "condition": "gt",             "value": "1000"         }, {             "type": "幼儿园服务半径",             "condition": "lt",             "value": "1000"         }, {             "type": "小学服务半径",             "condition": "lt",             "value": "1000"         },     ] } json中"condition"的值为"gt"、"lt"、"get"、"let"、"between","not_intersect"、"intersect"、"not_contain"、"contain"、"between" """ sys_question = """请扮演问答工具,对用户输入信息进行回答,请严格以markdown格式输出并保障寄送格式正确无误""" # 连接数据库 conn = psycopg2.connect( dbname="real3d", user="postgres", password="postgis", # host="192.168.100.30", host="192.168.60.2", port="5432" ) # 清除聊天记录 def clear_chat_history(): global chat_history chat_history = "" return chat_history def create_chat(msg,type): # msg = data['msg'] # type = data['type'] if type == 'selectLand': #同音字替换 msg = replace_word(msg, target_word) words_to_replace1 = ["爆破", "爆坡","鲍坡"] for word in words_to_replace1: msg = msg.replace(word, "抱坡") print(msg) # 调用大模型解析 # 这里调用大模型,并返回解析结果 res = update_chat_history(msg) #未找到相关数据提示 prompt = "根据提供的信息,用户的表述不够清晰明确,请重新描述您的需求。" addtress = ['抱坡区', '天涯区', '崖州区', '海棠区', '吉阳区'] land = ['园地', '耕地', '林地', '草地', '湿地', '公共卫生用地', '老年人社会福利用地', '儿童社会福利用地', '残疾人社会福利用地', '其他社会福利用地', '零售商业用地', '批发市场用地', '餐饮用地', '旅馆用地', '公用设施营业网点用地', '娱乐用地', '康体用地', '一类工业用地', '二类工业用地', '广播电视设施用地', '环卫用地', '消防用地', '干渠', '水工设施用地', '其他公用设施用地', '公园绿地', '防护绿地', '广场用地', '军事设施用地', '使领馆用地', '宗教用地', '文物古迹用地', '监教场所用地', '殡葬用地', '其他特殊用地', '河流水面', '湖泊水面', '水库水面', '坑塘水面', '沟渠', '冰川及常年积雪', '渔业基础设施用海', '增养殖用海', '捕捞海域', '工业用海', '盐田用海', '固体矿产用海', '油气用海', '可再生能源用海', '海底电缆管道用海', '港口用海', '农业设施建设用地', '工矿用地', '畜禽养殖设施建设用地', '水产养殖设施建设用地', '城镇住宅用地', '特殊用地', '居住用地', '绿地与开敞空间用地', '水田', '水浇地', '旱地', '果园', '茶园', '橡胶园', '其他园地', '乔木林地', '竹林地', '城镇社区服务设施用地', '农村宅基地', '农村社区服务设施用地', '机关团体用地', '科研用地', '文化用地', '教育用地', '体育用地', '医疗卫生用地', '社会福利用地', '商业用地', '商务金融用地', '二类农村宅基地', '图书与展览用地', '文化活动用地', '高等教育用地', '中等职业教育用地', '体育训练用地', '其他交通设施用地', '供水用地', '排水用地', '供电用地', '供燃气用地', '供热用地', '通信用地', '邮政用地', '医院用地', '基层医疗卫生设施用地', '田间道', '盐碱地', '沙地', '裸土地', '裸岩石砾地', '村道用地', '村庄内部道路用地', '公共管理与公共服务用地', '仓储用地', '交通运输用地', '公用设施用地', '交通运输用海', '航运用海', '路桥隧道用海', '风景旅游用海', '文体休闲娱乐用海', '军事用海', '其他特殊用海', '空闲地', '田坎', '港口码头用地', '管道运输用地', '城市轨道交通用地', '城镇道路用地', '交通场站用地', '一类城镇住宅用地', '二类城镇住宅用地', '三类城镇住宅用地', '一类农村宅基地', '商业服务业用地', '三类工业用地', '一类物流仓储用地', '二类物流仓储用地', '三类物流仓储用地', '盐田', '对外交通场站用地', '公共交通场站用地', '社会停车场用地', '中小学用地', '幼儿园用地', '其他教育用地', '体育场馆用地', '灌木林地', '其他林地', '天然牧草地', '人工牧草地', '其他草地', '森林沼泽', '灌丛沼泽', '沼泽草地', '其他沼泽地', '沿海滩涂', '内陆滩涂', '红树林地', '乡村道路用地', '种植设施建设用地', '娱乐康体用地', '其他商业服务业用地', '工业用地', '采矿用地', '物流仓储用地', '储备库用地', '铁路用地', '公路用地', '机场用地'] json_res = res.replace("json", "") json_res = json_res.replace("```", "") if json_res != "未找到相关数据": try: json_res = json.loads(json_res) districtName = json_res["districtName"] landType = json_res["landType"] # if landType != "未找到相关数据" and landType != "" and districtName != "未找到相关数据"and districtName != "": if landType in land and districtName in addtress: json_res = jsonResToDict(json_res) # print(json_res) else: json_res = prompt json_res = res_error(json_res, "selectLand","error") except: json_res = prompt json_res = res_error(json_res, "selectLand","error") else: json_res = prompt json_res = res_error(json_res, "selectLand","error") return json_res elif type == 'answer': # json_res = route_query(msg) # json_res = jsonResToDict_questions(json_res) # print(json_res) # 打印生成的回复 json_res = update_chat_history_simple(msg) json_res = res_success(json_res, "answer", "success") print(json_res) # 打印生成的回复 return json_res # 智能选址 def update_chat_history(user_message): global chat_history # 使用全局变量以便更新 prompt = chat_history + "\\n用户:" + user_message # 生成回复,并加入聊天上下文 res = ollama.generate( model="qwen2.5:7b", stream=False, system=sys_xuanzhi, prompt=prompt, options={"temperature": 0, "num_ctx": 32000, }, keep_alive=-1 ) # 获取机器人回复 bot_message = res["response"] # 更新聊天历史 chat_history += "\\n智能助手:" + bot_message # 返回机器人的回复 return bot_message # 将大模型解析的结果转换为选址需要的数据格式 def jsonResToDict(json_res): # 1.查询选址范围信息 districtName = json_res["districtName"] ewkt = getAiDistrict(districtName) # 2.保存选址范围信息 geomId = saveGeom(ewkt) # 3.获取用地类型信息 landType = json_res["landType"] landType = getLandType(landType, "YDYHFLDM") # 4.获取模板信息 factorTemplates = getTemplateByCode(landType) # TODO 以哪个因子列表为准,模版和因子个数怎么匹配 now = datetime.datetime.now() formatted_time = now.strftime("%Y%m%d%H%M%S") res = { "xzmj": 1500, "xmmc": "规划选址项目_"+formatted_time, "jsdw": "建设单位", "ydxz_bsm": landType, "ydmjbegin": json_res["area"]["min"], "ydmjend": json_res["area"]["max"], "geomId": geomId, "yxyz": [], # TODO: 循环遍历 # "yxyz": [ # { # "id": "259e5bbaab434dbfb9c679bd44d4bfa4", # "name": "幼儿园服务半径", # "bsm": "TB_YEY", # "conditionInfo": { # "spatial_type": "distance", # "default": "lt", # "hasValue": true, # "defaultValue": "300", # "unit": "米", # "clip": false # } # } # ], # "useMultiple": json_res["useMultiple"], "useLandType": True, # "multipleDistance": json_res["multipleDistance"] } # 循环遍历输入因子 factors = json_res["factors"] input_factors = {} for factor in factors: factorInfo = getFactorByName(factor["type"]) if factorInfo == None: continue factorId = factorInfo["id"] factorBsm = factorInfo["bsm"] conditionInfo = factorInfo["condition_info"] conditionObj = json.loads(conditionInfo) defaultValue = '0' default = 'lt' if "value" in factor: defaultValue = str(factor["value"]) if "condition" in factor: default = factor["condition"] # if defaultValue == '': # defaultValue = '0' factor_info = { "id": factorId, "name": factor["type"], "bsm": factorBsm, "conditionInfo": { "spatial_type": conditionObj["spatial_type"], "default": default, "hasValue": conditionObj["hasValue"], "defaultValue": defaultValue, "unit": conditionObj["unit"], "clip": conditionObj["clip"] } } input_factors[factor_info["id"]] = factor_info # 循环遍历模板 # 记录已经添加的因子 ID added_factor_ids = set() # 首先处理模板 for factorTemplate in factorTemplates: factorId = factorTemplate["id"] factorTemplate["conditionInfo"] = json.loads( factorTemplate["conditionInfo"]) res["yxyz"].append(factorTemplate) added_factor_ids.add(factorId) # 记录已添加的因子 ID # 然后检查 input_factors 并添加未在模板中的因子 for factor_id, factor_info in input_factors.items(): if factor_id not in added_factor_ids: res["yxyz"].append(factor_info) resObj = {} resObj["data"] = res resObj["code"] = 200 resObj["type"] = "selectLand" return resObj # 获取因子信息 def getFactorByName(name): with conn.cursor(cursor_factory=DictCursor) as cur: sql = "SELECT * FROM base.t_fzss_fzxz_factor WHERE name = %s" complete_sql = cur.mogrify(sql, (name,)).decode('utf-8') logger.info(f"Executing SQL: {complete_sql}") cur.execute(sql, (name,)) res = cur.fetchone() return res # 获取选址范围信息 def getAiDistrict(name): with conn.cursor(cursor_factory=DictCursor) as cur: sql = "SELECT public.st_asewkt(geom) as geom FROM base.t_fzss_fzxz_ai_district WHERE name = %s" complete_sql = cur.mogrify(sql, (name,)).decode('utf-8') logger.info(f"Executing SQL: {complete_sql}") cur.execute(sql, (name,)) res = cur.fetchone() return res["geom"] # 保存选址范围信息 def saveGeom(ewkt): new_uuid = str(uuid.uuid4()) # 生成一个新的 UUID from_type = 3 with conn.cursor() as cur: sql = "INSERT INTO base.t_fzss_zhxz_file(id,geom,from_type,create_time,area) VALUES (%s,public.st_geomfromewkt(%s),%s,now(),public.st_area(public.st_geomfromewkt(%s)::public.geography))" complete_sql = cur.mogrify( sql, (new_uuid, ewkt, from_type, ewkt)).decode('utf-8') logger.info(f"Executing SQL: {complete_sql}") cur.execute(sql, (new_uuid, ewkt, from_type, ewkt)) conn.commit() return new_uuid # 获取用地类型信息 def getLandType(landName, fzbs): with conn.cursor(cursor_factory=DictCursor) as cur: sql = "SELECT dm,mc,fzbs FROM base.t_fzss_fzxz_dict WHERE mc = %s and fzbs=%s" complete_sql = cur.mogrify(sql, (landName, fzbs)).decode('utf-8') logger.info(f"Executing SQL: {complete_sql}") cur.execute(sql, (landName, fzbs)) res = cur.fetchone() return res["dm"] # 获取内置模板信息 def getTemplateByCode(code): with conn.cursor(cursor_factory=DictCursor) as cur: sql = 'SELECT factor_id as id,factor_name as name,factor_bsm as bsm,condition_info as "conditionInfo" FROM base.t_fzss_fzxz_factor_temp WHERE land_type_code = %s' complete_sql = cur.mogrify(sql, (code,)).decode('utf-8') logger.info(f"Executing SQL: {complete_sql}") cur.execute(sql, (code,)) res = cur.fetchall() # 将查询结果转换为字典列表 result_list = [dict(row) for row in res] return result_list # 简单知识问答,未关联本地知识库 def update_chat_history_simple(user_message): global chat_history # 使用全局变量以便更新 prompt = chat_history + "\\n用户:" + user_message # 生成回复,并加入聊天上下文 res = ollama.generate( model="qwen2.5:7b", stream=False, system=sys_question, prompt=prompt, options={"temperature": 0, "num_ctx": 32000, }, keep_alive=-1 ) # 获取机器人回复 bot_message = res["response"] # 更新聊天历史 chat_history += "\\n智能助手:" + bot_message # 返回机器人的回复 return bot_message def route_query(msg): response = query(msg) # print(response) # if response: # resObj = {} # resObj["data"] = response # resObj["code"] = 200 # resObj["type"] = "answer" # return resObj # return {"error": "Something went wrong"}, 400 return response