update: relation lookup

This commit is contained in:
2025-06-26 18:10:58 +08:00
parent e56a9ae4b0
commit db217ee457

View File

@@ -1,8 +1,8 @@
import argparse import argparse
import requests import requests
import pandas import pandas
import time
import os import os
import re
from urllib import parse from urllib import parse
from datetime import datetime, timedelta from datetime import datetime, timedelta
@@ -21,17 +21,16 @@ parser.add_argument('--xm-password', type=str, nargs='?', default='')
parser.add_argument('--vf-token', type=str, nargs='?', default='') parser.add_argument('--vf-token', type=str, nargs='?', default='')
parser.add_argument('--xm-web-url', type=str, nargs='?', default='https://login.xiaoman.cn/login/') parser.add_argument('--xm-web-url', type=str, nargs='?', default='https://login.xiaoman.cn/login/')
parser.add_argument('--vf-api-url', type=str, nargs='?', default='https://ultimatron-france.vosfactures.fr/') parser.add_argument('--vf-api-url', type=str, nargs='?', default='https://ultimatron-france.vosfactures.fr/')
parser.add_argument('-C', '--currency', type=str, nargs='?', default='') parser.add_argument('-C', '--currency', type=str, nargs='?', default='USD')
parser.add_argument('-D', '--department', type=str, nargs='?', default='') parser.add_argument('-D', '--department', type=str, nargs='?', default='ULT事业部')
parser.add_argument('-T', '--days-delta', type=int, nargs='?', default=None) parser.add_argument('-T', '--days-delta', type=int, nargs='?', default=None)
parser.add_argument('-a', '--automation', type=str, choices=['none', 'draft', 'final'], nargs='?', default='none') parser.add_argument('-a', '--automation', type=str, choices=['none', 'draft', 'final'], nargs='?', default='none')
parser.add_argument('-e', '--encoding', type=str, nargs='?', default='utf-8') parser.add_argument('-e', '--encoding', type=str, nargs='?', default='utf-8')
parser.add_argument('-m', '--mappings', type=str, nargs='?', default='')
parser.add_argument('-d', '--outdir', type=str, nargs='?', default='.') parser.add_argument('-d', '--outdir', type=str, nargs='?', default='.')
parser.add_argument('-o', '--output', type=str, nargs='?', default='') parser.add_argument('-o', '--output', type=str, nargs='?', default='')
parser.add_argument('-p', '--per-page', type=int, nargs='?', default=5)
parser.add_argument('-t', '--timeout', type=int, nargs='?', default=60) parser.add_argument('-t', '--timeout', type=int, nargs='?', default=60)
parser.add_argument('-i', '--interval', type=int, nargs='?', default=5) parser.add_argument('-i', '--interval', type=int, nargs='?', default=5)
parser.add_argument('-r', '--retry', type=int, nargs='?', default=3)
args = parser.parse_args() args = parser.parse_args()
date = datetime.now() date = datetime.now()
@@ -51,18 +50,21 @@ def main(workbook=None):
print(f'[!!!!] 读取文件时发生了错误:{e}') print(f'[!!!!] 读取文件时发生了错误:{e}')
return 1 return 1
elif args.invoices.endswith('.xlsx'): elif args.invoices.endswith('.xlsx'):
# 读取发票数据 # 读取订单数据
print(f'[信息] 正在读取文件:{args.invoices}') print(f'[信息] 正在读取文件:{args.invoices}')
try: try:
workbook = pandas.read_excel(args.invoices) workbook = pandas.read_excel(args.invoices)
except Exception as e: except Exception as e:
print(f'[!!!!] 读取文件时发生了错误:{e}') print(f'[!!!!] 读取文件时发生了错误:{e}')
return 1 return 1
elif bool(args.invoices):
print(f'[!!!!] 无效参数:{args.invoices}')
return 1
else: else:
root = None root = None
index = 1 index = 1
format = "%d/%m/%Y" format = "%d/%m/%Y"
if args.days_delta is not None: if args.days_delta is not None:
datefrom = (datetime.today() - timedelta(days=args.days_delta)) datefrom = (datetime.today() - timedelta(days=args.days_delta))
dateto = datetime.today() dateto = datetime.today()
@@ -83,9 +85,7 @@ def main(workbook=None):
data = ET.fromstring(string) data = ET.fromstring(string)
except Exception as e: except Exception as e:
print(f'[警告] 向服务器请求数据时发生了错误:{e}') print(f'[警告] 向服务器请求数据时发生了错误:{e}')
return 21 continue
time.sleep(args.interval) #1
if data.tag in ['nil-classes', 'hash']: break if data.tag in ['nil-classes', 'hash']: break
else: index += 1 else: index += 1
@@ -95,108 +95,40 @@ def main(workbook=None):
if root is None: if root is None:
print('[!!!!] 服务器返回了无效数据') print('[!!!!] 服务器返回了无效数据')
return 22 return 21
def lookup(value, fieldname, excel) -> Result: def lookup(value, fieldname, excel) -> Result:
rows = excel[excel.isin([value]).any(axis=1)] rows = excel[excel.isin([value]).any(axis=1)]
data = rows.to_dict(orient='list') data = rows.to_dict(orient='list')
return Result(data.get(fieldname)) return Result(data.get(fieldname))
def fetch(url, error=0) -> Result[requests.Response, Exception]:
while True:
try:
response = requests.get(url, timeout=args.timeout)
return Result(response)
except Exception as e:
if error < args.retry: error += 1
else: return Result(None, e)
def text(element, fieldname) -> str: def text(element, fieldname) -> str:
child = element.find(fieldname) result = Result(element).map(lambda x: x.find(fieldname).text.strip())
if bool(child.text): return child.text.strip() text = result.ornone()
if bool(text): return text
else: return None else: return None
if workbook is None: if workbook is None:
# 读取产品信息 # 导出发票数据
print(f'[信息] 正在读取文件:{args.mappings}') invoices = root.findall('invoice')
try: limit = len(invoices)
products = pandas.read_excel(args.mappings)
except Exception as e:
print(f'[!!!!] 读取文件时发生了错误:{e}')
return 1
print(f'[信息] 已读取产品数据 {len(products)}') print(f'[信息] 已读取发票数据 {limit}')
clients = RelationMap(lambda x: text(x, 'client-id'))
invoices = RelationMap(lambda x: text(x, 'number'))
proformas = RelationMap(lambda x: text(x, 'from-invoice-id'))
categories = RelationMap(lambda x: text(x, 'category-id'))
for invoice in root.findall('invoice'):
number = text(invoice, 'number')
kind = invoice.find('kind').text
if kind not in ['vat']:
print(f"[警告] {number}: 类型错误 ({kind})")
continue
if invoice.find('positions') is None:
print(f"[警告] {number}: 缺少产品信息")
continue
try:
clients.setValueOf(invoice, None)
except Exception as e:
print(f"[警告] {number}: Client 数据错误 ({e})")
continue
try:
proformas.setValueOf(invoice, None)
except Exception as e:
print(f"[警告] {number}: Proforma 数据错误 ({e})")
continue
try:
categories.setValueOf(invoice, None)
except Exception as e:
print(f"[警告] {number}: Category 数据错误 ({e})")
continue
# 有效发票包含完整的客户、PI和归属数据
invoices.setValueOf(invoice, invoice)
print(f'[信息] 已读取有效发票数据 {len(invoices.map)}')
if len(invoices.map) == 0: return 0
print('[信息] 正在向服务器请求数据')
for id in clients.map.keys():
try:
response = requests.get(f'{args.vf_api_url}/clients/{id}.json?api_token={args.vf_token}', timeout=args.timeout)
data = response.json()
clients.map[id] = data
except Exception as e:
print(f'[警告] 向服务器请求数据时发生了错误:{e}')
continue
time.sleep(args.interval) #2
for id in proformas.map.keys():
try:
response = requests.get(f'{args.vf_api_url}/invoices/{id}.json?api_token={args.vf_token}', timeout=args.timeout)
data = response.json()
proformas.map[id] = data
except Exception as e:
print(f'[警告] 向服务器请求数据时发生了错误:{e}')
continue
time.sleep(args.interval) #3
for id in categories.map.keys():
try:
response = requests.get(f'{args.vf_api_url}/categories/{id}.json?api_token={args.vf_token}', timeout=args.timeout)
data = response.json()
categories.map[id] = data
except Exception as e:
print(f'[警告] 向服务器请求数据时发生了错误:{e}')
continue
# 订单导入字段 # 订单导入字段
# 详情见 <https://crm.xiaoman.cn/order/importOrder> # 详情见 <https://crm.xiaoman.cn/order/importOrder>
data = FieldArray( data = FieldArray(
'订单号', '订单号',
'形式发票', '商机号',
'订单日期', '订单日期',
'当前处理人', '当前处理人',
'业绩归属部门', '业绩归属部门',
@@ -212,31 +144,34 @@ def main(workbook=None):
'产品描述', '产品描述',
) )
for invoice in invoices.map.values(): for index, invoice in enumerate(invoices):
for position in invoice.find('positions'): rate = index / limit
number = text(invoice, 'number') number = text(invoice, 'number')
issue_date = text(invoice, 'issue-date') issue_date = text(invoice, 'issue-date')
category = categories.getValueOf(invoice)['name']
proforma = proformas.getValueOf(invoice)['number']
client = clients.getValueOf(invoice)['external_id']
print(f'[信息] 正在载入 {number} ... {str(round(rate * 100)).rjust(3)} %')
relation = fetch(f'{args.vf_api_url}/invoices/{text(invoice, 'from-invoice-id')}.json?api_token={args.vf_token}').map(lambda res: res.json()['number']).orelse(Result(text(invoice, 'title')).map(lambda x: x.split(maxsplit=1)[0])).ornone()
category = fetch(f'{args.vf_api_url}/categories/{text(invoice, 'category-id')}.json?api_token={args.vf_token}').map(lambda res: res.json()['name']).ornone()
client = fetch(f'{args.vf_api_url}/clients/{text(invoice, 'client-id')}.json?api_token={args.vf_token}').map(lambda res: res.json()['external_id']).ornone()
for position in invoice.find('positions'):
code = text(position, 'code') code = text(position, 'code')
id = lookup(code, '产品编号', products).map(lambda x: x[0]).ornone() product = text(position, 'name')
product = lookup(code, '产品名称', products).map(lambda x: x[0]).ornone() description = text(position, 'description')
description = lookup(code, '产品描述', products).map(lambda x: x[0]).ornone()
price = float(text(position, 'price-net') or '0') price = float(text(position, 'price-net') or '0')
discount = float(text(position, 'discount-percent') or '0') discount = float(text(position, 'discount-percent') or '0')
quantity = float(text(position, 'quantity') or '0') quantity = float(text(position, 'quantity') or '0')
data.append('形式发票', proforma) data.append('商机号', relation)
data.append('订单日期', issue_date) data.append('订单日期', issue_date)
data.append('当前处理人', category) data.append('当前处理人', category)
data.append('业绩归属部门', args.department or None) data.append('业绩归属部门', args.department)
data.append('客户编号', client) data.append('客户编号', client)
data.append('币种', args.currency or None) data.append('币种', args.currency)
data.append('订单号', number) data.append('订单号', number)
data.append('产品名称', product) data.append('产品名称', product)
data.append('产品编号', id) data.append('产品编号', None)
data.append('产品型号', code) data.append('产品型号', code)
data.append('原价', '%.2f' % price) data.append('原价', '%.2f' % price)
data.append('折扣率', '%g%%' % discount) data.append('折扣率', '%g%%' % discount)
@@ -270,7 +205,6 @@ def main(workbook=None):
return 6 return 6
try: try:
print(f'[信息] 正在载入 {args.xm_web_url}')
driver.get(args.xm_web_url) driver.get(args.xm_web_url)
except TimeoutException: except TimeoutException:
# 停止加载 # 停止加载
@@ -294,7 +228,6 @@ def main(workbook=None):
if condition is not None: if condition is not None:
wait = WebDriverWait(parent, timeout=args.timeout) wait = WebDriverWait(parent, timeout=args.timeout)
element = wait.until(condition(locator)) element = wait.until(condition(locator))
return element return element
except StaleElementReferenceException: except StaleElementReferenceException:
# 如果遇到过期元素,重新尝试查找 # 如果遇到过期元素,重新尝试查找
@@ -306,12 +239,27 @@ def main(workbook=None):
# 其他错误 # 其他错误
raise e raise e
def click(selector, parent=driver, condition=EC.element_to_be_clickable):
element = locate(selector, True, parent, condition)
try: element.click()
except: driver.execute_script("arguments[0].click();", element)
def ready(driver):
try:
wait = WebDriverWait(driver, timeout=args.interval)
wait.until(lambda x: 'nprogress-busy' in x.find_element(By.TAG_NAME, 'html').get_attribute('class'))
except TimeoutException:
pass
wait = WebDriverWait(driver, timeout=args.timeout)
wait.until(lambda x: 'nprogress-busy' not in x.find_element(By.TAG_NAME, 'html').get_attribute('class'))
return True
if bool(args.xm_username) and bool(args.xm_password): if bool(args.xm_username) and bool(args.xm_password):
try: try:
locate("input.account").send_keys(args.xm_username) locate("input.account").send_keys(args.xm_username)
locate("input#password").send_keys(args.xm_password) locate("input#password").send_keys(args.xm_password)
locate("input.agree-checkbox").click() click("input.agree-checkbox")
locate("button.login-btn").click() click("button.login-btn")
except Exception as e: except Exception as e:
print(f'[!!!!] 登录网页时发生了错误:{e}') print(f'[!!!!] 登录网页时发生了错误:{e}')
return 81 return 81
@@ -323,13 +271,14 @@ def main(workbook=None):
print("[信息] 已登录") print("[信息] 已登录")
break break
except: except:
time.sleep(args.interval) #4 ready(driver) #1
try: try:
locate(".layout-sidebar ul li.list-none.cpq div div").click() click(".layout-sidebar ul li.list-none.cpq div div")
locate(".layout-sidebar .layout-second-menu li.order a").click() ready(driver)
locate(".list-header-top button.okki-dropdown-trigger").click() click(".layout-sidebar .layout-second-menu li.order a")
locate(".okki-dropdown-content button").click() click(".list-header-top button.okki-dropdown-trigger")
click(".okki-dropdown-content button")
driver.switch_to.window(driver.window_handles[1]) driver.switch_to.window(driver.window_handles[1])
except Exception as e: except Exception as e:
print(f'[!!!!] 尝试进入录入订单页面时发生了错误:{e}') print(f'[!!!!] 尝试进入录入订单页面时发生了错误:{e}')
@@ -337,43 +286,46 @@ def main(workbook=None):
# 状态菜单映射 # 状态菜单映射
status = { 'draft': 1, 'final': 6 } status = { 'draft': 1, 'final': 6 }
time.sleep(args.interval) #5
try: try:
# 变更状态 # 变更状态
locate(".product-import-img-box .mm-selector-rendered").click() click(".product-import-img-box .mm-selector-rendered")
locate(f".mm-outside.mm-select-dropdown ul li:nth-child({status.get(args.automation)}) span").click() click(f".mm-outside.mm-select-dropdown ul li:nth-child({status.get(args.automation)}) span")
path = os.path.abspath(args.output or args.invoices) path = os.path.abspath(args.output or args.invoices)
# 上传文件 # 上传文件
locate(".big-file-upload input", wait=False).send_keys(path) locate(".big-file-upload input", wait=False).send_keys(path)
locate(".product-import-img-footer button", condition=EC.element_to_be_clickable).click() click(".product-import-img-footer button")
# 开始导入 # 开始导入
locate(".product-import-img-footer button.mm-button__primary", condition=EC.element_to_be_clickable).click() click(".product-import-img-footer button.mm-button__primary")
except Exception as e: except Exception as e:
print(f'[!!!!] 上传文件时发生了错误:{e}') print(f'[!!!!] 上传文件时发生了错误:{e}')
return 83 return 83
time.sleep(args.interval) #6 # 等待订单录入
while True:
try:
innerText = driver.find_element(By.CSS_SELECTOR, ".img-box-result-title").get_attribute('innerText')
if innerText == '导入完成': break
except:
ready(driver) #1
try: try:
# 等待订单录入
wait = WebDriverWait(driver, timeout=600)
wait.until(lambda x: x.find_element(By.CSS_SELECTOR, ".img-box-result-title").get_attribute('innerText') == '导入完成')
# 查看导入结果 # 查看导入结果
locate(".mm-notification-container .mm-icon-close").click() click(".mm-notification-container .mm-icon-close")
locate(".product-import-img-footer button.mm-button__primary").click() click(".product-import-img-footer button.mm-button__primary")
locate(".mm-tbody table tbody tr:nth-child(1) td:nth-child(3) a").click() click(".mm-tbody table tbody tr:nth-child(1) td:nth-child(3) a")
driver.switch_to.window(driver.window_handles[2]) driver.switch_to.window(driver.window_handles[2])
except Exception as e: except Exception as e:
print(f'[!!!!] 等待订单录入时发生了错误:{e}') print(f'[!!!!] 等待订单录入时发生了错误:{e}')
return 84 return 84
# 设置每页显示记录数 # 设置每页显示记录数
time.sleep(args.interval) #7 ready(driver) #2
url = driver.current_url url = driver.current_url
param = f'%22page_size%22%3A{args.per_page}' per_page = 5
param = f'%22page_size%22%3A{per_page}'
if 'page_size' in url: url = url.replace('%22page_size%22%3A20', param) if 'page_size' in url: url = url.replace('%22page_size%22%3A20', param)
else: url += param else: url += param
@@ -381,25 +333,19 @@ def main(workbook=None):
driver.get(url) driver.get(url)
modified = [] modified = []
while True: while ready(driver):
# 等待页面加载 for idx in range(per_page):
wait = WebDriverWait(driver, timeout=args.timeout)
wait.until(lambda x: 'nprogress-busy' not in x.find_element(By.TAG_NAME, 'html').get_attribute('class'))
time.sleep(args.interval) #8
for idx in range(args.per_page):
try: try:
links = driver.find_elements(By.CSS_SELECTOR, ".list-frame-table .row-items .cell[data-cci='1'] a") links = driver.find_elements(By.CSS_SELECTOR, ".list-frame-table .vue-recycle-scroller__item-wrapper .cell[data-cci='1'] a")
element = links[idx] link = links[idx]
except Exception as e: except:
print(f'[警告] 未找到记录: {e}')
break break
try: try:
driver.execute_script("arguments[0].click();", element) driver.execute_script("arguments[0].click();", link)
driver.switch_to.window(driver.window_handles[3]) driver.switch_to.window(driver.window_handles[3])
# 编辑订单 # 编辑订单
locate(".component-detail-frame-header .okki-space-item:nth-child(1) button").click() click(".component-detail-frame-header .okki-space-item:nth-child(1) button")
warn = False warn = False
header = locate(".order-edit-header .edit-order-no span span:nth-child(1)") header = locate(".order-edit-header .edit-order-no span span:nth-child(1)")
@@ -409,66 +355,100 @@ def main(workbook=None):
# 选择商机 # 选择商机
try: try:
selected = False selected = False
proforma = lookup(number, '形式发票', workbook).map(lambda x: x[0]).unwrap() relation = lookup(number, '商机号', workbook).map(lambda x: x[0]).unwrap()
index = 1
dropdown = locate(".component-business-select .mm-selector-rendered")
dropdown.click()
while True: if re.match(r'O\d+', str(relation)) is not None:
element = locate(f".mm-outside.mm-select-dropdown ul li:nth-child({index}) span", condition=None) try:
index += 1 href = locate(".layout-sidebar ul li.list-none.opportunity a").get_attribute('href')
if element.text.startswith(proforma): driver.switch_to.new_window('tab')
driver.execute_script("arguments[0].scrollIntoView({ block: 'center', inline: 'nearest' });", element) driver.get(href)
element.click()
# 检索商机
locate(".new-header input").send_keys(relation)
click(".new-header .search-btn span")
ready(driver)
name = locate("#business-board-drag-wrapper .business-card-wrapper > div .name-wrapper a", wait=False)
relation = name.text
finally:
driver.close()
driver.switch_to.window(driver.window_handles[3])
click(".component-business-select .mm-selector-rendered")
wait = WebDriverWait(driver, timeout=args.timeout)
menu = wait.until(lambda x: x.find_element(By.CSS_SELECTOR, ".mm-outside.mm-select-dropdown"))
for item in menu.find_elements(By.CSS_SELECTOR, "ul li span"):
driver.execute_script("arguments[0].scrollIntoView({ block: 'center' });", item)
if item.text.startswith(relation):
item.click()
selected = True selected = True
break break
if not selected: if not selected:
raise Exception('无法找到对应商机') raise Exception(f'无法找到对应商机 "{relation}"')
except Exception as e: except Exception as e:
print(f"[警告] {number}: 关联商机时发生错误:{e}") print(f"[警告] {number}: 关联商机时发生错误:{e}")
warn = True warn = True
# 编辑运费 # 编辑产品
try: try:
# 设定分页选项 10 条/页
click(".okki-pagination-options-size-changer")
click(".okki-select-dropdown .rc-virtual-list-holder-inner div:nth-child(1)")
wrapper = locate(".order-edit-product-wrapper .row-items", condition=None) wrapper = locate(".order-edit-product-wrapper .row-items", condition=None)
positions = lookup(number, '产品型号', workbook).unwrap() positions = lookup(number, '产品型号', workbook).unwrap()
index = 0 ids = []
count = positions.count('port')
limit = len(positions)
ports = []
while True: while True:
driver.execute_script("arguments[0].scrollTo(0, arguments[0].scrollHeight);", wrapper) driver.execute_script("arguments[0].scrollTo(0, 0);", wrapper)
items = wrapper.find_elements(By.CSS_SELECTOR, "div.vue-recycle-scroller__item-wrapper div.vue-recycle-scroller__item-view") index = 0
for item in items: while index < 10:
driver.execute_script("arguments[0].scrollIntoView({ block: 'start', inline: 'end' });", item) if len(ids) >= len(positions): raise KeyboardInterrupt()
product = locate(f".product-info-group-product-name input", parent=item, condition=None) # 获取项
# 定位元素 if index >= 5: driver.execute_script("arguments[0].scrollTo(0, arguments[0].scrollHeight);", wrapper)
driver.execute_script("arguments[0].scrollTo(0, arguments[0].scrollHeight);", wrapper) items = wrapper.find_elements(By.CSS_SELECTOR, ".row-item")
serial = locate(".cell[data-cci='2'] div div", parent=item, condition=None).text
source = locate(".cell[data-cci='4'] input", parent=item, condition=None)
target = locate(".cell[data-cci='6'] input", parent=item, condition=None)
if product.get_attribute('value') == 'port' and serial not in ports: for item in items:
# 填入含税成本价 # 订单序号
price = source.get_attribute('value') try: serial = item.find_element(By.CSS_SELECTOR, ".cell[data-cci='2'] .cell-inner div").text
target.send_keys(price) except: continue
index += 1 if not bool(serial): continue
ports.append(serial)
if index >= count: break # 转到元素
if index >= limit: raise Exception('无法定位指定产品') driver.execute_script("arguments[0].scrollIntoView({ block: 'center' });", item)
driver.execute_script("arguments[0].scrollIntoView({ block: 'start' });", wrapper)
if serial not in ids:
click(".product-info-group-product-info a.jump-link", parent=item)
price = locate(".okki-drawer-body div[title='含税成本价'] + div div.mm-formily-preview-text", condition=None).text
click(".okki-drawer-body .space-header-after button")
wait = WebDriverWait(driver, timeout=args.timeout)
wait.until_not(lambda x: x.find_element(By.CSS_SELECTOR, ".okki-drawer-body").is_displayed())
original = locate(".cell[data-cci='4'] input", parent=item, condition=None)
if price == '--': price = original.get_attribute('value')
# 填入含税成本价
target = locate(".cell[data-cci='6'] input", parent=item, condition=None)
target.send_keys(price)
ids.append(serial)
index += 1
# 下一页
click(".summary-pagination-wrapper li.okki-pagination-next button", condition=None)
except KeyboardInterrupt:
pass
except Exception as e: except Exception as e:
print(f"[警告] {number}: 编辑运费时发生错误:{e}") print(f"[警告] {number}: 编辑产品时发生错误:{e}")
warn = True warn = True
# 保存订单 # 保存订单
button = locate(".order-edit-footer button.okki-btn-primary", condition=None) click(".order-edit-footer button.okki-btn-primary", condition=None)
driver.execute_script("arguments[0].click();", button) ready(driver)
time.sleep(args.interval) #9
print(f"[信息] {number}: 修改完成") print(f"[信息] {number}: 修改完成")
modified.append(number) modified.append(number)
@@ -495,21 +475,6 @@ def main(workbook=None):
return 0 return 0
class RelationMap:
def __init__(self, predicate):
self.map = {}
self.predicate = predicate
def getValueOf(self, object):
id = self.predicate(object)
if bool(id): return self.map.get(id)
else: raise Exception(f"Invalid ID: '{id}'")
def setValueOf(self, object, value):
id = self.predicate(object)
if bool(id): self.map[id] = value
else: raise Exception(f"Invalid ID: '{id}'")
class FieldArray[K, V]: class FieldArray[K, V]:
def __init__(self, *args: K): def __init__(self, *args: K):
self.map: dict[K, list[V]] = { name: [] for name in args } self.map: dict[K, list[V]] = { name: [] for name in args }
@@ -527,8 +492,8 @@ class FieldArray[K, V]:
array.extend([ padding for _ in range(delta) ]) array.extend([ padding for _ in range(delta) ])
self.index += 1 self.index += 1
class Result: class Result[T, E]:
def __init__(self, value, error=None): def __init__(self, value: T, error: E | None = None):
self.value = value self.value = value
self.error = error self.error = error
@@ -554,3 +519,4 @@ except KeyboardInterrupt: status = 145
print(f'[信息] 已修改 {success} 个订单,其中包含 {warning} 条警告信息') print(f'[信息] 已修改 {success} 个订单,其中包含 {warning} 条警告信息')
print(f'[信息] 总耗时 {datetime.now() - date}') print(f'[信息] 总耗时 {datetime.now() - date}')
exit(status)