import ftplib import os import time import socket class FtpOper: def __init__(self): super().__init__() self.ftp = None # type: ftplib.FTP | None self._conn = { 'host': None, 'port': None, 'user': None, 'password': None, 'timeout': 30, 'passive': True, } def connect(self, host, port=21, user=None, password=None, timeout=30, passive=True): self._conn.update({'host': host, 'port': port, 'user': user, 'password': password, 'timeout': timeout, 'passive': passive}) # 关闭旧连接 try: if self.ftp: try: self.ftp.quit() except Exception: try: self.ftp.close() except Exception: pass finally: self.ftp = None # 建立新连接 ftp = ftplib.FTP(timeout=timeout) # 一些FTP在EPSV/PASV上有兼容性问题,先尝试被动 ftp.connect(host, port, timeout=timeout) if user is not None: ftp.login(user=user, passwd=password) else: ftp.login() ftp.set_pasv(passive) # 避免中文路径问题 try: ftp.encoding = 'utf-8' except Exception: pass self.ftp = ftp return self.ftp def ensure_connected(self): if not self.ftp: if not self._conn['host']: raise RuntimeError('FTP未初始化连接参数') return self.connect(**self._conn) try: self.ftp.voidcmd('NOOP') return self.ftp except Exception: return self._reconnect() def _reconnect(self): # 简单重连退避 last_exc = None for delay in (0.2, 0.5, 1.0, 2.0): try: return self.connect(**self._conn) except Exception as e: last_exc = e time.sleep(delay) raise last_exc def close(self): try: if self.ftp: try: self.ftp.quit() # 关闭服务器 except Exception: self.ftp.close() finally: self.ftp = None def _change_dir(self, remotepath, create=False): ftp = self.ensure_connected() # 标准化路径分隔符 path = (remotepath or '/').replace('\\', '/').strip() if not path: path = '/' if path == '/': ftp.cwd('/') return if not create: ftp.cwd(path) return # 递归创建 if path.startswith('/'): ftp.cwd('/') parts = [p for p in path.split('/') if p] else: parts = [p for p in path.split('/') if p] for part in parts: try: ftp.cwd(part) except ftplib.error_perm: ftp.mkd(part) ftp.cwd(part) # 上传文件(带重试与自动创建目录) def uploadfile(self, localfile, remotepath): if not os.path.isfile(localfile): raise FileNotFoundError(f'本地文件不存在: {localfile}') filename = os.path.split(localfile)[-1] attempts = 0 last_exc = None while attempts < 2: attempts += 1 try: self._change_dir(remotepath, create=True) with open(localfile, 'rb') as file: # 使用较小块以降低长连接压力 self.ftp.storbinary(f'STOR {filename}', file, blocksize=64 * 1024) return except (ftplib.error_temp, ftplib.error_reply, OSError, socket.timeout) as e: last_exc = e # 421/超时尝试重连后再试一次 try: self._reconnect() except Exception: pass # 最终失败抛出 if last_exc: raise last_exc # 上传文件夹 def uploaddir(self, localdir, remotepath): if not os.path.isdir(localdir): raise NotADirectoryError(f'不是文件夹: {localdir}') dirname = os.path.split(localdir)[-1] new_remotepath = (remotepath.rstrip('/') + '/' + dirname).replace('//', '/') # 确保目录存在 self._change_dir(new_remotepath, create=True) for entry in os.listdir(localdir): src = os.path.join(localdir, entry) if os.path.isfile(src): self.uploadfile(src, new_remotepath) elif os.path.isdir(src): self.uploaddir(src, new_remotepath) self.ensure_connected().cwd('..') # 创建文件夹(兼容原有调用) def makedir(self, dirname, remotepath, new_remotepath): try: self._change_dir(new_remotepath, create=True) print('文件夹已存在或创建成功') return '文件夹已存在' except ftplib.error_perm as ex: print(f'创建文件夹失败:{ex}') return '创建文件夹失败' # 下载文件 def downloadfile(self, localfile, remotefile): self.ensure_connected() remotepath, remotefile_name = os.path.split(remotefile) if self.is_exist(remotepath, remotefile_name): # 判断文件是否存在 with open(localfile, 'wb') as file: self.ftp.retrbinary(f'RETR {remotefile_name}', file.write, blocksize=64 * 1024) print(f'文件下载成功:{localfile}') else: print('文件不存在') # 下载文件夹 def downloaddir(self, localdir, remotepath): self.ensure_connected() if not self.is_exist(remotepath): print('远程文件夹不存在') else: if not os.path.exists(localdir): print(f'创建本地文件夹:{localdir}') os.makedirs(localdir) self.ftp.cwd(remotepath) remotenames = self.ftp.nlst() for file in remotenames: localfile = os.path.join(localdir, file) if '.' not in file: # 简单判断文件夹 if not os.path.exists(localfile): os.makedirs(localfile) self.downloaddir(localfile, remotepath + '/' + file) else: self.downloadfile(localfile, remotepath + '/' + file) self.ftp.cwd('..') # 判断文件/文件夹是否存在 def is_exist(self, remotepath, filename=None): try: self._change_dir(remotepath, create=False) except ftplib.error_perm: print('远程路径不存在') return False if filename is not None: filelist = self.ftp.nlst() if filename in filelist: print(f'存在该文件{filename}') return True else: print(f'没有该文件{filename}') return False else: print(f'远程路径存在{remotepath}') return True # 删除文件 def deletfile(self, filename, remotepath): self.ensure_connected() if self.is_exist(remotepath, filename): self.ftp.delete(filename) print('远程文件已删除') else: print('远程文件不存在') # 删除文件夹 def deletedir(self, remotepath): self.ensure_connected() if self.is_exist(remotepath): filelist = self.ftp.nlst() if len(filelist) != 0: for file in filelist: new_remotepath = os.path.join(remotepath, file).replace('\\', '/') if '.' not in file: self.deletedir(new_remotepath) else: self.deletfile(file, remotepath) self.ftp.rmd(remotepath) print('远程文件夹删除成功') else: print('远程文件夹不存在') # 重命名 def rename(self, oldname, newname): self.ensure_connected() try: self.ftp.rename(oldname, newname) return True except ftplib.error_perm: return False