From 423213759dad9f761ef4287b3c3ff73797010618 Mon Sep 17 00:00:00 2001 From: break27 Date: Wed, 29 Apr 2026 11:41:58 +0800 Subject: [PATCH] update: disabled BiDi --- main.py | 142 ++++++++++++++++------------------------------- requirements.txt | Bin 1018 -> 1018 bytes 2 files changed, 49 insertions(+), 93 deletions(-) diff --git a/main.py b/main.py index f097c46..6ab38e5 100644 --- a/main.py +++ b/main.py @@ -4,25 +4,25 @@ import logging import base64 import csv -from selenium.common.exceptions import StaleElementReferenceException, TimeoutException +from selenium.common.exceptions import TimeoutException, StaleElementReferenceException from selenium.webdriver.chrome.webdriver import WebDriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By -from selenium.webdriver.support import expected_conditions as EC -from selenium.webdriver.support.wait import WebDriverWait -from selenium.webdriver.remote.webelement import WebElement -from selenium.webdriver.common.bidi.network import Request as NetworkRequest + +from common import jsonrpc2 +from common.utils import * from io import BytesIO -from common import jsonrpc2 -from typing import Callable from pathlib import Path from urllib3 import PoolManager +from itertools import count from subprocess import Popen parser = argparse.ArgumentParser(description="Opportunity Export") parser.add_argument('account', type=str) parser.add_argument('password', type=str) +parser.add_argument('-p', '--open', action='store_true') +parser.add_argument('-d', '--directory', type=str, default=str(Path.home().joinpath('Downloads'))) parser.add_argument('-e', '--encoding', type=str, default="utf-8-sig") parser.add_argument('-t', '--timeout', type=int, default=60) parser.add_argument('-r', '--attempts', type=int, default=3) @@ -35,7 +35,6 @@ APIURL = "https://crm.xiaoman.cn/api/opportunityRead/export" def main(driver: WebDriver, logger = logging.getLogger('main')): try: http = PoolManager() - context = jsonrpc2.ConnectionContext() parameters = vars(args) driver.get(str(Path('index.html').resolve())) driver.execute_script(jsonrpc2.prelude(), parameters) @@ -51,60 +50,8 @@ def main(driver: WebDriver, logger = logging.getLogger('main')): logger.warning('Timeout') driver.execute_script("window.stop();") - def until(condition: Callable[[WebDriver], bool], watch=True): - try: - WebDriverWait(driver, parameters.get('timeout')).until(condition) - except (TimeoutException, StaleElementReferenceException): - pass - if watch: WebDriverWait(driver, parameters.get('timeout')).until_not(condition) - return True - - def sleep(seconds: float): - with context: - try: WebDriverWait(driver, seconds, seconds).until(lambda _: False) - except: pass - return True - - def locate(selector, wait=True, condition=EC.visibility_of_element_located) -> WebElement: - while True: - try: - locator = (By.CSS_SELECTOR, selector) - if not wait: return driver.find_element(*locator) - - presence = EC.presence_of_element_located(locator) - element = WebDriverWait(driver, parameters.get('timeout')).until(presence, 'Timeout') - driver.execute_script("arguments[0].scrollIntoView({ block: 'center', inline: 'nearest' });", element) - - if condition is not None: - element = WebDriverWait(driver, parameters.get('timeout')).until(condition(locator), 'Timeout') - return element - except StaleElementReferenceException: - continue - - def click(selector: str|WebElement, wait=True, condition=EC.element_to_be_clickable): - element = locate(selector, wait, condition) if isinstance(selector, str) else selector - counter = lambda: int(element.get_attribute('taximeter') or 0) - error = False - - value = counter() - driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].setAttribute('taximeter', arguments[1] + 1));", element, value) - - for _ in range(parameters.get('attempts')): - try: - if not error: element.click() - else: driver.execute_script("arguments[0].click();", element) - except StaleElementReferenceException: - break - except: - error = True - continue - try: - WebDriverWait(driver, parameters.get('interval')).until(lambda _: counter() > value) - break - except TimeoutException: continue - except: break - try: + setup(driver, parameters.get('attempts'), parameters.get('timeout'), 0) until(lambda x: 'loginProgress' in x.find_element(By.TAG_NAME, "body").get_attribute('class'), watch=False) account = str(parameters.get('account')) password = str(parameters.get('password')) @@ -122,26 +69,34 @@ def main(driver: WebDriver, logger = logging.getLogger('main')): while True: try: locate("#container", wait=False) + sidebar = locate(".new-layout-left") + driver.execute_script("arguments[0].remove();", sidebar) logger.info('Done') break except: sleep(3) - ready = False - sidebar = locate(".new-layout-left") - driver.execute_script("arguments[0].remove();", sidebar) - - def handle(request: NetworkRequest): - nonlocal ready - if request.url == APIURL: ready = True - try: request.continue_request() - except Exception as e: logger.debug('Network request error', exc_info=e) - - event = 'before_request' - driver.network.add_request_handler(event, handle) + while True: + try: + if buttons := driver.find_elements(By.CSS_SELECTOR, ".export-actions .okki-btn-primary"): + identity = 'EXPORT' + for button in buttons: driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].parentElement.setAttribute(arguments[1], '1'));", button, identity) + break + finally: + sleep(1) while True: - if not ready: + try: + for index in count(0): + if (driver.execute_script("return arguments[0].parentElement.getAttribute(arguments[1]);", buttons[index], identity)): + driver.execute_script("arguments[0].parentElement.removeAttribute(arguments[1]);", buttons[index], identity) + raise ValueError() + except StaleElementReferenceException: + logger.critical('Error while listening to button events') + return 4 + except ValueError: + pass + except: sleep(1) continue @@ -151,17 +106,14 @@ def main(driver: WebDriver, logger = logging.getLogger('main')): except: pass - try: - sleep(1) - click(".business-export-wrap section:nth-child(3) h2 svg", wait=False) - cell = locate(".business-export-wrap section:nth-child(3) table tbody tr:first-child td:first-child span", wait=False) - filename = cell.text - except: - continue - - if filename == '--': - sleep(1) - continue + while True: + try: + sleep(1) + click(".business-export-wrap section:nth-child(3) h2 svg", wait=False) + cell = locate(".business-export-wrap section:nth-child(3) table tbody tr:first-child td:first-child span", wait=False) + if (filename := cell.text) != '--': break + except: + pass try: logger.info('New task: %s', filename) @@ -174,14 +126,14 @@ def main(driver: WebDriver, logger = logging.getLogger('main')): logger.info('Read %d line(s) total', len(text)) except Exception as e: logger.critical('Unable to load input data', exc_info=e) - return 4 + return 5 try: file = Path('template.xlsx').resolve() template = openpyxl.load_workbook(file) except Exception as e: logger.critical('Unable to load template excel', exc_info=e) - return 5 + return 6 header = next(data, None) source = template.active @@ -206,11 +158,10 @@ def main(driver: WebDriver, logger = logging.getLogger('main')): source[coord] = value sheet = template.copy_worksheet(source) - logger.info('New sheet: %s', sheet.title) + logger.debug('%s', sheet.title) if not len(template.worksheets) > 1: logger.error('Invalid input') - ready = False continue template.remove(source) @@ -234,13 +185,17 @@ def main(driver: WebDriver, logger = logging.getLogger('main')): href ) - try: Popen(['start', (Path.home() / 'Downloads' / filename).as_posix()], shell=True) - except Exception as e: logger.warning('Unable to open exported excel file at %s' % file, exc_info=e) + if parameters.get('open'): + try: + path = Path.home() / 'Downloads' / filename + until(lambda _: path.exists(), watch=False) + Popen(['start', str(path)], shell=True) + except Exception as e: + logger.warning('Unable to open exported excel file at %s', str(path), exc_info=e) driver.close() driver.switch_to.window(driver.window_handles[1]) logger.info('Done') - ready = False if __name__ == '__main__': try: @@ -258,8 +213,9 @@ if __name__ == '__main__': logger.info('Creating automation instance') opts = Options() - opts.enable_bidi = True opts.enable_downloads = True + opts.add_argument('--deny-permission-prompts') + opts.add_experimental_option('prefs', { 'download.default_directory': args.directory }) driver = WebDriver(options=opts) status = main(driver) except Exception as e: diff --git a/requirements.txt b/requirements.txt index 64b278bf7eb02bf4098612e915a0bf22355de872..d6d4cd76c0aa68281ab53254428328f7aa9f4182 100644 GIT binary patch delta 89 zcmeyx{)>IWH(z50Lm*6HFaSd{hBO9a1`7rwphy}+GJ`pgOa)>Ch9n?NW-tMg7GU)t Y@l+r?38>x@sKSWBjKO%bGvjt905nw)&Hw-a delta 89 zcmeyx{)>IWH(yJJ6ow=Q3kCy*L?AW+VnZO$jKPw@6eyAiWSIiRKsbdV8K^Q5sKy)! aO&C%cjDh0jKr#i$P6mn?ZFXkd&IAAvdk@+G