123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368 |
- __author__ = 'liying'
- __date__ = 'May 2025'
- __copyright__ = '(C) 2025, liying'
- import os
- import time
- import psycopg2
- import matplotlib.pyplot as plt
- from matplotlib import font_manager
- import numpy as np
- import siwei_config
- from PyQt5.QtWidgets import QDialog, QVBoxLayout
- from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
- from qgis.core import (
- QgsProcessingAlgorithm,
- QgsProcessingParameterProviderConnection,
- QgsProcessingParameterDatabaseSchema,
- QgsProcessingParameterString,
- QgsProcessingParameterFile,
- QgsProcessingParameterBoolean
- )
- from qgis.utils import iface
- # 设置字体以支持中文
- def set_matplotlib_font():
- try:
- font_path = 'C:/Windows/Fonts/msyh.ttc'
- prop = font_manager.FontProperties(fname=font_path)
- plt.rcParams['font.family'] = prop.get_name()
- except Exception as e:
- print(f"字体设置失败:{e}")
- class DataStorageStatistics(QgsProcessingAlgorithm):
- DATABASE = 'DATABASE'
- SCHEMA = 'SCHEMA'
- TABLE = 'TABLE'
- EXPORT_DIR = 'EXPORT_DIR'
- EXPORT_CHARTS = 'EXPORT_CHARTS'
- def initAlgorithm(self, config=None):
- self.addParameter(QgsProcessingParameterProviderConnection(
- self.DATABASE, '数据库连接', 'postgres', defaultValue=siwei_config.CONFIG['db']['host']))
- self.addParameter(QgsProcessingParameterDatabaseSchema(
- self.SCHEMA, '模式', connectionParameterName=self.DATABASE, defaultValue='base'))
- self.addParameter(QgsProcessingParameterString(
- self.TABLE, '表名', defaultValue='t_vector_storage'))
- self.addParameter(QgsProcessingParameterFile(
- self.EXPORT_DIR, '图表导出目录', behavior=QgsProcessingParameterFile.Folder, optional=True))
- self.addParameter(QgsProcessingParameterBoolean(
- self.EXPORT_CHARTS, '是否导出图表', defaultValue=True))
- def processAlgorithm(self, parameters, context, feedback):
- connection_params = {
- 'host': siwei_config.CONFIG['db']['host'],
- 'port': siwei_config.CONFIG['db']['port'],
- 'dbname': siwei_config.CONFIG['db']['name'],
- 'user': siwei_config.CONFIG['db']['user'],
- 'password': siwei_config.CONFIG['db']['password'],
- "connect_timeout": 10,
- }
- schema = self.parameterAsString(parameters, self.SCHEMA, context)
- table = self.parameterAsString(parameters, self.TABLE, context)
- full_table = f'"{schema}"."{table}"'
- export_dir = self.parameterAsString(parameters, self.EXPORT_DIR, context) or os.path.expanduser(
- '~/qgis_stat_exports')
- os.makedirs(export_dir, exist_ok=True)
- export_charts = self.parameterAsBoolean(parameters, self.EXPORT_CHARTS, context)
- conn = psycopg2.connect(**connection_params)
- conn.autocommit = True
- cursor = conn.cursor()
- fields = ['ywlx', 'sjlx', 'glbm']
- stat_results = {}
- for field in fields:
- feedback.pushInfo(f"正在统计字段:{field}")
- cursor.execute(f"""
- SELECT {field}, COUNT(*) FROM {full_table}
- WHERE {field} IS NOT NULL
- GROUP BY {field}
- ORDER BY COUNT(*) DESC
- """)
- data = cursor.fetchall()
- stat_results[field] = data
- for row in data:
- feedback.pushInfo(f"{row[0]}: {row[1]}")
- feedback.pushInfo("正在统计数据文件大小...")
- size_map = {'vector': 0, 'raster': 0, 'table': 0, 'osgb': 0}
- found_tables = {'vector': 0, 'raster': 0, 'table': 0, 'osgb': 0}
- missing_tables = {'vector': 0, 'raster': 0, 'table': 0, 'osgb': 0}
- # 特别处理 osgb 类型的数据
- feedback.pushInfo("正在处理 osgb 类型数据...")
- cursor.execute(f"""
- SELECT id, sjywz
- FROM {full_table}
- WHERE sjlx = 'osgb' AND sjywz IS NOT NULL
- """)
- osgb_rows = cursor.fetchall()
- feedback.pushInfo(f"找到 {len(osgb_rows)} 条 osgb 数据记录")
- # 为 osgb 类型计算总大小
- total_osgb_size = 0
- osgb_files_count = 0
- missing_osgb_files = 0
- osgb_file_sizes = [] # 用于统计各个文件的大小
- for osgb_id, sjywz in osgb_rows:
- if not sjywz:
- missing_osgb_files += 1
- feedback.pushWarning(f"ID为 {osgb_id} 的 osgb 记录 sjywz 字段为空")
- continue
- try:
- # 检查文件路径是否存在
- if not os.path.exists(sjywz):
- missing_osgb_files += 1
- feedback.pushWarning(f"找不到 osgb 文件路径: {sjywz}")
- continue
- # 如果 sjywz 是文件夹,则计算文件夹总大小
- if os.path.isdir(sjywz):
- folder_size = 0
- for dirpath, dirnames, filenames in os.walk(sjywz):
- for filename in filenames:
- file_path = os.path.join(dirpath, filename)
- if os.path.exists(file_path):
- folder_size += os.path.getsize(file_path)
- file_size = folder_size
- feedback.pushInfo(f"OSGB文件夹 {sjywz} 总大小: {round(file_size / 1024 / 1024, 2)} MB")
- else:
- # 如果 sjywz 是文件,则直接获取文件大小
- file_size = os.path.getsize(sjywz)
- feedback.pushInfo(f"OSGB文件 {sjywz} 大小: {round(file_size / 1024 / 1024, 2)} MB")
- total_osgb_size += file_size
- osgb_files_count += 1
- osgb_file_sizes.append((sjywz, file_size))
- except Exception as e:
- missing_osgb_files += 1
- feedback.pushWarning(f"读取 osgb 文件 {sjywz} 失败: {str(e)}")
- # 更新 osgb 类型的统计信息
- size_map['osgb'] = total_osgb_size
- found_tables['osgb'] = osgb_files_count
- missing_tables['osgb'] = missing_osgb_files
- # 输出 osgb 统计结果
- feedback.pushInfo(f"\nOSGB 文件统计结果:")
- feedback.pushInfo(f"总文件数: {osgb_files_count}")
- feedback.pushInfo(f"缺失文件数: {missing_osgb_files}")
- feedback.pushInfo(f"总大小: {round(total_osgb_size / 1024 / 1024, 2)} MB")
- # 输出最大的几个 osgb 文件
- if osgb_file_sizes:
- osgb_file_sizes.sort(key=lambda x: x[1], reverse=True)
- feedback.pushInfo("\n最大的5个 OSGB 文件:")
- for i, (file_path, size) in enumerate(osgb_file_sizes[:5], 1):
- feedback.pushInfo(f"{i}. {file_path}: {round(size / 1024 / 1024, 2)} MB")
- # 处理其他数据类型
- cursor.execute(f"""
- SELECT sjlx, name, sjywz
- FROM {full_table}
- WHERE sjlx IN ('vector', 'raster', 'table') AND sjlx != 'osgb'
- """)
- rows = cursor.fetchall()
- for sjlx, name, sjywz in rows:
- try:
- if sjlx in ['vector', 'raster']:
- if not name:
- continue
- table_name_candidates = self.get_table_name_candidates(name)
- else: # sjlx == 'table'
- if not sjywz:
- continue
- table_name_candidates = self.get_table_name_candidates(sjywz)
- size_found = False
- for full_target in table_name_candidates:
- try:
- cursor.execute("SELECT pg_total_relation_size(%s);", (full_target,))
- size = cursor.fetchone()
- if size and size[0]:
- size_map[sjlx] += size[0]
- found_tables[sjlx] += 1
- size_found = True
- break
- except Exception:
- continue
- if not size_found:
- missing_tables[sjlx] += 1
- feedback.pushWarning(f"无法找到表:{name if sjlx in ['vector', 'raster'] else sjywz}")
- except Exception as e:
- feedback.pushInfo(f"处理表时出错:{name if sjlx in ['vector', 'raster'] else sjywz},错误信息:{str(e)}")
- feedback.pushInfo("\n表大小统计结果:")
- data_type_names = {
- 'vector': '矢量数据',
- 'raster': '栅格数据',
- 'table': '附件资料',
- 'osgb': '三维数据'
- }
- for sjlx in size_map:
- feedback.pushInfo(
- f"{data_type_names[sjlx]}: 找到 {found_tables[sjlx]} 个表/文件,缺失 {missing_tables[sjlx]} 个,总大小 {round(size_map[sjlx] / 1024 / 1024, 2)} MB")
- conn.close()
- # 图表绘制
- set_matplotlib_font()
- timestamp = time.strftime("%Y%m%d_%H%M%S")
- fig, axes = plt.subplots(2, 2, figsize=(14, 12))
- field_names = {'ywlx': '业务类型', 'sjlx': '数据类型', 'glbm': '管理部门'}
- for idx, field in enumerate(fields):
- data = stat_results.get(field, [])
- labels = [str(r[0]) for r in data]
- sizes = [r[1] for r in data]
- ax = axes[idx // 2][idx % 2]
- if field == 'ywlx':
- self.plot_donut(ax, labels, sizes, f"{field_names[field]} 分布")
- elif field == 'glbm':
- self.plot_line(ax, labels, sizes, f"{field_names[field]} 分布")
- else:
- self.plot_pie(ax, labels, sizes, f"{field_names[field]} 分布")
- size_labels = ['矢量数据', '栅格数据', '附件资料', '三维数据']
- original_keys = ['vector', 'raster', 'table', 'osgb']
- size_values = [round(size_map[key] / 1024 / 1024, 2) for key in original_keys]
- self.plot_bar(axes[1][1], size_labels, size_values, "数据类型大小 (MB)")
- plt.suptitle("数据字段分布与存储大小统计", fontsize=16)
- plt.tight_layout(rect=[0, 0.03, 1, 0.95])
- if export_charts:
- png_path = os.path.join(export_dir, f"data_storage_stats_{timestamp}.png")
- plt.savefig(png_path, dpi=300)
- feedback.pushInfo(f"图表已保存至:{png_path}")
- self.show_matplotlib_dialog(fig)
- return {'结果': '统计完成'}
- def get_table_name_candidates(self, table_ref):
- candidates = [table_ref]
- if '.' in table_ref:
- parts = table_ref.split('.')
- if len(parts) == 2:
- candidates.extend([
- f'"{parts[0]}"."{parts[1]}"',
- f'{parts[0]}."{parts[1]}"',
- f'"{parts[0]}".{parts[1]}'
- ])
- if '_' in table_ref:
- candidates.append(table_ref.split('_', 1)[1])
- if table_ref.startswith('hhht_'):
- candidates.append(table_ref[5:])
- return list(set(candidates))
- def plot_donut(self, ax, labels, sizes, title):
- if not sizes or sum(sizes) == 0:
- ax.axis('off')
- ax.set_title(f"{title}\n(无数据)")
- return
- if len(labels) > 6:
- others_sum = sum(sizes[5:])
- labels = labels[:5] + ["其他"]
- sizes = sizes[:5] + [others_sum]
- wedges, texts, autotexts = ax.pie(
- sizes, labels=labels, autopct='%1.1f%%', startangle=90,
- wedgeprops=dict(width=0.4), textprops={'fontsize': 9})
- for i, autotext in enumerate(autotexts):
- autotext.set_text(f'{sizes[i]}')
- ax.set_title(title)
- ax.axis('equal')
- ax.legend(wedges, [f'{l}: {s}' for l, s in zip(labels, sizes)], loc="best", fontsize=8)
- def plot_pie(self, ax, labels, sizes, title):
- if not sizes or sum(sizes) == 0:
- ax.axis('off')
- ax.set_title(f"{title}\n(无数据)")
- return
- if len(labels) > 6:
- others_sum = sum(sizes[5:])
- labels = labels[:5] + ["其他"]
- sizes = sizes[:5] + [others_sum]
- wedges, texts, autotexts = ax.pie(
- sizes, labels=labels, autopct='%1.1f%%', startangle=90,
- textprops={'fontsize': 9})
- for i, autotext in enumerate(autotexts):
- autotext.set_text(f'{sizes[i]}')
- ax.set_title(title)
- ax.axis('equal')
- ax.legend(wedges, [f'{l}: {s}' for l, s in zip(labels, sizes)], loc="best", fontsize=8)
- def plot_line(self, ax, labels, sizes, title):
- if not sizes:
- ax.axis('off')
- ax.set_title(f"{title}\n(无数据)")
- return
- ax.plot(labels, sizes, marker='o', linestyle='-', color='teal')
- ax.set_title(title)
- ax.set_xlabel("管理部门")
- ax.set_ylabel("数量")
- for i, (x, y) in enumerate(zip(labels, sizes)):
- ax.annotate(f'{y}', (x, y), textcoords="offset points", xytext=(0, 5), ha='center', fontsize=9)
- ax.tick_params(axis='x', rotation=30)
- def plot_bar(self, ax, labels, sizes, title):
- if not sizes or sum(sizes) == 0:
- ax.axis('off')
- ax.set_title(f"{title}\n(无数据)")
- return
- bars = ax.bar(labels, sizes, color='cornflowerblue')
- ax.set_title(title)
- ax.set_xlabel('数据类型')
- ax.set_ylabel('大小 (MB)')
- for bar in bars:
- height = bar.get_height()
- ax.annotate(f'{height:.2f} MB',
- xy=(bar.get_x() + bar.get_width() / 2, height),
- xytext=(0, 3),
- textcoords="offset points",
- ha='center', va='bottom', fontsize=9)
- def show_matplotlib_dialog(self, fig):
- dialog = QDialog(iface.mainWindow())
- dialog.setWindowTitle("统计图表")
- layout = QVBoxLayout()
- canvas = FigureCanvas(fig)
- layout.addWidget(canvas)
- dialog.setLayout(layout)
- dialog.resize(1200, 900)
- dialog.exec_()
- def name(self):
- return 'DataStorageStatistics'
- def displayName(self):
- return '数据统计'
- def group(self):
- return '数据分析工具'
- def groupId(self):
- return 'data_storage_analysis'
- def createInstance(self):
- return DataStorageStatistics()
|