update: added override mode

This commit is contained in:
2025-08-01 14:21:45 +08:00
parent 0289f722de
commit bedbc9ec42

View File

@@ -11,6 +11,8 @@ from xml.etree import ElementTree as ET
from selenium import webdriver
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.remote.webelement import WebElement
@@ -24,9 +26,9 @@ parser.add_argument('--xm-web-url', type=str, nargs='?', default='https://login.
parser.add_argument('--vf-api-url', type=str, nargs='?', default='')
parser.add_argument('-C', '--currency', type=str, nargs='?', default='USD')
parser.add_argument('-D', '--department', type=str, nargs='?', default='')
parser.add_argument('-T', '--duration', type=int, nargs='?', default=None)
parser.add_argument('-T', '--duration', type=str, nargs='*', default=[])
parser.add_argument('-P', '--prefix', type=str, nargs='?', default='')
parser.add_argument('-a', '--automation', type=str, choices=['none', 'draft', 'final'], nargs='?', default='none')
parser.add_argument('-a', '--automation', type=str, choices=['none', 'draft', 'final', 'override'], nargs='?', default='none')
parser.add_argument('-e', '--encoding', type=str, nargs='?', default='utf-8')
parser.add_argument('-d', '--outdir', type=str, nargs='?', default='.')
parser.add_argument('-o', '--output', type=str, nargs='?', default='')
@@ -66,25 +68,34 @@ def main(workbook=None):
else:
root = None
index = 1
format = "%Y-%m-%d"
match args.duration:
case it if len(it) == 0:
yesterday = (datetime.today() - timedelta(days=1))
datefrom = yesterday
dateto = yesterday
print(f'[信息] 正在尝试获取 {yesterday.strftime(format)} 的发票数据')
case it if len(it) == 1 and (duration := str(it[0])) and duration.isnumeric():
datefrom = (datetime.today() - timedelta(days=int(duration)))
dateto = datetime.today()
print(f'[信息] 正在尝试获取自 {datefrom.strftime(format)}{dateto.strftime(format)} 的发票数据')
case it if len(it) == 2:
datefrom = datetime.strptime(it[0], format)
dateto = datetime.strptime(it[1], format)
print(f'[信息] 正在尝试获取自 {datefrom.strftime(format)}{dateto.strftime(format)} 的发票数据')
case _:
print(f'[!!!!] 无效参数:{args.duration}')
return 11
format = "%d/%m/%Y"
if args.duration is not None:
datefrom = (datetime.today() - timedelta(days=args.duration))
dateto = datetime.today()
print(f'[信息] 正在尝试获取自 {datefrom:%Y-%m-%d}{dateto:%Y-%m-%d} 的发票数据')
else:
yesterday = (datetime.today() - timedelta(days=1))
datefrom = yesterday
dateto = yesterday
print(f'[信息] 正在尝试获取 {yesterday:%Y-%m-%d} 的发票数据')
datefrom = parse.quote(datefrom.strftime(format))
dateto = parse.quote(dateto.strftime(format))
kinds = '&'.join([f'kinds%5B%5D={k}' for k in args.kinds])
if not args.vf_token or not args.vf_api_url:
print('[!!!!] 缺少参数vf_token 或 vf_api_url')
return 11
return 12
while True:
try:
@@ -105,8 +116,9 @@ def main(workbook=None):
print('[!!!!] 服务器返回了无效数据')
return 21
def lookup(value, fieldname, excel) -> Result:
rows = excel[excel.isin([value]).any(axis=1)]
def lookup(value, fieldname, df: pandas.DataFrame) -> Result:
df = df.astype(object).where(df.notna(), None)
rows = df[df.isin([value]).any(axis=1)]
data = rows.to_dict(orient='list')
return Result(data.get(fieldname))
@@ -318,67 +330,217 @@ def main(workbook=None):
click(".layout-sidebar ul li.list-none.cpq div div")
ready(driver)
click(".layout-sidebar .layout-second-menu li.order a")
click(".list-header-top button.okki-dropdown-trigger")
click(".okki-dropdown-content button")
driver.switch_to.window(driver.window_handles[1])
except Exception as e:
print(f'[!!!!] 尝试进入录入订单页面时发生了错误:{e}')
print(f'[!!!!] 尝试检索订单时发生了错误:{e}')
return 82
# 状态菜单映射
status = { 'draft': 1, 'final': 6 }
try:
# 选择不导入无对应产品的订单
click(".product-import-img-box .import-img-radio:nth-child(2) .mm-radio-group > label:nth-child(2) .mm-radio-input", condition=None)
# 变更状态
click(".product-import-img-box .mm-selector-rendered")
click(f".mm-outside.mm-select-dropdown ul li:nth-child({status.get(args.automation)}) span")
path = os.path.abspath(args.output or args.invoices)
# 上传文件
locate(".big-file-upload input", wait=False).send_keys(path)
click(".product-import-img-footer button")
# 开始导入
click(".product-import-img-footer button.mm-button__primary")
except Exception as e:
print(f'[!!!!] 上传文件时发生了错误:{e}')
return 83
# 等待订单录入
while True:
if index := { 'draft': 1, 'final': 6 }.get(args.automation):
try:
innerText = driver.find_element(By.CSS_SELECTOR, ".img-box-result-title").get_attribute('innerText')
if innerText == '导入完成': break
click(".list-header-top button.okki-dropdown-trigger")
click(".okki-dropdown-content button")
driver.switch_to.window(driver.window_handles[1])
except Exception as e:
print(f'[!!!!] 尝试进入录入订单页面时发生了错误:{e}')
return 83
try:
# 选择不导入无对应产品的订单
click(".product-import-img-box .import-img-radio:nth-child(2) .mm-radio-group > label:nth-child(2) .mm-radio-input", condition=None)
# 变更状态
click(".product-import-img-box .mm-selector-rendered")
click(f".mm-outside.mm-select-dropdown ul li:nth-child({index}) span")
path = os.path.abspath(args.output or args.invoices)
# 上传文件
locate(".big-file-upload input", wait=False).send_keys(path)
click(".product-import-img-footer button")
# 开始导入
click(".product-import-img-footer button.mm-button__primary")
except Exception as e:
print(f'[!!!!] 上传文件时发生了错误:{e}')
return 83
# 等待订单录入
while True:
try:
innerText = driver.find_element(By.CSS_SELECTOR, ".img-box-result-title").get_attribute('innerText')
if innerText == '导入完成': break
except:
ready(driver) #1
try:
# 查看导入结果
click(".mm-notification-container .mm-icon-close")
click(".product-import-img-footer button.mm-button__primary")
cell = locate(".mm-tbody table tbody tr:nth-child(1) td:nth-child(3)", condition=None)
indicator = locate("div > div > span", parent=cell, condition=None)
error = int(indicator.text[3:])
if error != 0:
print(f'[警告] {error} 个订单导入失败')
print(f'[信息] 提示:可下载失败记录')
except Exception as e:
print(f'[!!!!] 等待订单录入时发生了错误:{e}')
return 84
try:
click("a", wait=False, parent=cell)
driver.switch_to.window(driver.window_handles[2])
except:
ready(driver) #1
try:
# 查看导入结果
click(".mm-notification-container .mm-icon-close")
click(".product-import-img-footer button.mm-button__primary")
cell = locate(".mm-tbody table tbody tr:nth-child(1) td:nth-child(3)", condition=None)
indicator = locate("div > div > span", parent=cell, condition=None)
error = int(indicator.text[3:])
if error != 0:
print(f'[警告] {error} 个订单导入失败')
print(f'[信息] 提示:可下载失败记录')
except Exception as e:
print(f'[!!!!] 等待订单录入时发生了错误:{e}')
return 84
try:
click("a", wait=False, parent=cell)
driver.switch_to.window(driver.window_handles[2])
except:
print('[!!!!] 查看导入结果时发生了错误')
return 85
print('[!!!!] 查看导入结果时发生了错误')
return 85
ready(driver) #2
modified = []
def modify(link: WebElement, handle=3):
click(link)
driver.switch_to.window(driver.window_handles[handle])
click(".sticky .okki-space-item:nth-child(1) button")
warn = False
associative = False
relation = None
number = Result(None).map(lambda _: locate("h1.serial-input-title span").get_attribute('innerText')).ornone()
correction = lookup(number, 'AVOIR', workbook).map(lambda x: x[0]).ornone()
if number is not None and number not in modified and correction is None:
try:
# 选择商机
for attempt in range(args.retry):
try:
relation = lookup(number, '商机号', workbook).map(lambda x: x[0]).unwrap()
needle = str(relation).strip()
if match := Result(re.search(r'O\d+', needle)).map(lambda x: x[0]).ornone():
try:
href = locate(".layout-sidebar ul li.list-none.opportunity a").get_attribute('href')
driver.switch_to.new_window('tab')
driver.get(href)
# 检索商机
locate(".new-header input").send_keys(match)
click(".new-header .search-btn span")
ready(driver)
name = locate("#business-board-drag-wrapper .business-card-wrapper > div .name-wrapper a", wait=False)
needle = name.text.strip()
finally:
driver.close()
driver.switch_to.window(driver.window_handles[handle])
dropdown = locate("#rc_select_1")
dropdown.clear()
dropdown.send_keys(needle)
wait = WebDriverWait(driver, timeout=args.timeout)
menu = wait.until(lambda x: x.find_element(By.CSS_SELECTOR, ".okki-select-dropdown"))
for item in menu.find_elements(By.CSS_SELECTOR, ".rc-virtual-list-holder-inner > div"):
if item.get_attribute('label').strip().startswith(needle):
click(item)
associative = True
raise KeyboardInterrupt()
# 关闭菜单栏
click(dropdown)
except KeyboardInterrupt:
break
except Exception as e:
print(f"[警告] {number}: 关联商机时发生错误:{e}")
warn = True
break
if not associative:
print(f'[警告] {number}: 无法找到对应商机 "{relation}"')
warn = True
# 设定分页选项 10 条/页
try:
click(".okki-pagination-options-size-changer")
click(".okki-select-dropdown .rc-virtual-list-holder-inner > div:nth-child(1)")
except:
print(f'[警告] {number}: 分页选项设置失败')
pass
# 编辑产品
try:
wrapper = locate(".paas-order-product-list .row-items", condition=None)
positions = lookup(number, '产品型号', workbook).unwrap()
ids = []
while (index := 1):
for attempts in range(args.retry):
if index >= 10: break
tail = Result(ids).map(lambda x: x[-1:][0]).ornone()
driver.execute_script("arguments[0].scrollIntoView({ block: 'end' });", wrapper)
driver.execute_script("arguments[0].scrollTo(arguments[1], arguments[2]);", wrapper, 0, 0)
for item in wrapper.find_elements(By.CSS_SELECTOR, ".row-item"):
if len(ids) >= len(positions): raise KeyboardInterrupt()
serial = Result(item.find_elements(By.CSS_SELECTOR, ".cell[data-cci='2'] .cell-inner div")).map(lambda x: int(x[0].text)).ornone()
if serial is not None and serial not in ids:
base = (serial - tail) if tail is not None else index
height = int(item.get_attribute('clientHeight'))
offset = (base - 1) * height
driver.execute_script("arguments[0].scrollTo(arguments[1], arguments[2]);", wrapper, 0, offset)
click(".product-info-group-product-info a.jump-link", parent=item)
price = locate(".okki-drawer-body div[title='含税成本价'] + div div.mm-formily-preview-text", condition=None).text
click(".okki-drawer-body .space-header-after button")
wait = WebDriverWait(driver, timeout=args.timeout)
wait.until_not(lambda x: x.find_element(By.CSS_SELECTOR, ".okki-drawer-body").is_displayed())
original = locate(".cell[data-cci='4'] input", parent=item, condition=None)
if price == '--': price = original.get_attribute('value')
driver.execute_script("arguments[0].scrollIntoView({ block: 'center' });", wrapper)
driver.execute_script("arguments[0].scroll(arguments[1], arguments[2]);", wrapper, 400, offset)
# 填入含税成本价
target = locate(".cell[data-cci='6'] input", parent=item, condition=None)
length = len(target.get_attribute('value'))
target.send_keys(Keys.BACKSPACE * length)
target.send_keys(price)
ids.append(serial)
index += 1
# 下一页
button = locate(".text-right li.okki-pagination-next button", condition=None)
if bool(button.get_attribute('disabled')) and len(ids) < len(positions): raise Exception(f'产品信息不完整。缺少:{', '.join(positions[len(ids):])}')
click(button)
except KeyboardInterrupt:
pass
# 删除 Avoir 费用
try: click(".ow-box button.okki-btn-round", wait=False)
except: pass
# 保存订单
ready(driver)
click(".sticky.bottom-0 button.okki-btn-primary", condition=None)
ready(driver)
print(f"[信息] {number}: 修改完成")
modified.append(number)
global success
success += 1
except Exception as e:
print(f"[警告] {number}: 编辑订单时发生了错误:{e}")
warn = True
global warning
if warn: warning += 1
# 关闭标签页
driver.close()
driver.switch_to.window(driver.window_handles[handle-1])
# 设置分页选项 10 条/页
try:
click(".paas-invoice-list-frame .okki-pagination-options-size-changer")
@@ -388,160 +550,71 @@ def main(workbook=None):
print(f'[警告] 分页选项设置失败')
pass
while ready(driver):
for link in driver.find_elements(By.CSS_SELECTOR, ".list-frame-table .vue-recycle-scroller__item-wrapper .cell[data-cci='1'] a"):
# 编辑订单
try:
click(link)
driver.switch_to.window(driver.window_handles[3])
click(".sticky .okki-space-item:nth-child(1) button")
match args.automation:
case 'draft', 'final':
while ready(driver):
for link in driver.find_elements(By.CSS_SELECTOR, ".list-frame-table .vue-recycle-scroller__item-wrapper .cell[data-cci='1'] a"):
try:
modify(link)
except Exception as e:
print(f'[!!!!] 编辑订单时发生了错误:{e}')
return 85
warn = False
associative = False
number = locate("h1.serial-input-title span").get_attribute('innerText')
total = lookup(number, 'AVOIR', workbook).map(lambda x: x[0]).ornone()
try:
button = locate(".paas-invoice-list-frame .list-okki-footer-wrap li.okki-pagination-next button", wait=False)
if bool(button.get_attribute('disabled')): raise KeyboardInterrupt()
click(button)
except:
print('[信息] 已经是最后一页')
break
case 'override':
for number in workbook['订单号'].unique():
try:
box = locate(".list-header-wrap div[data-item-name='订单号']", condition=None)
search = locate("input", parent=box, condition=None)
length = len(search.get_attribute('value'))
click(box)
ActionChains(driver).send_keys(Keys.BACKSPACE * length).send_keys(number).send_keys(Keys.ENTER).perform()
except Exception as e:
print(f'[警告] {number}: 检索订单时发生了错误:{e}')
continue
if number not in modified and total is None:
# 选择商机
for attempt in range(args.retry):
try:
relation = lookup(number, '商机号', workbook).map(lambda x: x[0]).unwrap()
needle = str(relation).strip().lstrip('SAV-')
match = re.match(r'O\d+', needle)
entry = None
department: str = lookup(number, '业绩归属部门', workbook).map(lambda x: x[0]).ornone()
if match is not None:
try:
href = locate(".layout-sidebar ul li.list-none.opportunity a").get_attribute('href')
driver.switch_to.new_window('tab')
driver.get(href)
for attempt in range(args.retry):
try:
while ready(driver):
for link in driver.find_elements(By.CSS_SELECTOR, ".list-frame-table .vue-recycle-scroller__item-wrapper .cell[data-cci='1'] a"):
name = link.text.strip()
cell = link.find_element(By.XPATH, '../../../../div[@data-cci="6"]/div/span')
title = cell.get_attribute('title').strip()
# 检索商机
locate(".new-header input").send_keys(needle)
click(".new-header .search-btn span")
ready(driver)
name = locate("#business-board-drag-wrapper .business-card-wrapper > div .name-wrapper a", wait=False)
needle = name.text.strip()
finally:
driver.close()
driver.switch_to.window(driver.window_handles[3])
dropdown = locate("#rc_select_1")
dropdown.clear()
dropdown.send_keys(needle)
wait = WebDriverWait(driver, timeout=args.timeout)
menu = wait.until(lambda x: x.find_element(By.CSS_SELECTOR, ".okki-select-dropdown"))
for item in menu.find_elements(By.CSS_SELECTOR, ".rc-virtual-list-holder-inner > div"):
if item.get_attribute('label').strip().startswith(needle):
click(item)
associative = True
if link.is_displayed() and name == number and title == department:
entry = link
raise KeyboardInterrupt()
# 关闭菜单栏
click(dropdown)
except KeyboardInterrupt:
break
except Exception as e:
print(f"[警告] {number}: 关联商机时发生错误:{e}")
warn = True
break
if not associative:
print(f'[警告] {number}: 无法找到对应商机 "{relation}"')
warn = True
# 设定分页选项 10 条/页
try:
click(".okki-pagination-options-size-changer")
click(".okki-select-dropdown .rc-virtual-list-holder-inner > div:nth-child(1)")
except:
print(f'[警告] {number}: 分页选项设置失败')
pass
# 编辑产品
try:
wrapper = locate(".paas-order-product-list .row-items", condition=None)
positions = lookup(number, '产品型号', workbook).unwrap()
ids = []
while True:
driver.execute_script("arguments[0].scrollTo(0, 0);", wrapper)
index = 0
while index < 10:
if len(ids) >= len(positions): raise KeyboardInterrupt()
if index >= 5: driver.execute_script("arguments[0].scrollTo(0, arguments[0].scrollHeight);", wrapper)
for item in wrapper.find_elements(By.CSS_SELECTOR, ".row-item"):
# 订单序号
try: serial = item.find_element(By.CSS_SELECTOR, ".cell[data-cci='2'] .cell-inner div").text
except: continue
if not bool(serial): continue
# 转到元素
driver.execute_script("arguments[0].scrollIntoView({ block: 'center' });", item)
driver.execute_script("arguments[0].scrollIntoView({ block: 'start' });", wrapper)
if serial not in ids:
click(".product-info-group-product-info a.jump-link", parent=item)
price = locate(".okki-drawer-body div[title='含税成本价'] + div div.mm-formily-preview-text", condition=None).text
click(".okki-drawer-body .space-header-after button")
wait = WebDriverWait(driver, timeout=args.timeout)
wait.until_not(lambda x: x.find_element(By.CSS_SELECTOR, ".okki-drawer-body").is_displayed())
original = locate(".cell[data-cci='5'] input", parent=item, condition=None)
if price == '--': price = original.get_attribute('value')
# 填入含税成本价
driver.execute_script("arguments[0].scroll(600, 0);", wrapper)
target = locate(".cell[data-cci='7'] input", parent=item, condition=None)
target.clear()
target.send_keys(price)
ids.append(serial)
index += 1
# 下一页
click(".text-right li.okki-pagination-next button", condition=None)
button = locate(".okki-pagination-next button", condition=None)
if bool(button.get_attribute('disabled')): break
click(button)
except KeyboardInterrupt:
pass
break
except StaleElementReferenceException:
continue
except Exception as e:
print(f"[警告] {number}: 编辑产品时发生错误:{e}")
warn = True
print(f'[警告] {number}: 发生错误:{e}')
continue
# 删除 Avoir 费用
try: click(".ow-box button.okki-btn-round")
except: pass
if entry is None:
print(f'[警告] {number}: 无法找到对应发票')
continue
# 保存订单
ready(driver)
click(".sticky.bottom-0 button.okki-btn-primary", condition=None)
ready(driver)
print(f"[信息] {number}: 修改完成")
modified.append(number)
global success
success += 1
global warning
if warn: warning += 1
# 关闭标签页
driver.close()
driver.switch_to.window(driver.window_handles[2])
except Exception as e:
print(f'[!!!!] 编辑订单时发生了错误:{e}')
return 85
try:
button = locate(".paas-invoice-list-frame .list-okki-footer-wrap li.okki-pagination-next button", wait=False)
if bool(button.get_attribute('disabled')): raise KeyboardInterrupt()
click(button)
except:
print('[信息] 已经是最后一页')
break
try:
modify(entry, handle=1)
except Exception as e:
print(f'[!!!!] {number}: 编辑订单时发生了错误:{e}')
continue
return 0