add: ability to locate multiple ports & read from excel

This commit is contained in:
2025-05-05 08:57:19 +02:00
parent 6eac742d1d
commit 32cf8599d8

View File

@@ -15,20 +15,21 @@ from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
parser = argparse.ArgumentParser(description="销售订单自动导入脚本")
parser.add_argument('invoices', nargs='?')
parser.add_argument('products')
parser.add_argument('invoices', nargs='?', default='')
parser.add_argument('--xm-username', type=str, nargs='?', default='')
parser.add_argument('--xm-password', type=str, nargs='?', default='')
parser.add_argument('--vf-token', type=str, required=True)
parser.add_argument('--vf-token', type=str, nargs='?', default='')
parser.add_argument('--xm-web-url', type=str, nargs='?', default='https://login.xiaoman.cn/login/')
parser.add_argument('--vf-api-url', type=str, nargs='?', default='https://ultimatron-france.vosfactures.fr/')
parser.add_argument('--outdir', type=str, nargs='?', default='.')
parser.add_argument('--per-page', type=int, nargs='?', default=5)
parser.add_argument('--currency', type=str, required=True)
parser.add_argument('--department', type=str, required=True)
parser.add_argument('-C', '--currency', type=str, nargs='?', default='')
parser.add_argument('-D', '--department', type=str, nargs='?', default='')
parser.add_argument('-T', '--days-delta', type=int, nargs='?', default=None)
parser.add_argument('-a', '--automation', type=str, choices=['none', 'draft', 'final'], nargs='?', default='none')
parser.add_argument('-e', '--encoding', type=str, nargs='?', default='utf-8')
parser.add_argument('-m', '--mappings', type=str, nargs='?', default='')
parser.add_argument('-d', '--outdir', type=str, nargs='?', default='.')
parser.add_argument('-o', '--output', type=str, nargs='?', default='')
parser.add_argument('-p', '--per-page', type=int, nargs='?', default=5)
parser.add_argument('-t', '--timeout', type=int, nargs='?', default=60)
parser.add_argument('-i', '--interval', type=int, nargs='?', default=5)
@@ -38,18 +39,8 @@ date = datetime.now()
success = 0
warning = 0
def main():
# 读取产品信息
print(f'[信息] 正在读取文件:{args.products}')
try:
products = pandas.read_excel(args.products)
except Exception as e:
print(f'[!!!!] 读取文件时发生了错误:{e}')
return 1
print(f'[信息] 已读取产品数据 {len(products)}')
if args.invoices:
def main(workbook=None):
if args.invoices.endswith('.xml'):
# 读取发票数据
print(f'[信息] 正在读取文件:{args.invoices}')
try:
@@ -58,17 +49,36 @@ def main():
root = document.getroot()
except Exception as e:
print(f'[!!!!] 读取文件时发生了错误:{e}')
return 2
return 1
elif args.invoices.endswith('.xlsx'):
# 读取发票数据
print(f'[信息] 正在读取文件:{args.invoices}')
try:
workbook = pandas.read_excel(args.invoices)
except Exception as e:
print(f'[!!!!] 读取文件时发生了错误:{e}')
return 1
else:
root = None
index = 1
yesterday = (datetime.today() - timedelta(days=1)).strftime("%d/%m/%Y")
quote = parse.quote(yesterday)
format = "%d/%m/%Y"
if args.days_delta is not None:
datefrom = (datetime.today() - timedelta(days=args.days_delta))
dateto = datetime.today()
print(f'[信息] 正在尝试获取自 {datefrom:%Y-%m-%d}{dateto:%Y-%m-%d} 的 Facture 数据')
else:
yesterday = (datetime.today() - timedelta(days=1))
datefrom = yesterday
dateto = yesterday
print(f'[信息] 正在尝试获取 {yesterday:%Y-%m-%d} 的 Facture 数据')
datefrom = parse.quote(datefrom.strftime(format))
dateto = parse.quote(dateto.strftime(format))
print(f'[信息] 正在尝试获取 {yesterday} 的 Facture 数据')
while True:
try:
response = requests.get(f'{args.vf_api_url}/invoices.xml?kind=vat&include_positions=true&per_page=25&page={index}&period=more&date_from={quote}&date_to={quote}&search_date_type=issue_date&status=paid&api_token={args.vf_token}', timeout=args.timeout)
response = requests.get(f'{args.vf_api_url}/invoices.xml?kind=vat&include_positions=true&per_page=25&page={index}&period=more&date_from={datefrom}&date_to={dateto}&search_date_type=issue_date&api_token={args.vf_token}', timeout=args.timeout)
string = response.content.decode(args.encoding)
data = ET.fromstring(string)
except Exception as e:
@@ -87,151 +97,164 @@ def main():
print('[!!!!] 服务器返回了无效数据')
return 22
def lookup(value, fieldname) -> str:
rows = products[products.isin([value]).any(axis=1)]
def lookup(value, fieldname, excel) -> Result:
rows = excel[excel.isin([value]).any(axis=1)]
data = rows.to_dict(orient='list')
try: return data.get(fieldname)[0]
except: return None
return Result(data.get(fieldname))
def text(element, fieldname) -> str:
child = element.find(fieldname)
if bool(child.text): return child.text.strip()
else: return None
clients = RelationMap(lambda x: text(x, 'client-id'))
invoices = RelationMap(lambda x: text(x, 'number'))
proformas = RelationMap(lambda x: text(x, 'from-invoice-id'))
categories = RelationMap(lambda x: text(x, 'category-id'))
for invoice in root.findall('invoice'):
number = text(invoice, 'number')
kind = invoice.find('kind').text
if kind not in ['vat']:
print(f"[警告] {number}: 类型错误 ({kind})")
continue
if invoice.find('positions') is None:
print(f"[警告] {number}: 缺少产品信息")
continue
if workbook is None:
# 读取产品信息
print(f'[信息] 正在读取文件:{args.mappings}')
try:
clients.setValueOf(invoice, None)
products = pandas.read_excel(args.mappings)
except Exception as e:
print(f"[警告] {number}: Client 数据错误 ({e})")
continue
print(f'[!!!!] 读取文件时发生了错误:{e}')
return 1
try:
proformas.setValueOf(invoice, None)
except Exception as e:
print(f"[警告] {number}: Proforma 数据错误 ({e})")
continue
print(f'[信息] 已读取产品数据 {len(products)}')
try:
categories.setValueOf(invoice, None)
except Exception as e:
print(f"[警告] {number}: Category 数据错误 ({e})")
continue
clients = RelationMap(lambda x: text(x, 'client-id'))
invoices = RelationMap(lambda x: text(x, 'number'))
proformas = RelationMap(lambda x: text(x, 'from-invoice-id'))
categories = RelationMap(lambda x: text(x, 'category-id'))
# 有效发票包含完整的客户、PI和归属数据
invoices.setValueOf(invoice, invoice)
print(f'[信息] 已读取有效发票数据 {len(invoices.map)}')
if len(invoices.map) == 0: return 0
print('[信息] 正在向服务器请求数据')
for id in clients.map.keys():
try:
response = requests.get(f'{args.vf_api_url}/clients/{id}.json?api_token={args.vf_token}', timeout=args.timeout)
data = response.json()
clients.map[id] = data
except Exception as e:
print(f'[警告] 向服务器请求数据时发生了错误:{e}')
continue
time.sleep(args.interval) #2
for id in proformas.map.keys():
try:
response = requests.get(f'{args.vf_api_url}/invoices/{id}.json?api_token={args.vf_token}', timeout=args.timeout)
data = response.json()
proformas.map[id] = data
except Exception as e:
print(f'[警告] 向服务器请求数据时发生了错误:{e}')
continue
time.sleep(args.interval) #3
for id in categories.map.keys():
try:
response = requests.get(f'{args.vf_api_url}/categories/{id}.json?api_token={args.vf_token}', timeout=args.timeout)
data = response.json()
categories.map[id] = data
except Exception as e:
print(f'[警告] 向服务器请求数据时发生了错误:{e}')
continue
# 订单导入字段
# 详情见 <https://crm.xiaoman.cn/order/importOrder>
data = FieldArray(
'订单号',
'订单日期',
'当前处理人',
'业绩归属部门',
'客户编号',
'币种',
'产品名称',
'产品编号',
'产品型号',
'原价',
'折扣率',
'单价',
'数量',
'产品描述',
)
for invoice in invoices.map.values():
for position in invoice.find('positions'):
for invoice in root.findall('invoice'):
number = text(invoice, 'number')
issue_date = text(invoice, 'issue-date')
category = categories.getValueOf(invoice)['name']
client = clients.getValueOf(invoice)['external_id']
kind = invoice.find('kind').text
code = text(position, 'code')
id = lookup(code, '产品编号')
product = lookup(code, '产品名称')
description = lookup(code, '产品描述')
price = float(text(position, 'price-net') or '0')
discount = float(text(position, 'discount-percent') or '0')
quantity = float(text(position, 'quantity') or '0')
if kind not in ['vat']:
print(f"[警告] {number}: 类型错误 ({kind})")
continue
data.append('订单日期', issue_date)
data.append('当前处理人', category)
data.append('业绩归属部门', args.department)
data.append('客户编号', client)
data.append('币种', args.currency)
data.append('订单号', number)
data.append('产品名称', product)
data.append('产品编号', id)
data.append('产品型号', code)
data.append('原价', '%.2f' % price)
data.append('折扣率', '%g%%' % discount)
data.append('产品描述', description)
data.append('单价', '%.2f' % (price * (1 - discount / 100)))
data.append('数量', '%g' % quantity)
data.newrow()
if invoice.find('positions') is None:
print(f"[警告] {number}: 缺少产品信息")
continue
# 新建导入数据表
if not bool(args.output): args.output = f'{args.outdir}/ultimatron-orders-import-{date.strftime("%Y%m%d-%H%M%S-%f")}.xlsx'
print(f'[信息] 正在写入文件:{args.output}')
try:
clients.setValueOf(invoice, None)
except Exception as e:
print(f"[警告] {number}: Client 数据错误 ({e})")
continue
try:
workbook = pandas.DataFrame(data.map)
workbook.to_excel(args.output, index=False, sheet_name='Sheet1')
except Exception as e:
print(f'[!!!!] 写入文件时发生了错误:{e}')
return 3
try:
proformas.setValueOf(invoice, None)
except Exception as e:
print(f"[警告] {number}: Proforma 数据错误 ({e})")
continue
try:
categories.setValueOf(invoice, None)
except Exception as e:
print(f"[警告] {number}: Category 数据错误 ({e})")
continue
# 有效发票包含完整的客户、PI和归属数据
invoices.setValueOf(invoice, invoice)
print(f'[信息] 已读取有效发票数据 {len(invoices.map)}')
if len(invoices.map) == 0: return 0
print('[信息] 正在向服务器请求数据')
for id in clients.map.keys():
try:
response = requests.get(f'{args.vf_api_url}/clients/{id}.json?api_token={args.vf_token}', timeout=args.timeout)
data = response.json()
clients.map[id] = data
except Exception as e:
print(f'[警告] 向服务器请求数据时发生了错误:{e}')
continue
time.sleep(args.interval) #2
for id in proformas.map.keys():
try:
response = requests.get(f'{args.vf_api_url}/invoices/{id}.json?api_token={args.vf_token}', timeout=args.timeout)
data = response.json()
proformas.map[id] = data
except Exception as e:
print(f'[警告] 向服务器请求数据时发生了错误:{e}')
continue
time.sleep(args.interval) #3
for id in categories.map.keys():
try:
response = requests.get(f'{args.vf_api_url}/categories/{id}.json?api_token={args.vf_token}', timeout=args.timeout)
data = response.json()
categories.map[id] = data
except Exception as e:
print(f'[警告] 向服务器请求数据时发生了错误:{e}')
continue
# 订单导入字段
# 详情见 <https://crm.xiaoman.cn/order/importOrder>
data = FieldArray(
'订单号',
'形式发票',
'订单日期',
'当前处理人',
'业绩归属部门',
'客户编号',
'币种',
'产品名称',
'产品编号',
'产品型号',
'原价',
'折扣率',
'单价',
'数量',
'产品描述',
)
for invoice in invoices.map.values():
for position in invoice.find('positions'):
number = text(invoice, 'number')
issue_date = text(invoice, 'issue-date')
category = categories.getValueOf(invoice)['name']
proforma = proformas.getValueOf(invoice)['number']
client = clients.getValueOf(invoice)['external_id']
code = text(position, 'code')
id = lookup(code, '产品编号', products).map(lambda x: x[0]).ornone()
product = lookup(code, '产品名称', products).map(lambda x: x[0]).ornone()
description = lookup(code, '产品描述', products).map(lambda x: x[0]).ornone()
price = float(text(position, 'price-net') or '0')
discount = float(text(position, 'discount-percent') or '0')
quantity = float(text(position, 'quantity') or '0')
data.append('形式发票', proforma)
data.append('订单日期', issue_date)
data.append('当前处理人', category)
data.append('业绩归属部门', args.department or None)
data.append('客户编号', client)
data.append('币种', args.currency or None)
data.append('订单号', number)
data.append('产品名称', product)
data.append('产品编号', id)
data.append('产品型号', code)
data.append('原价', '%.2f' % price)
data.append('折扣率', '%g%%' % discount)
data.append('产品描述', description)
data.append('单价', '%.2f' % (price * (1 - discount / 100)))
data.append('数量', '%g' % quantity)
data.newrow()
# 新建导入数据表
if not bool(args.output): args.output = f'{args.outdir}/ultimatron-orders-import-{date.strftime("%Y%m%d-%H%M%S-%f")}.xlsx'
print(f'[信息] 正在写入文件:{args.output}')
try:
workbook = pandas.DataFrame(data.map)
workbook.to_excel(args.output, index=False, sheet_name='Sheet1')
except Exception as e:
print(f'[!!!!] 写入文件时发生了错误:{e}')
return 3
if args.automation == 'none': return 0
print('[信息] 正在启动自动化程序')
@@ -317,8 +340,9 @@ def main():
locate(".product-import-img-box .mm-selector-rendered").click()
locate(f".mm-outside.mm-select-dropdown ul li:nth-child({status.get(args.automation)}) span").click()
path = os.path.abspath(args.output or args.invoices)
# 上传文件
locate(".big-file-upload input", wait=False).send_keys(os.path.abspath(args.output))
locate(".big-file-upload input", wait=False).send_keys(path)
locate(".product-import-img-footer button", condition=EC.element_to_be_clickable).click()
# 开始导入
@@ -331,7 +355,7 @@ def main():
try:
# 等待订单录入
wait = WebDriverWait(driver, timeout=(args.timeout * 4))
wait = WebDriverWait(driver, timeout=600)
wait.until(lambda x: x.find_element(By.CSS_SELECTOR, ".img-box-result-title").get_attribute('innerText') == '导入完成')
# 查看导入结果
locate(".mm-notification-container .mm-icon-close").click()
@@ -345,15 +369,11 @@ def main():
# 设置每页显示记录数
time.sleep(args.interval) #6
url = driver.current_url
param = f'%22page_size%22%3A{args.per_page}%7D'
param = f'%22page_size%22%3A{args.per_page}'
print('url before: ' + url)
if 'page_size' in url: url = url.replace('%22page_size%22%3A20%7D', param)
if 'page_size' in url: url = url.replace('%22page_size%22%3A20', param)
else: url += param
print('url after: ' + url)
driver.get(url)
modified = []
@@ -381,20 +401,18 @@ def main():
header = locate(".order-edit-header .edit-order-no span span:nth-child(1)")
number = header.text
invoice = invoices.map.get(number)
positions = invoice.find('positions').findall('*')
if number not in modified:
# 选择商机
try:
proforma = lookup(number, '形式发票', workbook).map(lambda x: x[0]).unwrap()
index = 1
dropdown = locate(".component-business-select .mm-selector-rendered")
dropdown.click()
while True:
element = locate(f".mm-outside.mm-select-dropdown ul li:nth-child({index}) span")
element = locate(f".mm-outside.mm-select-dropdown ul li:nth-child({index}) span", condition=None)
index += 1
if element.text.startswith(proformas.getValueOf(invoice)['number']):
if element.text.startswith(proforma):
driver.execute_script("arguments[0].scrollIntoView({ block: 'center', inline: 'nearest' });", element)
element.click()
break
@@ -405,22 +423,34 @@ def main():
# 编辑运费
try:
wrapper = locate(".order-edit-product-wrapper .virtual-list-os-target .row-items", condition=None)
positions = lookup(number, '产品型号', workbook).unwrap()
index = 0
count = positions.count('port')
limit = len(positions)
ports = []
for idx in range(limit):
while True:
driver.execute_script("arguments[0].scrollTo(0, arguments[0].scrollHeight);", wrapper)
items = wrapper.find_elements(By.CSS_SELECTOR, "div.vue-recycle-scroller__item-wrapper div.vue-recycle-scroller__item-view")
driver.execute_script("arguments[0].scrollIntoView({ block: 'start', inline: 'end' });", items[idx])
product = locate(f".product-info-group-product-name input", parent=items[idx], condition=None)
if product.get_attribute('value') == 'port':
for item in items:
driver.execute_script("arguments[0].scrollIntoView({ block: 'start', inline: 'end' });", item)
product = locate(f".product-info-group-product-name input", parent=item, condition=None)
# 定位元素
driver.execute_script("arguments[0].scrollTo(0, arguments[0].scrollHeight);", wrapper)
source = locate(".cell[data-cci='4'] input", parent=items[idx], condition=None)
target = locate(".cell[data-cci='6'] input", parent=items[idx], condition=None)
# 填入含税成本价
price = source.get_attribute('value')
target.send_keys(price)
serial = locate(".cell[data-cci='2'] div div", parent=item, condition=None).text
source = locate(".cell[data-cci='4'] input", parent=item, condition=None)
target = locate(".cell[data-cci='6'] input", parent=item, condition=None)
if product.get_attribute('value') == 'port' and serial not in ports:
# 填入含税成本价
price = source.get_attribute('value')
target.send_keys(price)
index += 1
ports.append(serial)
if index >= count: break
if index >= limit: raise Exception('无法定位指定产品')
except Exception as e:
print(f"[警告] {number}: 编辑运费时发生错误:{e}")
warn = True
@@ -487,6 +517,28 @@ class FieldArray[K, V]:
array.insert(self.index, padding)
self.index += 1
class Result:
def __init__(self, value, error=None):
self.value = value
self.error = error
def orelse(self, other):
if self.error is not None: return other
else: return self
def ornone(self):
if self.error is not None: return None
else: return self.value
def unwrap(self):
if self.error is not None: raise self.error
else: return self.value
def map(self, f):
if self.error is not None: return self
try: return Result(f(self.value))
except Exception as e: return Result(None, e)
try: status = main()
except KeyboardInterrupt: status = 145