544 lines
23 KiB
Python
544 lines
23 KiB
Python
import argparse
|
|
import requests
|
|
import pandas
|
|
import os
|
|
import re
|
|
|
|
from urllib import parse
|
|
from datetime import datetime, timedelta
|
|
from xml.etree import ElementTree as ET
|
|
|
|
from selenium import webdriver
|
|
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.webdriver.support.wait import WebDriverWait
|
|
from selenium.webdriver.remote.webelement import WebElement
|
|
|
|
parser = argparse.ArgumentParser(description="销售订单自动导入")
|
|
parser.add_argument('invoices', nargs='?', default='')
|
|
parser.add_argument('--xm-username', type=str, nargs='?', default='')
|
|
parser.add_argument('--xm-password', type=str, nargs='?', default='')
|
|
parser.add_argument('--vf-token', type=str, nargs='?', default='')
|
|
parser.add_argument('--xm-web-url', type=str, nargs='?', default='https://login.xiaoman.cn/login/')
|
|
parser.add_argument('--vf-api-url', type=str, nargs='?', default='')
|
|
parser.add_argument('-C', '--currency', type=str, nargs='?', default='USD')
|
|
parser.add_argument('-D', '--department', type=str, nargs='?', default='')
|
|
parser.add_argument('-T', '--duration', type=int, nargs='?', default=None)
|
|
parser.add_argument('-a', '--automation', type=str, choices=['none', 'draft', 'final'], nargs='?', default='none')
|
|
parser.add_argument('-e', '--encoding', type=str, nargs='?', default='utf-8')
|
|
parser.add_argument('-d', '--outdir', type=str, nargs='?', default='.')
|
|
parser.add_argument('-o', '--output', type=str, nargs='?', default='')
|
|
parser.add_argument('-t', '--timeout', type=int, nargs='?', default=60)
|
|
parser.add_argument('-i', '--interval', type=int, nargs='?', default=5)
|
|
parser.add_argument('-r', '--retry', type=int, nargs='?', default=3)
|
|
|
|
args = parser.parse_args()
|
|
date = datetime.now()
|
|
|
|
success = 0
|
|
warning = 0
|
|
|
|
def main(workbook=None):
|
|
if args.invoices.endswith('.xml'):
|
|
# 读取发票数据
|
|
print(f'[信息] 正在读取文件:{args.invoices}')
|
|
try:
|
|
with open(args.invoices, 'r', encoding=args.encoding) as file:
|
|
document = ET.parse(file)
|
|
root = document.getroot()
|
|
except Exception as e:
|
|
print(f'[!!!!] 读取文件时发生了错误:{e}')
|
|
return 1
|
|
elif args.invoices.endswith('.xlsx'):
|
|
# 读取订单数据
|
|
print(f'[信息] 正在读取文件:{args.invoices}')
|
|
try:
|
|
workbook = pandas.read_excel(args.invoices)
|
|
except Exception as e:
|
|
print(f'[!!!!] 读取文件时发生了错误:{e}')
|
|
return 1
|
|
elif bool(args.invoices):
|
|
print(f'[!!!!] 无效参数:{args.invoices}')
|
|
return 1
|
|
else:
|
|
root = None
|
|
index = 1
|
|
format = "%d/%m/%Y"
|
|
|
|
if args.duration is not None:
|
|
datefrom = (datetime.today() - timedelta(days=args.duration))
|
|
dateto = datetime.today()
|
|
print(f'[信息] 正在尝试获取自 {datefrom:%Y-%m-%d} 到 {dateto:%Y-%m-%d} 的发票数据')
|
|
else:
|
|
yesterday = (datetime.today() - timedelta(days=1))
|
|
datefrom = yesterday
|
|
dateto = yesterday
|
|
print(f'[信息] 正在尝试获取 {yesterday:%Y-%m-%d} 的发票数据')
|
|
|
|
datefrom = parse.quote(datefrom.strftime(format))
|
|
dateto = parse.quote(dateto.strftime(format))
|
|
|
|
while True:
|
|
try:
|
|
response = requests.get(f'{args.vf_api_url}/invoices.xml?kind=vat&include_positions=true&per_page=25&page={index}&period=more&date_from={datefrom}&date_to={dateto}&search_date_type=issue_date&api_token={args.vf_token}', timeout=args.timeout)
|
|
string = response.content.decode(args.encoding)
|
|
data = ET.fromstring(string)
|
|
except Exception as e:
|
|
print(f'[警告] 向服务器请求数据时发生了错误:{e}')
|
|
continue
|
|
|
|
if data.tag in ['nil-classes', 'hash']: break
|
|
else: index += 1
|
|
|
|
if root is None: root = data
|
|
else: root.extend(data.findall('invoice'))
|
|
|
|
if root is None:
|
|
print('[!!!!] 服务器返回了无效数据')
|
|
return 21
|
|
|
|
def lookup(value, fieldname, excel) -> Result:
|
|
rows = excel[excel.isin([value]).any(axis=1)]
|
|
data = rows.to_dict(orient='list')
|
|
return Result(data.get(fieldname))
|
|
|
|
def fetch(url, error=0) -> Result[requests.Response, Exception]:
|
|
while True:
|
|
try:
|
|
response = requests.get(url, timeout=args.timeout)
|
|
return Result(response)
|
|
except Exception as e:
|
|
if error < args.retry: error += 1
|
|
else: return Result(None, e)
|
|
|
|
def text(element, fieldname) -> str:
|
|
result = Result(element).map(lambda x: x.find(fieldname).text.strip())
|
|
text = result.ornone()
|
|
if bool(text): return text
|
|
else: return None
|
|
|
|
if workbook is None:
|
|
# 导出发票数据
|
|
invoices = root.findall('invoice')
|
|
limit = len(invoices)
|
|
data = FieldArray()
|
|
print(f'[信息] 已读取发票数据 {limit} 条')
|
|
|
|
for index, invoice in enumerate(invoices):
|
|
rate = index / limit
|
|
number = text(invoice, 'number')
|
|
issue_date = text(invoice, 'issue-date')
|
|
|
|
print(f'[信息] 正在载入 {number} ... {str(round(rate * 100)).rjust(3)} %')
|
|
|
|
relation = fetch(f'{args.vf_api_url}/invoices/{text(invoice, 'from-invoice-id')}.json?api_token={args.vf_token}').map(lambda res: res.json()['number']).orelse(Result(text(invoice, 'title')).map(lambda x: x.split(maxsplit=1)[0])).ornone()
|
|
category = fetch(f'{args.vf_api_url}/categories/{text(invoice, 'category-id')}.json?api_token={args.vf_token}').map(lambda res: res.json()['name']).ornone()
|
|
client = fetch(f'{args.vf_api_url}/clients/{text(invoice, 'client-id')}.json?api_token={args.vf_token}').map(lambda res: res.json()['external_id']).ornone()
|
|
|
|
for position in invoice.find('positions'):
|
|
code = text(position, 'code')
|
|
product = text(position, 'name')
|
|
description = text(position, 'description')
|
|
price = float(text(position, 'price-net') or '0')
|
|
discount = float(text(position, 'discount-percent') or '0')
|
|
quantity = float(text(position, 'quantity') or '0')
|
|
|
|
# 订单导入字段
|
|
# 详情见 <https://crm.xiaoman.cn/order/importOrder>
|
|
data.append('订单号', number)
|
|
data.append('商机号', relation)
|
|
data.append('订单日期', issue_date)
|
|
data.append('当前处理人', category)
|
|
data.append('业绩归属部门', args.department)
|
|
data.append('客户编号', client)
|
|
data.append('币种', args.currency)
|
|
data.append('产品名称', product)
|
|
data.append('产品编号', None)
|
|
data.append('产品型号', code)
|
|
data.append('原价', '%.2f' % price)
|
|
data.append('折扣率', '%g%%' % discount)
|
|
data.append('单价', '%.2f' % (price * (1 - discount / 100)))
|
|
data.append('数量', '%g' % quantity)
|
|
data.append('产品描述', description)
|
|
data.newrow()
|
|
|
|
# 新建导入数据表
|
|
if not bool(args.output): args.output = f'{args.outdir}/ultimatron-orders-import-{date.strftime("%Y%m%d-%H%M%S-%f")}.xlsx'
|
|
print(f'[信息] 正在写入文件:{args.output}')
|
|
|
|
try:
|
|
workbook = pandas.DataFrame(data.map)
|
|
workbook.to_excel(args.output, index=False, sheet_name='Sheet1')
|
|
except Exception as e:
|
|
print(f'[!!!!] 写入文件时发生了错误:{e}')
|
|
return 3
|
|
|
|
if args.automation == 'none': return 0
|
|
print('[信息] 正在启动自动化程序')
|
|
|
|
try:
|
|
opts = webdriver.ChromeOptions()
|
|
opts.add_experimental_option("excludeSwitches", ["enable-logging"])
|
|
|
|
driver = webdriver.Chrome(opts)
|
|
driver.set_page_load_timeout(args.timeout)
|
|
except Exception as e:
|
|
print(f'[!!!!] 初始化时发生了错误:{e}')
|
|
return 6
|
|
|
|
try:
|
|
driver.get(args.xm_web_url)
|
|
except TimeoutException:
|
|
# 停止加载
|
|
print(f'[警告] 操作超时')
|
|
driver.execute_script("window.stop();")
|
|
except Exception as e:
|
|
print(f'[警告] 载入网页时发生了错误:{e}')
|
|
return 7
|
|
|
|
def locate(selector, wait=True, parent=driver, condition=EC.visibility_of_element_located) -> WebElement:
|
|
while True:
|
|
try:
|
|
locator = (By.CSS_SELECTOR, selector)
|
|
if not wait: return parent.find_element(*locator)
|
|
|
|
wait = WebDriverWait(parent, timeout=args.timeout)
|
|
element = wait.until(EC.presence_of_element_located(locator))
|
|
# 查看元素
|
|
driver.execute_script("arguments[0].scrollIntoView({ block: 'center', inline: 'nearest' });", element)
|
|
|
|
if condition is not None:
|
|
wait = WebDriverWait(parent, timeout=args.timeout)
|
|
element = wait.until(condition(locator))
|
|
return element
|
|
except StaleElementReferenceException:
|
|
# 如果遇到过期元素,重新尝试查找
|
|
continue
|
|
except TimeoutException:
|
|
# 超时错误
|
|
raise Exception('操作超时')
|
|
except Exception as e:
|
|
# 其他错误
|
|
raise e
|
|
|
|
def click(selector, wait=True, parent=driver, condition=EC.element_to_be_clickable):
|
|
element = locate(selector, wait, parent, condition) if isinstance(selector, str) else selector
|
|
counter = lambda: int(element.get_attribute('taximeter') or 0)
|
|
error = False
|
|
|
|
value = counter()
|
|
driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].setAttribute('taximeter', arguments[1] + 1));", element, value)
|
|
|
|
for attempt in range(args.retry):
|
|
try:
|
|
if not error: element.click()
|
|
else: driver.execute_script("arguments[0].click();", element)
|
|
except StaleElementReferenceException:
|
|
break
|
|
except:
|
|
error = True
|
|
continue
|
|
# 检测点击事件
|
|
try:
|
|
WebDriverWait(driver, args.interval).until(lambda _: counter() > value)
|
|
break
|
|
except TimeoutException: continue
|
|
except: break
|
|
|
|
def ready(driver):
|
|
try:
|
|
condition = lambda x: 'nprogress-busy' in x.find_element(By.TAG_NAME, 'html').get_attribute('class')
|
|
wait = WebDriverWait(driver, timeout=args.interval)
|
|
wait.until(condition)
|
|
except (TimeoutException, StaleElementReferenceException):
|
|
pass
|
|
wait = WebDriverWait(driver, timeout=args.timeout)
|
|
wait.until_not(condition)
|
|
return True
|
|
|
|
if bool(args.xm_username) and bool(args.xm_password):
|
|
try:
|
|
locate("input.account").send_keys(args.xm_username)
|
|
locate("input#password").send_keys(args.xm_password)
|
|
click("input.agree-checkbox")
|
|
click("button.login-btn")
|
|
except Exception as e:
|
|
print(f'[!!!!] 登录网页时发生了错误:{e}')
|
|
return 81
|
|
|
|
print("[信息] 正在检测登录状态...")
|
|
while True:
|
|
try:
|
|
driver.find_element(By.ID, 'container')
|
|
print("[信息] 已登录")
|
|
break
|
|
except:
|
|
ready(driver) #1
|
|
|
|
try:
|
|
click(".layout-sidebar ul li.list-none.cpq div div")
|
|
ready(driver)
|
|
click(".layout-sidebar .layout-second-menu li.order a")
|
|
click(".list-header-top button.okki-dropdown-trigger")
|
|
click(".okki-dropdown-content button")
|
|
driver.switch_to.window(driver.window_handles[1])
|
|
except Exception as e:
|
|
print(f'[!!!!] 尝试进入录入订单页面时发生了错误:{e}')
|
|
return 82
|
|
|
|
# 状态菜单映射
|
|
status = { 'draft': 1, 'final': 6 }
|
|
|
|
try:
|
|
# 变更状态
|
|
click(".product-import-img-box .mm-selector-rendered")
|
|
click(f".mm-outside.mm-select-dropdown ul li:nth-child({status.get(args.automation)}) span")
|
|
|
|
path = os.path.abspath(args.output or args.invoices)
|
|
# 上传文件
|
|
locate(".big-file-upload input", wait=False).send_keys(path)
|
|
click(".product-import-img-footer button")
|
|
# 开始导入
|
|
click(".product-import-img-footer button.mm-button__primary")
|
|
except Exception as e:
|
|
print(f'[!!!!] 上传文件时发生了错误:{e}')
|
|
return 83
|
|
|
|
# 等待订单录入
|
|
while True:
|
|
try:
|
|
innerText = driver.find_element(By.CSS_SELECTOR, ".img-box-result-title").get_attribute('innerText')
|
|
if innerText == '导入完成': break
|
|
except:
|
|
ready(driver) #1
|
|
|
|
try:
|
|
# 查看导入结果
|
|
click(".mm-notification-container .mm-icon-close")
|
|
click(".product-import-img-footer button.mm-button__primary")
|
|
|
|
cell = locate(".mm-tbody table tbody tr:nth-child(1) td:nth-child(3)", condition=None)
|
|
indicator = locate("div > div > span", parent=cell, condition=None)
|
|
error = int(indicator.text[3:])
|
|
|
|
if error != 0:
|
|
print(f'[警告] {error} 个订单导入失败')
|
|
print(f'[信息] 提示:可下载失败记录')
|
|
except Exception as e:
|
|
print(f'[!!!!] 等待订单录入时发生了错误:{e}')
|
|
return 84
|
|
|
|
try:
|
|
click("a", wait=False, parent=cell)
|
|
driver.switch_to.window(driver.window_handles[2])
|
|
except:
|
|
print('[!!!!] 查看导入结果时发生了错误')
|
|
return 85
|
|
|
|
ready(driver) #2
|
|
modified = []
|
|
|
|
# 设置分页选项 10 条/页
|
|
try:
|
|
click(".okki-pagination-options-size-changer")
|
|
click(".okki-select-dropdown .rc-virtual-list-holder-inner div:nth-child(1)")
|
|
ready(driver)
|
|
except:
|
|
print(f'[警告] 分页选项设置失败')
|
|
pass
|
|
|
|
while ready(driver):
|
|
for link in driver.find_elements(By.CSS_SELECTOR, ".list-frame-table .vue-recycle-scroller__item-wrapper .cell[data-cci='1'] a"):
|
|
# 编辑订单
|
|
try:
|
|
click(link)
|
|
driver.switch_to.window(driver.window_handles[3])
|
|
click(".component-detail-frame-header .okki-space-item:nth-child(1) button")
|
|
|
|
warn = False
|
|
associative = False
|
|
header = locate(".order-edit-header .edit-order-no span span:nth-child(1)")
|
|
number = header.text
|
|
|
|
if number not in modified:
|
|
# 选择商机
|
|
for attempt in range(args.retry):
|
|
try:
|
|
relation = lookup(number, '商机号', workbook).map(lambda x: x[0]).unwrap()
|
|
match = re.match(r'O\d+', str(relation))
|
|
|
|
if match is not None:
|
|
try:
|
|
href = locate(".layout-sidebar ul li.list-none.opportunity a").get_attribute('href')
|
|
driver.switch_to.new_window('tab')
|
|
driver.get(href)
|
|
|
|
# 检索商机
|
|
locate(".new-header input").send_keys(relation)
|
|
click(".new-header .search-btn span")
|
|
ready(driver)
|
|
|
|
name = locate("#business-board-drag-wrapper .business-card-wrapper > div .name-wrapper a", wait=False)
|
|
relation = name.text
|
|
finally:
|
|
driver.close()
|
|
driver.switch_to.window(driver.window_handles[3])
|
|
|
|
dropdown = locate(".component-business-select .mm-selector-rendered")
|
|
click(dropdown)
|
|
|
|
wait = WebDriverWait(driver, timeout=args.timeout)
|
|
menu = wait.until(lambda x: x.find_element(By.CSS_SELECTOR, ".mm-outside.mm-select-dropdown"))
|
|
|
|
for item in menu.find_elements(By.CSS_SELECTOR, "ul li span"):
|
|
if item.text.startswith(relation):
|
|
click(item)
|
|
associative = True
|
|
raise KeyboardInterrupt()
|
|
# 关闭菜单栏
|
|
click(dropdown)
|
|
except KeyboardInterrupt:
|
|
break
|
|
except Exception as e:
|
|
print(f"[警告] {number}: 关联商机时发生错误:{e}")
|
|
warn = True
|
|
break
|
|
|
|
if not associative:
|
|
print(f'[警告] {number}: 无法找到对应商机 "{relation}"')
|
|
warn = True
|
|
|
|
# 设定分页选项 10 条/页
|
|
try:
|
|
click(".okki-pagination-options-size-changer")
|
|
click(".okki-select-dropdown .rc-virtual-list-holder-inner div:nth-child(1)")
|
|
except:
|
|
print(f'[警告] {number}: 分页选项设置失败')
|
|
pass
|
|
|
|
# 编辑产品
|
|
try:
|
|
wrapper = locate(".order-edit-product-wrapper .row-items", condition=None)
|
|
positions = lookup(number, '产品型号', workbook).unwrap()
|
|
ids = []
|
|
|
|
while True:
|
|
driver.execute_script("arguments[0].scrollTo(0, 0);", wrapper)
|
|
index = 0
|
|
|
|
while index < 10:
|
|
if len(ids) >= len(positions): raise KeyboardInterrupt()
|
|
if index >= 5: driver.execute_script("arguments[0].scrollTo(0, arguments[0].scrollHeight);", wrapper)
|
|
|
|
for item in wrapper.find_elements(By.CSS_SELECTOR, ".row-item"):
|
|
# 订单序号
|
|
try: serial = item.find_element(By.CSS_SELECTOR, ".cell[data-cci='2'] .cell-inner div").text
|
|
except: continue
|
|
if not bool(serial): continue
|
|
|
|
# 转到元素
|
|
driver.execute_script("arguments[0].scrollIntoView({ block: 'center' });", item)
|
|
driver.execute_script("arguments[0].scrollIntoView({ block: 'start' });", wrapper)
|
|
|
|
if serial not in ids:
|
|
click(".product-info-group-product-info a.jump-link", parent=item)
|
|
price = locate(".okki-drawer-body div[title='含税成本价'] + div div.mm-formily-preview-text", condition=None).text
|
|
click(".okki-drawer-body .space-header-after button")
|
|
|
|
wait = WebDriverWait(driver, timeout=args.timeout)
|
|
wait.until_not(lambda x: x.find_element(By.CSS_SELECTOR, ".okki-drawer-body").is_displayed())
|
|
|
|
original = locate(".cell[data-cci='4'] input", parent=item, condition=None)
|
|
if price == '--': price = original.get_attribute('value')
|
|
|
|
# 填入含税成本价
|
|
target = locate(".cell[data-cci='6'] input", parent=item, condition=None)
|
|
target.send_keys(price)
|
|
ids.append(serial)
|
|
index += 1
|
|
|
|
# 下一页
|
|
click(".summary-pagination-wrapper li.okki-pagination-next button", condition=None)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
except Exception as e:
|
|
print(f"[警告] {number}: 编辑产品时发生错误:{e}")
|
|
warn = True
|
|
|
|
# 保存订单
|
|
ready(driver)
|
|
click(".order-edit-footer button.okki-btn-primary", condition=None)
|
|
ready(driver)
|
|
|
|
print(f"[信息] {number}: 修改完成")
|
|
modified.append(number)
|
|
|
|
global success
|
|
success += 1
|
|
|
|
global warning
|
|
if warn: warning += 1
|
|
|
|
# 关闭标签页
|
|
driver.close()
|
|
driver.switch_to.window(driver.window_handles[2])
|
|
except Exception as e:
|
|
print(f'[!!!!] 编辑订单时发生了错误:{e}')
|
|
return 85
|
|
|
|
try:
|
|
button = locate(".okki-pagination-next button", wait=False)
|
|
if bool(button.get_attribute('disabled')): raise KeyboardInterrupt()
|
|
click(button)
|
|
except:
|
|
print('[信息] 已经是最后一页')
|
|
break
|
|
|
|
return 0
|
|
|
|
class FieldArray[K, V]:
|
|
def __init__(self):
|
|
self.map: dict[K, list[V]] = {}
|
|
self.index = 0
|
|
pass
|
|
|
|
def append(self, key: K, value: V):
|
|
array = self.map.setdefault(key, [])
|
|
array.insert(self.index, value)
|
|
|
|
def newrow(self, padding=None):
|
|
for array in self.map.values():
|
|
limit = len(array) - 1
|
|
delta = self.index - limit
|
|
array.extend([ padding for _ in range(delta) ])
|
|
self.index += 1
|
|
|
|
class Result[T, E]:
|
|
def __init__(self, value: T, error: E | None = None):
|
|
self.value = value
|
|
self.error = error
|
|
|
|
def orelse(self, other):
|
|
if self.error is not None: return other
|
|
else: return self
|
|
|
|
def ornone(self):
|
|
if self.error is not None: return None
|
|
else: return self.value
|
|
|
|
def unwrap(self):
|
|
if self.error is not None: raise self.error
|
|
else: return self.value
|
|
|
|
def map(self, f):
|
|
if self.error is not None: return self
|
|
try: return Result(f(self.value))
|
|
except Exception as e: return Result(None, e)
|
|
|
|
try: status = main()
|
|
except KeyboardInterrupt: status = 145
|
|
|
|
print(f'[信息] 已修改 {success} 个订单,其中包含 {warning} 条警告信息')
|
|
print(f'[信息] 总耗时 {datetime.now() - date}')
|
|
exit(status) |