summaryrefslogtreecommitdiff
path: root/test/monkeyfarmer.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/monkeyfarmer.py')
-rw-r--r--test/monkeyfarmer.py149
1 files changed, 99 insertions, 50 deletions
diff --git a/test/monkeyfarmer.py b/test/monkeyfarmer.py
index 1851888a5..c5ede027d 100644
--- a/test/monkeyfarmer.py
+++ b/test/monkeyfarmer.py
@@ -24,16 +24,22 @@ Python code.
"""
+# pylint: disable=locally-disabled, missing-docstring
+
import asyncore
import os
import socket
import subprocess
import time
+
class MonkeyFarmer(asyncore.dispatcher):
+
+ # pylint: disable=locally-disabled, too-many-instance-attributes
+
def __init__(self, monkey_cmd, online, quiet=False, *, wrapper=None):
(mine, monkeys) = socket.socketpair()
-
+
asyncore.dispatcher.__init__(self, sock=mine)
if wrapper is not None:
@@ -65,7 +71,6 @@ class MonkeyFarmer(asyncore.dispatcher):
def handle_close(self):
# the pipe to the monkey process has closed
self.close()
-
def handle_read(self):
got = self.recv(8192)
@@ -75,7 +80,8 @@ class MonkeyFarmer(asyncore.dispatcher):
if self.monkey.poll() is None:
self.monkey.terminate()
self.monkey.wait()
- self.lines.insert(0, "GENERIC EXIT {}".format(self.monkey.returncode).encode('utf-8'))
+ self.lines.insert(0, "GENERIC EXIT {}".format(
+ self.monkey.returncode).encode('utf-8'))
return
self.incoming += got
if b"\n" in self.incoming:
@@ -84,7 +90,7 @@ class MonkeyFarmer(asyncore.dispatcher):
self.lines = lines
def writable(self):
- return (len(self.buffer) > 0)
+ return len(self.buffer) > 0
def handle_write(self):
sent = self.send(self.buffer)
@@ -94,7 +100,7 @@ class MonkeyFarmer(asyncore.dispatcher):
cmd = (" ".join(args))
if not self.quiet:
print(">>> {}".format(cmd))
- self.discussion.append((">",cmd))
+ self.discussion.append((">", cmd))
cmd = cmd + "\n"
self.buffer += cmd.encode('utf-8')
@@ -106,15 +112,15 @@ class MonkeyFarmer(asyncore.dispatcher):
self.online(line)
def schedule_event(self, event, secs=None, when=None):
- assert(secs is not None or when is not None)
+ assert secs is not None or when is not None
if when is None:
when = time.time() + secs
self.scheduled.append((when, event))
- self.scheduled.sort(lambda a,b: cmp(a[0],b[0]))
+ self.scheduled.sort()
def unschedule_event(self, event):
self.scheduled = [x for x in self.scheduled if x[1] != event]
-
+
def loop(self, once=False):
if len(self.lines) > 0:
self.monkey_says(self.lines.pop(0))
@@ -128,8 +134,8 @@ class MonkeyFarmer(asyncore.dispatcher):
func(self)
now = time.time()
if len(self.scheduled) > 0:
- next = self.scheduled[0][0]
- asyncore.loop(timeout=next-now, count=1)
+ next_event = self.scheduled[0][0]
+ asyncore.loop(timeout=next_event - now, count=1)
else:
asyncore.loop(count=1)
if len(self.lines) > 0:
@@ -137,9 +143,17 @@ class MonkeyFarmer(asyncore.dispatcher):
if once:
break
+
class Browser:
+
+ # pylint: disable=locally-disabled, too-many-instance-attributes, dangerous-default-value, invalid-name
+
def __init__(self, monkey_cmd=["./nsmonkey"], quiet=False, *, wrapper=None):
- self.farmer = MonkeyFarmer(monkey_cmd=monkey_cmd, online=self.on_monkey_line, quiet=quiet, wrapper=wrapper)
+ self.farmer = MonkeyFarmer(
+ monkey_cmd=monkey_cmd,
+ online=self.on_monkey_line,
+ quiet=quiet,
+ wrapper=wrapper)
self.windows = {}
self.logins = {}
self.sslcerts = {}
@@ -161,7 +175,7 @@ class Browser:
def pass_options(self, *opts):
if len(opts) > 0:
self.farmer.tell_monkey("OPTIONS " + (" ".join(['--' + opt for opt in opts])))
-
+
def on_monkey_line(self, line):
parts = line.split(" ")
handler = getattr(self, "handle_" + parts[0], None)
@@ -175,7 +189,7 @@ class Browser:
self.quit()
self.farmer.loop()
return self.stopped
-
+
def handle_GENERIC(self, what, *args):
if what == 'STARTED':
self.started = True
@@ -186,9 +200,8 @@ class Browser:
elif what == 'EXIT':
if not self.stopped:
print("Unexpected exit of monkey process with code {}".format(args[0]))
- assert(self.stopped)
+ assert self.stopped
else:
- # TODO: Nothing for now?
pass
def handle_WINDOW(self, action, _win, winid, *args):
@@ -230,7 +243,7 @@ class Browser:
def handle_PLOT(self, *args):
if self.current_draw_target is not None:
self.current_draw_target.handle_plot(*args)
-
+
def new_window(self, url=None):
if url is None:
self.farmer.tell_monkey("WINDOW NEW")
@@ -243,18 +256,31 @@ class Browser:
return self.windows[poss_wins.pop()]
def handle_ready_login(self, lwin):
+
+ # pylint: disable=locally-disabled, no-self-use
+
# Override this method to do useful stuff
lwin.destroy()
+ def handle_ready_sslcert(self, cwin):
+
+ # pylint: disable=locally-disabled, no-self-use
+
+ # Override this method to do useful stuff
+ cwin.destroy()
+
+
class SSLCertWindow:
+
+ # pylint: disable=locally-disabled, invalid-name
+
def __init__(self, browser, winid, _url, *url):
self.alive = True
self.browser = browser
self.winid = winid
self.url = " ".join(url)
- def handle(self, action, _str="STR", *rest):
- content = " ".join(rest)
+ def handle(self, action, _str="STR"):
if action == "DESTROY":
self.alive = False
else:
@@ -265,16 +291,20 @@ class SSLCertWindow:
self.browser.farmer.loop(once=True)
def go(self):
- assert(self.alive)
+ assert self.alive
self.browser.farmer.tell_monkey("SSLCERT GO {}".format(self.winid))
self._wait_dead()
def destroy(self):
- assert(self.alive)
+ assert self.alive
self.browser.farmer.tell_monkey("SSLCERT DESTROY {}".format(self.winid))
self._wait_dead()
+
class LoginWindow:
+
+ # pylint: disable=locally-disabled, too-many-instance-attributes, invalid-name
+
def __init__(self, browser, winid, _url, *url):
self.alive = True
self.ready = False
@@ -301,13 +331,13 @@ class LoginWindow:
self.ready = True
def send_username(self, username=None):
- assert(self.alive)
+ assert self.alive
if username is None:
username = self.username
self.browser.farmer.tell_monkey("LOGIN USERNAME {} {}".format(self.winid, username))
def send_password(self, password=None):
- assert(self.alive)
+ assert self.alive
if password is None:
password = self.password
self.browser.farmer.tell_monkey("LOGIN PASSWORD {} {}".format(self.winid, password))
@@ -315,19 +345,35 @@ class LoginWindow:
def _wait_dead(self):
while self.alive:
self.browser.farmer.loop(once=True)
-
+
def go(self):
- assert(self.alive)
+ assert self.alive
self.browser.farmer.tell_monkey("LOGIN GO {}".format(self.winid))
self._wait_dead()
def destroy(self):
- assert(self.alive)
+ assert self.alive
self.browser.farmer.tell_monkey("LOGIN DESTROY {}".format(self.winid))
self._wait_dead()
-
+
+
class BrowserWindow:
- def __init__(self, browser, winid, _for, coreid, _existing, otherid, _newtab, newtab, _clone, clone):
+
+ # pylint: disable=locally-disabled, too-many-instance-attributes, too-many-public-methods, invalid-name
+
+ def __init__(
+ self,
+ browser,
+ winid,
+ _for,
+ coreid,
+ _existing,
+ otherid,
+ _newtab,
+ newtab,
+ _clone,
+ clone):
+ # pylint: disable=locally-disabled, too-many-arguments
self.alive = True
self.browser = browser
self.winid = winid
@@ -361,7 +407,7 @@ class BrowserWindow:
if (time.time() - now) > timeout:
break
- def go(self, url, referer = None):
+ def go(self, url, referer=None):
if referer is None:
self.browser.farmer.tell_monkey("WINDOW GO %s %s" % (
self.winid, url))
@@ -387,15 +433,12 @@ class BrowserWindow:
def handle_window_SIZE(self, _width, width, _height, height):
self.width = int(width)
self.height = int(height)
-
+
def handle_window_DESTROY(self):
self.alive = False
def handle_window_TITLE(self, _str, *title):
self.title = " ".join(title)
-
- def handle_window_REDRAW(self):
- pass
def handle_window_GET_DIMENSIONS(self, _width, width, _height, height):
self.width = width
@@ -418,11 +461,12 @@ class BrowserWindow:
self.scrolly = int(y)
def handle_window_UPDATE_BOX(self, _x, x, _y, y, _width, width, _height, height):
+ # pylint: disable=locally-disabled, no-self-use
+
x = int(x)
y = int(y)
width = int(width)
height = int(height)
- pass
def handle_window_UPDATE_EXTENT(self, _width, width, _height, height):
self.content_width = int(width)
@@ -516,9 +560,11 @@ class BrowserWindow:
self.browser.farmer.loop(once=True)
-if __name__ == '__main__':
- # Simple test is as follows...
-
+def farmer_test():
+ '''
+ Simple farmer test
+ '''
+
browser = Browser(quiet=True)
win = browser.new_window()
@@ -534,11 +580,10 @@ if __name__ == '__main__':
print("Received {} plot commands".format(len(cmds)))
for cmd in cmds:
if cmd[0] == "TEXT":
- x = cmd[2]
- y = cmd[4]
+ text_x = cmd[2]
+ text_y = cmd[4]
rest = " ".join(cmd[6:])
- print("{} {} -> {}".format(x,y,rest))
-
+ print("{} {} -> {}".format(text_x, text_y, rest))
browser.pass_options("--enable_javascript=1")
win.load_page("file://" + full_fname)
@@ -549,10 +594,10 @@ if __name__ == '__main__':
print("Received {} plot commands".format(len(cmds)))
for cmd in cmds:
if cmd[0] == "TEXT":
- x = cmd[2]
- y = cmd[4]
+ text_x = cmd[2]
+ text_y = cmd[4]
rest = " ".join(cmd[6:])
- print("{} {} -> {}".format(x,y,rest))
+ print("{} {} -> {}".format(text_x, text_y, rest))
browser.quit_and_wait()
@@ -565,17 +610,17 @@ if __name__ == '__main__':
def handle_ready_sslcert(self, cwin):
cwin.destroy()
- browser = FooBarLogin(quiet=True)
- win = browser.new_window()
+ fbbrowser = FooBarLogin(quiet=True)
+ win = fbbrowser.new_window()
win.load_page("https://httpbin.org/basic-auth/foo/bar")
cmds = win.redraw()
print("Received {} plot commands for auth test".format(len(cmds)))
for cmd in cmds:
if cmd[0] == "TEXT":
- x = cmd[2]
- y = cmd[4]
+ text_x = cmd[2]
+ text_y = cmd[4]
rest = " ".join(cmd[6:])
- print("{} {} -> {}".format(x,y,rest))
+ print("{} {} -> {}".format(text_x, text_y, rest))
fname = "test/js/inserted-script.html"
full_fname = os.path.join(os.getcwd(), fname)
@@ -588,6 +633,10 @@ if __name__ == '__main__':
win.wait_for_log(substr="deferred")
- #print("Discussion was:")
- #for line in browser.farmer.discussion:
+ # print("Discussion was:")
+ # for line in browser.farmer.discussion:
# print("{} {}".format(line[0], line[1]))
+
+
+if __name__ == '__main__':
+ farmer_test()