123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- import ftplib
- import os
- class FtpOper:
- ftp = ftplib.FTP()
- def __init__(self):
- super().__init__()
- def close(self):
- self.ftp.quit() # 关闭服务器
- # 上传文件
- def uploadfile(self, localfile, remotepath):
- if not os.path.isfile(localfile):
- print('不是文件')
- filename = os.path.split(localfile)[-1] # 获取文件名
- self.ftp.cwd(remotepath)
- with open(localfile, 'rb') as file: # 读取文件
- self.ftp.storbinary(f'STOR {filename}', file) # 二进制存储
- # 上传文件夹
- def uploaddir(self, localdir, remotepath):
- # if not os.path.isdir(localdir):
- # print('不是文件夹')
- dirname = os.path.split(localdir)[-1] # 文件夹名称
- new_remotepath = remotepath + '/' + dirname # os.path.join(remotepath, dirname)
- self.makedir(dirname, remotepath, new_remotepath)
- for localfile in os.listdir(localdir):
- src = os.path.join(localdir, localfile)
- if os.path.isfile(src):
- self.uploadfile(src, new_remotepath)
- elif os.path.isdir(src):
- self.uploaddir(src, new_remotepath) # 嵌套的文件夹:{src}
- self.ftp.cwd('..')
- # 创建文件夹
- def makedir(self, dirname, remotepath, new_remotepath):
- try:
- self.ftp.cwd(new_remotepath)
- print('文件夹已存在')
- return '文件夹已存在'
- except ftplib.error_perm:
- try:
- self.ftp.cwd(remotepath)
- self.ftp.mkd(dirname)
- print(f'文件夹创建成功:{remotepath}/{dirname}')
- return '文件夹创建成功'
- except ftplib.error_perm as ex:
- print(f'创建文件夹失败:{ex}')
- return '创建文件夹失败'
- # 下载文件
- def downloadfile(self, localfile, remotefile):
- remotepath, remotefile_name = os.path.split(remotefile)
- if self.is_exist(remotepath, remotefile_name): # 判断文件是否存在
- with open(localfile, 'wb') as file:
- self.ftp.retrbinary(f'RETR {remotefile}', file.write)
- print(f'文件下载成功:{localfile}')
- else:
- print('文件不存在')
- # 下载文件夹
- def downloaddir(self, localdir, remotepath):
- if not self.is_exist(remotepath):
- print('远程文件夹不存在')
- else:
- if not os.path.exists(localdir):
- print(f'创建本地文件夹:{localdir}')
- os.makedirs(localdir)
- self.ftp.cwd(remotepath)
- remotenames = self.ftp.nlst()
- for file in remotenames:
- localfile = os.path.join(localdir, file)
- if file.find('.') == -1: # 此节点下的文件夹
- if not os.path.exists(localfile):
- os.makedirs(localfile)
- self.downloaddir(localfile, remotepath + '/' + file) # 递归文件夹:{remotepath}
- else: # 此节点下的文件
- self.downloadfile(localfile, file)
- self.ftp.cwd('..')
- # 判断文件/文件夹是否存在
- def is_exist(self, remotepath, filename=None):
- try:
- self.ftp.cwd(remotepath)
- except ftplib.error_perm:
- print('远程路径不存在')
- return False
- if filename is not None:
- filelist = self.ftp.nlst()
- if filename in filelist:
- print(f'存在该文件{filename}')
- return True
- else:
- print(f'没有该文件{filename}')
- else:
- print(f'远程路径存在{remotepath}')
- return True
- # 删除文件
- def deletfile(self, filename, remotepath):
- if self.is_exist(remotepath, filename):
- self.ftp.delete(filename)
- print('远程文件已删除')
- else:
- print('远程文件不存在')
- # 删除文件夹
- def deletedir(self, remotepath):
- if self.is_exist(remotepath):
- filelist = self.ftp.nlst()
- if len(filelist) != 0:
- for file in filelist:
- new_remotepath = os.path.join(remotepath, file).replace('\\', '/')
- if file.find('.') == -1:
- self.deletedir(new_remotepath)
- else:
- self.deletfile(file, remotepath)
- self.ftp.rmd(remotepath)
- print('远程文件夹删除成功')
- else:
- print('远程文件夹不存在')
- # 重命名
- def rename(self, oldname, newname):
- try:
- self.ftp.rename(oldname, newname)
- return True
- except ftplib.error_perm:
- return False
|