update: disabled BiDi

This commit is contained in:
2026-04-29 11:41:58 +08:00
parent 7abaa1b059
commit 423213759d
2 changed files with 49 additions and 93 deletions

142
main.py
View File

@@ -4,25 +4,25 @@ import logging
import base64
import csv
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException
from selenium.common.exceptions import TimeoutException, StaleElementReferenceException
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.common.bidi.network import Request as NetworkRequest
from common import jsonrpc2
from common.utils import *
from io import BytesIO
from common import jsonrpc2
from typing import Callable
from pathlib import Path
from urllib3 import PoolManager
from itertools import count
from subprocess import Popen
parser = argparse.ArgumentParser(description="Opportunity Export")
parser.add_argument('account', type=str)
parser.add_argument('password', type=str)
parser.add_argument('-p', '--open', action='store_true')
parser.add_argument('-d', '--directory', type=str, default=str(Path.home().joinpath('Downloads')))
parser.add_argument('-e', '--encoding', type=str, default="utf-8-sig")
parser.add_argument('-t', '--timeout', type=int, default=60)
parser.add_argument('-r', '--attempts', type=int, default=3)
@@ -35,7 +35,6 @@ APIURL = "https://crm.xiaoman.cn/api/opportunityRead/export"
def main(driver: WebDriver, logger = logging.getLogger('main')):
try:
http = PoolManager()
context = jsonrpc2.ConnectionContext()
parameters = vars(args)
driver.get(str(Path('index.html').resolve()))
driver.execute_script(jsonrpc2.prelude(), parameters)
@@ -51,60 +50,8 @@ def main(driver: WebDriver, logger = logging.getLogger('main')):
logger.warning('Timeout')
driver.execute_script("window.stop();")
def until(condition: Callable[[WebDriver], bool], watch=True):
try:
WebDriverWait(driver, parameters.get('timeout')).until(condition)
except (TimeoutException, StaleElementReferenceException):
pass
if watch: WebDriverWait(driver, parameters.get('timeout')).until_not(condition)
return True
def sleep(seconds: float):
with context:
try: WebDriverWait(driver, seconds, seconds).until(lambda _: False)
except: pass
return True
def locate(selector, wait=True, condition=EC.visibility_of_element_located) -> WebElement:
while True:
try:
locator = (By.CSS_SELECTOR, selector)
if not wait: return driver.find_element(*locator)
presence = EC.presence_of_element_located(locator)
element = WebDriverWait(driver, parameters.get('timeout')).until(presence, 'Timeout')
driver.execute_script("arguments[0].scrollIntoView({ block: 'center', inline: 'nearest' });", element)
if condition is not None:
element = WebDriverWait(driver, parameters.get('timeout')).until(condition(locator), 'Timeout')
return element
except StaleElementReferenceException:
continue
def click(selector: str|WebElement, wait=True, condition=EC.element_to_be_clickable):
element = locate(selector, wait, condition) if isinstance(selector, str) else selector
counter = lambda: int(element.get_attribute('taximeter') or 0)
error = False
value = counter()
driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].setAttribute('taximeter', arguments[1] + 1));", element, value)
for _ in range(parameters.get('attempts')):
try:
if not error: element.click()
else: driver.execute_script("arguments[0].click();", element)
except StaleElementReferenceException:
break
except:
error = True
continue
try:
WebDriverWait(driver, parameters.get('interval')).until(lambda _: counter() > value)
break
except TimeoutException: continue
except: break
try:
setup(driver, parameters.get('attempts'), parameters.get('timeout'), 0)
until(lambda x: 'loginProgress' in x.find_element(By.TAG_NAME, "body").get_attribute('class'), watch=False)
account = str(parameters.get('account'))
password = str(parameters.get('password'))
@@ -122,26 +69,34 @@ def main(driver: WebDriver, logger = logging.getLogger('main')):
while True:
try:
locate("#container", wait=False)
sidebar = locate(".new-layout-left")
driver.execute_script("arguments[0].remove();", sidebar)
logger.info('Done')
break
except:
sleep(3)
ready = False
sidebar = locate(".new-layout-left")
driver.execute_script("arguments[0].remove();", sidebar)
def handle(request: NetworkRequest):
nonlocal ready
if request.url == APIURL: ready = True
try: request.continue_request()
except Exception as e: logger.debug('Network request error', exc_info=e)
event = 'before_request'
driver.network.add_request_handler(event, handle)
while True:
try:
if buttons := driver.find_elements(By.CSS_SELECTOR, ".export-actions .okki-btn-primary"):
identity = 'EXPORT'
for button in buttons: driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].parentElement.setAttribute(arguments[1], '1'));", button, identity)
break
finally:
sleep(1)
while True:
if not ready:
try:
for index in count(0):
if (driver.execute_script("return arguments[0].parentElement.getAttribute(arguments[1]);", buttons[index], identity)):
driver.execute_script("arguments[0].parentElement.removeAttribute(arguments[1]);", buttons[index], identity)
raise ValueError()
except StaleElementReferenceException:
logger.critical('Error while listening to button events')
return 4
except ValueError:
pass
except:
sleep(1)
continue
@@ -151,17 +106,14 @@ def main(driver: WebDriver, logger = logging.getLogger('main')):
except:
pass
try:
sleep(1)
click(".business-export-wrap section:nth-child(3) h2 svg", wait=False)
cell = locate(".business-export-wrap section:nth-child(3) table tbody tr:first-child td:first-child span", wait=False)
filename = cell.text
except:
continue
if filename == '--':
sleep(1)
continue
while True:
try:
sleep(1)
click(".business-export-wrap section:nth-child(3) h2 svg", wait=False)
cell = locate(".business-export-wrap section:nth-child(3) table tbody tr:first-child td:first-child span", wait=False)
if (filename := cell.text) != '--': break
except:
pass
try:
logger.info('New task: %s', filename)
@@ -174,14 +126,14 @@ def main(driver: WebDriver, logger = logging.getLogger('main')):
logger.info('Read %d line(s) total', len(text))
except Exception as e:
logger.critical('Unable to load input data', exc_info=e)
return 4
return 5
try:
file = Path('template.xlsx').resolve()
template = openpyxl.load_workbook(file)
except Exception as e:
logger.critical('Unable to load template excel', exc_info=e)
return 5
return 6
header = next(data, None)
source = template.active
@@ -206,11 +158,10 @@ def main(driver: WebDriver, logger = logging.getLogger('main')):
source[coord] = value
sheet = template.copy_worksheet(source)
logger.info('New sheet: %s', sheet.title)
logger.debug('%s', sheet.title)
if not len(template.worksheets) > 1:
logger.error('Invalid input')
ready = False
continue
template.remove(source)
@@ -234,13 +185,17 @@ def main(driver: WebDriver, logger = logging.getLogger('main')):
href
)
try: Popen(['start', (Path.home() / 'Downloads' / filename).as_posix()], shell=True)
except Exception as e: logger.warning('Unable to open exported excel file at %s' % file, exc_info=e)
if parameters.get('open'):
try:
path = Path.home() / 'Downloads' / filename
until(lambda _: path.exists(), watch=False)
Popen(['start', str(path)], shell=True)
except Exception as e:
logger.warning('Unable to open exported excel file at %s', str(path), exc_info=e)
driver.close()
driver.switch_to.window(driver.window_handles[1])
logger.info('Done')
ready = False
if __name__ == '__main__':
try:
@@ -258,8 +213,9 @@ if __name__ == '__main__':
logger.info('Creating automation instance')
opts = Options()
opts.enable_bidi = True
opts.enable_downloads = True
opts.add_argument('--deny-permission-prompts')
opts.add_experimental_option('prefs', { 'download.default_directory': args.directory })
driver = WebDriver(options=opts)
status = main(driver)
except Exception as e:

Binary file not shown.