Source code for vncdotool.client

"""
Twisted based VNC client protocol and factory

(c) 2010 Marc Sibson

MIT License
"""

import rfb
from twisted.internet.defer import Deferred
from twisted.internet import reactor

import getpass
import math
import operator
import time
import logging

log = logging.getLogger('vncdotool.client')


KEYMAP = {
    'bsp': rfb.KEY_BackSpace,
    'tab': rfb.KEY_Tab,
    'return': rfb.KEY_Return,
    'enter': rfb.KEY_Return,
    'esc': rfb.KEY_Escape,
    'ins': rfb.KEY_Insert,
    'delete': rfb.KEY_Delete,
    'del': rfb.KEY_Delete,
    'home': rfb.KEY_Home,
    'end': rfb.KEY_End,
    'pgup': rfb.KEY_PageUp,
    'pgdn': rfb.KEY_PageDown,
    'left': rfb.KEY_Left,
    'up': rfb.KEY_Up,
    'right': rfb.KEY_Right,
    'down': rfb.KEY_Down,

    'slash': rfb.KEY_BackSlash,
    'bslash': rfb.KEY_BackSlash,
    'fslash': rfb.KEY_ForwardSlash,
    'spacebar': rfb.KEY_SpaceBar,
    'space': rfb.KEY_SpaceBar,
    'sb': rfb.KEY_SpaceBar,

    'f1': rfb.KEY_F1,
    'f2': rfb.KEY_F2,
    'f3': rfb.KEY_F3,
    'f4': rfb.KEY_F4,
    'f5': rfb.KEY_F5,
    'f6': rfb.KEY_F6,
    'f7': rfb.KEY_F7,
    'f8': rfb.KEY_F8,
    'f9': rfb.KEY_F9,
    'f10': rfb.KEY_F10,
    'f11': rfb.KEY_F11,
    'f12': rfb.KEY_F12,
    'f13': rfb.KEY_F13,
    'f14': rfb.KEY_F14,
    'f15': rfb.KEY_F15,
    'f16': rfb.KEY_F16,
    'f17': rfb.KEY_F17,
    'f18': rfb.KEY_F18,
    'f19': rfb.KEY_F19,
    'f20': rfb.KEY_F20,

    'lshift': rfb.KEY_ShiftLeft,
    'shift': rfb.KEY_ShiftLeft,
    'rshift': rfb.KEY_ShiftRight,
    'lctrl': rfb.KEY_ControlLeft,
    'ctrl': rfb.KEY_ControlLeft,
    'rctrl': rfb.KEY_ControlRight,
    'lmeta': rfb.KEY_MetaLeft,
    'meta': rfb.KEY_MetaLeft,
    'rmeta': rfb.KEY_MetaRight,
    'lalt': rfb.KEY_AltLeft,
    'alt': rfb.KEY_AltLeft,
    'ralt': rfb.KEY_AltRight,
    'scrlk': rfb.KEY_Scroll_Lock,
    'sysrq': rfb.KEY_Sys_Req,
    'numlk': rfb.KEY_Num_Lock,
    'caplk': rfb.KEY_Caps_Lock,
    'pause': rfb.KEY_Pause,
    'lsuper': rfb.KEY_Super_L,
    'super': rfb.KEY_Super_L,
    'rsuper': rfb.KEY_Super_R,
    'lhyper': rfb.KEY_Hyper_L,
    'hyper': rfb.KEY_Hyper_L,
    'rhyper': rfb.KEY_Hyper_R,

    'kp0': rfb.KEY_KP_0,
    'kp1': rfb.KEY_KP_1,
    'kp2': rfb.KEY_KP_2,
    'kp3': rfb.KEY_KP_3,
    'kp4': rfb.KEY_KP_4,
    'kp5': rfb.KEY_KP_5,
    'kp6': rfb.KEY_KP_6,
    'kp7': rfb.KEY_KP_7,
    'kp8': rfb.KEY_KP_8,
    'kp9': rfb.KEY_KP_9,
    'kpenter': rfb.KEY_KP_Enter,
}


