pivot to selenium

This commit is contained in:
2026-03-31 18:00:39 +08:00
parent ba548e8b13
commit 714818bb15
9 changed files with 412 additions and 53 deletions

3
.gitignore vendored
View File

@@ -1 +1,2 @@
.workbooks/ /.vscode
/venv

View File

@@ -1,52 +0,0 @@
# 商机纸质打印操作步骤
1. 导出商机数据
![image.png](https://alidocs.oss-cn-zhangjiakou.aliyuncs.com/res/yBRq1geYWaXmldv1/img/ac293e62-7aec-42f9-ad40-023cfea9da1e.png)
2. 筛选数据
![image.png](https://alidocs.oss-cn-zhangjiakou.aliyuncs.com/res/yBRq1geYWaXmldv1/img/aa73c2fe-3790-4524-8c07-29ad6370b488.png)
3. 在“导出设置”中设置导出字段 **(字段和顺序必须与图中一致)**
![E24B22E1-8621-451a-AADA-973B1730D1E5.png](https://alidocs.oss-cn-zhangjiakou.aliyuncs.com/res/yBRq1geYWaXmldv1/img/3f0b8ad2-fea1-42e8-b791-a0eb82364038.png)
4. 导出详细版
![image.png](https://alidocs.oss-cn-zhangjiakou.aliyuncs.com/res/yBRq1geYWaXmldv1/img/03f57463-5e82-4f7d-9dfc-922e1a03a154.png)
5. 打开工作簿,点击“启用宏”
![image.png](https://alidocs.oss-cn-zhangjiakou.aliyuncs.com/res/yBRq1geYWaXmldv1/img/878691a5-5c24-4bcf-b352-6e86764f4f75.png)
6. 选择Config工作表输入导出位置 **(注意:路径必须以分隔符 “\” 结尾)**
![image.png](https://alidocs.oss-cn-zhangjiakou.aliyuncs.com/res/yBRq1geYWaXmldv1/img/4f0e8d61-11eb-46e3-94dc-228a19fe9228.png)
7. 运行宏
![image.png](https://alidocs.oss-cn-zhangjiakou.aliyuncs.com/res/yBRq1geYWaXmldv1/img/c0a439b3-68a2-45bc-b1ab-5902caa3c782.png)
8. 选择导出的详细版商机数据
![image.png](https://alidocs.oss-cn-zhangjiakou.aliyuncs.com/res/yBRq1geYWaXmldv1/img/e58b64e0-80c1-48c2-b299-fee264e8fae1.png)
9. 宏运行完成Excel会有弹窗提示在指定文件夹找到导出的商机
![image.png](https://alidocs.oss-cn-zhangjiakou.aliyuncs.com/res/yBRq1geYWaXmldv1/img/6a7b04ed-fcd9-4da6-8789-5d1a8ce67e68.png)
10. 全选,右键,选择显示更多选项,点击打印
![image.png](https://alidocs.oss-cn-zhangjiakou.aliyuncs.com/res/yBRq1geYWaXmldv1/img/35d3c5c5-06ba-4c62-b8ab-a09d83cc336c.png)

48
index.html Normal file
View File

@@ -0,0 +1,48 @@
<link rel="icon" href="https://s3.dualstack.us-east-2.amazonaws.com/pythondotorg-assets/media/files/python-logo-only.svg" type="image/svg+xml">
<title>Opportunity Exporter</title>
<textarea id="messages" name="messages" rows="45" cols="100" readonly>
</textarea>
<script type="text/javascript">
function main(url, parameters) {
let ws = new WebSocket(url);
let messages = document.querySelector("#messages");
ws.addEventListener('message', (e) => {
object = JSON.parse(e.data);
lines = Array.from(object.result);
for (let line of lines) {
node = document.createTextNode(new String(line).concat('\n'));
messages.appendChild(node);
}
messages.scrollTop = messages.scrollHeight;
});
setInterval(() => {
data = JSON.stringify({ method: "sync", params: null });
ws.send(data);
}, 1000);
}
</script>
<style type="text/css">
body {
width: 100%;
height: 100%;
display: inline-flex;
margin: auto;
align-items: center;
justify-content: center;
}
textarea {
resize: none;
white-space: pre-wrap;
}
textarea:focus {
outline: none;
}
</style>

354
main.py Normal file
View File

@@ -0,0 +1,354 @@
import argparse
import openpyxl
import logging
import base64
import json
import csv
import trio
import trio_websocket as ws
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.common.bidi.network import Request as NetworkRequest
from io import BytesIO
from enum import Enum
from typing import Self, Callable
from pathlib import Path
from urllib3 import PoolManager
from threading import Thread
parser = argparse.ArgumentParser(description="Opportunity Exporter")
parser.add_argument('account', type=str)
parser.add_argument('password', type=str)
parser.add_argument('-e', '--encoding', type=str, default="utf-8-sig")
parser.add_argument('-t', '--timeout', type=int, default=60)
parser.add_argument('-r', '--attempts', type=int, default=3)
parser.add_argument('-l', '--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'])
WEBURL = "https://crm.xiaoman.cn/business/export"
APIURL = "https://crm.xiaoman.cn/api/opportunityRead/export"
args = parser.parse_args()
handlers = {}
connection, server = None, None
class Request[T]:
def __init__(self, method: str, params: T):
if not isinstance(method, str): raise TypeError()
self.method = method
self.params = params
@classmethod
def load(cls, raw: bytes) -> Self:
data = json.loads(raw)
return cls(**data)
class Response[T]:
def __init__(self, result: T):
self.result = result
def __str__(self):
data = { 'result': self.result }
return json.dumps(data)
class Error(Enum):
PARSE_ERROR = -200
INVALID_REQUEST = -300
METHOD_NOT_FOUND = -400
INTERNAL_ERROR = -500
def __str__(self):
data = { 'error': self.value }
return json.dumps(data)
class History(logging.Handler):
def __init__(self):
super().__init__()
self.records = []
def emit(self, record):
self.records.append(record)
def truncate(self) -> list:
copy = self.records.copy()
self.records.clear()
return copy
def main(driver: WebDriver, logger = logging.getLogger('main')):
try:
http = PoolManager()
driver.get(str(Path('index.html').resolve()))
endpoint = server.listeners[0]
parameters = vars(args)
driver.execute_script(f"main(...arguments);", f'ws://{endpoint.address}:{endpoint.port}', parameters)
except Exception as e:
logger.critical('Unable to load starup page', exc_info=e)
return 2
try:
driver.switch_to.new_window('tab')
driver.set_page_load_timeout(parameters.get('timeout'))
driver.get(WEBURL)
except TimeoutException:
logger.warning('Timeout')
driver.execute_script("window.stop();")
def until(condition: Callable[[WebDriver], bool], watch=True):
try:
WebDriverWait(driver, parameters.get('timeout')).until(condition)
except (TimeoutException, StaleElementReferenceException):
pass
if watch: WebDriverWait(driver, parameters.get('timeout')).until_not(condition)
return True
def sleep(seconds: float):
try: WebDriverWait(driver, seconds, seconds).until(lambda _: False)
except: pass
return True
def locate(selector, wait=True, condition=EC.visibility_of_element_located) -> WebElement:
while True:
try:
locator = (By.CSS_SELECTOR, selector)
if not wait: return driver.find_element(*locator)
presence = EC.presence_of_element_located(locator)
element = WebDriverWait(driver, parameters.get('timeout')).until(presence, 'Timeout')
driver.execute_script("arguments[0].scrollIntoView({ block: 'center', inline: 'nearest' });", element)
if condition is not None:
element = WebDriverWait(driver, parameters.get('timeout')).until(condition(locator), 'Timeout')
return element
except StaleElementReferenceException:
continue
def click(selector: str|WebElement, wait=True, condition=EC.element_to_be_clickable):
element = locate(selector, wait, condition) if isinstance(selector, str) else selector
counter = lambda: int(element.get_attribute('taximeter') or 0)
error = False
value = counter()
driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].setAttribute('taximeter', arguments[1] + 1));", element, value)
for _ in range(parameters.get('attempts')):
try:
if not error: element.click()
else: driver.execute_script("arguments[0].click();", element)
except StaleElementReferenceException:
break
except:
error = True
continue
try:
WebDriverWait(driver, parameters.get('interval')).until(lambda _: counter() > value)
break
except TimeoutException: continue
except: break
try:
until(lambda x: 'loginProgress' in x.find_element(By.TAG_NAME, "body").get_attribute('class'), watch=False)
locate("input.account").send_keys(parameters.get('account'))
locate("input#password").send_keys(parameters.get('password'))
click("input.agree-checkbox")
click("button.login-btn")
except Exception as e:
logger.critical('Unable to login to %s', parameters.get('url'), exc_info=e)
return 3
logger.info('Waiting for authentication to complete...')
while True:
try:
locate("#container", False)
logger.info('Done')
break
except:
sleep(3)
ready = False
sidebar = locate(".new-layout-left", False)
driver.execute_script("arguments[0].remove();", sidebar)
def handle(request: NetworkRequest):
nonlocal ready
if request.url == APIURL: ready = True
try: request.continue_request()
except Exception as e: logger.debug('Network request error', exc_info=e)
event = 'before_request'
driver.network.add_request_handler(event, handle)
while True:
if not ready:
sleep(1)
continue
try:
locate(".mm-modal-content .mm-modal-body input", wait=False).send_keys(parameters.get('password'))
click(".mm-modal-footer button.okki-btn-primary", wait=False)
except:
pass
try:
sleep(1)
click(".business-export-wrap section:nth-child(3) h2 svg", wait=False)
cell = locate(".business-export-wrap section:nth-child(3) table tbody tr:first-child td:first-child span", wait=False)
filename = cell.text
except:
continue
if filename == '--':
sleep(1)
continue
try:
logger.info('New task: %s', filename)
download = locate(".business-export-wrap section:nth-child(3) table tbody tr:first-child td:last-child a")
href = download.get_attribute('href')
response = http.request("GET", href)
text = response.data.decode(parameters.get('encoding')).splitlines()
data = csv.reader(text)
logger.info('Read %d line(s) total', len(text))
except Exception as e:
logger.critical('Unable to load input data', exc_info=e)
return 4
try:
file = Path('template.xlsx').resolve()
template = openpyxl.load_workbook(file)
except Exception as e:
logger.critical('Unable to load template excel', exc_info=e)
return 5
header = next(data, None)
source = template.active
output = BytesIO()
def preprocess(data: str):
try: return float(data)
except ValueError: pass
try: return int(data)
except ValueError: pass
return data.lstrip("'")
for row in data:
for index, name in enumerate(header):
names = template.defined_names
if name not in names: continue
for _, coord in names[name].destinations:
value = preprocess(row[index])
source[coord] = value
sheet = template.copy_worksheet(source)
logger.info('New sheet: %s', sheet.title)
if not len(template.worksheets) > 1:
logger.error('Invalid input')
ready = False
continue
template.remove(source)
template.save(output)
data = base64.b64encode(output.getbuffer())
text = data.decode('ascii')
mime = 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
href = f"data:{mime};base64,{text}"
driver.switch_to.new_window('tab')
driver.execute_script(
"""
var link = document.createElement('a');
link.download = arguments[0];
link.href = arguments[1];
link.click();
""",
filename.replace('.csv', '.xlsx'),
href
)
driver.close()
driver.switch_to.window(driver.window_handles[1])
logger.info('Done')
ready = False
async def handler(request: ws.WebSocketRequest, logger = logging.getLogger('websocket')):
global connection
websocket = await request.accept()
if connection is None:
connection = websocket
logger.info('Connection established')
else:
await websocket.aclose(code=1000, reason="Non-singular connection prohibited")
return
while True:
try:
message = await connection.get_message()
inbound = Request.load(message)
handler = handlers[inbound.method]
results = handler(inbound.params)
response = Response(results)
except json.decoder.JSONDecodeError:
logger.error('Parse error')
response = Error.PARSE_ERROR
except TypeError:
logger.error('Invalid request')
response = Error.INVALID_REQUEST
except KeyError:
logger.error('Method not found: `%s`', inbound.method)
response = Error.METHOD_NOT_FOUND
except Exception as e:
logger.error('Internal error', exc_info=e)
response = Error.INTERNAL_ERROR
await connection.send_message(str(response))
async def backend(listen='127.0.0.1', port=0):
import _thread as t
global server
listeners = await trio.open_tcp_listeners(port, host=listen)
server = ws.WebSocketServer(handler, listeners, max_message_size=125_000_000)
await server.run()
t.interrupt_main()
if __name__ == '__main__':
try:
logging.basicConfig(level=logging.INFO, format="[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s", datefmt="%Y-%m-%d %H:%M")
logger = logging.getLogger()
formatter = logger.handlers[0].formatter
history = History()
logger.addHandler(history)
level = logging.getLevelNamesMapping().get(args.log_level, 'INFO')
logger.setLevel(level)
logger.info('Initializing...')
handlers.setdefault('sync', lambda _: list(map(lambda x: formatter.format(x), history.truncate())))
thread = Thread(target=lambda: trio.run(backend), daemon=True)
thread.start()
logger.info('Creating automation instance')
opts = Options()
opts.enable_bidi = True
opts.enable_downloads = True
driver = WebDriver(options=opts)
status = main(driver)
except Exception as e:
logger.critical('Fatal error', exc_info=e)
status = 1
finally:
driver.quit()
exit(status)

View File

@@ -0,0 +1,4 @@
cd /D .\..\
set SE_PROXY=http://127.0.0.1:10809
.\venv\Scripts\pythonw.exe .\main.py "user@example.com" "example"
@pause

View File

@@ -0,0 +1,4 @@
cd /D .\..\
python -m venv venv
.\venv\Scripts\pip.exe install -r ./requirements.txt
@pause

BIN
requirements.txt Normal file

Binary file not shown.

BIN
template.xlsx Normal file

Binary file not shown.

Binary file not shown.