#!/usr/bin/python # coding=utf-8 """Handle stocks on the web.""" import cgi import cgitb cgitb.enable () from Cheetah.Template import Template import fcntl DEFAULT_DATABASE = 'stocks.txt' class StocksError (Exception): pass class StocksParseError (StocksError): def __init__ (self, message, filename, line_index): StocksError.__init__ (self, message, filename, line_index) self.filename = filename self.line = line_index + 1 self.message = message def __str__ (self): return '%s:%d: %s' % (self.filename, self.line, self.message) class StocksItemQty (dict): """Quantity dict of StocksItem.""" def __setitem__ (self, key, value): if value: dict.__setitem__ (self, key, value) else: del self[key] def itersorted (self, withmain = False): for k in sorted (self.keys ()): if withmain or k != 'main': yield k, self[k] class StocksItem: """A specific stock item.""" def __init__ (self, code, cmd, qty, desc = None): self.code = code self.cmd = cmd self.qty = StocksItemQty (main = qty) self.desc = desc def __str__ (self): s = [ ] s.append ('%s\t%s\t%d' % (self.code, self.cmd, self.qty['main'])) if self.desc: s.extend (('\t', self.desc)) for place, qty in self.qty.itersorted (): s.append ('\n\t%s\t%d' % (place, qty)) return ''.join (s) class Stocks: """Stocks, i.e. stock items list.""" def __init__ (self): self.database = None def __delete__ (self): self.close () def load (self, filename): """Open an lock database.""" # Open and lock. self.close () self.database = open (filename, 'r+') fcntl.flock (self.database, fcntl.LOCK_EX) self.database.seek (0) # Load. self.head = [ ] self.tail = [ ] self.items = { } headtail = self.head last_item = None for n, line in enumerate (self.database): if '\t' not in line: headtail.append (line) elif line.startswith ('Code\t'): headtail.append (line) headtail = self.tail elif not line.startswith ('\t'): cols = line.strip ().split ('\t') if len (cols) not in (3, 4): raise StocksParseError ("bad input line", filename, n) code, cmd, qty = cols[0:3] desc = cols[3] if len (cols) == 4 else None try: qty = int (qty) except ValueError: raise StocksParseError ("bad quantity", filename, n) if code in self.items: raise StocksParseError ("duplicated code", filename, n) item = StocksItem (code, cmd, qty, desc) self.items[item.code] = item last_item = item else: cols = line.strip ().split ('\t') if len (cols) != 2 or last_item is None: raise StocksParseError ("bad input line", filename, n) place, qty = cols try: qty = int (qty) except ValueError: raise StocksParseError ("bad quantity", filename, n) if place in last_item.qty: raise StocksParseError ("duplicated place", filename, n) last_item.qty[place] = qty def dump (self): """Dump stocks to database.""" self.database.seek (0) self.database.truncate () self.database.write (''.join (self.head)) for ik in sorted (self.items.keys ()): self.database.write (str (self.items[ik]) + '\n') self.database.write (''.join (self.tail)) def close (self): """Close and unlock database.""" if self.database is not None: self.database.close () def itersorted (self): """Return sorted iterable list of items.""" for ik in sorted (self.items.keys ()): yield self.items[ik] class AppCGI: def __init__ (self): pass def run (self): template = AppCGI.template self.error = None dirty = False # Get form content. form = cgi.FieldStorage () self.mode = form.getfirst ('mode', None) self.place = form.getfirst ('place', None) if self.place == '--- new': self.place = form.getfirst ('place_new', None) self.action = form.getfirst ('action', None) # Read database. self.stocks = Stocks () self.stocks.load (DEFAULT_DATABASE) # Make operations. try: if (self.mode == 'update' or self.action == 'export') and not self.place: self.mode = None raise StocksError ("update mode without a place") if self.mode == 'update' and self.action == 'update': for code, qty in self.iter_update (form): if qty: item = self.stocks.items[code] old_qty = item.qty.get (self.place, 0) item.qty[self.place] = old_qty + qty dirty = True elif self.action == 'merge': from_ = form.getfirst ('from', None) to = form.getfirst ('to', None) factor = int (form.getfirst ('factor', None)) if not (from_ and to and factor): raise StocksError ("bad parameters") for item in self.stocks.items.itervalues (): if from_ in item.qty: old_qty = item.qty.get (to, 0) item.qty[to] = old_qty + item.qty[from_] * factor if from_ != to: del item.qty[from_] dirty = True elif self.action == 'export': template = AppCGI.export_template except StocksError, e: self.error = str (e) except ValueError: self.error = "bad input value" if dirty: self.stocks.dump () # Make a list of places. places = set () for item in self.stocks.items.itervalues (): places.update (item.qty) places -= set (('main', )) self.places = sorted (places) # Done. self.output (template) def output (self, template): t = Template (template, searchList = [ self ]) t.stylesheet = AppCGI.stylesheet print t def iter_update (self, form): """Iter over (code_x, qty_x) pairs from a form.""" line = 0 while True: code = form.getfirst ('code_%d' % line, None) if code is None: break qty = int (form.getfirst ('qty_%d' % line, 0)) yield code, qty line += 1 AppCGI.stylesheet = """\ table { border-collapse: collapse; width: 100%; } th { background: #d0d0d0; } td { background: #e0e0e0; border: 1px solid white; } tr.place td { background: #e8e8e8; } tr:hover td, tr.place:hover td { background: #f0f0f0; } td.qty, td.action { text-align: right; } form { margin: 0; } input.qty, input.number { width: 5em; } p { margin: 0.5ex 0; } p.error { background: #ff4444; border: 1px dashed black; padding: 0.5ex; } """ AppCGI.template = u"""\ Content-Type: text/html; charset=UTF-8 Stocks

Stocks

#def cmdlink($cmd) #if $cmd.isdigit() $cmd #elif $cmd.startswith('rs:') $cmd #else #filter WebSafe $cmd #end filter #end if #end def #def update_mode($line, $i) #filter WebSafe #if $mode == 'update' #if $place in $i.qty then $i.qty[$place] else 0 # ± #end if #end filter #end def #def select_places($name, $add = None) #set $places = $places #if $add #set $places = sorted ($places + $add) #end if #filter WebSafe #end filter #end def #filter WebSafe #if $error

$error

#end if #if not $mode

Update mode for #filter None $select_places("place", ['main', '--- new']) #end filter

#filter None Merge $select_places("from") #end filter x #filter None to $select_places("to", ['main']) #end filter

#filter None Export $select_places("place") #end filter

#else

#end if
#if $mode #end if #for $line, $i in enumerate($stocks.itersorted) #filter None #end filter #filter None $update_mode($line, $i) #end filter #for $iplace, $qty in $i.qty.itersorted #if $mode #end if #end for #end for
Code Command Description Qty$place
$i.code$cmdlink($i.cmd)$i.desc $i.qty.main
$iplace $qty
#end filter """ AppCGI.export_template = u"""\ Content-Type: text/plain; charset=UTF-8 #filter WebSafe #for $i in $stocks.itersorted #if $place in $i.qty $i.cmd,$i.qty[$place],$i.code #end if #end for #end filter """ if __name__ == '__main__': a = AppCGI () a.run ()