import argparse import openpyxl import logging import base64 import json import csv import trio import trio_websocket as ws from selenium.common.exceptions import StaleElementReferenceException, TimeoutException from selenium.webdriver.chrome.webdriver import WebDriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.remote.webelement import WebElement from selenium.webdriver.common.bidi.network import Request as NetworkRequest from io import BytesIO from enum import Enum from typing import Self, Callable from pathlib import Path from urllib3 import PoolManager from threading import Thread parser = argparse.ArgumentParser(description="Opportunity Exporter") parser.add_argument('account', type=str) parser.add_argument('password', type=str) parser.add_argument('-e', '--encoding', type=str, default="utf-8-sig") parser.add_argument('-t', '--timeout', type=int, default=60) parser.add_argument('-r', '--attempts', type=int, default=3) parser.add_argument('-l', '--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']) WEBURL = "https://crm.xiaoman.cn/business/export" APIURL = "https://crm.xiaoman.cn/api/opportunityRead/export" args = parser.parse_args() handlers = {} connection, server = None, None class Request[T]: def __init__(self, method: str, params: T): if not isinstance(method, str): raise TypeError() self.method = method self.params = params @classmethod def load(cls, raw: bytes) -> Self: data = json.loads(raw) return cls(**data) class Response[T]: def __init__(self, result: T): self.result = result def __str__(self): data = { 'result': self.result } return json.dumps(data) class Error(Enum): PARSE_ERROR = -200 INVALID_REQUEST = -300 METHOD_NOT_FOUND = -400 INTERNAL_ERROR = -500 def __str__(self): data = { 'error': self.value } return json.dumps(data) class History(logging.Handler): def __init__(self): super().__init__() self.records = [] def emit(self, record): self.records.append(record) def truncate(self) -> list: copy = self.records.copy() self.records.clear() return copy class ConnectionContext: def __enter__(self): self.healthcheck() return self def __exit__(self, exc_type, exc, tb): self.healthcheck() return True class Error(Exception): pass @classmethod def healthcheck(cls): if connection.closed is not None: raise cls.Error() def main(driver: WebDriver, logger = logging.getLogger('main')): try: http = PoolManager() context = ConnectionContext() driver.get(str(Path('index.html').resolve())) endpoint = server.listeners[0] parameters = vars(args) driver.execute_script(f"main(...arguments);", f'ws://{endpoint.address}:{endpoint.port}', parameters) except Exception as e: logger.critical('Unable to load starup page', exc_info=e) return 2 try: driver.switch_to.new_window('tab') driver.set_page_load_timeout(parameters.get('timeout')) driver.get(WEBURL) except TimeoutException: logger.warning('Timeout') driver.execute_script("window.stop();") def until(condition: Callable[[WebDriver], bool], watch=True): try: WebDriverWait(driver, parameters.get('timeout')).until(condition) except (TimeoutException, StaleElementReferenceException): pass if watch: WebDriverWait(driver, parameters.get('timeout')).until_not(condition) return True def sleep(seconds: float): with context: try: WebDriverWait(driver, seconds, seconds).until(lambda _: False) except: pass return True def locate(selector, wait=True, condition=EC.visibility_of_element_located) -> WebElement: while True: try: locator = (By.CSS_SELECTOR, selector) if not wait: return driver.find_element(*locator) presence = EC.presence_of_element_located(locator) element = WebDriverWait(driver, parameters.get('timeout')).until(presence, 'Timeout') driver.execute_script("arguments[0].scrollIntoView({ block: 'center', inline: 'nearest' });", element) if condition is not None: element = WebDriverWait(driver, parameters.get('timeout')).until(condition(locator), 'Timeout') return element except StaleElementReferenceException: continue def click(selector: str|WebElement, wait=True, condition=EC.element_to_be_clickable): element = locate(selector, wait, condition) if isinstance(selector, str) else selector counter = lambda: int(element.get_attribute('taximeter') or 0) error = False value = counter() driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].setAttribute('taximeter', arguments[1] + 1));", element, value) for _ in range(parameters.get('attempts')): try: if not error: element.click() else: driver.execute_script("arguments[0].click();", element) except StaleElementReferenceException: break except: error = True continue try: WebDriverWait(driver, parameters.get('interval')).until(lambda _: counter() > value) break except TimeoutException: continue except: break try: until(lambda x: 'loginProgress' in x.find_element(By.TAG_NAME, "body").get_attribute('class'), watch=False) account = str(parameters.get('account')) password = str(parameters.get('password')) logger.info('Logging in as %s (%s)', account.split('@', 1).pop(0).capitalize(), account) locate("input.account").send_keys(account) locate("input#password").send_keys(password) click("input.agree-checkbox") click("button.login-btn") except Exception as e: logger.critical('Unable to login to %s', parameters.get('url'), exc_info=e) return 3 logger.info('Waiting for authentication to complete...') while True: try: locate("#container", wait=False) logger.info('Done') break except: sleep(3) ready = False sidebar = locate(".new-layout-left") driver.execute_script("arguments[0].remove();", sidebar) def handle(request: NetworkRequest): nonlocal ready if request.url == APIURL: ready = True try: request.continue_request() except Exception as e: logger.debug('Network request error', exc_info=e) event = 'before_request' driver.network.add_request_handler(event, handle) while True: if not ready: sleep(1) continue try: locate(".mm-modal-content .mm-modal-body input", wait=False).send_keys(parameters.get('password')) click(".mm-modal-footer button.okki-btn-primary", wait=False) except: pass try: sleep(1) click(".business-export-wrap section:nth-child(3) h2 svg", wait=False) cell = locate(".business-export-wrap section:nth-child(3) table tbody tr:first-child td:first-child span", wait=False) filename = cell.text except: continue if filename == '--': sleep(1) continue try: logger.info('New task: %s', filename) download = locate(".business-export-wrap section:nth-child(3) table tbody tr:first-child td:last-child a") href = download.get_attribute('href') response = http.request("GET", href) text = response.data.decode(parameters.get('encoding')).splitlines() data = csv.reader(text) logger.info('Read %d line(s) total', len(text)) except Exception as e: logger.critical('Unable to load input data', exc_info=e) return 4 try: file = Path('template.xlsx').resolve() template = openpyxl.load_workbook(file) except Exception as e: logger.critical('Unable to load template excel', exc_info=e) return 5 header = next(data, None) source = template.active output = BytesIO() def preprocess(data: str): try: return float(data) except ValueError: pass try: return int(data) except ValueError: pass return data.lstrip("'") for row in data: for index, name in enumerate(header): names = template.defined_names if name not in names: continue for _, coord in names[name].destinations: value = preprocess(row[index]) source[coord] = value sheet = template.copy_worksheet(source) logger.info('New sheet: %s', sheet.title) if not len(template.worksheets) > 1: logger.error('Invalid input') ready = False continue template.remove(source) template.save(output) data = base64.b64encode(output.getbuffer()) text = data.decode('ascii') mime = 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' href = f"data:{mime};base64,{text}" driver.switch_to.new_window('tab') driver.execute_script( """ var link = document.createElement('a'); link.download = arguments[0]; link.href = arguments[1]; link.click(); """, filename.replace('.csv', '.xlsx'), href ) driver.close() driver.switch_to.window(driver.window_handles[1]) logger.info('Done') ready = False async def handler(request: ws.WebSocketRequest, logger = logging.getLogger('websocket')): global connection websocket = await request.accept() if connection is None: connection = websocket else: await websocket.aclose(code=1000, reason="Non-singular connection prohibited") return while True: try: message = await connection.get_message() inbound = Request.load(message) handler = handlers[inbound.method] results = handler(inbound.params) response = Response(results) except json.decoder.JSONDecodeError: logger.error('Parse error') response = Error.PARSE_ERROR except TypeError: logger.error('Invalid request') response = Error.INVALID_REQUEST except KeyError: logger.error('Method not found: `%s`', inbound.method) response = Error.METHOD_NOT_FOUND except Exception as e: logger.error('Internal error', exc_info=e) response = Error.INTERNAL_ERROR await connection.send_message(str(response)) async def backend(listen='127.0.0.1', port=0): import _thread as t global server listeners = await trio.open_tcp_listeners(port, host=listen) server = ws.WebSocketServer(handler, listeners, max_message_size=125_000_000) await server.run() t.interrupt_main() if __name__ == '__main__': try: logging.basicConfig(level=logging.INFO, format="[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s", datefmt="%Y-%m-%d %H:%M") logger = logging.getLogger() formatter = logger.handlers[0].formatter history = History() logger.addHandler(history) level = logging.getLevelNamesMapping().get(args.log_level, 'INFO') logger.setLevel(level) logger.info('Initializing...') handlers.setdefault('sync', lambda _: list(map(lambda x: formatter.format(x), history.truncate()))) thread = Thread(target=lambda: trio.run(backend), daemon=True) thread.start() logger.info('Creating automation instance') opts = Options() opts.enable_bidi = True opts.enable_downloads = True driver = WebDriver(options=opts) status = main(driver) except Exception as e: logger.critical('Fatal error', exc_info=e) status = 1 finally: driver.quit() exit(status)