update: added 'utils' module

This commit is contained in:
2026-04-22 16:31:53 +08:00
parent 9db80a0401
commit 5c13c1f7f9
7 changed files with 120 additions and 28 deletions

View File

@@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "common"
description = "Reusable code stubs"
version = "0.1.9"
version = "0.1.11"
requires-python = ">=3.13"
authors = [
{ name="BreakerBear", email="breakerbear@autistic.men" },
@@ -15,6 +15,9 @@ dependencies = [
"trio-websocket",
]
[project.optional-dependencies]
selenium = ["selenium"]
[project.urls]
Homepage = "https://git.autistic.men/Utilities/common"
Issues = "https://git.autistic.men/Utilities/common/issues"

View File

@@ -1,14 +1,12 @@
from common.jsonrpc2.server import (
Request,
Response,
Error,
History,
ConnectionContext,
Options,
define,
remove,
run
run,
)
def prelude() -> str:
@@ -18,3 +16,15 @@ def prelude() -> str:
endpoint = server.listeners[0]
url = f'ws://{endpoint.address}:{endpoint.port}'
return file.read() % url
__all__ = [
'Request',
'Response',
'Error',
'History',
'Options',
'define',
'remove',
'run',
'prelude',
]

View File

@@ -37,11 +37,9 @@ const RemoteProceduralCall = {
let data = JSON.stringify(request);
CONNECTION.send(data);
let reject = null;
let resolve = null;
let [resolve, reject] = [null, null];
let promise = new Promise((a, b) => { resolve = a; reject = b; });
let item = { resolve, reject, id };
QUEUE.push(item);
QUEUE.push({ resolve, reject, id });
let result = await promise;
return result;

View File

@@ -7,7 +7,7 @@ from typing import Self, Callable, Any
from itertools import repeat
from threading import Thread
handlers: dict[str, Callable] = {}
handlers: dict[str, Callable] = dict()
connection, server = None, None
class Request[T]:
@@ -99,24 +99,6 @@ class History(logging.Handler):
self.records.clear()
return copy
class ConnectionContext:
def __enter__(self):
self.healthcheck()
return self
def __exit__(self, exc_type, exc, tb):
self.healthcheck()
return True
class Error(Exception):
pass
@classmethod
def healthcheck(cls):
if connection is not None:
if connection.closed is not None:
raise cls.Error()
class Options:
def __init__(self):
self.listen = '127.0.0.1'
@@ -170,7 +152,7 @@ async def backend(options: Options):
await server.run()
def define(method: str, handler: Callable[..., Any]):
if method in handlers: raise KeyError()
if method in handlers: raise KeyError(method)
handlers[method] = handler
def remove(method: str) -> Callable[..., Any]:

View File

@@ -0,0 +1,10 @@
from common.utils.timer import Timer
from common.utils.selenium import until, sleep, locate, click
__all__ = [
'Timer',
'until',
'sleep',
'locate',
'click',
]

View File

@@ -0,0 +1,67 @@
import random
import string
import typing
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.remote.webelement import WebElement
def until(driver: WebDriver, condition: typing.Callable[[WebDriver], bool], timeout: float, watch=True):
try:
WebDriverWait(driver, timeout).until(condition)
except (TimeoutException, StaleElementReferenceException):
pass
if watch: WebDriverWait(driver, timeout).until_not(condition)
def sleep(driver: WebDriver, seconds: float):
from common.jsonrpc2.server import connection
assert connection is not None and connection.closed is not None
try: WebDriverWait(driver, seconds, seconds).until(lambda _: False)
except: pass
def locate(driver: WebDriver, selector: str, timeout: float, wait=True, condition=None) -> WebElement:
while True:
try:
locator = (By.CSS_SELECTOR, selector)
if not wait: return driver.find_element(*locator)
presence = EC.presence_of_element_located(locator)
element = WebDriverWait(driver, timeout).until(presence, 'Timeout')
driver.execute_script("arguments[0].scrollIntoView({ block: 'center', inline: 'nearest' });", element)
if condition is not None:
predicate = condition or EC.visibility_of_element_located
element = WebDriverWait(driver, timeout).until(predicate(locator), 'Timeout')
return element
except StaleElementReferenceException:
pass
def click(driver: WebDriver, selector: str|WebElement, attempts: int, interval: int, wait=True, condition=None):
predicate = condition if condition is not None else EC.element_to_be_clickable
identity = ''.join(random.choices(string.ascii_uppercase + string.digits, k=8))
element = locate(selector, wait, predicate) if isinstance(selector, str) else selector
counter = lambda: int(element.get_attribute(identity) or 0)
error = False
value = counter()
driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].setAttribute(arguments[1], arguments[2] + 1));", element, identity, value)
for _ in range(attempts):
try:
if not error: element.click()
else: driver.execute_script("arguments[0].click();", element)
except StaleElementReferenceException:
break
except:
error = True
continue
try:
WebDriverWait(driver, interval).until(lambda _: counter() > value)
break
except TimeoutException: continue
except: break

22
src/common/utils/timer.py Normal file
View File

@@ -0,0 +1,22 @@
from math import floor
from time import time
class Timer:
def __init__(self):
self.clear()
def clear(self):
self.checkpoint = None
self.accumulator = 0
def start(self):
seconds = floor(time())
self.checkpoint = seconds
def pause(self):
assert self.checkpoint is not None
self.accumulator = self.delta()
def delta(self):
assert self.checkpoint is not None
return floor(time()) - self.checkpoint + self.accumulator