#! /usr/bin/env python """Tk GUI providing simple I/O for KPs. This must be exec'd from simpleio.py since ILU Servers don't survive across the fork. Usage: tkstdio.py [-f fd] [-h] Where: -f fd -- the file descriptor to write the SBH to. If this is omitted, then the GUI is started up in test mode. -h -- print help -H int -- widget height -W int -- widget width """ import sys import os import string import getopt from Tkinter import * from koe.common import tktools import ilu_tk from koe.interfaces.stubs import FileAPI, FileAPI__skel # Useful Text widget location ENDMINUS1C = 'end - 1 c' # Milliseconds between interrupt checks KEEPALIVE_TIMER = 500 class TkStdio(FileAPI__skel.File): """Implement a Unix talk(1)-like interface for interactive KPs. Uses the File.isl API. """ DEFAULTHEIGHT = 24 DEFAULTWIDTH = 60 def __init__(self, testing=None, height=None, width=None): self._master = None self._stayinalive = None self._height = height or self.DEFAULTHEIGHT self._width = width or self.DEFAULTWIDTH self._create_widgets(testing=testing) def go(self): if not self._stayinalive: self._keep_alive() self._stayinalive = 1 self._inp.focus_set() ilu_tk.RunMainLoop() def stop(self): self._master.quit() def _quit(self): self.stop() def _create_widgets(self, testing=None): self._master = Tk(className='KOEStdio') self._master.title('Tk Stdio') self._master.iconname('Tk Stdio') self._master.protocol('WM_DELETE_WINDOW', self._quit) # button bar btnframe = Frame(self._master) btnframe.pack(fill=X, expand=0, side=BOTTOM) inpclearbtn = Button(btnframe, text='Clear Input Buffer', command=self._clear_input_buffer) inpclearbtn.pack(side=LEFT) outclearbtn = Button(btnframe, text='Clear Output Log', command=self._clear_output_log) outclearbtn.pack(side=LEFT) # output window and message self._outmsg = Label(self._master, text='Output from KP', anchor=W) self._outmsg.pack(fill=X, expand=0) self._out, outframe = tktools.make_text_box( self._master, width=self._width, height=self._height) self._out.config(state=DISABLED) self._out.pack(fill=BOTH, expand=1) # input window and message self._inpmsg = Label(self._master, text = 'Input to KP', anchor=W) self._inpmsg.pack(fill=X, expand=0) self._inp, inpframe = tktools.make_text_box( self._master, width=self._width, height=self._height) self._inp.pack(fill=BOTH, expand=1) self._inp.bind_all('', self._read_text) self._inp.bind_all('', self._emulate_eof) self._inp.bind_all('', self._emulate_eof) self._reading = 0 self._wait_for_eof = 0 self._prompt() self._inpbuffer = [] self._inp.focus_set() # testing if testing: self._readbtn = Button(btnframe, text='Read', command=self._test_read) self._readbtn.pack(side=LEFT) self._readlnbtn = Button(btnframe, text='Read Line', command=self._test_readln) self._readlnbtn.pack(side=LEFT) self._readlnsbtn = Button(btnframe, text='Read Lines', command=self._test_readlns) self._readlnsbtn.pack(side=LEFT) quitbtn = Button(btnframe, text='Quit', command=self._quit) quitbtn.pack(side=RIGHT) def _keep_alive(self): # Exercise the Python interpreter regularly so keyboard # interrupts get through try: self._master.tk.createtimerhandler(KEEPALIVE_TIMER, self._keep_alive) except KeyboardInterrupt: sys.exit(0) def _prompt(self): if not self._wait_for_eof: self._inp.insert(END, '==> ') self._inp.delete(ENDMINUS1C, END) self._inp.mark_set('insert', ENDMINUS1C) self._inp.mark_set('start', ENDMINUS1C) self._inp.mark_gravity('start', LEFT) def _clear_input_buffer(self): self._inpbuffer = [] self._update_readmsg() def _update_readmsg(self): msg = 'Input to KP' if self._reading: msg = msg + ': KP is reading...' if self._wait_for_eof: msg = msg + ' (waiting for Control-d)' if self._inpbuffer: msg = msg + ' (lines in buffer: %d)' % len(self._inpbuffer) self._inpmsg.config(text=msg) def _emulate_eof(self, event=None): self._wait_for_eof = 0 self._inp.insert(END, '\n') self._read_text() def _read_text(self, event=None): textline = self._inp.get('start', ENDMINUS1C)[:-1] self._inpbuffer.append(textline) self._prompt() if self._reading and not self._wait_for_eof: self.stop() self._reading = 0 self._update_readmsg() def _unfreeze(self): self._out.config(state=NORMAL) def _freeze(self): self._out.config(state=DISABLED) self._out.see(END) def _clear_output_log(self): self._unfreeze() self._out.delete(0.0, END) self._freeze() # # testing # def _update_testbtns(self, state=NORMAL): self._readbtn.config(state=state) self._readlnbtn.config(state=state) self._readlnsbtn.config(state=state) def _test_read(self): self.write('tell me something interesting') self._update_testbtns(DISABLED) text = self.read(1024) print 'read: "%s"' % text self.write('you told me: %s' % text) self._update_testbtns(NORMAL) def _test_readln(self): self.write('tell me more about yourself') self._update_testbtns(DISABLED) text = self.readline() print 'read a line: "%s"' % text self.write('you told me: %s' % text) self._update_testbtns(NORMAL) def _test_readlns(self): self.write('go ahead, pour your heart out!') self._update_testbtns(DISABLED) lines = self.readlines() text = string.joinfields(lines, '') print 'read these lines: "%s"' % text self.write('you told me: %s' % text) self._update_testbtns(NORMAL) # # ISL interface # def open(self, mode): """mode is ignored.""" pass def close(self): self._master.destroy() ilu_tk.ExitMainLoop() def flush(self): pass def read(self, size): if not self._inpbuffer: self._reading = 1 self._update_readmsg() self.go() text = string.joinfields(self._inpbuffer, '\n') + '\n' if size < len(text): front = text[:size] back = text[size:-1] self._inpbuffer = string.splitfields(back, '\n') text = front else: self._inpbuffer = [] self._update_readmsg() return text def readline(self): if not self._inpbuffer: self._reading = 1 self._update_readmsg() self.go() if len(self._inpbuffer) == 0: # user hit quit from the menu without entering input. the # recursive loop was exited, and now we need to exit the # top-level mainloop. self.stop() return '' text = self._inpbuffer[0] + '\n' del self._inpbuffer[0] self._update_readmsg() return text def readlines(self): if not self._inpbuffer: self._reading = 1 self._wait_for_eof = 1 self._update_readmsg() self.go() lines = [] for line in self._inpbuffer: lines.append(line + '\n') if lines: if lines[-1] == '\n': del lines[-1] elif lines[-1][-1:] == '\n': lines[-1] = lines[-1][:-1] self._inpbuffer = [] self._update_readmsg() return lines def seek(self, offset, whence): raise FileAPI.FileIOError def tell(self): raise FileAPI.FileIOError def write(self, text): self._unfreeze() self._out.insert(END, text) self._freeze() def writelines(self, lines): self._unfreeze() for line in lines: self._out.insert(END, line) self._freeze() if __name__ == '__main__': import os fd = None testing = 1 error = 0 usage = 0 try: optlist, args = getopt.getopt(sys.argv[1:], 'f:H:W:h') except getopt.error, msg: print msg error = 1 usage = 1 optlist = [] if args: error = 1 usage = 1 optlist = [] height = width = None for opt, val in optlist: if opt == '-f': fd = string.atoi(val) testing = None elif opt == '-h': usage = 1 elif opt == '-H': height = val elif opt == '-W': width = val if usage: print sys.argv print 'Usage:', sys.argv[0], '[-f fd] [-?] [-H height] [-W width]' sys.exit(error) tkstdio = TkStdio(testing=testing, height=height, width=width) sbh = tkstdio.IluSBH() if not fd: print sbh else: fp = os.fdopen(fd, 'w') fp.write(sbh) fp.close() tkstdio.go()