initial commit

This commit is contained in:
2026-04-13 14:26:09 +08:00
commit ec2008a879
4 changed files with 178 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
/dist
/venv
*.egg-info

20
pyproject.toml Normal file
View File

@@ -0,0 +1,20 @@
[build-system]
requires = ["setuptools >= 77.0.3"]
build-backend = "setuptools.build_meta"
[project]
name = "common"
description = "Reusable code stubs"
version = "0.1.1"
requires-python = ">=3.13"
authors = [
{ name="BreakerBear", email="breakerbear@autistic.men" },
]
dependencies = [
"trio",
"trio-websocket",
]
[project.urls]
Homepage = "https://git.autistic.men/Utilities/common"
Issues = "https://git.autistic.men/Utilities/common/issues"

0
src/common/__init__.py Normal file
View File

155
src/common/jsonrpc2.py Normal file
View File

@@ -0,0 +1,155 @@
import json
import logging
import trio
import trio_websocket as ws
from enum import Enum
from typing import Self, Callable
from ipaddress import IPv4Address
from itertools import repeat
from threading import Thread
handlers: dict[str, Callable] = {}
connection, server = None, None
class Request[T]:
def __init__(self, method: str, params = None, id = None):
if not isinstance(method, str): raise TypeError()
if not isinstance(id, (str, int)) and id is not None: raise TypeError()
self.id = id
self.method = method
self.params = params
class ParamsError(Exception):
pass
@classmethod
def load(cls, raw: bytes) -> Self:
data = json.loads(raw)
return cls(**data)
def fulfill(self) -> T:
args = self.params
handler: Callable[..., T] = handlers[self.method]
argcount = handler.__code__.co_argcount
varnames = handler.__code__.co_varnames
if self.params is None:
if argcount > 0: raise self.ParamsError()
return handler()
if isinstance(args, list):
if len(args) != argcount: raise self.ParamsError()
return handler(*args)
if isinstance(args, dict):
if args.keys() != set(varnames): raise self.ParamsError()
return handler(**args)
raise TypeError()
class Response[T]:
def __init__(self, id: str|int, inner: T):
self.id = id
self.inner = inner
def __str__(self):
data = dict()
data['id'] = self.id
if isinstance(self.inner, Error): data['error'] = self.inner.response()
else: data['result'] = self.inner
return json.dumps(data)
class Error(Enum):
PARSE_ERROR = -32700
INVALID_REQUEST = -32600
METHOD_NOT_FOUND = -32601
INVALID_PARAMS = -32602
INTERNAL_ERROR = -32603
def message(self) -> str:
return self.name.capitalize().replace('_', ' ')
def response(self):
data = dict()
data['code'] = self.value
data['message'] = self.message()
return data
class History(logging.Handler):
def __init__(self):
super().__init__()
self.records = []
def emit(self, record):
self.records.append(record)
def truncate(self) -> list:
copy = self.records.copy()
self.records.clear()
return copy
class ConnectionContext:
def __enter__(self):
self.healthcheck()
return self
def __exit__(self, exc_type, exc, tb):
self.healthcheck()
return True
class Error(Exception):
pass
@classmethod
def healthcheck(cls):
if connection is not None:
if connection.closed is not None:
raise cls.Error()
async def handler(request: ws.WebSocketRequest):
global connection
logger = logging.getLogger('jsonrpc2')
websocket = await request.accept()
if connection is None:
connection = websocket
else:
await websocket.aclose(code=1000, reason="Engaged")
return
for mid, res, err, exc in repeat((None, None, None, None)):
try:
message = await connection.get_message()
inbound = Request.load(message)
mid = inbound.id
res = inbound.fulfill()
except json.decoder.JSONDecodeError: err = Error.PARSE_ERROR
except TypeError: err = Error.INVALID_REQUEST
except KeyError: err = Error.METHOD_NOT_FOUND
except Exception as e:
err = Error.INTERNAL_ERROR
exc = e
if err is not None: logger.error(err.message(), exc_info=exc)
if mid is not None: await connection.send_message(str(Response(mid, err or res)))
async def backend(listen='127.0.0.1', port=0):
import _thread as t
global server
listeners = await trio.open_tcp_listeners(port, host=listen)
server = ws.WebSocketServer(handler, listeners, max_message_size=125_000_000)
await server.run()
t.interrupt_main()
def run(logger: logging.Logger) -> tuple[IPv4Address, int]:
history = History()
fmt = logger.handlers[0].formatter
logger.addHandler(history)
handlers.setdefault('sync', lambda: list(map(lambda x: fmt.format(x), history.truncate())))
thread = Thread(target=lambda: trio.run(backend), daemon=True)
thread.start()
endpoint = server.listeners[0]
return (endpoint.address, endpoint.port)