#! /usr/bin/env python #-*- coding: utf-8 -*- import sys import os import optparse import subprocess class Messages: need_param_invalid = 'The arguments is insufficient.' param_not_supplied = '{0} option not supplied.' param_not_support = 'not support parameter.' class TransparentOptionParser(optparse.OptionParser): # over ride on arguments from 'rargs' 'values' consuming. def _process_args(self, largs, rargs, values): """_process_args(largs : [string], rargs : [string], values : Values) Process command-line arguments and populate 'values', consuming options and arguments from 'rargs'. If 'allow_interspersed_args' is false, stop at the first non-option argument. If true, accumulate any interspersed non-option arguments in 'largs'. """ def _through_option(func, *args, **kwds): """new function""" try: func(*args, **kwds) except optparse.BadOptionError, err: largs.append(err.opt_str) while rargs: arg = rargs[0] # We handle bare "--" explicitly, and bare "-" is handled by the # standard arg handler since the short arg case ensures that the # len of the opt string is greater than 1. if arg == "--": del rargs[0] return elif arg[0:2] == "--": # process a single long option (possibly with value(s)) self._process_long_opt(rargs, values) elif arg[:1] == "-" and len(arg) > 1: # process a cluster of short options (possibly with # value(s) for the last one only) _through_option(self._process_short_opts, rargs, values) # modified elif self.allow_interspersed_args: largs.append(arg) del rargs[0] else: return # stop now, leave this arg in rargs # Say this is the original argument list: # [arg0, arg1, ..., arg(i-1), arg(i), arg(i+1), ..., arg(N-1)] # ^ # (we are about to process arg(i)). # # Then rargs is [arg(i), ..., arg(N-1)] and largs is a *subset* of # [arg0, ..., arg(i-1)] (any options and their arguments will have # been removed from largs). # # The while loop will usually consume 1 or more arguments per pass. # If it consumes 1 (eg. arg is an option that takes no arguments), # then after _process_arg() is done the situation is: # # largs = subset of [arg0, ..., arg(i)] # rargs = [arg(i+1), ..., arg(N-1)] # # If allow_interspersed_args is false, largs will always be # *empty* -- still a subset of [arg0, ..., arg(i-1)], but # not a very interesting subset! def parse_args(self, args=None, values=None): if args is not None: args = list(args) return optparse.OptionParser.parse_args(self, args, values) class SubOptionParser(TransparentOptionParser): def parse_args(self, args, need=0, ignore=[]): args_org = args if ignore: args = filter(lambda arg: arg not in ignore, args) opts, args = TransparentOptionParser.parse_args(self, args_org) if ignore: args += filter(lambda arg: arg not in ignore, args_org) if need is None: return opts, args else: need = int(need) # raise TypeError try: if need == 1: need_arg = args.pop(0) # raise IndexError return opts, args, need_arg elif need > 1: need_arg = [args.pop(0) for ii in range(need)] # raise IndexError return opts, args, need_arg else: raise IndexError, Messages.need_param_invalid except IndexError, err: self.error(msg(Messages.need_param_invalid, need)) def requires(self, *opts): for opt in opts: self.check_required(opt) def check_required(self, opt): option = self.get_option(opt) # assumes the option's 'default' is set to None ! if getattr(self.values, option.dest): self.error(param_not_supplied.format(option)) class DummyChild(object): def wait(self): return 0 def call(*args, **kwds): args = list(args) args = map(str, args) try: env = os.environ['SUBCOMMAND'] except KeyError, err: env = '' env = env.lower().split(':') if 'noexec' in env: child = DummyChild() else: # execute child = subprocess.Popen(args) try: nowait = kwds['nowait'] except KeyError: nowait = False if nowait: return child else: return child.wait() class SubCommand(object): def _usage(self, *args, **kwds): funcs = dir(self) funcs = map(lambda x: x.lower(), funcs) funcs = filter(lambda x: x[0] != '_' or x == 'call', funcs) usage = '{0} subcommand [options]\n\nsubcommand list\n\n'.format(self.__class__.__name__.lower()) for name in funcs: usage += ' * {0}\n'.format(name) return usage def _call(self, *args, **kwds): parser = SubOptionParser(usage=self._usage()) opts, args, subcmd = parser.parse_args(args, need=1, ignore=['-h', '--help']) name = subcmd.lower() attrs = dir(self) subcmds = filter(lambda attr: attr.lower() == name, attrs) length = len(subcmds) if length < 1: parser.error(param_not_support, name) assert length == 1, str(length) name = subcmds[0] try: class_or_func = getattr(self, name) except AttributeError, err: parser.print_usage() assert False, str(err) if hasattr(class_or_func, '_call'): # class subcmd = class_or_func() return subcmd._call(*args, **kwds) else: return class_or_func(*argrs, **kwds) def msg(*args): args = map(str, args) return ': '.join(args) def dispatch(subcommands, *args, **kwds): subcommands = dict([(subcmd.__name__.lower(), subcmd) for subcmd in subcommands]) args = list(args) name = args.pop(0) # raise IndexError name = name.lower() cmd_class = subcommands[name] # raise KeyError cmd = cmd_class() return cmd._call(*args, **kwds) def test(): class TESTA(SubCommand): class TESTAA(SubCommand): def _call(self, *args, **kwds): call('python', '-c', 'print "{0}"'.format(self.__class__.__name__)) class TESTAB(SubCommand): def _call(self, *args, **kwds): call('python', '-c', 'print "{0}"'.format(self.__class__.__name__)) class TESTAC(SubCommand): def _call(self, *args, **kwds): call('python', '-c', 'print "{0}"'.format(self.__class__.__name__)) class TESTB(SubCommand): class TESTBA(SubCommand): def _call(self, *args, **kwds): call('python', '-c', 'print "{0}"'.format(self.__class__.__name__)) class TESTBB(SubCommand): def _call(self, *args, **kwds): call('python', '-c', 'print "{0}"'.format(self.__class__.__name__)) class TESTBC(SubCommand): def _call(self, *args, **kwds): call('python', '-c', 'print "{0}"'.format(self.__class__.__name__)) dispatch([TESTA, TESTB], *sys.argv[1:]) if __name__ == '__main__': test()
2011年11月4日金曜日
サブコマンドをラッピングするためのクラス
登録:
コメントの投稿 (Atom)
0 件のコメント:
コメントを投稿