From 32cf8599d8ffd954cc413161cd4646d65a5ed277 Mon Sep 17 00:00:00 2001 From: break27 Date: Mon, 5 May 2025 08:57:19 +0200 Subject: [PATCH] add: ability to locate multiple ports & read from excel --- 销售订单自动导入.py | 400 +++++++++++++++++++++++++------------------- 1 file changed, 226 insertions(+), 174 deletions(-) diff --git a/销售订单自动导入.py b/销售订单自动导入.py index 5179753..8f95203 100644 --- a/销售订单自动导入.py +++ b/销售订单自动导入.py @@ -15,20 +15,21 @@ from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait parser = argparse.ArgumentParser(description="销售订单自动导入脚本") -parser.add_argument('invoices', nargs='?') -parser.add_argument('products') +parser.add_argument('invoices', nargs='?', default='') parser.add_argument('--xm-username', type=str, nargs='?', default='') parser.add_argument('--xm-password', type=str, nargs='?', default='') -parser.add_argument('--vf-token', type=str, required=True) +parser.add_argument('--vf-token', type=str, nargs='?', default='') parser.add_argument('--xm-web-url', type=str, nargs='?', default='https://login.xiaoman.cn/login/') parser.add_argument('--vf-api-url', type=str, nargs='?', default='https://ultimatron-france.vosfactures.fr/') -parser.add_argument('--outdir', type=str, nargs='?', default='.') -parser.add_argument('--per-page', type=int, nargs='?', default=5) -parser.add_argument('--currency', type=str, required=True) -parser.add_argument('--department', type=str, required=True) +parser.add_argument('-C', '--currency', type=str, nargs='?', default='') +parser.add_argument('-D', '--department', type=str, nargs='?', default='') +parser.add_argument('-T', '--days-delta', type=int, nargs='?', default=None) parser.add_argument('-a', '--automation', type=str, choices=['none', 'draft', 'final'], nargs='?', default='none') parser.add_argument('-e', '--encoding', type=str, nargs='?', default='utf-8') +parser.add_argument('-m', '--mappings', type=str, nargs='?', default='') +parser.add_argument('-d', '--outdir', type=str, nargs='?', default='.') parser.add_argument('-o', '--output', type=str, nargs='?', default='') +parser.add_argument('-p', '--per-page', type=int, nargs='?', default=5) parser.add_argument('-t', '--timeout', type=int, nargs='?', default=60) parser.add_argument('-i', '--interval', type=int, nargs='?', default=5) @@ -38,18 +39,8 @@ date = datetime.now() success = 0 warning = 0 -def main(): - # 读取产品信息 - print(f'[信息] 正在读取文件:{args.products}') - try: - products = pandas.read_excel(args.products) - except Exception as e: - print(f'[!!!!] 读取文件时发生了错误:{e}') - return 1 - - print(f'[信息] 已读取产品数据 {len(products)} 条') - - if args.invoices: +def main(workbook=None): + if args.invoices.endswith('.xml'): # 读取发票数据 print(f'[信息] 正在读取文件:{args.invoices}') try: @@ -58,17 +49,36 @@ def main(): root = document.getroot() except Exception as e: print(f'[!!!!] 读取文件时发生了错误:{e}') - return 2 + return 1 + elif args.invoices.endswith('.xlsx'): + # 读取发票数据 + print(f'[信息] 正在读取文件:{args.invoices}') + try: + workbook = pandas.read_excel(args.invoices) + except Exception as e: + print(f'[!!!!] 读取文件时发生了错误:{e}') + return 1 else: root = None index = 1 - yesterday = (datetime.today() - timedelta(days=1)).strftime("%d/%m/%Y") - quote = parse.quote(yesterday) + format = "%d/%m/%Y" + + if args.days_delta is not None: + datefrom = (datetime.today() - timedelta(days=args.days_delta)) + dateto = datetime.today() + print(f'[信息] 正在尝试获取自 {datefrom:%Y-%m-%d} 到 {dateto:%Y-%m-%d} 的 Facture 数据') + else: + yesterday = (datetime.today() - timedelta(days=1)) + datefrom = yesterday + dateto = yesterday + print(f'[信息] 正在尝试获取 {yesterday:%Y-%m-%d} 的 Facture 数据') + + datefrom = parse.quote(datefrom.strftime(format)) + dateto = parse.quote(dateto.strftime(format)) - print(f'[信息] 正在尝试获取 {yesterday} 的 Facture 数据') while True: try: - response = requests.get(f'{args.vf_api_url}/invoices.xml?kind=vat&include_positions=true&per_page=25&page={index}&period=more&date_from={quote}&date_to={quote}&search_date_type=issue_date&status=paid&api_token={args.vf_token}', timeout=args.timeout) + response = requests.get(f'{args.vf_api_url}/invoices.xml?kind=vat&include_positions=true&per_page=25&page={index}&period=more&date_from={datefrom}&date_to={dateto}&search_date_type=issue_date&api_token={args.vf_token}', timeout=args.timeout) string = response.content.decode(args.encoding) data = ET.fromstring(string) except Exception as e: @@ -87,151 +97,164 @@ def main(): print('[!!!!] 服务器返回了无效数据') return 22 - def lookup(value, fieldname) -> str: - rows = products[products.isin([value]).any(axis=1)] + def lookup(value, fieldname, excel) -> Result: + rows = excel[excel.isin([value]).any(axis=1)] data = rows.to_dict(orient='list') - try: return data.get(fieldname)[0] - except: return None + return Result(data.get(fieldname)) def text(element, fieldname) -> str: child = element.find(fieldname) if bool(child.text): return child.text.strip() else: return None - clients = RelationMap(lambda x: text(x, 'client-id')) - invoices = RelationMap(lambda x: text(x, 'number')) - proformas = RelationMap(lambda x: text(x, 'from-invoice-id')) - categories = RelationMap(lambda x: text(x, 'category-id')) - - for invoice in root.findall('invoice'): - number = text(invoice, 'number') - kind = invoice.find('kind').text - - if kind not in ['vat']: - print(f"[警告] {number}: 类型错误 ({kind})") - continue - - if invoice.find('positions') is None: - print(f"[警告] {number}: 缺少产品信息") - continue - + if workbook is None: + # 读取产品信息 + print(f'[信息] 正在读取文件:{args.mappings}') try: - clients.setValueOf(invoice, None) + products = pandas.read_excel(args.mappings) except Exception as e: - print(f"[警告] {number}: Client 数据错误 ({e})") - continue + print(f'[!!!!] 读取文件时发生了错误:{e}') + return 1 - try: - proformas.setValueOf(invoice, None) - except Exception as e: - print(f"[警告] {number}: Proforma 数据错误 ({e})") - continue + print(f'[信息] 已读取产品数据 {len(products)} 条') - try: - categories.setValueOf(invoice, None) - except Exception as e: - print(f"[警告] {number}: Category 数据错误 ({e})") - continue + clients = RelationMap(lambda x: text(x, 'client-id')) + invoices = RelationMap(lambda x: text(x, 'number')) + proformas = RelationMap(lambda x: text(x, 'from-invoice-id')) + categories = RelationMap(lambda x: text(x, 'category-id')) - # 有效发票(包含完整的客户、PI和归属数据) - invoices.setValueOf(invoice, invoice) - - print(f'[信息] 已读取有效发票数据 {len(invoices.map)} 条') - if len(invoices.map) == 0: return 0 - - print('[信息] 正在向服务器请求数据') - - for id in clients.map.keys(): - try: - response = requests.get(f'{args.vf_api_url}/clients/{id}.json?api_token={args.vf_token}', timeout=args.timeout) - data = response.json() - clients.map[id] = data - except Exception as e: - print(f'[警告] 向服务器请求数据时发生了错误:{e}') - continue - - time.sleep(args.interval) #2 - - for id in proformas.map.keys(): - try: - response = requests.get(f'{args.vf_api_url}/invoices/{id}.json?api_token={args.vf_token}', timeout=args.timeout) - data = response.json() - proformas.map[id] = data - except Exception as e: - print(f'[警告] 向服务器请求数据时发生了错误:{e}') - continue - - time.sleep(args.interval) #3 - - for id in categories.map.keys(): - try: - response = requests.get(f'{args.vf_api_url}/categories/{id}.json?api_token={args.vf_token}', timeout=args.timeout) - data = response.json() - categories.map[id] = data - except Exception as e: - print(f'[警告] 向服务器请求数据时发生了错误:{e}') - continue - - # 订单导入字段 - # 详情见 - data = FieldArray( - '订单号', - '订单日期', - '当前处理人', - '业绩归属部门', - '客户编号', - '币种', - '产品名称', - '产品编号', - '产品型号', - '原价', - '折扣率', - '单价', - '数量', - '产品描述', - ) - - for invoice in invoices.map.values(): - for position in invoice.find('positions'): + for invoice in root.findall('invoice'): number = text(invoice, 'number') - issue_date = text(invoice, 'issue-date') - category = categories.getValueOf(invoice)['name'] - client = clients.getValueOf(invoice)['external_id'] + kind = invoice.find('kind').text - code = text(position, 'code') - id = lookup(code, '产品编号') - product = lookup(code, '产品名称') - description = lookup(code, '产品描述') - price = float(text(position, 'price-net') or '0') - discount = float(text(position, 'discount-percent') or '0') - quantity = float(text(position, 'quantity') or '0') + if kind not in ['vat']: + print(f"[警告] {number}: 类型错误 ({kind})") + continue - data.append('订单日期', issue_date) - data.append('当前处理人', category) - data.append('业绩归属部门', args.department) - data.append('客户编号', client) - data.append('币种', args.currency) - data.append('订单号', number) - data.append('产品名称', product) - data.append('产品编号', id) - data.append('产品型号', code) - data.append('原价', '%.2f' % price) - data.append('折扣率', '%g%%' % discount) - data.append('产品描述', description) - data.append('单价', '%.2f' % (price * (1 - discount / 100))) - data.append('数量', '%g' % quantity) - data.newrow() + if invoice.find('positions') is None: + print(f"[警告] {number}: 缺少产品信息") + continue - # 新建导入数据表 - if not bool(args.output): args.output = f'{args.outdir}/ultimatron-orders-import-{date.strftime("%Y%m%d-%H%M%S-%f")}.xlsx' - print(f'[信息] 正在写入文件:{args.output}') + try: + clients.setValueOf(invoice, None) + except Exception as e: + print(f"[警告] {number}: Client 数据错误 ({e})") + continue - try: - workbook = pandas.DataFrame(data.map) - workbook.to_excel(args.output, index=False, sheet_name='Sheet1') - except Exception as e: - print(f'[!!!!] 写入文件时发生了错误:{e}') - return 3 + try: + proformas.setValueOf(invoice, None) + except Exception as e: + print(f"[警告] {number}: Proforma 数据错误 ({e})") + continue + + try: + categories.setValueOf(invoice, None) + except Exception as e: + print(f"[警告] {number}: Category 数据错误 ({e})") + continue + + # 有效发票(包含完整的客户、PI和归属数据) + invoices.setValueOf(invoice, invoice) + + print(f'[信息] 已读取有效发票数据 {len(invoices.map)} 条') + if len(invoices.map) == 0: return 0 + + print('[信息] 正在向服务器请求数据') + + for id in clients.map.keys(): + try: + response = requests.get(f'{args.vf_api_url}/clients/{id}.json?api_token={args.vf_token}', timeout=args.timeout) + data = response.json() + clients.map[id] = data + except Exception as e: + print(f'[警告] 向服务器请求数据时发生了错误:{e}') + continue + + time.sleep(args.interval) #2 + + for id in proformas.map.keys(): + try: + response = requests.get(f'{args.vf_api_url}/invoices/{id}.json?api_token={args.vf_token}', timeout=args.timeout) + data = response.json() + proformas.map[id] = data + except Exception as e: + print(f'[警告] 向服务器请求数据时发生了错误:{e}') + continue + + time.sleep(args.interval) #3 + + for id in categories.map.keys(): + try: + response = requests.get(f'{args.vf_api_url}/categories/{id}.json?api_token={args.vf_token}', timeout=args.timeout) + data = response.json() + categories.map[id] = data + except Exception as e: + print(f'[警告] 向服务器请求数据时发生了错误:{e}') + continue + + # 订单导入字段 + # 详情见 + data = FieldArray( + '订单号', + '形式发票', + '订单日期', + '当前处理人', + '业绩归属部门', + '客户编号', + '币种', + '产品名称', + '产品编号', + '产品型号', + '原价', + '折扣率', + '单价', + '数量', + '产品描述', + ) + + for invoice in invoices.map.values(): + for position in invoice.find('positions'): + number = text(invoice, 'number') + issue_date = text(invoice, 'issue-date') + category = categories.getValueOf(invoice)['name'] + proforma = proformas.getValueOf(invoice)['number'] + client = clients.getValueOf(invoice)['external_id'] + + code = text(position, 'code') + id = lookup(code, '产品编号', products).map(lambda x: x[0]).ornone() + product = lookup(code, '产品名称', products).map(lambda x: x[0]).ornone() + description = lookup(code, '产品描述', products).map(lambda x: x[0]).ornone() + price = float(text(position, 'price-net') or '0') + discount = float(text(position, 'discount-percent') or '0') + quantity = float(text(position, 'quantity') or '0') + + data.append('形式发票', proforma) + data.append('订单日期', issue_date) + data.append('当前处理人', category) + data.append('业绩归属部门', args.department or None) + data.append('客户编号', client) + data.append('币种', args.currency or None) + data.append('订单号', number) + data.append('产品名称', product) + data.append('产品编号', id) + data.append('产品型号', code) + data.append('原价', '%.2f' % price) + data.append('折扣率', '%g%%' % discount) + data.append('产品描述', description) + data.append('单价', '%.2f' % (price * (1 - discount / 100))) + data.append('数量', '%g' % quantity) + data.newrow() + + # 新建导入数据表 + if not bool(args.output): args.output = f'{args.outdir}/ultimatron-orders-import-{date.strftime("%Y%m%d-%H%M%S-%f")}.xlsx' + print(f'[信息] 正在写入文件:{args.output}') + + try: + workbook = pandas.DataFrame(data.map) + workbook.to_excel(args.output, index=False, sheet_name='Sheet1') + except Exception as e: + print(f'[!!!!] 写入文件时发生了错误:{e}') + return 3 if args.automation == 'none': return 0 print('[信息] 正在启动自动化程序') @@ -317,8 +340,9 @@ def main(): locate(".product-import-img-box .mm-selector-rendered").click() locate(f".mm-outside.mm-select-dropdown ul li:nth-child({status.get(args.automation)}) span").click() + path = os.path.abspath(args.output or args.invoices) # 上传文件 - locate(".big-file-upload input", wait=False).send_keys(os.path.abspath(args.output)) + locate(".big-file-upload input", wait=False).send_keys(path) locate(".product-import-img-footer button", condition=EC.element_to_be_clickable).click() # 开始导入 @@ -331,7 +355,7 @@ def main(): try: # 等待订单录入 - wait = WebDriverWait(driver, timeout=(args.timeout * 4)) + wait = WebDriverWait(driver, timeout=600) wait.until(lambda x: x.find_element(By.CSS_SELECTOR, ".img-box-result-title").get_attribute('innerText') == '导入完成') # 查看导入结果 locate(".mm-notification-container .mm-icon-close").click() @@ -345,15 +369,11 @@ def main(): # 设置每页显示记录数 time.sleep(args.interval) #6 url = driver.current_url - param = f'%22page_size%22%3A{args.per_page}%7D' + param = f'%22page_size%22%3A{args.per_page}' - print('url before: ' + url) - - if 'page_size' in url: url = url.replace('%22page_size%22%3A20%7D', param) + if 'page_size' in url: url = url.replace('%22page_size%22%3A20', param) else: url += param - print('url after: ' + url) - driver.get(url) modified = [] @@ -381,20 +401,18 @@ def main(): header = locate(".order-edit-header .edit-order-no span span:nth-child(1)") number = header.text - invoice = invoices.map.get(number) - positions = invoice.find('positions').findall('*') - if number not in modified: # 选择商机 try: + proforma = lookup(number, '形式发票', workbook).map(lambda x: x[0]).unwrap() index = 1 dropdown = locate(".component-business-select .mm-selector-rendered") dropdown.click() while True: - element = locate(f".mm-outside.mm-select-dropdown ul li:nth-child({index}) span") + element = locate(f".mm-outside.mm-select-dropdown ul li:nth-child({index}) span", condition=None) index += 1 - if element.text.startswith(proformas.getValueOf(invoice)['number']): + if element.text.startswith(proforma): driver.execute_script("arguments[0].scrollIntoView({ block: 'center', inline: 'nearest' });", element) element.click() break @@ -405,22 +423,34 @@ def main(): # 编辑运费 try: wrapper = locate(".order-edit-product-wrapper .virtual-list-os-target .row-items", condition=None) + positions = lookup(number, '产品型号', workbook).unwrap() + index = 0 + count = positions.count('port') limit = len(positions) + ports = [] - for idx in range(limit): + while True: driver.execute_script("arguments[0].scrollTo(0, arguments[0].scrollHeight);", wrapper) items = wrapper.find_elements(By.CSS_SELECTOR, "div.vue-recycle-scroller__item-wrapper div.vue-recycle-scroller__item-view") - driver.execute_script("arguments[0].scrollIntoView({ block: 'start', inline: 'end' });", items[idx]) - product = locate(f".product-info-group-product-name input", parent=items[idx], condition=None) - if product.get_attribute('value') == 'port': + for item in items: + driver.execute_script("arguments[0].scrollIntoView({ block: 'start', inline: 'end' });", item) + product = locate(f".product-info-group-product-name input", parent=item, condition=None) # 定位元素 driver.execute_script("arguments[0].scrollTo(0, arguments[0].scrollHeight);", wrapper) - source = locate(".cell[data-cci='4'] input", parent=items[idx], condition=None) - target = locate(".cell[data-cci='6'] input", parent=items[idx], condition=None) - # 填入含税成本价 - price = source.get_attribute('value') - target.send_keys(price) + serial = locate(".cell[data-cci='2'] div div", parent=item, condition=None).text + source = locate(".cell[data-cci='4'] input", parent=item, condition=None) + target = locate(".cell[data-cci='6'] input", parent=item, condition=None) + + if product.get_attribute('value') == 'port' and serial not in ports: + # 填入含税成本价 + price = source.get_attribute('value') + target.send_keys(price) + index += 1 + ports.append(serial) + + if index >= count: break + if index >= limit: raise Exception('无法定位指定产品') except Exception as e: print(f"[警告] {number}: 编辑运费时发生错误:{e}") warn = True @@ -487,6 +517,28 @@ class FieldArray[K, V]: array.insert(self.index, padding) self.index += 1 +class Result: + def __init__(self, value, error=None): + self.value = value + self.error = error + + def orelse(self, other): + if self.error is not None: return other + else: return self + + def ornone(self): + if self.error is not None: return None + else: return self.value + + def unwrap(self): + if self.error is not None: raise self.error + else: return self.value + + def map(self, f): + if self.error is not None: return self + try: return Result(f(self.value)) + except Exception as e: return Result(None, e) + try: status = main() except KeyboardInterrupt: status = 145