initial commit

This commit is contained in:
2025-04-24 10:52:33 +02:00
commit 529e7e1549
3 changed files with 454 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
.venv/
.vscode/
.workbooks/
*.xlsx

BIN
requirements.txt Normal file

Binary file not shown.

450
销售订单自动导入.py Normal file
View File

@@ -0,0 +1,450 @@
import argparse
import requests
import pandas
import time
import os
from urllib import parse
from datetime import datetime, timedelta
from xml.etree import ElementTree as ET
from selenium import webdriver
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
parser = argparse.ArgumentParser(description="销售订单自动导入脚本")
parser.add_argument('invoices', nargs='?')
parser.add_argument('products')
parser.add_argument('--xm-username', type=str, nargs='?', default='')
parser.add_argument('--xm-password', type=str, nargs='?', default='')
parser.add_argument('--vf-token', type=str, required=True)
parser.add_argument('--xm-web-url', type=str, nargs='?', default='https://login.xiaoman.cn/login/')
parser.add_argument('--vf-api-url', type=str, nargs='?', default='https://ultimatron-france.vosfactures.fr/')
parser.add_argument('--outdir', type=str, nargs='?', default='.')
parser.add_argument('-a', '--automation', type=str, choices=['none', 'draft', 'done'], nargs='?', default='none')
parser.add_argument('-e', '--encoding', type=str, nargs='?', default='utf-8')
parser.add_argument('-o', '--output', type=str, nargs='?', default='')
parser.add_argument('-t', '--timeout', type=int, nargs='?', default=60)
parser.add_argument('-i', '--interval', type=int, nargs='?', default=3)
args = parser.parse_args()
date = datetime.now()
success = 0
warning = 0
def main():
# 读取产品信息
print(f'[信息] 正在读取文件:{args.products}')
try:
products = pandas.read_excel(args.products)
except Exception as e:
print(f'[!!!!] 读取文件时发生了错误:{e}')
return 1
print(f'[信息] 已读取产品数据 {len(products)}')
if args.invoices:
# 读取发票数据
print(f'[信息] 正在读取文件:{args.invoices}')
try:
with open(args.invoices, 'r', encoding=args.encoding) as file:
document = ET.parse(file)
root = document.getroot()
except Exception as e:
print(f'[!!!!] 读取文件时发生了错误:{e}')
return 2
else:
root = None
index = 1
yesterday = (datetime.today() - timedelta(days=1)).strftime("%d/%m/%Y")
quote = parse.quote(yesterday)
print(f'[信息] 正在尝试获取 {yesterday} 的 Facture 数据')
while True:
try:
response = requests.get(f'{args.vf_api_url}/invoices.xml?kind=vat&include_positions=true&per_page=25&page={index}&period=more&date_from={quote}&date_to={quote}&search_date_type=issue_date&status=paid&api_token={args.vf_token}', timeout=args.timeout)
string = response.content.decode(args.encoding)
data = ET.fromstring(string)
except Exception as e:
print(f'[警告] 向服务器请求数据时发生了错误:{e}')
return 21
time.sleep(args.interval) #1
if data.tag == 'nil-classes': break
else: index += 1
if root is None: root = data
else: root.extend(data.findall('invoice'))
if root is None:
print('[!!!!] 服务器返回了无效数据')
return 22
def lookup(value, fieldname) -> str:
rows = products[products.isin([value]).any(axis=1)]
data = rows.to_dict(orient='list')
try: return data.get(fieldname)[0]
except: return ''
def text(element, fieldname) -> str:
child = element.find(fieldname)
if bool(child.text): return child.text.strip()
else: return ''
clients = RelationMap(lambda x: text(x, 'client-id'))
invoices = RelationMap(lambda x: text(x, 'number'))
proformas = RelationMap(lambda x: text(x, 'from-invoice-id'))
categories = RelationMap(lambda x: text(x, 'category-id'))
for invoice in root.findall('invoice'):
number = text(invoice, 'number')
kind = invoice.find('kind').text
if kind not in ['vat']:
print(f"[警告] {number}: 类型错误 ({kind})")
continue
if invoice.find('positions') is None:
print(f"[警告] {number}: 缺少产品信息")
continue
try:
clients.setValueOf(invoice, None)
except Exception as e:
print(f"[警告] {number}: Client 数据错误 ({e})")
continue
try:
proformas.setValueOf(invoice, None)
except Exception as e:
print(f"[警告] {number}: Proforma 数据错误 ({e})")
continue
try:
categories.setValueOf(invoice, None)
except Exception as e:
print(f"[警告] {number}: Category 数据错误 ({e})")
continue
# 有效发票包含完整的客户、PI和归属数据
invoices.setValueOf(invoice, invoice)
print(f'[信息] 已读取有效发票数据 {len(invoices.map)}')
if len(invoices.map) == 0: return 0
print('[信息] 正在向服务器请求数据')
for id in clients.map.keys():
try:
response = requests.get(f'{args.vf_api_url}/clients/{id}.json?api_token={args.vf_token}', timeout=args.timeout)
data = response.json()
clients.map[id] = data
except Exception as e:
print(f'[警告] 向服务器请求数据时发生了错误:{e}')
continue
time.sleep(args.interval) #2
for id in proformas.map.keys():
try:
response = requests.get(f'{args.vf_api_url}/invoices/{id}.json?api_token={args.vf_token}', timeout=args.timeout)
data = response.json()
proformas.map[id] = data
except Exception as e:
print(f'[警告] 向服务器请求数据时发生了错误:{e}')
continue
time.sleep(args.interval) #3
for id in categories.map.keys():
try:
response = requests.get(f'{args.vf_api_url}/categories/{id}.json?api_token={args.vf_token}', timeout=args.timeout)
data = response.json()
categories.map[id] = data
except Exception as e:
print(f'[警告] 向服务器请求数据时发生了错误:{e}')
continue
# 订单导入字段
# 详情见 <https://crm.xiaoman.cn/order/importOrder>
data = FieldArray(
'订单号',
'订单日期',
'当前处理人',
'客户编号',
'币种',
'产品名称',
'产品编号',
'产品型号',
'原价',
'折扣率',
'单价',
'数量',
'产品描述',
'订单金额',
)
for invoice in invoices.map.values():
for position in invoice.find('positions'):
number = text(invoice, 'number')
issue_date = text(invoice, 'issue-date')
category = categories.getValueOf(invoice)['name']
client = clients.getValueOf(invoice)['external_id']
code = text(position, 'code')
id = lookup(code, '产品编号')
product = lookup(code, '产品名称')
description = lookup(code, '产品描述')
price = float(text(position, 'price-net'))
discount = float(text(position, 'discount-percent') or '0')
quantity = float(text(position, 'quantity'))
data.append('订单日期', issue_date)
data.append('当前处理人', category)
data.append('客户编号', client)
data.append('币种', 'USD')
data.append('订单号', number)
data.append('产品名称', product)
data.append('产品编号', id)
data.append('产品型号', code)
data.append('原价', '%.2f' % price)
data.append('折扣率', '%g%%' % discount)
data.append('产品描述', description)
data.append('单价', '%.2f' % (price * (1 - discount / 100)))
data.append('数量', '%g' % quantity)
data.newrow()
# 新建导入数据表
if not bool(args.output): args.output = f'{args.outdir}/ultimatron-orders-import-{date.strftime("%Y%m%d-%H%M%S-%f")}.xlsx'
print(f'[信息] 正在写入文件:{args.output}')
try:
workbook = pandas.DataFrame(data.map)
workbook.to_excel(args.output, index=False, sheet_name='Sheet1')
except Exception as e:
print(f'[!!!!] 写入文件时发生了错误:{e}')
return 3
if args.automation == 'none': return 0
print('[信息] 正在启动自动化程序')
if not bool(args.xm_username):
print(f"[!!!!] 错误:缺少参数 'xm-username'(小满账号邮箱)")
return 4
if not bool(args.xm_password):
print(f"[!!!!] 错误:缺少参数 'xm-password'(小满账号密码)")
return 5
try:
driver = webdriver.Chrome()
driver.set_page_load_timeout(args.timeout)
except Exception as e:
print(f'[!!!!] 初始化时发生了错误:{e}')
return 6
try:
print(f'[信息] 正在载入 {args.xm_web_url}')
driver.get(args.xm_web_url)
except TimeoutException:
# 停止加载
print(f'[警告] 操作超时')
driver.execute_script("window.stop();")
except Exception as e:
print(f'[警告] 载入网页时发生了错误:{e}')
return 7
def locate(selector, wait=True, parent=driver, condition=EC.visibility_of_element_located):
while True:
try:
locator = (By.CSS_SELECTOR, selector)
if not wait: return parent.find_element(*locator)
wait = WebDriverWait(parent, timeout=args.timeout)
element = wait.until(EC.presence_of_element_located(locator))
driver.execute_script("arguments[0].scrollIntoView({ block: 'center', inline: 'nearest' });", element)
element = wait.until(condition(locator))
return element
except StaleElementReferenceException:
# 如果遇到过期元素,重新尝试查找
continue
except TimeoutException:
# 超时错误
raise Exception('操作超时')
except Exception as e:
# 其他错误
raise e
try:
locate("input.account").send_keys(args.xm_username)
locate("input#password").send_keys(args.xm_password)
locate("input.agree-checkbox").click()
locate("button.login-btn").click()
except Exception as e:
print(f'[!!!!] 登录网页时发生了错误:{e}')
return 81
try:
locate(".layout-sidebar ul li:nth-child(10) div div").click()
locate(".layout-sidebar .layout-second-menu li:nth-child(3) a").click()
locate(".list-header-top button.okki-dropdown-trigger").click()
locate(".okki-dropdown-content button").click()
driver.switch_to.window(driver.window_handles[1])
except Exception as e:
print(f'[!!!!] 尝试进入录入订单页面时发生了错误:{e}')
return 82
# 状态菜单映射
status = { 'draft': 1, 'done': 6 }
time.sleep(args.interval) #4
try:
# 变更状态
locate(".product-import-img-box .mm-selector-rendered").click()
locate(f".mm-outside.mm-select-dropdown ul li:nth-child({status.get(args.automation)}) span").click()
# 上传文件
locate(".big-file-upload input", wait=False).send_keys(os.path.abspath(args.output))
locate(".product-import-img-footer button", condition=EC.element_to_be_clickable).click()
# 开始导入
locate(".product-import-img-footer button.mm-button__primary", condition=EC.element_to_be_clickable).click()
except Exception as e:
print(f'[!!!!] 上传文件时发生了错误:{e}')
return 83
time.sleep(args.interval) #5
try:
# 等待订单录入
wait = WebDriverWait(driver, timeout=(args.timeout * 4))
wait.until(lambda x: x.find_element(By.CSS_SELECTOR, ".img-box-result-title").get_attribute('innerText') == '导入完成')
# 查看导入结果
locate(".mm-notification-container .mm-icon-close", wait=False).click()
locate(".product-import-img-footer button.mm-button__primary").click()
locate(".mm-tbody table tbody tr:nth-child(1) td:nth-child(3) a").click()
driver.switch_to.window(driver.window_handles[2])
except Exception as e:
print(f'[!!!!] 等待订单录入时发生了错误:{e}')
return 84
time.sleep(args.interval) #6
while True:
links = driver.find_elements(By.CSS_SELECTOR, ".list-frame-table .virtual-list-os-target .order-name-wrap a.jump-link")
try:
for link in links:
link.click()
driver.switch_to.window(driver.window_handles[3])
# 编辑订单
locate(".component-detail-frame-header .okki-space-item:nth-child(1) button").click()
warn = False
number = locate(".order-edit-header .edit-order-no span span:nth-child(1)").text
invoice = invoices.map.get(number)
# 选择商机
try:
dropdown = locate(".component-business-select .mm-selector-rendered")
dropdown.click()
for item in driver.find_elements(By.CSS_SELECTOR, ".mm-outside.mm-select-dropdown ul li span"):
if item.text.startswith(proformas.getValueOf(invoice)['number']):
driver.execute_script("arguments[0].scrollIntoView({ block: 'center', inline: 'nearest' });", item)
item.click()
break
except Exception as e:
print(f"[警告] 发票 '{number}' 录入商机时发生错误:{e}")
warn = True
# 编辑运费
try:
wrapper = driver.find_element(By.CSS_SELECTOR, ".order-edit-product-wrapper .virtual-list-os-target")
driver.execute_script("arguments[0].scrollIntoView({ block: 'center', inline: 'end' });", wrapper)
elements = wrapper.find_elements(By.CSS_SELECTOR, ".row-item")
elements.reverse()
for item in elements:
if item.find_element(By.CSS_SELECTOR, ".product-info-group-product-name input").get_attribute('value') == 'port':
target = locate("input[colindex='6']", parent=item)
source = locate("input[colindex='7']", parent=item)
# 填入含税成本价
price = source.get_attribute('value')
target.send_keys(price)
break
except Exception as e:
print(f"[警告] 发票 '{number}' 编辑运费时发生错误:{e}")
warn = True
# 保存订单
locate(".order-edit-footer button.okki-btn-primary").click()
# 关闭标签页
driver.close()
driver.switch_to.window(driver.window_handles[2])
global success
success += 1
global warning
if warn: warning += 1
except Exception as e:
print(f'[!!!!] 编辑订单时发生了错误:{e}')
return 85
try:
button = locate(".okki-pagination-next button", wait=False)
driver.execute_script("arguments[0].click()", button)
time.sleep(args.interval) #7
if bool(button.get_attribute('disabled')): raise Exception()
except:
print('[信息] 已经是最后一页')
break
# 等待页面加载
wait = WebDriverWait(driver, timeout=args.timeout)
wait.until(lambda x: 'nprogress-busy' not in x.find_element(By.TAG_NAME, 'html').get_attribute('class'))
return 0
class RelationMap:
def __init__(self, predicate):
self.map = {}
self.predicate = predicate
def getValueOf(self, object):
id = self.predicate(object)
if bool(id): return self.map.get(id)
else: raise Exception(f"Invalid ID: '{id}'")
def setValueOf(self, object, value):
id = self.predicate(object)
if bool(id): self.map[id] = value
else: raise Exception(f"Invalid ID: '{id}'")
class FieldArray[K, V]:
def __init__(self, *args: K):
self.map: dict[K, list[V]] = { name: [] for name in args }
self.index = 0
pass
def append(self, key: K, value: V):
array = self.map.get(key)
array.insert(self.index, value)
def newrow(self, padding=''):
for array in self.map.values():
if len(array) <= self.index:
array.insert(self.index, padding)
self.index += 1
try: status = main()
except KeyboardInterrupt: status = 145
print(f'[信息] 已录入 {success} 个订单;收到 {warning} 条警告信息')
print(f'[信息] 总耗时 {datetime.now() - date}')