import ftplib import os class FtpOper: ftp = ftplib.FTP() def __init__(self): super().__init__() def close(self): self.ftp.quit() # 关闭服务器 # 上传文件 def uploadfile(self, localfile, remotepath): if not os.path.isfile(localfile): print('不是文件') filename = os.path.split(localfile)[-1] # 获取文件名 self.ftp.cwd(remotepath) with open(localfile, 'rb') as file: # 读取文件 self.ftp.storbinary(f'STOR {filename}', file) # 二进制存储 # 上传文件夹 def uploaddir(self, localdir, remotepath): # if not os.path.isdir(localdir): # print('不是文件夹') dirname = os.path.split(localdir)[-1] # 文件夹名称 new_remotepath = remotepath + '/' + dirname # os.path.join(remotepath, dirname) self.makedir(dirname, remotepath, new_remotepath) for localfile in os.listdir(localdir): src = os.path.join(localdir, localfile) if os.path.isfile(src): self.uploadfile(src, new_remotepath) elif os.path.isdir(src): self.uploaddir(src, new_remotepath) # 嵌套的文件夹:{src} self.ftp.cwd('..') # 创建文件夹 def makedir(self, dirname, remotepath, new_remotepath): try: self.ftp.cwd(new_remotepath) print('文件夹已存在') return '文件夹已存在' except ftplib.error_perm: try: self.ftp.cwd(remotepath) self.ftp.mkd(dirname) print(f'文件夹创建成功:{remotepath}/{dirname}') return '文件夹创建成功' except ftplib.error_perm as ex: print(f'创建文件夹失败:{ex}') return '创建文件夹失败' # 下载文件 def downloadfile(self, localfile, remotefile): remotepath, remotefile_name = os.path.split(remotefile) if self.is_exist(remotepath, remotefile_name): # 判断文件是否存在 with open(localfile, 'wb') as file: self.ftp.retrbinary(f'RETR {remotefile}', file.write) print(f'文件下载成功:{localfile}') else: print('文件不存在') # 下载文件夹 def downloaddir(self, localdir, remotepath): if not self.is_exist(remotepath): print('远程文件夹不存在') else: if not os.path.exists(localdir): print(f'创建本地文件夹:{localdir}') os.makedirs(localdir) self.ftp.cwd(remotepath) remotenames = self.ftp.nlst() for file in remotenames: localfile = os.path.join(localdir, file) if file.find('.') == -1: # 此节点下的文件夹 if not os.path.exists(localfile): os.makedirs(localfile) self.downloaddir(localfile, remotepath + '/' + file) # 递归文件夹:{remotepath} else: # 此节点下的文件 self.downloadfile(localfile, file) self.ftp.cwd('..') # 判断文件/文件夹是否存在 def is_exist(self, remotepath, filename=None): try: self.ftp.cwd(remotepath) except ftplib.error_perm: print('远程路径不存在') return False if filename is not None: filelist = self.ftp.nlst() if filename in filelist: print(f'存在该文件{filename}') return True else: print(f'没有该文件{filename}') else: print(f'远程路径存在{remotepath}') return True # 删除文件 def deletfile(self, filename, remotepath): if self.is_exist(remotepath, filename): self.ftp.delete(filename) print('远程文件已删除') else: print('远程文件不存在') # 删除文件夹 def deletedir(self, remotepath): if self.is_exist(remotepath): filelist = self.ftp.nlst() if len(filelist) != 0: for file in filelist: new_remotepath = os.path.join(remotepath, file).replace('\\', '/') if file.find('.') == -1: self.deletedir(new_remotepath) else: self.deletfile(file, remotepath) self.ftp.rmd(remotepath) print('远程文件夹删除成功') else: print('远程文件夹不存在') # 重命名 def rename(self, oldname, newname): try: self.ftp.rename(oldname, newname) return True except ftplib.error_perm: return False