Files
imporders/销售订单自动导入.py
2025-08-01 14:47:13 +08:00

666 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import argparse
import requests
import pandas
import os
import re
from urllib import parse
from datetime import datetime, timedelta
from xml.etree import ElementTree as ET
from selenium import webdriver
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.remote.webelement import WebElement
parser = argparse.ArgumentParser(description="销售订单自动导入")
parser.add_argument('invoices', nargs='?', default='')
parser.add_argument('--xm-username', type=str, nargs='?', default='')
parser.add_argument('--xm-password', type=str, nargs='?', default='')
parser.add_argument('--vf-token', type=str, nargs='?', default='')
parser.add_argument('--xm-web-url', type=str, nargs='?', default='https://login.xiaoman.cn/login/')
parser.add_argument('--vf-api-url', type=str, nargs='?', default='')
parser.add_argument('-C', '--currency', type=str, nargs='?', default='USD')
parser.add_argument('-D', '--department', type=str, nargs='?', default='')
parser.add_argument('-T', '--duration', type=str, nargs='*', default=[])
parser.add_argument('-P', '--prefix', type=str, nargs='?', default='')
parser.add_argument('-a', '--automation', type=str, choices=['none', 'draft', 'final', 'override'], nargs='?', default='none')
parser.add_argument('-e', '--encoding', type=str, nargs='?', default='utf-8')
parser.add_argument('-d', '--outdir', type=str, nargs='?', default='.')
parser.add_argument('-o', '--output', type=str, nargs='?', default='')
parser.add_argument('-t', '--timeout', type=int, nargs='?', default=60)
parser.add_argument('-i', '--interval', type=int, nargs='?', default=5)
parser.add_argument('-k', '--kinds', type=str, nargs='*', default=['vat', 'correction'])
parser.add_argument('-r', '--retry', type=int, nargs='?', default=3)
args = parser.parse_args()
date = datetime.now()
success = 0
warning = 0
def main(workbook=None):
if args.invoices.endswith('.xml'):
# 读取发票数据
print(f'[信息] 正在读取文件:{args.invoices}')
try:
with open(args.invoices, 'r', encoding=args.encoding) as file:
document = ET.parse(file)
root = document.getroot()
except Exception as e:
print(f'[!!!!] 读取文件时发生了错误:{e}')
return 1
elif args.invoices.endswith('.xlsx'):
# 读取订单数据
print(f'[信息] 正在读取文件:{args.invoices}')
try:
workbook = pandas.read_excel(args.invoices)
except Exception as e:
print(f'[!!!!] 读取文件时发生了错误:{e}')
return 1
elif bool(args.invoices):
print(f'[!!!!] 无效参数:{args.invoices}')
return 1
else:
root = None
index = 1
format = "%Y-%m-%d"
match args.duration:
case it if len(it) == 0:
yesterday = (datetime.today() - timedelta(days=1))
datefrom = yesterday
dateto = yesterday
print(f'[信息] 正在尝试获取 {yesterday.strftime(format)} 的发票数据')
case it if len(it) == 1 and (duration := str(it[0])) and duration.isnumeric():
datefrom = (datetime.today() - timedelta(days=int(duration)))
dateto = datetime.today()
print(f'[信息] 正在尝试获取自 {datefrom.strftime(format)}{dateto.strftime(format)} 的发票数据')
case it if len(it) == 2:
datefrom = datetime.strptime(it[0], format)
dateto = datetime.strptime(it[1], format)
print(f'[信息] 正在尝试获取自 {datefrom.strftime(format)}{dateto.strftime(format)} 的发票数据')
case _:
print(f'[!!!!] 无效参数:{args.duration}')
return 11
format = "%d/%m/%Y"
datefrom = parse.quote(datefrom.strftime(format))
dateto = parse.quote(dateto.strftime(format))
kinds = '&'.join([f'kinds%5B%5D={k}' for k in args.kinds])
if not args.vf_token or not args.vf_api_url:
print('[!!!!] 缺少参数vf_token 或 vf_api_url')
return 12
while True:
try:
response = requests.get(f'{args.vf_api_url}/invoices.xml?{kinds}&include_positions=true&per_page=25&page={index}&period=more&date_from={datefrom}&date_to={dateto}&search_date_type=issue_date&api_token={args.vf_token}', timeout=args.timeout)
string = response.content.decode(args.encoding)
data = ET.fromstring(string)
except Exception as e:
print(f'[警告] 向服务器请求数据时发生了错误:{e}')
continue
if data.tag in ['nil-classes', 'hash']: break
else: index += 1
if root is None: root = data
else: root.extend(data.findall('invoice'))
if root is None:
print('[!!!!] 服务器返回了无效数据')
return 21
def lookup(value, fieldname, df: pandas.DataFrame) -> Result:
df = df.astype(object).where(df.notna(), None)
rows = df[df.isin([value]).any(axis=1)]
data = rows.to_dict(orient='list')
return Result(data.get(fieldname))
def fetch(url, error=0) -> Result[requests.Response, Exception]:
while True:
try:
response = requests.get(url, timeout=args.timeout)
return Result(response)
except Exception as e:
if error < args.retry: error += 1
else: return Result(None, e)
def text(element, fieldname) -> str:
result = Result(element).map(lambda x: x.find(fieldname).text.strip())
text = result.ornone()
if bool(text): return text
else: return None
if workbook is None:
# 导出发票数据
invoices = root.findall('invoice')
limit = len(invoices)
print(f'[信息] 已读取发票数据 {limit}')
# 订单导入字段
# 详情见 <https://crm.xiaoman.cn/order/importOrder>
data = FieldArray(
'订单号',
'商机号',
'订单日期',
'当前处理人',
'业绩归属部门',
'客户编号',
'币种',
'产品名称',
'产品编号',
'产品型号',
'原价',
'折扣率',
'单价',
'数量',
'产品描述',
'AVOIR'
)
for index, invoice in enumerate(invoices):
rate = index / limit
kind = text(invoice, 'kind')
number = text(invoice, 'number')
issue_date = text(invoice, 'issue-date')
total = float(text(invoice, 'price-net') or '0')
print(f'[信息] 正在载入 {number} ... {str(round(rate * 100)).rjust(3)} %')
relation = fetch(f'{args.vf_api_url}/invoices/{text(invoice, 'from-invoice-id')}.json?api_token={args.vf_token}').map(lambda res: res.json()['number']).orelse(Result(text(invoice, 'title')).map(lambda x: x.split(maxsplit=1)[0])).ornone()
category = fetch(f'{args.vf_api_url}/categories/{text(invoice, 'category-id')}.json?api_token={args.vf_token}').map(lambda res: res.json()['name']).ornone()
client = fetch(f'{args.vf_api_url}/clients/{text(invoice, 'client-id')}.json?api_token={args.vf_token}').map(lambda res: res.json()['external_id']).ornone()
for position in invoice.find('positions'):
code = text(position, 'code')
product = text(position, 'name')
description = text(position, 'description')
price = float(text(position, 'price-net') or '0')
discount = float(text(position, 'discount-percent') or '0')
quantity = float(text(position, 'quantity') or '0')
data.append('订单号', args.prefix + number)
data.append('商机号', relation)
data.append('订单日期', issue_date)
data.append('当前处理人', category)
data.append('业绩归属部门', args.department)
data.append('客户编号', client)
data.append('币种', args.currency)
match kind:
case 'vat':
data.append('产品名称', product)
data.append('产品型号', code)
data.append('原价', '%.2f' % price)
data.append('折扣率', '%g%%' % discount)
data.append('单价', '%.2f' % (price * (1 - discount / 100)))
data.append('数量', '%g' % quantity)
data.append('产品描述', description)
case 'correction':
data.append('产品名称', 'REMISE SPECIAL')
data.append('产品编号', '186')
data.append('单价', '0')
data.append('数量', '0')
data.append('AVOIR', '%.2f' % total)
data.newrow()
# 新建导入数据表
if not bool(args.output): args.output = f'{args.outdir}/ultimatron-orders-import-{date.strftime("%Y%m%d-%H%M%S-%f")}.xlsx'
print(f'[信息] 正在写入文件:{args.output}')
try:
workbook = pandas.DataFrame(data.map)
workbook.to_excel(args.output, index=False, sheet_name='Sheet1')
except Exception as e:
print(f'[!!!!] 写入文件时发生了错误:{e}')
return 3
if args.automation == 'none': return 0
print('[信息] 正在启动自动化程序')
try:
opts = webdriver.ChromeOptions()
opts.add_experimental_option("excludeSwitches", ["enable-logging"])
driver = webdriver.Chrome(opts)
driver.set_page_load_timeout(args.timeout)
except Exception as e:
print(f'[!!!!] 初始化时发生了错误:{e}')
return 6
try:
driver.get(args.xm_web_url)
except TimeoutException:
# 停止加载
print(f'[警告] 操作超时')
driver.execute_script("window.stop();")
except Exception as e:
print(f'[警告] 载入网页时发生了错误:{e}')
return 7
def locate(selector, wait=True, parent=driver, condition=EC.visibility_of_element_located) -> WebElement:
while True:
try:
locator = (By.CSS_SELECTOR, selector)
if not wait: return parent.find_element(*locator)
wait = WebDriverWait(parent, timeout=args.timeout)
element = wait.until(EC.presence_of_element_located(locator))
# 查看元素
driver.execute_script("arguments[0].scrollIntoView({ block: 'center', inline: 'nearest' });", element)
if condition is not None:
wait = WebDriverWait(parent, timeout=args.timeout)
element = wait.until(condition(locator))
return element
except StaleElementReferenceException:
# 如果遇到过期元素,重新尝试查找
continue
except TimeoutException:
# 超时错误
raise Exception('操作超时')
except Exception as e:
# 其他错误
raise e
def click(selector, wait=True, parent=driver, condition=EC.element_to_be_clickable):
element = locate(selector, wait, parent, condition) if isinstance(selector, str) else selector
counter = lambda: int(element.get_attribute('taximeter') or 0)
error = False
value = counter()
driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].setAttribute('taximeter', arguments[1] + 1));", element, value)
for attempt in range(args.retry):
try:
if not error: element.click()
else: driver.execute_script("arguments[0].click();", element)
except StaleElementReferenceException:
break
except:
error = True
continue
# 检测点击事件
try:
WebDriverWait(driver, args.interval).until(lambda _: counter() > value)
break
except TimeoutException: continue
except: break
def ready(driver):
try:
condition = lambda x: 'nprogress-busy' in x.find_element(By.TAG_NAME, 'html').get_attribute('class')
wait = WebDriverWait(driver, timeout=args.interval)
wait.until(condition)
except (TimeoutException, StaleElementReferenceException):
pass
wait = WebDriverWait(driver, timeout=args.timeout)
wait.until_not(condition)
return True
def keyin(element: WebElement, value):
try: element.send_keys(value)
except: driver.execute_script("arguments[0].value = arguments[1]", element, value)
if bool(args.xm_username) and bool(args.xm_password):
try:
locate("input.account").send_keys(args.xm_username)
locate("input#password").send_keys(args.xm_password)
click("input.agree-checkbox")
click("button.login-btn")
except Exception as e:
print(f'[!!!!] 登录网页时发生了错误:{e}')
return 81
print("[信息] 正在检测登录状态...")
while True:
try:
driver.find_element(By.ID, 'container')
print("[信息] 已登录")
break
except:
ready(driver) #1
try:
click(".layout-sidebar ul li.list-none.cpq div div")
ready(driver)
click(".layout-sidebar .layout-second-menu li.order a")
except Exception as e:
print(f'[!!!!] 尝试检索订单时发生了错误:{e}')
return 82
# 状态菜单映射
if index := { 'draft': 1, 'final': 6 }.get(args.automation):
try:
click(".list-header-top button.okki-dropdown-trigger")
click(".okki-dropdown-content button")
driver.switch_to.window(driver.window_handles[1])
except Exception as e:
print(f'[!!!!] 尝试进入录入订单页面时发生了错误:{e}')
return 83
try:
# 选择不导入无对应产品的订单
click(".product-import-img-box .import-img-radio:nth-child(2) .mm-radio-group > label:nth-child(2) .mm-radio-input", condition=None)
# 变更状态
click(".product-import-img-box .mm-selector-rendered")
click(f".mm-outside.mm-select-dropdown ul li:nth-child({index}) span")
path = os.path.abspath(args.output or args.invoices)
# 上传文件
locate(".big-file-upload input", wait=False).send_keys(path)
click(".product-import-img-footer button")
# 开始导入
click(".product-import-img-footer button.mm-button__primary")
except Exception as e:
print(f'[!!!!] 上传文件时发生了错误:{e}')
return 83
# 等待订单录入
while True:
try:
innerText = driver.find_element(By.CSS_SELECTOR, ".img-box-result-title").get_attribute('innerText')
if innerText == '导入完成': break
except:
ready(driver) #1
try:
# 查看导入结果
click(".mm-notification-container .mm-icon-close")
click(".product-import-img-footer button.mm-button__primary")
cell = locate(".mm-tbody table tbody tr:nth-child(1) td:nth-child(3)", condition=None)
indicator = locate("div > div > span", parent=cell, condition=None)
error = int(indicator.text[3:])
if error != 0:
print(f'[警告] {error} 个订单导入失败')
print(f'[信息] 提示:可下载失败记录')
except Exception as e:
print(f'[!!!!] 等待订单录入时发生了错误:{e}')
return 84
try:
click("a", wait=False, parent=cell)
driver.switch_to.window(driver.window_handles[2])
except:
print('[!!!!] 查看导入结果时发生了错误')
return 85
ready(driver) #2
modified = []
def modify(link: WebElement, handle=3):
click(link)
driver.switch_to.window(driver.window_handles[handle])
click(".sticky .okki-space-item:nth-child(1) button")
warn = False
associative = False
relation = None
number = Result(None).map(lambda _: locate("h1.serial-input-title span").get_attribute('innerText')).ornone()
correction = lookup(number, 'AVOIR', workbook).map(lambda x: x[0]).ornone()
if number is not None and number not in modified and correction is None:
try:
# 选择商机
for attempt in range(args.retry):
try:
relation = lookup(number, '商机号', workbook).map(lambda x: x[0]).unwrap()
needle = str(relation).strip()
if match := Result(re.search(r'O\d+', needle)).map(lambda x: x[0]).ornone():
try:
href = locate(".layout-sidebar ul li.list-none.opportunity a").get_attribute('href')
driver.switch_to.new_window('tab')
driver.get(href)
# 检索商机
locate(".new-header input").send_keys(match)
click(".new-header .search-btn span")
ready(driver)
name = locate("#business-board-drag-wrapper .business-card-wrapper > div .name-wrapper a", wait=False)
needle = name.text.strip()
finally:
driver.close()
driver.switch_to.window(driver.window_handles[handle])
dropdown = locate("#rc_select_1")
dropdown.clear()
dropdown.send_keys(needle)
wait = WebDriverWait(driver, timeout=args.timeout)
menu = wait.until(lambda x: x.find_element(By.CSS_SELECTOR, ".okki-select-dropdown"))
for item in menu.find_elements(By.CSS_SELECTOR, ".rc-virtual-list-holder-inner > div"):
if item.get_attribute('label').strip().startswith(needle):
click(item)
associative = True
raise KeyboardInterrupt()
# 关闭菜单栏
click(dropdown)
except KeyboardInterrupt:
break
except Exception as e:
print(f"[警告] {number}: 关联商机时发生错误:{e}")
warn = True
break
if not associative:
print(f'[警告] {number}: 无法找到对应商机 "{relation}"')
warn = True
# 设定分页选项 10 条/页
try:
click(".okki-pagination-options-size-changer")
click(".okki-select-dropdown .rc-virtual-list-holder-inner > div:nth-child(1)")
except:
print(f'[警告] {number}: 分页选项设置失败')
pass
# 编辑产品
try:
wrapper = locate(".paas-order-product-list .row-items", condition=None)
positions = lookup(number, '产品型号', workbook).unwrap()
ids = []
while (index := 1):
for attempts in range(args.retry):
if index >= 10: break
tail = Result(ids).map(lambda x: x[-1:][0]).ornone()
driver.execute_script("arguments[0].scrollIntoView({ block: 'end' });", wrapper)
driver.execute_script("arguments[0].scrollTo(arguments[1], arguments[2]);", wrapper, 0, 0)
for item in wrapper.find_elements(By.CSS_SELECTOR, ".row-item"):
if len(ids) >= len(positions): raise KeyboardInterrupt()
serial = Result(item.find_elements(By.CSS_SELECTOR, ".cell[data-cci='2'] .cell-inner div")).map(lambda x: int(x[0].text)).ornone()
if serial is not None and serial not in ids:
base = (serial - tail) if tail is not None else index
height = int(item.get_attribute('clientHeight'))
offset = (base - 1) * height
driver.execute_script("arguments[0].scrollTo(arguments[1], arguments[2]);", wrapper, 0, offset)
click(".product-info-group-product-info a.jump-link", parent=item)
price = locate(".okki-drawer-body div[title='含税成本价'] + div div.mm-formily-preview-text", condition=None).text
click(".okki-drawer-body .space-header-after button")
wait = WebDriverWait(driver, timeout=args.timeout)
wait.until_not(lambda x: x.find_element(By.CSS_SELECTOR, ".okki-drawer-body").is_displayed())
original = locate(".cell[data-cci='4'] input", parent=item, condition=None)
if price == '--': price = original.get_attribute('value')
driver.execute_script("arguments[0].scrollIntoView({ block: 'center' });", wrapper)
driver.execute_script("arguments[0].scroll(arguments[1], arguments[2]);", wrapper, 400, offset)
# 填入含税成本价
target = locate(".cell[data-cci='6'] input", parent=item, condition=None)
length = len(target.get_attribute('value'))
target.send_keys(Keys.BACKSPACE * length)
target.send_keys(price)
ids.append(serial)
index += 1
# 下一页
button = locate(".text-right li.okki-pagination-next button", condition=None)
if bool(button.get_attribute('disabled')) and len(ids) < len(positions): raise Exception(f'产品信息不完整。缺少:{', '.join(positions[len(ids):])}')
click(button)
except KeyboardInterrupt:
pass
# 删除 Avoir 费用
try: click(".ow-box button.okki-btn-round", wait=False)
except: pass
# 保存订单
ready(driver)
click(".sticky.bottom-0 button.okki-btn-primary", condition=None)
ready(driver)
print(f"[信息] {number}: 修改完成")
modified.append(number)
global success
success += 1
except Exception as e:
print(f"[警告] {number}: 编辑订单时发生了错误:{e}")
warn = True
global warning
if warn: warning += 1
# 关闭标签页
driver.close()
driver.switch_to.window(driver.window_handles[handle-1])
# 设置分页选项 10 条/页
try:
click(".paas-invoice-list-frame .okki-pagination-options-size-changer")
click(".okki-select-dropdown .rc-virtual-list-holder-inner > div:nth-child(1)")
ready(driver)
except:
print(f'[警告] 分页选项设置失败')
pass
if args.automation == 'override':
for number in workbook['订单号'].unique():
try:
box = locate(".list-header-wrap div[data-item-name='订单号']", condition=None)
search = locate("input", parent=box, condition=None)
length = len(search.get_attribute('value'))
click(box)
ActionChains(driver).send_keys(Keys.BACKSPACE * length).send_keys(number).send_keys(Keys.ENTER).perform()
except Exception as e:
print(f'[警告] {number}: 检索订单时发生了错误:{e}')
continue
entry = None
department: str = lookup(number, '业绩归属部门', workbook).map(lambda x: x[0]).ornone()
for attempt in range(args.retry):
try:
while ready(driver):
for link in driver.find_elements(By.CSS_SELECTOR, ".list-frame-table .vue-recycle-scroller__item-wrapper .cell[data-cci='1'] a"):
name = link.text.strip()
cell = link.find_element(By.XPATH, '../../../../div[@data-cci="6"]/div/span')
title = cell.get_attribute('title').strip()
if link.is_displayed() and name == number and title == department:
entry = link
raise KeyboardInterrupt()
# 下一页
button = locate(".okki-pagination-next button", condition=None)
if bool(button.get_attribute('disabled')): break
click(button)
except KeyboardInterrupt:
break
except StaleElementReferenceException:
continue
except Exception as e:
print(f'[警告] {number}: 发生错误:{e}')
continue
if entry is None:
print(f'[警告] {number}: 无法找到对应发票')
continue
try:
modify(entry, handle=1)
except Exception as e:
print(f'[!!!!] {number}: 编辑订单时发生了错误:{e}')
continue
return 0
while ready(driver):
for link in driver.find_elements(By.CSS_SELECTOR, ".list-frame-table .vue-recycle-scroller__item-wrapper .cell[data-cci='1'] a"):
try:
modify(link)
except Exception as e:
print(f'[!!!!] 编辑订单时发生了错误:{e}')
return 85
try:
button = locate(".paas-invoice-list-frame .list-okki-footer-wrap li.okki-pagination-next button", wait=False)
if bool(button.get_attribute('disabled')): raise KeyboardInterrupt()
click(button)
except:
print('[信息] 已经是最后一页')
break
return 0
class FieldArray[K, V]:
def __init__(self, *args: K):
self.map: dict[K, list[V]] = { name: [] for name in args }
self.index = 0
pass
def append(self, key: K, value: V):
array = self.map.get(key, [])
array.insert(self.index, value)
def newrow(self, padding=None):
for array in self.map.values():
limit = len(array) - 1
delta = self.index - limit
array.extend([ padding for _ in range(delta) ])
self.index += 1
class Result[T, E]:
def __init__(self, value: T, error: E | None = None):
self.value = value
self.error = error
def orelse(self, other):
if self.error is not None: return other
else: return self
def ornone(self):
if self.error is not None: return None
else: return self.value
def unwrap(self):
if self.error is not None: raise self.error
else: return self.value
def map(self, f):
if self.error is not None: return self
try: return Result(f(self.value))
except Exception as e: return Result(None, e)
try: status = main()
except KeyboardInterrupt: status = 145
print(f'[信息] 已修改 {success} 个订单,其中包含 {warning} 条警告信息')
print(f'[信息] 总耗时 {datetime.now() - date}')
exit(status)