update: pivot to http protocol
This commit is contained in:
@@ -4,16 +4,12 @@ build-backend = "setuptools.build_meta"
|
|||||||
|
|
||||||
[project]
|
[project]
|
||||||
name = "common"
|
name = "common"
|
||||||
description = "Reusable code stubs"
|
description = "Commonly reusable code"
|
||||||
version = "0.1.14"
|
version = "0.1.15"
|
||||||
requires-python = ">=3.13"
|
requires-python = ">=3.13"
|
||||||
authors = [
|
authors = [
|
||||||
{ name="BreakerBear", email="breakerbear@autistic.men" },
|
{ name="BreakerBear", email="breakerbear@autistic.men" },
|
||||||
]
|
]
|
||||||
dependencies = [
|
|
||||||
"trio",
|
|
||||||
"trio-websocket",
|
|
||||||
]
|
|
||||||
|
|
||||||
[project.optional-dependencies]
|
[project.optional-dependencies]
|
||||||
selenium = ["selenium"]
|
selenium = ["selenium"]
|
||||||
|
|||||||
@@ -31,10 +31,11 @@ class ActionFlow:
|
|||||||
self.on[index] = bool(self.actions[index].prepare())
|
self.on[index] = bool(self.actions[index].prepare())
|
||||||
|
|
||||||
def react(self):
|
def react(self):
|
||||||
for index in self.indices.values():
|
while any(self.on):
|
||||||
if (self.on[index]):
|
for index in self.indices.values():
|
||||||
self.on[index] = None
|
if (self.on[index]):
|
||||||
self.on[index] = bool(self.actions[index].perform())
|
self.on[index] = None
|
||||||
|
self.on[index] = bool(self.actions[index].perform())
|
||||||
|
|
||||||
def index(self, action: type[Action]) -> int:
|
def index(self, action: type[Action]) -> int:
|
||||||
name = action.__name__
|
name = action.__name__
|
||||||
|
|||||||
@@ -1,30 +1,11 @@
|
|||||||
from common.jsonrpc2.server import (
|
from common.jsonrpc2.server import (
|
||||||
Request,
|
|
||||||
Response,
|
|
||||||
Error,
|
|
||||||
History,
|
History,
|
||||||
Options,
|
RequestHandler,
|
||||||
define,
|
ServiceProvider,
|
||||||
remove,
|
|
||||||
run,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def prelude() -> str:
|
|
||||||
with open(__file__.replace('__init__.py', 'client.js'), 'r') as file:
|
|
||||||
from common.jsonrpc2.server import server
|
|
||||||
assert server is not None
|
|
||||||
endpoint = server.listeners[0]
|
|
||||||
url = f'ws://{endpoint.address}:{endpoint.port}'
|
|
||||||
return file.read() % url
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
'Request',
|
|
||||||
'Response',
|
|
||||||
'Error',
|
|
||||||
'History',
|
'History',
|
||||||
'Options',
|
'RequestHandler',
|
||||||
'define',
|
'ServiceProvider',
|
||||||
'remove',
|
|
||||||
'run',
|
|
||||||
'prelude',
|
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -1,55 +1,45 @@
|
|||||||
const CONNECTION = new WebSocket("%s");
|
export default new class extends Function {
|
||||||
const QUEUE = new Array();
|
constructor() {
|
||||||
|
super('...args', 'return this.get(...args)');
|
||||||
|
return this.bind(this);
|
||||||
|
}
|
||||||
|
|
||||||
CONNECTION.addEventListener('message', (e) => {
|
all(selectors) { return document.querySelectorAll(selectors) }
|
||||||
let message = JSON.parse(e.data);
|
get(selectors) { return document.querySelector(selectors) }
|
||||||
let index = QUEUE.findIndex(item => item.id === message.id);
|
set(selectors, event, listener) { return this.get(selectors).addEventListener(event, listener) }
|
||||||
|
};
|
||||||
|
|
||||||
if (index === -1) throw Error('Invalid message ID: ' + message.id);
|
export const LogRecord = {
|
||||||
let promise = QUEUE.splice(index, 1)[0];
|
format: (log) => {
|
||||||
|
let ms = (log.created - new Date().getTimezoneOffset() * 60) * 1000;
|
||||||
if (message.error) promise.reject(message.error);
|
|
||||||
else promise.resolve(message.result);
|
|
||||||
});
|
|
||||||
|
|
||||||
while (CONNECTION.readyState !== CONNECTION.OPEN) {
|
|
||||||
await new Promise(resolve => setTimeout(resolve, 200));
|
|
||||||
}
|
|
||||||
|
|
||||||
const LogRecord = {
|
|
||||||
format: (record) => {
|
|
||||||
let ms = record.created * 1000 - new Date().getTimezoneOffset() * 60000;
|
|
||||||
let date = new Date(ms).toISOString().slice(0, 16).replace("T", " ");
|
let date = new Date(ms).toISOString().slice(0, 16).replace("T", " ");
|
||||||
let line = `[${date}] [${record.levelname}] [${record.name}] ${record.msg}`;
|
let line = `[${date}] [${log.levelname}] [${log.name}] ${log.msg}`;
|
||||||
let message = [line, record.exc_text, record.stack_info].filter(Boolean).join('\n');
|
let message = [line, log.exc_text, log.stack_info].filter(Boolean).join('\n');
|
||||||
return message;
|
return message;
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
const RemoteProceduralCall = {
|
export const Rpc2 = {
|
||||||
notify: async (method, ...args) => {
|
notify: async (method, ...args) => {
|
||||||
let request = { method };
|
let request = { method };
|
||||||
if (args.length > 0) request.params = args;
|
if (args.length > 0) request.params = args;
|
||||||
let data = JSON.stringify(request);
|
let body = JSON.stringify(request);
|
||||||
CONNECTION.send(data);
|
await fetch('/', { method: 'POST', body });
|
||||||
},
|
},
|
||||||
invoke: async (method, ...args) => {
|
invoke: async (method, ...args) => {
|
||||||
let id = Math.floor(Math.random() * 1000000000);
|
let id = Math.floor(Math.random() * 1000000000);
|
||||||
let request = { method, id };
|
let request = { method, id };
|
||||||
if (args.length > 0) request.params = args;
|
if (args.length > 0) request.params = args;
|
||||||
|
|
||||||
let data = JSON.stringify(request);
|
let body = JSON.stringify(request);
|
||||||
CONNECTION.send(data);
|
let response = await fetch('/', { method: 'POST', body });
|
||||||
|
let json = await response.json();
|
||||||
|
|
||||||
let [resolve, reject] = [null, null];
|
if (json.error) {
|
||||||
let promise = new Promise((a, b) => { resolve = a; reject = b; });
|
let message = `(${json.error.code}) ${json.error.message}`;
|
||||||
QUEUE.push({ resolve, reject, id });
|
if (json.error.data) message += `: ${json.error.data}`;
|
||||||
|
throw new Error(message);
|
||||||
let result = await promise;
|
}
|
||||||
return result;
|
return json.result;
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
window.LogRecord = LogRecord;
|
|
||||||
window.Rpc2 = RemoteProceduralCall;
|
|
||||||
main(...arguments); // user-defined code
|
|
||||||
@@ -1,53 +1,50 @@
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import trio
|
import dataclasses
|
||||||
import trio_websocket as ws
|
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
|
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
|
||||||
from typing import Self, Callable, Any
|
from typing import Self, Callable, Any
|
||||||
from itertools import repeat
|
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
|
|
||||||
handlers: dict[str, Callable] = dict()
|
|
||||||
connection, server = None, None
|
|
||||||
|
|
||||||
class Request[T]:
|
class Request[T]:
|
||||||
def __init__(self, method: str, params = None, id = None):
|
def __init__(self, method: str, params = None, id = None):
|
||||||
if not isinstance(method, str): raise TypeError()
|
if not isinstance(method, str): raise TypeError('method')
|
||||||
if not isinstance(id, (str, int)) and id is not None: raise TypeError()
|
if not isinstance(id, (str, int)) and id is not None: raise TypeError('id')
|
||||||
self.id = id
|
self.id = id
|
||||||
self.method = method
|
self.method = method
|
||||||
self.params = params
|
self.params = params
|
||||||
|
|
||||||
class ParamsError(Exception):
|
class ParamsError(Exception): pass
|
||||||
pass
|
class NotFound(Exception): pass
|
||||||
|
class Invalid(Exception): pass
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def load(cls, raw: bytes) -> Self:
|
def load(cls, data: str) -> Self:
|
||||||
data = json.loads(raw)
|
result = json.loads(data)
|
||||||
return cls(**data)
|
return cls(**result)
|
||||||
|
|
||||||
def fulfill(self) -> T:
|
def handle(self, handlers: dict[str, Any]) -> T:
|
||||||
args = self.params
|
args = self.params
|
||||||
handler: Callable[..., T] = handlers[self.method]
|
handler: Callable[..., T] = handlers.get(self.method)
|
||||||
|
|
||||||
|
if handler is None: raise self.NotFound(self.method)
|
||||||
argcount = handler.__code__.co_argcount
|
argcount = handler.__code__.co_argcount
|
||||||
varnames = handler.__code__.co_varnames
|
argnames = handler.__code__.co_varnames[:argcount]
|
||||||
|
|
||||||
if self.params is None:
|
if self.params is None:
|
||||||
if argcount > 0: raise self.ParamsError(None)
|
if argcount > 0: raise self.ParamsError('Too less')
|
||||||
return handler()
|
return handler()
|
||||||
|
|
||||||
if isinstance(args, list):
|
if isinstance(args, list):
|
||||||
if len(args) != argcount: raise self.ParamsError(args)
|
if len(args) != argcount: raise self.ParamsError(args)
|
||||||
return handler(*args)
|
return handler(*args)
|
||||||
|
|
||||||
if isinstance(args, dict):
|
if isinstance(args, dict):
|
||||||
if args.keys() != set(varnames): raise self.ParamsError(args)
|
if args.keys() != set(argnames): raise self.ParamsError(args)
|
||||||
return handler(**args)
|
return handler(**args)
|
||||||
|
|
||||||
raise TypeError(repr(args))
|
raise self.Invalid('Arguments unacceptable')
|
||||||
|
|
||||||
class Response[T]:
|
class Response[T]:
|
||||||
def __init__(self, id: str|int, inner: T):
|
def __init__(self, id: str|int|None, inner: T):
|
||||||
self.id = id
|
self.id = id
|
||||||
self.inner = inner
|
self.inner = inner
|
||||||
|
|
||||||
@@ -80,7 +77,7 @@ class Error:
|
|||||||
result['code'] = self.code.value
|
result['code'] = self.code.value
|
||||||
result['message'] = self.message()
|
result['message'] = self.message()
|
||||||
if self.data is not None:
|
if self.data is not None:
|
||||||
result['data'] = self.data
|
result['data'] = str(self.data)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
class History(logging.Handler):
|
class History(logging.Handler):
|
||||||
@@ -100,65 +97,98 @@ class History(logging.Handler):
|
|||||||
self.records.clear()
|
self.records.clear()
|
||||||
return copy
|
return copy
|
||||||
|
|
||||||
class Options:
|
class Server(ThreadingHTTPServer):
|
||||||
def __init__(self):
|
def __init__(self, *args, **kwargs):
|
||||||
self.listen = '127.0.0.1'
|
super().__init__(*args, **kwargs)
|
||||||
self.port = 0
|
self.encoding: str
|
||||||
self.max_message_size = 125_000_000
|
self.handlers = dict()
|
||||||
|
|
||||||
async def handler(request: ws.WebSocketRequest):
|
class RequestHandler(BaseHTTPRequestHandler):
|
||||||
global connection
|
server: Server
|
||||||
logger = logging.getLogger('jsonrpc2')
|
protocol_version = 'HTTP/1.1'
|
||||||
websocket = await request.accept()
|
|
||||||
|
|
||||||
if connection is None:
|
def log_message(self, format, *args):
|
||||||
connection = websocket
|
pass
|
||||||
else:
|
|
||||||
await websocket.aclose(code=1000, reason="Engaged")
|
|
||||||
return
|
|
||||||
|
|
||||||
for mid, res, err, exc in repeat((None, None, None, None)):
|
def do_GET(self):
|
||||||
try:
|
try:
|
||||||
message = await connection.get_message()
|
match self.headers.get('Sec-Fetch-Dest'):
|
||||||
inbound = Request.load(message)
|
case 'script':
|
||||||
mid = inbound.id
|
file, mime = __file__[:__file__.rfind('server.py')] + 'client.js', 'text/javascript'
|
||||||
res = inbound.fulfill()
|
case _:
|
||||||
except json.decoder.JSONDecodeError as e:
|
file, mime = './index.html', 'text/html'
|
||||||
err = Error(Error.Code.PARSE_ERROR)
|
response = open(file, mode='rb').read()
|
||||||
exc = e
|
self.send_response(200)
|
||||||
except Request.ParamsError as e:
|
self.send_header('Content-Type', '%s; charset=%s' % (mime, self.server.encoding))
|
||||||
err = Error(Error.Code.INVALID_PARAMS)
|
except FileNotFoundError:
|
||||||
exc = e
|
response = b'File not found'
|
||||||
except TypeError as e:
|
self.send_response(404)
|
||||||
err = Error(Error.Code.INVALID_REQUEST)
|
|
||||||
exc = e
|
|
||||||
except KeyError as e:
|
|
||||||
err = Error(Error.Code.METHOD_NOT_FOUND)
|
|
||||||
err.data = str(e).strip("'")
|
|
||||||
exc = e
|
|
||||||
except ws.ConnectionClosed as e:
|
|
||||||
logger.critical('Going away', exc_info=e)
|
|
||||||
return
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
err = Error(Error.Code.INTERNAL_ERROR)
|
response = b'Internal error: %b' % str(e).encode(self.server.encoding)
|
||||||
exc = e
|
self.send_response(500)
|
||||||
|
|
||||||
if err is not None: logger.error(err.message(), exc_info=exc)
|
|
||||||
if mid is not None: await connection.send_message(str(Response(mid, err or res)))
|
|
||||||
|
|
||||||
async def backend(options: Options):
|
self.send_header('Content-Length', str(len(response)))
|
||||||
global server
|
self.end_headers()
|
||||||
listeners = await trio.open_tcp_listeners(port=options.port, host=options.listen)
|
self.wfile.write(response)
|
||||||
server = ws.WebSocketServer(handler, listeners, max_message_size=options.max_message_size)
|
|
||||||
await server.run()
|
|
||||||
|
|
||||||
def define(method: str, handler: Callable[..., Any]):
|
def do_POST(self, request=None):
|
||||||
if method in handlers: raise KeyError(method)
|
try:
|
||||||
handlers[method] = handler
|
size = int(self.headers.get('Content-Length', '0'))
|
||||||
|
data = self.rfile.read(size)
|
||||||
|
request = Request.load(data)
|
||||||
|
message = request.handle(self.server.handlers)
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
message = Error(Error.Code.PARSE_ERROR, e)
|
||||||
|
except Request.ParamsError as e:
|
||||||
|
message = Error(Error.Code.INVALID_PARAMS, e)
|
||||||
|
except Request.Invalid as e:
|
||||||
|
message = Error(Error.Code.INVALID_REQUEST, e)
|
||||||
|
except Request.NotFound as e:
|
||||||
|
message = Error(Error.Code.METHOD_NOT_FOUND, e)
|
||||||
|
except Exception as e:
|
||||||
|
message = Error(Error.Code.INTERNAL_ERROR, e)
|
||||||
|
|
||||||
def remove(method: str) -> Callable[..., Any]:
|
response = Response(request.id if request is not None else None, message)
|
||||||
return handlers.pop(method)
|
buffer = bytes(str(response), encoding=self.server.encoding)
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header('Content-Length', str(len(buffer)))
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(buffer)
|
||||||
|
|
||||||
def run(options: Options):
|
class ServiceProvider:
|
||||||
thread = Thread(target=lambda: trio.run(backend, options), daemon=True)
|
def __init__(self, options):
|
||||||
thread.start()
|
self.options = dataclasses.asdict(options)
|
||||||
|
self.server = Server((self.options['host'], self.options['port']), self.options['handler'])
|
||||||
|
self.server.encoding = self.options['encoding']
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def default(cls):
|
||||||
|
import sys, _thread as t
|
||||||
|
logger = logging.getLogger()
|
||||||
|
history = History()
|
||||||
|
logger.addHandler(history)
|
||||||
|
opts = cls.Options()
|
||||||
|
self = cls(opts)
|
||||||
|
self.set('history', lambda: history.truncate())
|
||||||
|
self.set('exit', lambda: t.interrupt_main() or sys.exit(0))
|
||||||
|
return self
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class Options:
|
||||||
|
host : str = '127.0.0.1'
|
||||||
|
port : int = 0
|
||||||
|
handler : type = RequestHandler
|
||||||
|
encoding : str = 'UTF-8'
|
||||||
|
interval : float = 0.2
|
||||||
|
|
||||||
|
def set(self, method: str, handler: Callable[..., Any]):
|
||||||
|
if method in self.server.handlers: raise KeyError(method)
|
||||||
|
self.server.handlers[method] = handler
|
||||||
|
|
||||||
|
def pop(self, method: str) -> Callable[..., Any]:
|
||||||
|
return self.server.handlers.pop(method)
|
||||||
|
|
||||||
|
def run(self) -> str:
|
||||||
|
thread = Thread(target=lambda: self.server.serve_forever(self.options['interval']), daemon=True)
|
||||||
|
thread.start()
|
||||||
|
return 'http://%s:%d' % self.server.server_address
|
||||||
|
|||||||
@@ -1,8 +1,12 @@
|
|||||||
from common.utils.timer import Timer
|
from common.utils.selenium import (
|
||||||
from common.utils.selenium import until, sleep, locate, click, setup
|
until,
|
||||||
|
sleep,
|
||||||
|
locate,
|
||||||
|
click,
|
||||||
|
setup,
|
||||||
|
)
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
'Timer',
|
|
||||||
'until',
|
'until',
|
||||||
'sleep',
|
'sleep',
|
||||||
'locate',
|
'locate',
|
||||||
|
|||||||
@@ -7,24 +7,25 @@ from selenium.webdriver.remote.webelement import WebElement
|
|||||||
from selenium.webdriver.common.by import By
|
from selenium.webdriver.common.by import By
|
||||||
from selenium.webdriver.support import expected_conditions as EC
|
from selenium.webdriver.support import expected_conditions as EC
|
||||||
from selenium.webdriver.support.wait import WebDriverWait
|
from selenium.webdriver.support.wait import WebDriverWait
|
||||||
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException
|
from selenium.common.exceptions import StaleElementReferenceException, NoAlertPresentException, TimeoutException
|
||||||
|
|
||||||
driver: WebDriver|None = None
|
driver: WebDriver|None = None
|
||||||
attempts, timeout, interval, identity = int(), float(), float(), None
|
identity, parameters = None, dict()
|
||||||
|
|
||||||
def until(condition: typing.Callable[[WebDriver], bool], watch=True):
|
def until(condition: typing.Callable[[WebDriver], bool], watch=True):
|
||||||
try:
|
try:
|
||||||
WebDriverWait(driver, timeout).until(condition)
|
WebDriverWait(driver, parameters['timeout']).until(condition)
|
||||||
except (TimeoutException, StaleElementReferenceException):
|
except (TimeoutException, StaleElementReferenceException):
|
||||||
pass
|
pass
|
||||||
if watch: WebDriverWait(driver, timeout).until_not(condition)
|
if watch: WebDriverWait(driver, parameters['timeout']).until_not(condition)
|
||||||
|
|
||||||
def sleep(seconds: float):
|
def sleep(seconds: float):
|
||||||
from common.jsonrpc2.server import connection
|
try: driver.switch_to.alert
|
||||||
assert connection is not None and connection.closed is None
|
except NoAlertPresentException: pass
|
||||||
|
except: raise KeyboardInterrupt()
|
||||||
|
|
||||||
try: WebDriverWait(driver, seconds, seconds).until(lambda _: False)
|
try: WebDriverWait(driver, seconds, seconds).until(lambda _: False)
|
||||||
except: pass
|
except TimeoutException: pass
|
||||||
|
|
||||||
def locate(selector: str, wait=True, condition=True) -> WebElement:
|
def locate(selector: str, wait=True, condition=True) -> WebElement:
|
||||||
while True:
|
while True:
|
||||||
@@ -33,12 +34,12 @@ def locate(selector: str, wait=True, condition=True) -> WebElement:
|
|||||||
if not wait: return driver.find_element(*locator)
|
if not wait: return driver.find_element(*locator)
|
||||||
|
|
||||||
presence = EC.presence_of_element_located(locator)
|
presence = EC.presence_of_element_located(locator)
|
||||||
element = WebDriverWait(driver, timeout).until(presence, 'Timeout')
|
element = WebDriverWait(driver, parameters['timeout']).until(presence, 'Timeout')
|
||||||
driver.execute_script("arguments[0].scrollIntoView({ block: 'center', inline: 'nearest' });", element)
|
driver.execute_script("arguments[0].scrollIntoView({ block: 'center', inline: 'nearest' });", element)
|
||||||
|
|
||||||
if condition is not None and condition != False:
|
if condition is not None and condition != False:
|
||||||
predicate = condition if callable(condition) else EC.visibility_of_element_located
|
predicate = condition if callable(condition) else EC.visibility_of_element_located
|
||||||
element = WebDriverWait(driver, timeout).until(predicate(locator), 'Timeout')
|
element = WebDriverWait(driver, parameters['timeout']).until(predicate(locator), 'Timeout')
|
||||||
|
|
||||||
return element
|
return element
|
||||||
except StaleElementReferenceException:
|
except StaleElementReferenceException:
|
||||||
@@ -49,11 +50,12 @@ def click(selector: str|WebElement, wait=True, condition=False):
|
|||||||
error = False
|
error = False
|
||||||
|
|
||||||
element = locate(selector, wait, predicate) if isinstance(selector, str) else selector
|
element = locate(selector, wait, predicate) if isinstance(selector, str) else selector
|
||||||
counter = lambda: int(element.get_attribute(identity) or 0)
|
counter = lambda: int(element.get_attribute(identity) or '0')
|
||||||
value = counter()
|
value = counter()
|
||||||
driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].setAttribute(arguments[1], arguments[2] + 1));", element, identity, value)
|
driver.execute_script("window.__%s__ = () => { arguments[0].setAttribute('%s', arguments[1] + 1) }" % ((identity,) * 2), element, value)
|
||||||
|
driver.execute_script("arguments[0].addEventListener('click', __%s__);" % identity, element)
|
||||||
|
|
||||||
for _ in range(attempts):
|
for _ in range(parameters['attempts']):
|
||||||
try:
|
try:
|
||||||
if not error: element.click()
|
if not error: element.click()
|
||||||
else: driver.execute_script("arguments[0].click();", element)
|
else: driver.execute_script("arguments[0].click();", element)
|
||||||
@@ -63,15 +65,16 @@ def click(selector: str|WebElement, wait=True, condition=False):
|
|||||||
error = True
|
error = True
|
||||||
continue
|
continue
|
||||||
try:
|
try:
|
||||||
WebDriverWait(driver, interval).until(lambda _: counter() > value)
|
WebDriverWait(driver, parameters['interval']).until(lambda _: counter() > value)
|
||||||
break
|
break
|
||||||
except TimeoutException: continue
|
except TimeoutException: continue
|
||||||
except: break
|
except: break
|
||||||
|
|
||||||
def setup(a: WebDriver, b: int, c: float, d: float):
|
try: driver.execute_script("arguments[0].removeEventListener('click', __%s__);" % identity, element)
|
||||||
global identity, driver, attempts, timeout, interval
|
except: pass
|
||||||
|
|
||||||
|
def setup(a: WebDriver, b: dict):
|
||||||
|
global identity, driver
|
||||||
identity = ''.join(random.choices(string.ascii_uppercase + string.digits, k=8))
|
identity = ''.join(random.choices(string.ascii_uppercase + string.digits, k=8))
|
||||||
driver = a
|
driver = a
|
||||||
attempts = b
|
parameters.update(b)
|
||||||
timeout = c
|
|
||||||
interval = d
|
|
||||||
|
|||||||
Reference in New Issue
Block a user