| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- import ftplib
- import os
- import time
- import socket
- class FtpOper:
- def __init__(self):
- super().__init__()
- self.ftp = None # type: ftplib.FTP | None
- self._conn = {
- 'host': None,
- 'port': None,
- 'user': None,
- 'password': None,
- 'timeout': 30,
- 'passive': True,
- }
- def connect(self, host, port=21, user=None, password=None, timeout=30, passive=True):
- self._conn.update({'host': host, 'port': port, 'user': user, 'password': password, 'timeout': timeout, 'passive': passive})
- # 关闭旧连接
- try:
- if self.ftp:
- try:
- self.ftp.quit()
- except Exception:
- try:
- self.ftp.close()
- except Exception:
- pass
- finally:
- self.ftp = None
- # 建立新连接
- ftp = ftplib.FTP(timeout=timeout)
- # 一些FTP在EPSV/PASV上有兼容性问题,先尝试被动
- ftp.connect(host, port, timeout=timeout)
- if user is not None:
- ftp.login(user=user, passwd=password)
- else:
- ftp.login()
- ftp.set_pasv(passive)
- # 避免中文路径问题
- try:
- ftp.encoding = 'utf-8'
- except Exception:
- pass
- self.ftp = ftp
- return self.ftp
- def ensure_connected(self):
- if not self.ftp:
- if not self._conn['host']:
- raise RuntimeError('FTP未初始化连接参数')
- return self.connect(**self._conn)
- try:
- self.ftp.voidcmd('NOOP')
- return self.ftp
- except Exception:
- return self._reconnect()
- def _reconnect(self):
- # 简单重连退避
- last_exc = None
- for delay in (0.2, 0.5, 1.0, 2.0):
- try:
- return self.connect(**self._conn)
- except Exception as e:
- last_exc = e
- time.sleep(delay)
- raise last_exc
- def close(self):
- try:
- if self.ftp:
- try:
- self.ftp.quit() # 关闭服务器
- except Exception:
- self.ftp.close()
- finally:
- self.ftp = None
- def _change_dir(self, remotepath, create=False):
- ftp = self.ensure_connected()
- # 标准化路径分隔符
- path = (remotepath or '/').replace('\\', '/').strip()
- if not path:
- path = '/'
- if path == '/':
- ftp.cwd('/')
- return
- if not create:
- ftp.cwd(path)
- return
- # 递归创建
- if path.startswith('/'):
- ftp.cwd('/')
- parts = [p for p in path.split('/') if p]
- else:
- parts = [p for p in path.split('/') if p]
- for part in parts:
- try:
- ftp.cwd(part)
- except ftplib.error_perm:
- ftp.mkd(part)
- ftp.cwd(part)
- # 上传文件(带重试与自动创建目录)
- def uploadfile(self, localfile, remotepath):
- if not os.path.isfile(localfile):
- raise FileNotFoundError(f'本地文件不存在: {localfile}')
- filename = os.path.split(localfile)[-1]
- attempts = 0
- last_exc = None
- while attempts < 2:
- attempts += 1
- try:
- self._change_dir(remotepath, create=True)
- with open(localfile, 'rb') as file:
- # 使用较小块以降低长连接压力
- self.ftp.storbinary(f'STOR {filename}', file, blocksize=64 * 1024)
- return
- except (ftplib.error_temp, ftplib.error_reply, OSError, socket.timeout) as e:
- last_exc = e
- # 421/超时尝试重连后再试一次
- try:
- self._reconnect()
- except Exception:
- pass
- # 最终失败抛出
- if last_exc:
- raise last_exc
- # 上传文件夹
- def uploaddir(self, localdir, remotepath):
- if not os.path.isdir(localdir):
- raise NotADirectoryError(f'不是文件夹: {localdir}')
- dirname = os.path.split(localdir)[-1]
- new_remotepath = (remotepath.rstrip('/') + '/' + dirname).replace('//', '/')
- # 确保目录存在
- self._change_dir(new_remotepath, create=True)
- for entry in os.listdir(localdir):
- src = os.path.join(localdir, entry)
- if os.path.isfile(src):
- self.uploadfile(src, new_remotepath)
- elif os.path.isdir(src):
- self.uploaddir(src, new_remotepath)
- self.ensure_connected().cwd('..')
- # 创建文件夹(兼容原有调用)
- def makedir(self, dirname, remotepath, new_remotepath):
- try:
- self._change_dir(new_remotepath, create=True)
- print('文件夹已存在或创建成功')
- return '文件夹已存在'
- except ftplib.error_perm as ex:
- print(f'创建文件夹失败:{ex}')
- return '创建文件夹失败'
- # 下载文件
- def downloadfile(self, localfile, remotefile):
- self.ensure_connected()
- remotepath, remotefile_name = os.path.split(remotefile)
- if self.is_exist(remotepath, remotefile_name): # 判断文件是否存在
- with open(localfile, 'wb') as file:
- self.ftp.retrbinary(f'RETR {remotefile_name}', file.write, blocksize=64 * 1024)
- print(f'文件下载成功:{localfile}')
- else:
- print('文件不存在')
- # 下载文件夹
- def downloaddir(self, localdir, remotepath):
- self.ensure_connected()
- if not self.is_exist(remotepath):
- print('远程文件夹不存在')
- else:
- if not os.path.exists(localdir):
- print(f'创建本地文件夹:{localdir}')
- os.makedirs(localdir)
- self.ftp.cwd(remotepath)
- remotenames = self.ftp.nlst()
- for file in remotenames:
- localfile = os.path.join(localdir, file)
- if '.' not in file: # 简单判断文件夹
- if not os.path.exists(localfile):
- os.makedirs(localfile)
- self.downloaddir(localfile, remotepath + '/' + file)
- else:
- self.downloadfile(localfile, remotepath + '/' + file)
- self.ftp.cwd('..')
- # 判断文件/文件夹是否存在
- def is_exist(self, remotepath, filename=None):
- try:
- self._change_dir(remotepath, create=False)
- except ftplib.error_perm:
- print('远程路径不存在')
- return False
- if filename is not None:
- filelist = self.ftp.nlst()
- if filename in filelist:
- print(f'存在该文件{filename}')
- return True
- else:
- print(f'没有该文件{filename}')
- return False
- else:
- print(f'远程路径存在{remotepath}')
- return True
- # 删除文件
- def deletfile(self, filename, remotepath):
- self.ensure_connected()
- if self.is_exist(remotepath, filename):
- self.ftp.delete(filename)
- print('远程文件已删除')
- else:
- print('远程文件不存在')
- # 删除文件夹
- def deletedir(self, remotepath):
- self.ensure_connected()
- if self.is_exist(remotepath):
- filelist = self.ftp.nlst()
- if len(filelist) != 0:
- for file in filelist:
- new_remotepath = os.path.join(remotepath, file).replace('\\', '/')
- if '.' not in file:
- self.deletedir(new_remotepath)
- else:
- self.deletfile(file, remotepath)
- self.ftp.rmd(remotepath)
- print('远程文件夹删除成功')
- else:
- print('远程文件夹不存在')
- # 重命名
- def rename(self, oldname, newname):
- self.ensure_connected()
- try:
- self.ftp.rename(oldname, newname)
- return True
- except ftplib.error_perm:
- return False
|