[docs]def ImageFactory(): """ Wrap importing PIL.Image so vncdotool can be used without PIL being installed. Of course capture and expect won't work but at least we can still offer key, type, press and move. """ from PIL import Image return Image
[docs]class VNCDoToolClient(rfb.RFBClient): x = 0 y = 0 buttons = 0 screen = None deferred = None cursor = None cmask = None def _decodeKey(self, key): if self.factory.force_caps and key.isupper(): key = 'shift-%c' % key if len(key) == 1: keys = [key] else: keys = key.split('-') keys = [KEYMAP.get(k) or ord(k) for k in keys] return keys
[docs] def pause(self, duration): d = Deferred() reactor.callLater(duration, d.callback, self) return d
[docs] def keyPress(self, key): """ Send a key press to the server key: string: either [a-z] or a from KEYMAP """ log.debug('keyPress %s', key) self.keyDown(key) self.keyUp(key) return self
[docs] def keyDown(self, key): log.debug('keyDown %s', key) keys = self._decodeKey(key) for k in keys: self.keyEvent(k, down=1) return self
[docs] def keyUp(self, key): log.debug('keyUp %s', key) keys = self._decodeKey(key) for k in keys: self.keyEvent(k, down=0) return self
[docs] def mousePress(self, button): """ Send a mouse click at the last set position button: int: [1-n] """ log.debug('mousePress %s', button) buttons = self.buttons | (1 << (button - 1)) self.pointerEvent(self.x, self.y, buttonmask=buttons) self.pointerEvent(self.x, self.y, buttonmask=self.buttons) return self
[docs] def mouseDown(self, button): """ Send a mouse button down at the last set position button: int: [1-n] """ log.debug('mouseDown %s', button) self.buttons |= 1 << (button - 1) self.pointerEvent(self.x, self.y, buttonmask=self.buttons) return self
[docs] def mouseUp(self, button): """ Send mouse button released at the last set position button: int: [1-n] """ log.debug('mouseUp %s', button) self.buttons &= ~(1 << (button - 1)) self.pointerEvent(self.x, self.y, buttonmask=self.buttons) return self
[docs] def captureScreen(self, filename): """ Save the current display to filename """ log.debug('captureScreen %s', filename) return self._capture(filename)
[docs] def captureRegion(self, filename, x, y, w, h): """ Save a region of the current display to filename """ log.debug('captureRegion %s', filename) return self._capture(filename, x, y, x+w, y+h)
def _capture(self, filename, *args): self.framebufferUpdateRequest() self.deferred = Deferred() self.deferred.addCallback(self._captureSave, filename, *args) return self.deferred def _captureSave(self, data, filename, *args): log.debug('captureSave %s', filename) if args: capture = self.screen.crop(args) else: capture = self.screen capture.save(filename) return self
[docs] def expectScreen(self, filename, maxrms=0): """ Wait until the display matches a target image filename: an image file to read and compare against maxrms: the maximum root mean square between histograms of the screen and target image """ log.debug('expectScreen %s', filename) return self._expectFramebuffer(filename, 0, 0, maxrms)
[docs] def expectRegion(self, filename, x, y, maxrms=0): """ Wait until a portion of the screen matches the target image The region compared is defined by the box (x, y), (x + image.width, y + image.height) """ log.debug('expectRegion %s (%s, %s)', filename, x, y) return self._expectFramebuffer(filename, x, y, maxrms)
def _expectFramebuffer(self, filename, x, y, maxrms): self.framebufferUpdateRequest() image = ImageFactory().open(filename) w, h = image.size self.expected = image.histogram() self.deferred = Deferred() self.deferred.addCallback(self._expectCompare, (x, y, x+w, y+h), maxrms) return self.deferred def _expectCompare(self, image, box, maxrms): image = image.crop(box) hist = image.histogram() if len(hist) == len(self.expected): rms = math.sqrt(reduce(operator.add, map(lambda a, b: (a - b) ** 2, hist, self.expected)) / len(hist)) log.debug('rms %s', int(rms)) if rms <= maxrms: return self self.deferred = Deferred() self.deferred.addCallback(self._expectCompare, box, maxrms) self.framebufferUpdateRequest(incremental=1) return self.deferred
[docs] def mouseMove(self, x, y): """ Move the mouse pointer to position (x, y) """ log.debug('mouseMove %d,%d', x, y) self.x, self.y = x, y self.pointerEvent(x, y, self.buttons) return self
[docs] def mouseDrag(self, x, y, step=1): """ Move the mouse point to position (x, y) in increments of step """ log.debug('mouseDrag %d,%d', x, y) if x < self.x: xsteps = [self.x - i for i in xrange(step, self.x - x + 1, step)] else: xsteps = xrange(self.x, x, step) if y < self.y: ysteps = [self.y - i for i in xrange(step, self.y - y + 1, step)] else: ysteps = xrange(self.y, y, step) for ypos in ysteps: time.sleep(.2) self.mouseMove(self.x, ypos) for xpos in xsteps: time.sleep(.2) self.mouseMove(xpos, self.y) self.mouseMove(x, y) return self # # base customizations #
[docs] def vncConnectionMade(self): self.setPixelFormat() encodings = [rfb.RAW_ENCODING] if self.factory.pseudocusor or self.factory.nocursor: encodings.append(rfb.PSEUDO_CURSOR_ENCODING) self.setEncodings(encodings) self.factory.clientConnectionMade(self)
[docs] def bell(self): print 'ding'
[docs] def copy_text(self, text): print 'clipboard copy', repr(text)
[docs] def paste(self, message): self.clientCutText(message) return self
[docs] def updateRectangle(self, x, y, width, height, data): # ignore empty updates if not data: return size = (width, height) update = ImageFactory().fromstring('RGB', size, data, 'raw', 'RGBX') if not self.screen: self.screen = update # track upward screen resizes, often occurs during os boot of VMs elif self.screen.size[0] < width or self.screen.size[1] < height: self.screen = update else: self.screen.paste(update, (x, y)) self.drawCursor()
[docs] def commitUpdate(self, rectangles): if self.deferred: d = self.deferred self.deferred = None d.callback(self.screen)
[docs] def updateCursor(self, x, y, width, height, image, mask): if self.factory.nocursor: return if not width or not height: self.cursor = None self.cursor = ImageFactory().fromstring('RGBX', (width, height), image) self.cmask = ImageFactory().fromstring('1', (width, height), mask) self.cfocus = x, y self.drawCursor()
[docs] def drawCursor(self): if not self.cursor: return if not self.screen: return x = self.x - self.cfocus[0] y = self.y - self.cfocus[1] self.screen.paste(self.cursor, (x, y), self.cmask)
[docs] def vncRequestPassword(self): if self.factory.password is None: self.factory.password = getpass.getpass() self.sendPassword(self.factory.password)
[docs]class VNCDoToolFactory(rfb.RFBFactory): password = None protocol = VNCDoToolClient shared = True pseudocusor = False nocursor = False force_caps = False def __init__(self): self.deferred = Deferred()
[docs] def clientConnectionFailed(self, connector, reason): self.deferred.errback(reason)
[docs] def clientConnectionMade(self, protocol): self.deferred.callback(protocol)

This Page