import argparse import requests import pandas import time import os from urllib import parse from datetime import datetime, timedelta from xml.etree import ElementTree as ET from selenium import webdriver from selenium.common.exceptions import StaleElementReferenceException, TimeoutException from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait parser = argparse.ArgumentParser(description="销售订单自动导入脚本") parser.add_argument('invoices', nargs='?') parser.add_argument('products') parser.add_argument('--xm-username', type=str, nargs='?', default='') parser.add_argument('--xm-password', type=str, nargs='?', default='') parser.add_argument('--vf-token', type=str, required=True) parser.add_argument('--xm-web-url', type=str, nargs='?', default='https://login.xiaoman.cn/login/') parser.add_argument('--vf-api-url', type=str, nargs='?', default='https://ultimatron-france.vosfactures.fr/') parser.add_argument('--outdir', type=str, nargs='?', default='.') parser.add_argument('--per-page', type=int, nargs='?', default=5) parser.add_argument('--currency', type=str, required=True) parser.add_argument('--department', type=str, required=True) parser.add_argument('-a', '--automation', type=str, choices=['none', 'draft', 'final'], nargs='?', default='none') parser.add_argument('-e', '--encoding', type=str, nargs='?', default='utf-8') parser.add_argument('-o', '--output', type=str, nargs='?', default='') parser.add_argument('-t', '--timeout', type=int, nargs='?', default=60) parser.add_argument('-i', '--interval', type=int, nargs='?', default=5) args = parser.parse_args() date = datetime.now() success = 0 warning = 0 def main(): # 读取产品信息 print(f'[信息] 正在读取文件:{args.products}') try: products = pandas.read_excel(args.products) except Exception as e: print(f'[!!!!] 读取文件时发生了错误:{e}') return 1 print(f'[信息] 已读取产品数据 {len(products)} 条') if args.invoices: # 读取发票数据 print(f'[信息] 正在读取文件:{args.invoices}') try: with open(args.invoices, 'r', encoding=args.encoding) as file: document = ET.parse(file) root = document.getroot() except Exception as e: print(f'[!!!!] 读取文件时发生了错误:{e}') return 2 else: root = None index = 1 yesterday = (datetime.today() - timedelta(days=1)).strftime("%d/%m/%Y") quote = parse.quote(yesterday) print(f'[信息] 正在尝试获取 {yesterday} 的 Facture 数据') while True: try: response = requests.get(f'{args.vf_api_url}/invoices.xml?kind=vat&include_positions=true&per_page=25&page={index}&period=more&date_from={quote}&date_to={quote}&search_date_type=issue_date&status=paid&api_token={args.vf_token}', timeout=args.timeout) string = response.content.decode(args.encoding) data = ET.fromstring(string) except Exception as e: print(f'[警告] 向服务器请求数据时发生了错误:{e}') return 21 time.sleep(args.interval) #1 if data.tag == 'nil-classes': break else: index += 1 if root is None: root = data else: root.extend(data.findall('invoice')) if root is None: print('[!!!!] 服务器返回了无效数据') return 22 def lookup(value, fieldname) -> str: rows = products[products.isin([value]).any(axis=1)] data = rows.to_dict(orient='list') try: return data.get(fieldname)[0] except: return None def text(element, fieldname) -> str: child = element.find(fieldname) if bool(child.text): return child.text.strip() else: return None clients = RelationMap(lambda x: text(x, 'client-id')) invoices = RelationMap(lambda x: text(x, 'number')) proformas = RelationMap(lambda x: text(x, 'from-invoice-id')) categories = RelationMap(lambda x: text(x, 'category-id')) for invoice in root.findall('invoice'): number = text(invoice, 'number') kind = invoice.find('kind').text if kind not in ['vat']: print(f"[警告] {number}: 类型错误 ({kind})") continue if invoice.find('positions') is None: print(f"[警告] {number}: 缺少产品信息") continue try: clients.setValueOf(invoice, None) except Exception as e: print(f"[警告] {number}: Client 数据错误 ({e})") continue try: proformas.setValueOf(invoice, None) except Exception as e: print(f"[警告] {number}: Proforma 数据错误 ({e})") continue try: categories.setValueOf(invoice, None) except Exception as e: print(f"[警告] {number}: Category 数据错误 ({e})") continue # 有效发票(包含完整的客户、PI和归属数据) invoices.setValueOf(invoice, invoice) print(f'[信息] 已读取有效发票数据 {len(invoices.map)} 条') if len(invoices.map) == 0: return 0 print('[信息] 正在向服务器请求数据') for id in clients.map.keys(): try: response = requests.get(f'{args.vf_api_url}/clients/{id}.json?api_token={args.vf_token}', timeout=args.timeout) data = response.json() clients.map[id] = data except Exception as e: print(f'[警告] 向服务器请求数据时发生了错误:{e}') continue time.sleep(args.interval) #2 for id in proformas.map.keys(): try: response = requests.get(f'{args.vf_api_url}/invoices/{id}.json?api_token={args.vf_token}', timeout=args.timeout) data = response.json() proformas.map[id] = data except Exception as e: print(f'[警告] 向服务器请求数据时发生了错误:{e}') continue time.sleep(args.interval) #3 for id in categories.map.keys(): try: response = requests.get(f'{args.vf_api_url}/categories/{id}.json?api_token={args.vf_token}', timeout=args.timeout) data = response.json() categories.map[id] = data except Exception as e: print(f'[警告] 向服务器请求数据时发生了错误:{e}') continue # 订单导入字段 # 详情见 data = FieldArray( '订单号', '订单日期', '当前处理人', '业绩归属部门', '客户编号', '币种', '产品名称', '产品编号', '产品型号', '原价', '折扣率', '单价', '数量', '产品描述', ) for invoice in invoices.map.values(): for position in invoice.find('positions'): number = text(invoice, 'number') issue_date = text(invoice, 'issue-date') category = categories.getValueOf(invoice)['name'] client = clients.getValueOf(invoice)['external_id'] code = text(position, 'code') id = lookup(code, '产品编号') product = lookup(code, '产品名称') description = lookup(code, '产品描述') price = float(text(position, 'price-net') or '0') discount = float(text(position, 'discount-percent') or '0') quantity = float(text(position, 'quantity') or '0') data.append('订单日期', issue_date) data.append('当前处理人', category) data.append('业绩归属部门', args.department) data.append('客户编号', client) data.append('币种', args.currency) data.append('订单号', number) data.append('产品名称', product) data.append('产品编号', id) data.append('产品型号', code) data.append('原价', '%.2f' % price) data.append('折扣率', '%g%%' % discount) data.append('产品描述', description) data.append('单价', '%.2f' % (price * (1 - discount / 100))) data.append('数量', '%g' % quantity) data.newrow() # 新建导入数据表 if not bool(args.output): args.output = f'{args.outdir}/ultimatron-orders-import-{date.strftime("%Y%m%d-%H%M%S-%f")}.xlsx' print(f'[信息] 正在写入文件:{args.output}') try: workbook = pandas.DataFrame(data.map) workbook.to_excel(args.output, index=False, sheet_name='Sheet1') except Exception as e: print(f'[!!!!] 写入文件时发生了错误:{e}') return 3 if args.automation == 'none': return 0 print('[信息] 正在启动自动化程序') if not bool(args.xm_username): print(f"[!!!!] 错误:缺少参数 'xm-username'(小满账号邮箱)") return 4 if not bool(args.xm_password): print(f"[!!!!] 错误:缺少参数 'xm-password'(小满账号密码)") return 5 try: driver = webdriver.Chrome() driver.set_page_load_timeout(args.timeout) except Exception as e: print(f'[!!!!] 初始化时发生了错误:{e}') return 6 try: print(f'[信息] 正在载入 {args.xm_web_url}') driver.get(args.xm_web_url) except TimeoutException: # 停止加载 print(f'[警告] 操作超时') driver.execute_script("window.stop();") except Exception as e: print(f'[警告] 载入网页时发生了错误:{e}') return 7 def locate(selector, wait=True, parent=driver, condition=EC.visibility_of_element_located): while True: try: locator = (By.CSS_SELECTOR, selector) if not wait: return parent.find_element(*locator) wait = WebDriverWait(parent, timeout=args.timeout) element = wait.until(EC.presence_of_element_located(locator)) # 查看元素 driver.execute_script("arguments[0].scrollIntoView({ block: 'center', inline: 'nearest' });", element) if condition is not None: wait = WebDriverWait(parent, timeout=args.timeout) element = wait.until(condition(locator)) return element except StaleElementReferenceException: # 如果遇到过期元素,重新尝试查找 continue except TimeoutException: # 超时错误 raise Exception('操作超时') except Exception as e: # 其他错误 raise e try: locate("input.account").send_keys(args.xm_username) locate("input#password").send_keys(args.xm_password) locate("input.agree-checkbox").click() locate("button.login-btn").click() except Exception as e: print(f'[!!!!] 登录网页时发生了错误:{e}') return 81 try: locate(".layout-sidebar ul li:nth-child(10) div div").click() locate(".layout-sidebar .layout-second-menu li:nth-child(3) a").click() locate(".list-header-top button.okki-dropdown-trigger").click() locate(".okki-dropdown-content button").click() driver.switch_to.window(driver.window_handles[1]) except Exception as e: print(f'[!!!!] 尝试进入录入订单页面时发生了错误:{e}') return 82 # 状态菜单映射 status = { 'draft': 1, 'final': 6 } time.sleep(args.interval) #4 try: # 变更状态 locate(".product-import-img-box .mm-selector-rendered").click() locate(f".mm-outside.mm-select-dropdown ul li:nth-child({status.get(args.automation)}) span").click() # 上传文件 locate(".big-file-upload input", wait=False).send_keys(os.path.abspath(args.output)) locate(".product-import-img-footer button", condition=EC.element_to_be_clickable).click() # 开始导入 locate(".product-import-img-footer button.mm-button__primary", condition=EC.element_to_be_clickable).click() except Exception as e: print(f'[!!!!] 上传文件时发生了错误:{e}') return 83 time.sleep(args.interval) #5 try: # 等待订单录入 wait = WebDriverWait(driver, timeout=(args.timeout * 4)) wait.until(lambda x: x.find_element(By.CSS_SELECTOR, ".img-box-result-title").get_attribute('innerText') == '导入完成') # 查看导入结果 locate(".mm-notification-container .mm-icon-close").click() locate(".product-import-img-footer button.mm-button__primary").click() locate(".mm-tbody table tbody tr:nth-child(1) td:nth-child(3) a").click() driver.switch_to.window(driver.window_handles[2]) except Exception as e: print(f'[!!!!] 等待订单录入时发生了错误:{e}') return 84 # 设置每页显示记录数 time.sleep(args.interval) #6 driver.get(driver.current_url.replace('page_size%22%3A20%2C%22', f'page_size%22%3A{args.per_page}%2C%22')) modified = [] while True: # 等待页面加载 wait = WebDriverWait(driver, timeout=args.timeout) wait.until(lambda x: 'nprogress-busy' not in x.find_element(By.TAG_NAME, 'html').get_attribute('class')) time.sleep(args.interval) #7 for idx in range(args.per_page): try: links = driver.find_elements(By.CSS_SELECTOR, ".list-frame-table .virtual-list-os-target .row-items .cell[data-cci='1'] a") element = links[idx] except Exception as e: print(f'[警告] 未找到记录: {e}') break try: driver.execute_script("arguments[0].click();", element) driver.switch_to.window(driver.window_handles[3]) # 编辑订单 locate(".component-detail-frame-header .okki-space-item:nth-child(1) button").click() warn = False header = locate(".order-edit-header .edit-order-no span span:nth-child(1)") number = header.text invoice = invoices.map.get(number) positions = invoice.find('positions').findall('*') if number not in modified: # 选择商机 try: index = 1 dropdown = locate(".component-business-select .mm-selector-rendered") dropdown.click() while True: element = locate(f".mm-outside.mm-select-dropdown ul li:nth-child({index}) span") index += 1 if element.text.startswith(proformas.getValueOf(invoice)['number']): driver.execute_script("arguments[0].scrollIntoView({ block: 'center', inline: 'nearest' });", element) element.click() break except Exception as e: print(f"[警告] {number}: 录入商机时发生错误:{e}") warn = True # 编辑运费 try: wrapper = locate(".order-edit-product-wrapper .virtual-list-os-target .row-items", condition=None) limit = len(positions) for idx in range(limit): driver.execute_script("arguments[0].scrollTo(0, arguments[0].scrollHeight);", wrapper) items = wrapper.find_elements(By.CSS_SELECTOR, "div.vue-recycle-scroller__item-wrapper div.vue-recycle-scroller__item-view") driver.execute_script("arguments[0].scrollIntoView({ block: 'start', inline: 'end' });", items[idx]) product = locate(f".product-info-group-product-name input", parent=items[-1], condition=None) if product.get_attribute('value') == 'port': # 定位元素 driver.execute_script("arguments[0].scrollTo(0, arguments[0].scrollHeight);", wrapper) locate(".cell[data-cci='4'] input", parent=items[-1], condition=None) locate(".cell[data-cci='5'] input", parent=items[-1], condition=None) target = locate(".cell[data-cci='6'] input", parent=items[-1], condition=None) source = locate(".cell[data-cci='7'] input", parent=items[-1], condition=None) # 填入含税成本价 price = source.get_attribute('value') target.send_keys(price) break except Exception as e: print(f"[警告] {number}: 编辑运费时发生错误:{e}") warn = True # 保存订单 button = locate(".order-edit-footer button.okki-btn-primary", condition=None) driver.execute_script("arguments[0].click();", button) time.sleep(args.interval) #8 print(f"[信息] {number}: 修改完成") modified.append(number) global success success += 1 global warning if warn: warning += 1 # 关闭标签页 driver.close() driver.switch_to.window(driver.window_handles[2]) except Exception as e: print(f'[!!!!] 编辑订单时发生了错误:{e}') return 85 try: button = locate(".okki-pagination-next button", wait=False) if bool(button.get_attribute('disabled')): raise Exception() driver.execute_script("arguments[0].click()", button) except: print('[信息] 已经是最后一页') break return 0 class RelationMap: def __init__(self, predicate): self.map = {} self.predicate = predicate def getValueOf(self, object): id = self.predicate(object) if bool(id): return self.map.get(id) else: raise Exception(f"Invalid ID: '{id}'") def setValueOf(self, object, value): id = self.predicate(object) if bool(id): self.map[id] = value else: raise Exception(f"Invalid ID: '{id}'") class FieldArray[K, V]: def __init__(self, *args: K): self.map: dict[K, list[V]] = { name: [] for name in args } self.index = 0 pass def append(self, key: K, value: V): array = self.map.get(key) array.insert(self.index, value) def newrow(self, padding=None): for array in self.map.values(): if len(array) <= self.index: array.insert(self.index, padding) self.index += 1 try: status = main() except KeyboardInterrupt: status = 145 print(f'[信息] 已修改 {success} 个订单,其中包含 {warning} 条警告信息') print(f'[信息] 总耗时 {datetime.now() - date}')