Files
imporders/销售订单自动导入.py
2025-04-30 11:00:39 +02:00

495 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import argparse
import requests
import pandas
import time
import os
from urllib import parse
from datetime import datetime, timedelta
from xml.etree import ElementTree as ET
from selenium import webdriver
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
parser = argparse.ArgumentParser(description="销售订单自动导入脚本")
parser.add_argument('invoices', nargs='?')
parser.add_argument('products')
parser.add_argument('--xm-username', type=str, nargs='?', default='')
parser.add_argument('--xm-password', type=str, nargs='?', default='')
parser.add_argument('--vf-token', type=str, required=True)
parser.add_argument('--xm-web-url', type=str, nargs='?', default='https://login.xiaoman.cn/login/')
parser.add_argument('--vf-api-url', type=str, nargs='?', default='https://ultimatron-france.vosfactures.fr/')
parser.add_argument('--outdir', type=str, nargs='?', default='.')
parser.add_argument('--per-page', type=int, nargs='?', default=5)
parser.add_argument('--currency', type=str, required=True)
parser.add_argument('--department', type=str, required=True)
parser.add_argument('-a', '--automation', type=str, choices=['none', 'draft', 'final'], nargs='?', default='none')
parser.add_argument('-e', '--encoding', type=str, nargs='?', default='utf-8')
parser.add_argument('-o', '--output', type=str, nargs='?', default='')
parser.add_argument('-t', '--timeout', type=int, nargs='?', default=60)
parser.add_argument('-i', '--interval', type=int, nargs='?', default=5)
args = parser.parse_args()
date = datetime.now()
success = 0
warning = 0
def main():
# 读取产品信息
print(f'[信息] 正在读取文件:{args.products}')
try:
products = pandas.read_excel(args.products)
except Exception as e:
print(f'[!!!!] 读取文件时发生了错误:{e}')
return 1
print(f'[信息] 已读取产品数据 {len(products)}')
if args.invoices:
# 读取发票数据
print(f'[信息] 正在读取文件:{args.invoices}')
try:
with open(args.invoices, 'r', encoding=args.encoding) as file:
document = ET.parse(file)
root = document.getroot()
except Exception as e:
print(f'[!!!!] 读取文件时发生了错误:{e}')
return 2
else:
root = None
index = 1
yesterday = (datetime.today() - timedelta(days=1)).strftime("%d/%m/%Y")
quote = parse.quote(yesterday)
print(f'[信息] 正在尝试获取 {yesterday} 的 Facture 数据')
while True:
try:
response = requests.get(f'{args.vf_api_url}/invoices.xml?kind=vat&include_positions=true&per_page=25&page={index}&period=more&date_from={quote}&date_to={quote}&search_date_type=issue_date&status=paid&api_token={args.vf_token}', timeout=args.timeout)
string = response.content.decode(args.encoding)
data = ET.fromstring(string)
except Exception as e:
print(f'[警告] 向服务器请求数据时发生了错误:{e}')
return 21
time.sleep(args.interval) #1
if data.tag == 'nil-classes': break
else: index += 1
if root is None: root = data
else: root.extend(data.findall('invoice'))
if root is None:
print('[!!!!] 服务器返回了无效数据')
return 22
def lookup(value, fieldname) -> str:
rows = products[products.isin([value]).any(axis=1)]
data = rows.to_dict(orient='list')
try: return data.get(fieldname)[0]
except: return None
def text(element, fieldname) -> str:
child = element.find(fieldname)
if bool(child.text): return child.text.strip()
else: return None
clients = RelationMap(lambda x: text(x, 'client-id'))
invoices = RelationMap(lambda x: text(x, 'number'))
proformas = RelationMap(lambda x: text(x, 'from-invoice-id'))
categories = RelationMap(lambda x: text(x, 'category-id'))
for invoice in root.findall('invoice'):
number = text(invoice, 'number')
kind = invoice.find('kind').text
if kind not in ['vat']:
print(f"[警告] {number}: 类型错误 ({kind})")
continue
if invoice.find('positions') is None:
print(f"[警告] {number}: 缺少产品信息")
continue
try:
clients.setValueOf(invoice, None)
except Exception as e:
print(f"[警告] {number}: Client 数据错误 ({e})")
continue
try:
proformas.setValueOf(invoice, None)
except Exception as e:
print(f"[警告] {number}: Proforma 数据错误 ({e})")
continue
try:
categories.setValueOf(invoice, None)
except Exception as e:
print(f"[警告] {number}: Category 数据错误 ({e})")
continue
# 有效发票包含完整的客户、PI和归属数据
invoices.setValueOf(invoice, invoice)
print(f'[信息] 已读取有效发票数据 {len(invoices.map)}')
if len(invoices.map) == 0: return 0
print('[信息] 正在向服务器请求数据')
for id in clients.map.keys():
try:
response = requests.get(f'{args.vf_api_url}/clients/{id}.json?api_token={args.vf_token}', timeout=args.timeout)
data = response.json()
clients.map[id] = data
except Exception as e:
print(f'[警告] 向服务器请求数据时发生了错误:{e}')
continue
time.sleep(args.interval) #2
for id in proformas.map.keys():
try:
response = requests.get(f'{args.vf_api_url}/invoices/{id}.json?api_token={args.vf_token}', timeout=args.timeout)
data = response.json()
proformas.map[id] = data
except Exception as e:
print(f'[警告] 向服务器请求数据时发生了错误:{e}')
continue
time.sleep(args.interval) #3
for id in categories.map.keys():
try:
response = requests.get(f'{args.vf_api_url}/categories/{id}.json?api_token={args.vf_token}', timeout=args.timeout)
data = response.json()
categories.map[id] = data
except Exception as e:
print(f'[警告] 向服务器请求数据时发生了错误:{e}')
continue
# 订单导入字段
# 详情见 <https://crm.xiaoman.cn/order/importOrder>
data = FieldArray(
'订单号',
'订单日期',
'当前处理人',
'业绩归属部门',
'客户编号',
'币种',
'产品名称',
'产品编号',
'产品型号',
'原价',
'折扣率',
'单价',
'数量',
'产品描述',
)
for invoice in invoices.map.values():
for position in invoice.find('positions'):
number = text(invoice, 'number')
issue_date = text(invoice, 'issue-date')
category = categories.getValueOf(invoice)['name']
client = clients.getValueOf(invoice)['external_id']
code = text(position, 'code')
id = lookup(code, '产品编号')
product = lookup(code, '产品名称')
description = lookup(code, '产品描述')
price = float(text(position, 'price-net') or '0')
discount = float(text(position, 'discount-percent') or '0')
quantity = float(text(position, 'quantity') or '0')
data.append('订单日期', issue_date)
data.append('当前处理人', category)
data.append('业绩归属部门', args.department)
data.append('客户编号', client)
data.append('币种', args.currency)
data.append('订单号', number)
data.append('产品名称', product)
data.append('产品编号', id)
data.append('产品型号', code)
data.append('原价', '%.2f' % price)
data.append('折扣率', '%g%%' % discount)
data.append('产品描述', description)
data.append('单价', '%.2f' % (price * (1 - discount / 100)))
data.append('数量', '%g' % quantity)
data.newrow()
# 新建导入数据表
if not bool(args.output): args.output = f'{args.outdir}/ultimatron-orders-import-{date.strftime("%Y%m%d-%H%M%S-%f")}.xlsx'
print(f'[信息] 正在写入文件:{args.output}')
try:
workbook = pandas.DataFrame(data.map)
workbook.to_excel(args.output, index=False, sheet_name='Sheet1')
except Exception as e:
print(f'[!!!!] 写入文件时发生了错误:{e}')
return 3
if args.automation == 'none': return 0
print('[信息] 正在启动自动化程序')
try:
driver = webdriver.Chrome()
driver.set_page_load_timeout(args.timeout)
except Exception as e:
print(f'[!!!!] 初始化时发生了错误:{e}')
return 6
try:
print(f'[信息] 正在载入 {args.xm_web_url}')
driver.get(args.xm_web_url)
except TimeoutException:
# 停止加载
print(f'[警告] 操作超时')
driver.execute_script("window.stop();")
except Exception as e:
print(f'[警告] 载入网页时发生了错误:{e}')
return 7
def locate(selector, wait=True, parent=driver, condition=EC.visibility_of_element_located):
while True:
try:
locator = (By.CSS_SELECTOR, selector)
if not wait: return parent.find_element(*locator)
wait = WebDriverWait(parent, timeout=args.timeout)
element = wait.until(EC.presence_of_element_located(locator))
# 查看元素
driver.execute_script("arguments[0].scrollIntoView({ block: 'center', inline: 'nearest' });", element)
if condition is not None:
wait = WebDriverWait(parent, timeout=args.timeout)
element = wait.until(condition(locator))
return element
except StaleElementReferenceException:
# 如果遇到过期元素,重新尝试查找
continue
except TimeoutException:
# 超时错误
raise Exception('操作超时')
except Exception as e:
# 其他错误
raise e
if bool(args.xm_username) and bool(args.xm_password):
try:
locate("input.account").send_keys(args.xm_username)
locate("input#password").send_keys(args.xm_password)
locate("input.agree-checkbox").click()
locate("button.login-btn").click()
except Exception as e:
print(f'[!!!!] 登录网页时发生了错误:{e}')
return 81
else:
print("[警告] 未提供有效登录凭证")
key = input('[????] 请确认登录状态:已登录 (Y) / 取消操作 (N): ')
if key in ['Y', 'y']:
print('[信息] 已确认操作')
else:
print('[信息] 已取消操作')
return 0
try:
locate(".layout-sidebar ul li:nth-child(10) div div").click()
locate(".layout-sidebar .layout-second-menu li:nth-child(3) a").click()
locate(".list-header-top button.okki-dropdown-trigger").click()
locate(".okki-dropdown-content button").click()
driver.switch_to.window(driver.window_handles[1])
except Exception as e:
print(f'[!!!!] 尝试进入录入订单页面时发生了错误:{e}')
return 82
# 状态菜单映射
status = { 'draft': 1, 'final': 6 }
time.sleep(args.interval) #4
try:
# 变更状态
locate(".product-import-img-box .mm-selector-rendered").click()
locate(f".mm-outside.mm-select-dropdown ul li:nth-child({status.get(args.automation)}) span").click()
# 上传文件
locate(".big-file-upload input", wait=False).send_keys(os.path.abspath(args.output))
locate(".product-import-img-footer button", condition=EC.element_to_be_clickable).click()
# 开始导入
locate(".product-import-img-footer button.mm-button__primary", condition=EC.element_to_be_clickable).click()
except Exception as e:
print(f'[!!!!] 上传文件时发生了错误:{e}')
return 83
time.sleep(args.interval) #5
try:
# 等待订单录入
wait = WebDriverWait(driver, timeout=(args.timeout * 4))
wait.until(lambda x: x.find_element(By.CSS_SELECTOR, ".img-box-result-title").get_attribute('innerText') == '导入完成')
# 查看导入结果
locate(".mm-notification-container .mm-icon-close").click()
locate(".product-import-img-footer button.mm-button__primary").click()
locate(".mm-tbody table tbody tr:nth-child(1) td:nth-child(3) a").click()
driver.switch_to.window(driver.window_handles[2])
except Exception as e:
print(f'[!!!!] 等待订单录入时发生了错误:{e}')
return 84
# 设置每页显示记录数
time.sleep(args.interval) #6
url = driver.current_url
param = f'%22page_size%22%3A{args.per_page}%7D'
print('url before: ' + url)
if 'page_size' in url: url = url.replace('%22page_size%22%3A20%7D', param)
else: url += param
print('url after: ' + url)
driver.get(url)
modified = []
while True:
# 等待页面加载
wait = WebDriverWait(driver, timeout=args.timeout)
wait.until(lambda x: 'nprogress-busy' not in x.find_element(By.TAG_NAME, 'html').get_attribute('class'))
time.sleep(args.interval) #7
for idx in range(args.per_page):
try:
links = driver.find_elements(By.CSS_SELECTOR, ".list-frame-table .virtual-list-os-target .row-items .cell[data-cci='1'] a")
element = links[idx]
except Exception as e:
print(f'[警告] 未找到记录: {e}')
break
try:
driver.execute_script("arguments[0].click();", element)
driver.switch_to.window(driver.window_handles[3])
# 编辑订单
locate(".component-detail-frame-header .okki-space-item:nth-child(1) button").click()
warn = False
header = locate(".order-edit-header .edit-order-no span span:nth-child(1)")
number = header.text
invoice = invoices.map.get(number)
positions = invoice.find('positions').findall('*')
if number not in modified:
# 选择商机
try:
index = 1
dropdown = locate(".component-business-select .mm-selector-rendered")
dropdown.click()
while True:
element = locate(f".mm-outside.mm-select-dropdown ul li:nth-child({index}) span")
index += 1
if element.text.startswith(proformas.getValueOf(invoice)['number']):
driver.execute_script("arguments[0].scrollIntoView({ block: 'center', inline: 'nearest' });", element)
element.click()
break
except Exception as e:
print(f"[警告] {number}: 录入商机时发生错误:{e}")
warn = True
# 编辑运费
try:
wrapper = locate(".order-edit-product-wrapper .virtual-list-os-target .row-items", condition=None)
limit = len(positions)
for idx in range(limit):
driver.execute_script("arguments[0].scrollTo(0, arguments[0].scrollHeight);", wrapper)
items = wrapper.find_elements(By.CSS_SELECTOR, "div.vue-recycle-scroller__item-wrapper div.vue-recycle-scroller__item-view")
driver.execute_script("arguments[0].scrollIntoView({ block: 'start', inline: 'end' });", items[idx])
product = locate(f".product-info-group-product-name input", parent=items[idx], condition=None)
if product.get_attribute('value') == 'port':
# 定位元素
driver.execute_script("arguments[0].scrollTo(0, arguments[0].scrollHeight);", wrapper)
source = locate(".cell[data-cci='4'] input", parent=items[idx], condition=None)
target = locate(".cell[data-cci='6'] input", parent=items[idx], condition=None)
# 填入含税成本价
price = source.get_attribute('value')
target.send_keys(price)
except Exception as e:
print(f"[警告] {number}: 编辑运费时发生错误:{e}")
warn = True
# 保存订单
button = locate(".order-edit-footer button.okki-btn-primary", condition=None)
driver.execute_script("arguments[0].click();", button)
time.sleep(args.interval) #8
print(f"[信息] {number}: 修改完成")
modified.append(number)
global success
success += 1
global warning
if warn: warning += 1
# 关闭标签页
driver.close()
driver.switch_to.window(driver.window_handles[2])
except Exception as e:
print(f'[!!!!] 编辑订单时发生了错误:{e}')
return 85
try:
button = locate(".okki-pagination-next button", wait=False)
if bool(button.get_attribute('disabled')): raise Exception()
driver.execute_script("arguments[0].click()", button)
except:
print('[信息] 已经是最后一页')
break
return 0
class RelationMap:
def __init__(self, predicate):
self.map = {}
self.predicate = predicate
def getValueOf(self, object):
id = self.predicate(object)
if bool(id): return self.map.get(id)
else: raise Exception(f"Invalid ID: '{id}'")
def setValueOf(self, object, value):
id = self.predicate(object)
if bool(id): self.map[id] = value
else: raise Exception(f"Invalid ID: '{id}'")
class FieldArray[K, V]:
def __init__(self, *args: K):
self.map: dict[K, list[V]] = { name: [] for name in args }
self.index = 0
pass
def append(self, key: K, value: V):
array = self.map.get(key)
array.insert(self.index, value)
def newrow(self, padding=None):
for array in self.map.values():
if len(array) <= self.index:
array.insert(self.index, padding)
self.index += 1
try: status = main()
except KeyboardInterrupt: status = 145
print(f'[信息] 已修改 {success} 个订单,其中包含 {warning} 条警告信息')
print(f'[信息] 总耗时 {datetime.now() - date}')