update: click feedback detection

This commit is contained in:
2025-07-02 20:33:40 +08:00
parent db217ee457
commit 9e505f0a9a

View File

@@ -13,6 +13,7 @@ from selenium.common.exceptions import StaleElementReferenceException, TimeoutEx
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.remote.webelement import WebElement
parser = argparse.ArgumentParser(description="销售订单自动导入脚本")
parser.add_argument('invoices', nargs='?', default='')
@@ -23,7 +24,7 @@ parser.add_argument('--xm-web-url', type=str, nargs='?', default='https://login.
parser.add_argument('--vf-api-url', type=str, nargs='?', default='https://ultimatron-france.vosfactures.fr/')
parser.add_argument('-C', '--currency', type=str, nargs='?', default='USD')
parser.add_argument('-D', '--department', type=str, nargs='?', default='ULT事业部')
parser.add_argument('-T', '--days-delta', type=int, nargs='?', default=None)
parser.add_argument('-T', '--duration', type=int, nargs='?', default=None)
parser.add_argument('-a', '--automation', type=str, choices=['none', 'draft', 'final'], nargs='?', default='none')
parser.add_argument('-e', '--encoding', type=str, nargs='?', default='utf-8')
parser.add_argument('-d', '--outdir', type=str, nargs='?', default='.')
@@ -65,15 +66,15 @@ def main(workbook=None):
index = 1
format = "%d/%m/%Y"
if args.days_delta is not None:
datefrom = (datetime.today() - timedelta(days=args.days_delta))
if args.duration is not None:
datefrom = (datetime.today() - timedelta(days=args.duration))
dateto = datetime.today()
print(f'[信息] 正在尝试获取自 {datefrom:%Y-%m-%d}{dateto:%Y-%m-%d} Facture 数据')
print(f'[信息] 正在尝试获取自 {datefrom:%Y-%m-%d}{dateto:%Y-%m-%d}发票数据')
else:
yesterday = (datetime.today() - timedelta(days=1))
datefrom = yesterday
dateto = yesterday
print(f'[信息] 正在尝试获取 {yesterday:%Y-%m-%d} Facture 数据')
print(f'[信息] 正在尝试获取 {yesterday:%Y-%m-%d}发票数据')
datefrom = parse.quote(datefrom.strftime(format))
dateto = parse.quote(dateto.strftime(format))
@@ -214,7 +215,7 @@ def main(workbook=None):
print(f'[警告] 载入网页时发生了错误:{e}')
return 7
def locate(selector, wait=True, parent=driver, condition=EC.visibility_of_element_located):
def locate(selector, wait=True, parent=driver, condition=EC.visibility_of_element_located) -> WebElement:
while True:
try:
locator = (By.CSS_SELECTOR, selector)
@@ -240,18 +241,37 @@ def main(workbook=None):
raise e
def click(selector, parent=driver, condition=EC.element_to_be_clickable):
element = locate(selector, True, parent, condition)
try: element.click()
except: driver.execute_script("arguments[0].click();", element)
element = locate(selector, True, parent, condition) if isinstance(selector, str) else selector
counter = lambda: int(element.get_attribute('chronometer') or 0)
error = False
value = counter()
driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].setAttribute('chronometer', arguments[1] + 1));", element, value)
for attempt in range(args.retry):
try:
if not error: element.click()
else: driver.execute_script("arguments[0].click();", element)
except StaleElementReferenceException:
break
except:
error = True
continue
# 检测点击事件
try: WebDriverWait(driver, args.interval).until(lambda _: counter() > value)
except TimeoutException: continue
except: pass
break
def ready(driver):
try:
condition = lambda x: 'nprogress-busy' in x.find_element(By.TAG_NAME, 'html').get_attribute('class')
wait = WebDriverWait(driver, timeout=args.interval)
wait.until(lambda x: 'nprogress-busy' in x.find_element(By.TAG_NAME, 'html').get_attribute('class'))
wait.until(condition)
except TimeoutException:
pass
wait = WebDriverWait(driver, timeout=args.timeout)
wait.until(lambda x: 'nprogress-busy' not in x.find_element(By.TAG_NAME, 'html').get_attribute('class'))
wait.until_not(condition)
return True
if bool(args.xm_username) and bool(args.xm_password):
@@ -296,7 +316,6 @@ def main(workbook=None):
# 上传文件
locate(".big-file-upload input", wait=False).send_keys(path)
click(".product-import-img-footer button")
# 开始导入
click(".product-import-img-footer button.mm-button__primary")
except Exception as e:
@@ -321,12 +340,12 @@ def main(workbook=None):
print(f'[!!!!] 等待订单录入时发生了错误:{e}')
return 84
# 设置每页显示记录数
ready(driver) #2
url = driver.current_url
per_page = 5
param = f'%22page_size%22%3A{per_page}'
# 设置每页显示记录数
if 'page_size' in url: url = url.replace('%22page_size%22%3A20', param)
else: url += param
@@ -390,12 +409,16 @@ def main(workbook=None):
print(f"[警告] {number}: 关联商机时发生错误:{e}")
warn = True
# 编辑产品
# 设定分页选项 10 条/页
try:
# 设定分页选项 10 条/页
click(".okki-pagination-options-size-changer")
click(".okki-select-dropdown .rc-virtual-list-holder-inner div:nth-child(1)")
except:
print(f'[警告] {number}: 分页选项设置失败')
pass
# 编辑产品
try:
wrapper = locate(".order-edit-product-wrapper .row-items", condition=None)
positions = lookup(number, '产品型号', workbook).unwrap()
ids = []
@@ -446,6 +469,7 @@ def main(workbook=None):
warn = True
# 保存订单
ready(driver)
click(".order-edit-footer button.okki-btn-primary", condition=None)
ready(driver)