123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410 |
- from __future__ import absolute_import
- import sys
- import re
- import fnmatch
- import logging
- import os
- import shutil
- import warnings
- import zipfile
- from pip.utils import display_path, backup_dir, rmtree
- from pip.utils.deprecation import RemovedInPip7Warning
- from pip.utils.logging import indent_log
- from pip.exceptions import InstallationError
- from pip.basecommand import Command
- logger = logging.getLogger(__name__)
- class ZipCommand(Command):
- """Zip individual packages."""
- name = 'zip'
- usage = """
- %prog [options] <package> ..."""
- summary = 'DEPRECATED. Zip individual packages.'
- def __init__(self, *args, **kw):
- super(ZipCommand, self).__init__(*args, **kw)
- if self.name == 'zip':
- self.cmd_opts.add_option(
- '--unzip',
- action='store_true',
- dest='unzip',
- help='Unzip (rather than zip) a package.')
- else:
- self.cmd_opts.add_option(
- '--zip',
- action='store_false',
- dest='unzip',
- default=True,
- help='Zip (rather than unzip) a package.')
- self.cmd_opts.add_option(
- '--no-pyc',
- action='store_true',
- dest='no_pyc',
- help=(
- 'Do not include .pyc files in zip files (useful on Google App '
- 'Engine).'),
- )
- self.cmd_opts.add_option(
- '-l', '--list',
- action='store_true',
- dest='list',
- help='List the packages available, and their zip status.')
- self.cmd_opts.add_option(
- '--sort-files',
- action='store_true',
- dest='sort_files',
- help=('With --list, sort packages according to how many files they'
- ' contain.'),
- )
- self.cmd_opts.add_option(
- '--path',
- action='append',
- dest='paths',
- help=('Restrict operations to the given paths (may include '
- 'wildcards).'),
- )
- self.cmd_opts.add_option(
- '-n', '--simulate',
- action='store_true',
- help='Do not actually perform the zip/unzip operation.')
- self.parser.insert_option_group(0, self.cmd_opts)
- def paths(self):
- """All the entries of sys.path, possibly restricted by --path"""
- if not self.select_paths:
- return sys.path
- result = []
- match_any = set()
- for path in sys.path:
- path = os.path.normcase(os.path.abspath(path))
- for match in self.select_paths:
- match = os.path.normcase(os.path.abspath(match))
- if '*' in match:
- if re.search(fnmatch.translate(match + '*'), path):
- result.append(path)
- match_any.add(match)
- break
- else:
- if path.startswith(match):
- result.append(path)
- match_any.add(match)
- break
- else:
- logger.debug(
- "Skipping path %s because it doesn't match %s",
- path,
- ', '.join(self.select_paths),
- )
- for match in self.select_paths:
- if match not in match_any and '*' not in match:
- result.append(match)
- logger.debug(
- "Adding path %s because it doesn't match "
- "anything already on sys.path",
- match,
- )
- return result
- def run(self, options, args):
- warnings.warn(
- "'pip zip' and 'pip unzip` are deprecated, and will be removed in "
- "a future release.",
- RemovedInPip7Warning,
- )
- self.select_paths = options.paths
- self.simulate = options.simulate
- if options.list:
- return self.list(options, args)
- if not args:
- raise InstallationError(
- 'You must give at least one package to zip or unzip')
- packages = []
- for arg in args:
- module_name, filename = self.find_package(arg)
- if options.unzip and os.path.isdir(filename):
- raise InstallationError(
- 'The module %s (in %s) is not a zip file; cannot be '
- 'unzipped' % (module_name, filename)
- )
- elif not options.unzip and not os.path.isdir(filename):
- raise InstallationError(
- 'The module %s (in %s) is not a directory; cannot be '
- 'zipped' % (module_name, filename)
- )
- packages.append((module_name, filename))
- last_status = None
- for module_name, filename in packages:
- if options.unzip:
- last_status = self.unzip_package(module_name, filename)
- else:
- last_status = self.zip_package(
- module_name, filename, options.no_pyc
- )
- return last_status
- def unzip_package(self, module_name, filename):
- zip_filename = os.path.dirname(filename)
- if (not os.path.isfile(zip_filename) and
- zipfile.is_zipfile(zip_filename)):
- raise InstallationError(
- 'Module %s (in %s) isn\'t located in a zip file in %s'
- % (module_name, filename, zip_filename))
- package_path = os.path.dirname(zip_filename)
- if package_path not in self.paths():
- logger.warning(
- 'Unpacking %s into %s, but %s is not on sys.path',
- display_path(zip_filename),
- display_path(package_path),
- display_path(package_path),
- )
- logger.info(
- 'Unzipping %s (in %s)', module_name, display_path(zip_filename),
- )
- if self.simulate:
- logger.info(
- 'Skipping remaining operations because of --simulate'
- )
- return
- with indent_log():
- # FIXME: this should be undoable:
- zip = zipfile.ZipFile(zip_filename)
- to_save = []
- for info in zip.infolist():
- name = info.filename
- if name.startswith(module_name + os.path.sep):
- content = zip.read(name)
- dest = os.path.join(package_path, name)
- if not os.path.exists(os.path.dirname(dest)):
- os.makedirs(os.path.dirname(dest))
- if not content and dest.endswith(os.path.sep):
- if not os.path.exists(dest):
- os.makedirs(dest)
- else:
- with open(dest, 'wb') as f:
- f.write(content)
- else:
- to_save.append((name, zip.read(name)))
- zip.close()
- if not to_save:
- logger.debug(
- 'Removing now-empty zip file %s',
- display_path(zip_filename)
- )
- os.unlink(zip_filename)
- self.remove_filename_from_pth(zip_filename)
- else:
- logger.debug(
- 'Removing entries in %s/ from zip file %s',
- module_name,
- display_path(zip_filename),
- )
- zip = zipfile.ZipFile(zip_filename, 'w')
- for name, content in to_save:
- zip.writestr(name, content)
- zip.close()
- def zip_package(self, module_name, filename, no_pyc):
- logger.info('Zip %s (in %s)', module_name, display_path(filename))
- orig_filename = filename
- if filename.endswith('.egg'):
- dest_filename = filename
- else:
- dest_filename = filename + '.zip'
- with indent_log():
- # FIXME: I think this needs to be undoable:
- if filename == dest_filename:
- filename = backup_dir(orig_filename)
- logger.info(
- 'Moving %s aside to %s', orig_filename, filename,
- )
- if not self.simulate:
- shutil.move(orig_filename, filename)
- try:
- logger.debug(
- 'Creating zip file in %s', display_path(dest_filename),
- )
- if not self.simulate:
- zip = zipfile.ZipFile(dest_filename, 'w')
- zip.writestr(module_name + '/', '')
- for dirpath, dirnames, filenames in os.walk(filename):
- if no_pyc:
- filenames = [f for f in filenames
- if not f.lower().endswith('.pyc')]
- for fns, is_dir in [
- (dirnames, True), (filenames, False)]:
- for fn in fns:
- full = os.path.join(dirpath, fn)
- dest = os.path.join(
- module_name,
- dirpath[len(filename):].lstrip(
- os.path.sep
- ),
- fn,
- )
- if is_dir:
- zip.writestr(dest + '/', '')
- else:
- zip.write(full, dest)
- zip.close()
- logger.debug(
- 'Removing old directory %s', display_path(filename),
- )
- if not self.simulate:
- rmtree(filename)
- except:
- # FIXME: need to do an undo here
- raise
- # FIXME: should also be undone:
- self.add_filename_to_pth(dest_filename)
- def remove_filename_from_pth(self, filename):
- for pth in self.pth_files():
- with open(pth, 'r') as f:
- lines = f.readlines()
- new_lines = [
- l for l in lines if l.strip() != filename]
- if lines != new_lines:
- logger.debug(
- 'Removing reference to %s from .pth file %s',
- display_path(filename),
- display_path(pth),
- )
- if not [line for line in new_lines if line]:
- logger.debug(
- '%s file would be empty: deleting', display_path(pth)
- )
- if not self.simulate:
- os.unlink(pth)
- else:
- if not self.simulate:
- with open(pth, 'wb') as f:
- f.writelines(new_lines)
- return
- logger.warning(
- 'Cannot find a reference to %s in any .pth file',
- display_path(filename),
- )
- def add_filename_to_pth(self, filename):
- path = os.path.dirname(filename)
- dest = filename + '.pth'
- if path not in self.paths():
- logger.warning(
- 'Adding .pth file %s, but it is not on sys.path',
- display_path(dest),
- )
- if not self.simulate:
- if os.path.exists(dest):
- with open(dest) as f:
- lines = f.readlines()
- if lines and not lines[-1].endswith('\n'):
- lines[-1] += '\n'
- lines.append(filename + '\n')
- else:
- lines = [filename + '\n']
- with open(dest, 'wb') as f:
- f.writelines(lines)
- def pth_files(self):
- for path in self.paths():
- if not os.path.exists(path) or not os.path.isdir(path):
- continue
- for filename in os.listdir(path):
- if filename.endswith('.pth'):
- yield os.path.join(path, filename)
- def find_package(self, package):
- for path in self.paths():
- full = os.path.join(path, package)
- if os.path.exists(full):
- return package, full
- if not os.path.isdir(path) and zipfile.is_zipfile(path):
- zip = zipfile.ZipFile(path, 'r')
- try:
- zip.read(os.path.join(package, '__init__.py'))
- except KeyError:
- pass
- else:
- zip.close()
- return package, full
- zip.close()
- # FIXME: need special error for package.py case:
- raise InstallationError(
- 'No package with the name %s found' % package)
- def list(self, options, args):
- if args:
- raise InstallationError(
- 'You cannot give an argument with --list')
- for path in sorted(self.paths()):
- if not os.path.exists(path):
- continue
- basename = os.path.basename(path.rstrip(os.path.sep))
- if os.path.isfile(path) and zipfile.is_zipfile(path):
- if os.path.dirname(path) not in self.paths():
- logger.info('Zipped egg: %s', display_path(path))
- continue
- if (basename != 'site-packages' and
- basename != 'dist-packages' and not
- path.replace('\\', '/').endswith('lib/python')):
- continue
- logger.info('In %s:', display_path(path))
- with indent_log():
- zipped = []
- unzipped = []
- for filename in sorted(os.listdir(path)):
- ext = os.path.splitext(filename)[1].lower()
- if ext in ('.pth', '.egg-info', '.egg-link'):
- continue
- if ext == '.py':
- logger.debug(
- 'Not displaying %s: not a package',
- display_path(filename)
- )
- continue
- full = os.path.join(path, filename)
- if os.path.isdir(full):
- unzipped.append((filename, self.count_package(full)))
- elif zipfile.is_zipfile(full):
- zipped.append(filename)
- else:
- logger.debug(
- 'Unknown file: %s', display_path(filename),
- )
- if zipped:
- logger.info('Zipped packages:')
- with indent_log():
- for filename in zipped:
- logger.info(filename)
- else:
- logger.info('No zipped packages.')
- if unzipped:
- if options.sort_files:
- unzipped.sort(key=lambda x: -x[1])
- logger.info('Unzipped packages:')
- with indent_log():
- for filename, count in unzipped:
- logger.info('%s (%i files)', filename, count)
- else:
- logger.info('No unzipped packages.')
- def count_package(self, path):
- total = 0
- for dirpath, dirnames, filenames in os.walk(path):
- filenames = [f for f in filenames
- if not f.lower().endswith('.pyc')]
- total += len(filenames)
- return total
|