"""
Ditz command handler.
"""
from __future__ import print_function
import os
import re
import six
from cmd import Cmd
from textwrap import wrap
from .command import CmdBuiltin, CmdProject, CmdIssue, CmdInfo
from .database import DitzDB
from .settings import Settings
from .config import config as userconfig
from .config import ConfigSection
from .colorize import colorize
from .objects import Project, Config
from .plugin import Plugin, get_plugins
from .util import (print_columns, default_name, default_email,
matching_issue_names, DitzError)
from six.moves import input
class DitzCmd(Cmd, CmdBuiltin, CmdProject, CmdIssue, CmdInfo):
#: The :class:`DitzDB` database.
db = None
#: Whether running interactively.
interactive = False
#: Last issue name mentioned.
last_issuename = None
#: Last release name mentioned.
last_releasename = None
def __init__(self, path=".", settings=None, interactive=False, **kw):
Cmd.__init__(self)
#: Command prompt.
self.prompt = "Ditz: "
#: Pager prompt.
self.moreprompt = "-- More (Y/n/q) --"
#: Global settings.
self.settings = settings or Settings(**kw)
#: Configuration variables.
self.config = ConfigSection(None, 'command', userconfig)
#: Pathname of the database.
self.dbpath = os.path.realpath(path)
#: Whether last command was successful.
self.success = True
self.interactive = interactive
self.last_issuename = None
self.last_releasename = None
# In interactive mode, write status (and do interactive setup
# if creating a new database).
dirname, _ = Config.find(self.dbpath,
search=self.settings.searchparents)
if not dirname:
if settings.setup:
self.write("Setting up issue database in", path)
self.write()
self.setup()
else:
self.error("no issue database found in", self.dbpath)
elif self.interactive:
self.write("Reading issue database in", self.dbpath)
self.write()
# Read the database.
self.db = DitzDB.read(self.dbpath, settings=settings)
# Register plugin commands.
for cls in get_plugins(CommandSet):
for name in dir(cls):
if name.startswith(("do_", "help_")):
func = six.get_unbound_function(getattr(cls, name))
meth = six.create_bound_method(func, self)
setattr(DitzCmd, name, meth)
# Set the list of known commands.
self.commands = set()
for name in self.get_names():
if name.startswith("do_") and name != 'do_EOF':
self.commands.add(name[3:])
# Highlight the prompts if required.
if self.interactive and self.settings.highlight:
self.prompt = colorize(self.prompt.strip()) + " "
self.moreprompt = colorize(self.moreprompt)
def setup(self, oldconfig=None):
"""
Set up a database interactively.
"""
config = self.getconfig(oldconfig)
if config:
name = config.name
email = config.email
issuedir = config.issue_dir
else:
return False
path = os.path.join(self.dbpath, issuedir, Project.filename)
if os.path.exists(path):
if config.write():
self.write()
self.write("Configuration written")
else:
default = os.path.basename(self.dbpath)
project = self.getline("Project name", default)
if not project:
return False
db = DitzDB(project, name, email, issuedir, self.dbpath,
settings=self.settings)
db.write()
self.write()
self.write("Ditz database created in", self.dbpath)
return True
# Completion methods.
def match_issue(self, text, line, beg, end):
return startswith(self.db.issue_names, text)
complete_add_reference = match_issue
complete_assign = match_issue
complete_claim = match_issue
complete_close = match_issue
complete_comment = match_issue
complete_drop = match_issue
complete_edit = match_issue
complete_set_component = match_issue
complete_show = match_issue
complete_start = match_issue
complete_stop = match_issue
complete_unassign = match_issue
complete_unclaim = match_issue
def match_release(self, text, line, beg, end):
return startswith(self.db.releases, text)
complete_archive = match_release
complete_changelog = match_release
complete_release = match_release
complete_status = match_release
complete_todo = match_release
def match_command(self, text):
return startswith(self.commands, text)
# Command hook methods.
def onecmd(self, cmdline):
# Get command and arguments.
bits = cmdline.strip().split(None, 1)
cmd = args = ""
if len(bits) == 1:
cmd = bits[0]
elif len(bits) > 1:
cmd, args = bits
# Run it.
self.success = False
stop = False
try:
cmd = self.config.name = self.findcmd(cmd)
stop = Cmd.onecmd(self, cmd + " " + args)
self.success = True
except DitzError as msg:
self.write("Error:", msg)
except (KeyboardInterrupt, EOFError):
self.write()
self.write("Aborted")
return stop
def findcmd(self, cmd):
if cmd:
# Substitute aliases, if any.
if userconfig.has_option('alias', cmd):
cmd = userconfig.get('alias', cmd)
# Replace hyphens in command, to support original Ditz-style
# commands.
cmd = cmd.replace('-', '_')
# Do completion on it.
if cmd not in self.commands:
matches = self.match_command(cmd)
if len(matches) == 1:
cmd = matches[0]
elif len(matches) > 1:
self.error("ambiguous command: %s (%s)"
% (cmd, ", ".join(matches)))
return cmd
def emptyline(self):
return self.onecmd("todo")
def default(self, arg):
msg = "unknown command: " + arg
if self.interactive:
msg += " (type 'help' for a list)"
self.error(msg)
# User input methods.
[docs] def getissue(self, name):
"""
Get an issue by name or ID. Return it and its assigned name.
Args:
name (str): Issue name or ID.
Returns:
Tuple(Issue, str).
"""
if not name and self.last_issuename:
name = self.last_issuename
if not name:
self.error("no issue specified")
matched = matching_issue_names(name, self.db.issue_names)
count = len(matched)
if count == 1:
name = matched.pop()
elif count > 1:
if len(matched) <= 3:
matches = ", ".join(matched)
else:
matches = ", ".join(matched[:3]) + "..."
self.error("%d issue names match '%s' (%s)" %
(count, name, matches))
self.last_issuename = name
issue = self.db.get_issue(name)
if not issue:
issues = [i for i in self.db.issues if i.id.startswith(name)]
count = len(issues)
if count == 1:
issue = issues.pop()
elif count > 1:
self.error("%d issue IDs match '%s'" % (count, name))
if not issue:
self.error("no issue with name or ID matching '%s'" % name)
return issue, self.db.issue_name(issue)
[docs] def getrelease(self, name):
"""
Get a release and return it.
Args:
name (str or None): Release name.
"""
if name:
if name not in self.db.releases:
self.error("no release with name '%s'" % name)
self.last_releasename = name
return name
def getregexp(self, string):
try:
return re.compile(string or ".*")
except re.error as msg:
self.error("invalid regexp: %s" % six.text_type(msg))
def getvalue(self, value, vtype, default=None):
if value is None:
return default
try:
return vtype(value)
except ValueError:
self.error("expected %s, not %r" % (vtype, value))
[docs] def getline(self, prompt="> ", default="", allowempty=True):
"""
Get a single line of input.
Args:
prompt (str): Prompt string.
default (str): Default value.
allowempty (bool): Whether value is optional.
Returns:
Reply (str).
"""
if default:
prompt = "%s (default '%s'): " % (prompt, default)
while True:
if self.use_rawinput:
reply = input(prompt)
else:
self.stdout.write(prompt)
self.stdout.flush()
reply = self.stdin.readline()
if not len(reply.strip()):
reply = default
reply = reply.rstrip()
if reply or allowempty:
return reply
[docs] def gettext(self, title=None, prompt="> ", endchar='.'):
"""
Get multiline text.
Args:
title (str): Initial title text.
prompt (str): Prompt for each line of input.
endchar (str): Character terminating input.
Returns:
Text (str).
"""
if title:
self.write(title + " (ctrl-c to abort, %s to finish)" % endchar)
lines = []
while True:
line = self.getline(prompt)
if line == endchar:
return "\n".join(lines)
lines.append(line)
[docs] def getchoice(self, thing, choices):
"""
Get a choice of several things.
Args:
thing (str): Description of thing being chosen.
choices (list): List of available options.
Returns:
Choice made (str).
"""
items = []
for num, entry in enumerate(choices, 1):
item = "%3d) %s" % (num, entry)
items.append(item)
print_columns(items)
prompt = "Choose a %s (1--%d): " % (thing, len(choices))
while True:
reply = self.getline(prompt)
try:
return choices[int(reply) - 1]
except (ValueError, IndexError):
pass
[docs] def getyesno(self, question, default=False):
"""
Get the answer to a yes/no question.
Args:
question (str): The question.
default (bool): Default if no reply given.
Returns:
Reply (bool).
"""
prompt = question + " [%s] " % ("yes" if default else "no")
while True:
reply = self.getline(prompt)
if not reply:
return default
if reply[0] in "yY":
return True
elif reply[0] in "nN":
return False
def getconfig(self, oldconfig=None):
"""
Prompt for and return database configuration info.
"""
default = oldconfig.name if oldconfig else default_name()
name = self.getline("Your name", default)
default = oldconfig.email if oldconfig else default_email()
email = self.getline("Your email", default)
issuedirs = userconfig.get('config', 'issuedirs').split()
issuedir = issuedirs[0] if issuedirs else ".ditz-issues"
default = oldconfig.issue_dir if oldconfig else issuedir
issuedir = self.getline("Issue directory", default)
return Config(name, email, issuedir)
# Output methods.
[docs] def write(self, *args):
"""
Write the given list of arguments to stdout.
Also performs syntax highlighting and paging.
"""
def output(args):
args = [six.text_type(s) for s in args]
items = " ".join(args).split("\n")
cols = self.settings.termcols
trunc = userconfig.get('ui', 'linetrunc')
for line in items:
if not cols or len(line) <= cols:
yield line
elif trunc:
yield line[:cols - len(trunc)] + trunc
else:
for text in wrap(line, width=cols):
yield text
if args:
lines = self.settings.termlines
for num, line in enumerate(output(args), 1):
if not six.PY3:
line = line.encode('utf-8')
if self.settings.highlight:
line = colorize(line)
print(line)
if lines and num % (lines - 1) == 0:
reply = input(self.moreprompt)
if reply and reply[0] in "nqNQ":
break
else:
print()
[docs] def error(self, *args):
"""
Signal an error.
"""
raise DitzError(" ".join(args))
# Miscellaneous other stuff.
[docs] def unimplemented(self):
"""
Write a 'not implemented yet' message.
"""
cmd = self.lastcmd.split()[0]
self.error("'%s' is not implemented yet" % cmd)
class CommandSet(Plugin):
"""
Base class for plugin commands.
"""
category = "command"
#: Name of the command set.
name = None
#: One-line description (currently unused).
description = "undocumented"
def startswith(items, text):
return [x for x in items if x.startswith(text)]