update: click feedback

This commit is contained in:
2025-07-07 14:12:02 +08:00
parent 2af1640f31
commit 7df88078c9
2 changed files with 117 additions and 101 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
.vscode/ .vscode/
.venv/ .venv/
*.csv *.csv
*.bat

View File

@@ -8,20 +8,21 @@ from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support.wait import WebDriverWait
from datetime import datetime from tkinter import Tk, filedialog
from datetime import timedelta from datetime import datetime, timedelta
parser = argparse.ArgumentParser(description="邮件批量发送脚本") parser = argparse.ArgumentParser(description="邮件批量发送脚本")
parser.add_argument('datasheet') parser.add_argument('input', nargs='?')
parser.add_argument('--column-address', type=str, nargs='?', default='邮箱') parser.add_argument('--column-address', type=str, nargs='?', default='邮箱')
parser.add_argument('--column-code', type=str, nargs='?', default='客户编号') parser.add_argument('--column-code', type=str, nargs='?', default='客户编号')
parser.add_argument('--address', type=str, required=True) parser.add_argument('-u', '--url', type=str, nargs='?', default='https://id.ionos.fr/identifier')
parser.add_argument('--password', type=str, required=True) parser.add_argument('-a', '--address', type=str, required=True)
parser.add_argument('--encoding', type=str, nargs='?', default='utf-8') parser.add_argument('-p', '--password', type=str, required=True)
parser.add_argument('--timeout', type=int, nargs='?', default=60) parser.add_argument('-e', '--encoding', type=str, nargs='?', default='utf-8')
parser.add_argument('--interval', type=int, nargs='?', default=5) parser.add_argument('-t', '--timeout', type=int, nargs='?', default=60)
parser.add_argument('--rate-limit', type=int, nargs='?', default=8.33) parser.add_argument('-i', '--interval', type=int, nargs='?', default=10)
parser.add_argument('--max-occurrence', type=int, nargs='?', default=0) parser.add_argument('-m', '--max-occurrence', type=int, nargs='?', default=5)
parser.add_argument('-r', '--retry', type=int, nargs='?', default=3)
args = parser.parse_args() args = parser.parse_args()
date = datetime.now() date = datetime.now()
@@ -31,73 +32,113 @@ errors = 0
warnings = 0 warnings = 0
def main(): def main():
# 读取收件人列表 if not args.input:
print(f'[信息] 正在读取数据表:{args.datasheet}') root = Tk()
root.withdraw()
args.input = filedialog.askopenfilename(defaultextension='csv')
print(f'[信息] 正在读取数据:{args.input}')
try: try:
with open(args.datasheet, 'r', encoding=args.encoding) as file: with open(args.input, 'r', encoding=args.encoding) as file:
rows = csv.DictReader(file) rows = csv.DictReader(file)
data = list(rows) data = list(rows)
rate = 60 / (args.interval + 3)
limit = len(data)
print(f'[信息] 已读取联系人信息共 {limit}')
except Exception as e: except Exception as e:
print(f'[!!!!] 读取数据表失败:{e}') print(f'[!!!!] 读取数据表失败:{e}')
return 1 return 1
try: rate = 60 / (args.interval + 3)
print('[信息] 正在启动 Chrome 自动化实例') limit = len(data)
driver = webdriver.Chrome() print(f'[信息] 已读取联系人信息共 {limit}')
print('[信息] 正在载入网页') print('[信息] 正在启动 Chrome 自动化实例')
driver.get('https://id.ionos.fr/identifier') try:
except TimeoutException: opts = webdriver.ChromeOptions()
# 停止加载 opts.add_experimental_option("excludeSwitches", ["enable-logging"])
print(f'[警告] 操作超时') driver = webdriver.Chrome(opts)
driver.execute_script("window.stop();")
except Exception as e: except Exception as e:
print(f'[!!!!] 初始化时发生了错误:{e}') print(f'[!!!!] 初始化时发生了错误:{e}')
return 2 return 2
def locate(selector, condition=EC.presence_of_element_located):
while True:
try:
wait = WebDriverWait(driver, timeout=args.timeout)
return wait.until(condition((By.CSS_SELECTOR, selector)))
except StaleElementReferenceException:
# 如果遇到过期元素,重新尝试查找
continue
except TimeoutException:
# 超时错误
raise Exception('操作超时')
def click(selector, condition=EC.element_to_be_clickable):
element = locate(selector, condition) if isinstance(selector, str) else selector
counter = lambda: int(element.get_attribute('taximeter') or 0)
error = False
value = counter()
driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].setAttribute('taximeter', arguments[1] + 1));", element, value)
for attempt in range(args.retry):
try:
if not error: element.click()
else: driver.execute_script("arguments[0].click();", element)
except StaleElementReferenceException:
break
except:
error = True
continue
# 检测点击事件
try:
WebDriverWait(driver, args.interval).until(lambda _: counter() > value)
break
except TimeoutException: continue
except: break
try: try:
selectAll = locate(driver, EC.element_to_be_clickable, (By.ID, 'selectAll')) driver.get(args.url)
selectAll.click() except TimeoutException:
except: # 停止加载
pass print(f'[警告] 操作超时')
driver.execute_script("window.stop();")
# 接受 cookie
try: click("#selectAll")
except: pass
try: try:
print(f'[信息] 正在登陆 {args.address}') print(f'[信息] 正在登陆 {args.address}')
username = locate(driver, EC.element_to_be_clickable, (By.ID, 'username')) username = locate("#username")
username.send_keys(args.address) username.send_keys(args.address)
click("#button--with-loader")
submit = locate(driver, EC.element_to_be_clickable, (By.ID, 'button--with-loader')) password = locate("#password")
submit.click()
password = locate(driver, EC.element_to_be_clickable, (By.ID, 'password'))
password.send_keys(args.password) password.send_keys(args.password)
click("#button--with-loader")
submit = locate(driver, EC.element_to_be_clickable, (By.ID, 'button--with-loader'))
submit.click()
except Exception as e: except Exception as e:
print(f'[!!!!] 登录时发生了错误:{e}') print(f'[!!!!] 登录时发生了错误:{e}')
return 4 return 4
while True:
try:
driver.find_element(By.CSS_SELECTOR, "#io-ox-core")
print(f'[信息] 已登录')
break
except:
time.sleep(args.interval)
try: try:
drafts = locate(driver, EC.presence_of_element_located, (By.CSS_SELECTOR, "li[data-id='default0/Brouillons']")) # 打开草稿箱
driver.execute_script('arguments[0].click();', drafts) click("li[data-id='default0/Brouillons']", condition=EC.presence_of_element_located)
click("button[data-id='default0/Brouillons']", condition=EC.presence_of_element_located)
drafts = locate(driver, EC.presence_of_element_located, (By.CSS_SELECTOR, "button[data-id='default0/Brouillons']")) # 打开邮件
driver.execute_script('arguments[0].click();', drafts) click("ul[aria-label='List view'] li[data-index='0']", condition=EC.presence_of_element_located)
items = locate(driver, EC.presence_of_element_located, (By.CSS_SELECTOR, "ul[aria-label='List view']")) subject = locate("h1.subject")
mail = locate(items, EC.presence_of_element_located, (By.CSS_SELECTOR, "li[data-index='0']"))
mail.click()
subject = locate(driver, EC.presence_of_element_located, (By.CSS_SELECTOR, "h1.subject"))
print(f'[信息] 已读取邮件:{subject.text}') print(f'[信息] 已读取邮件:{subject.text}')
header = locate(driver, EC.presence_of_element_located, (By.TAG_NAME, 'header')) sender = locate("header div.from")
sender = header.find_element(By.CSS_SELECTOR, "div.from")
print(f'[信息] 指定发件人:{sender.text[6:].replace('\n', ' ')}') print(f'[信息] 指定发件人:{sender.text[6:].replace('\n', ' ')}')
except Exception as e: except Exception as e:
print(f'[!!!!] 读取邮件时发生了错误:{e}') print(f'[!!!!] 读取邮件时发生了错误:{e}')
@@ -106,10 +147,7 @@ def main():
print(f'[信息] 当前发送速率 {round(rate, 2)} 封/分钟') print(f'[信息] 当前发送速率 {round(rate, 2)} 封/分钟')
print(f'[信息] 预计使用时间 {timedelta(minutes=limit / rate)}') print(f'[信息] 预计使用时间 {timedelta(minutes=limit / rate)}')
if rate > args.rate_limit: if rate > 8.33: print('[警告] 当前发送速率已超出限制 8.33 封/分钟')
print(f'[警告] 已设置速率限制 {round(args.rate_limit, 2)}')
print('[警告] 当前发送速率已超出限制')
key = input('[????] 是否确定发送?确定 (Y) / 取消 (N): ') key = input('[????] 是否确定发送?确定 (Y) / 取消 (N): ')
if key == 'y' or key == 'Y': if key == 'y' or key == 'Y':
@@ -142,36 +180,37 @@ def main():
try: try:
print(f'[信息] 正在发送:{recipient}') print(f'[信息] 正在发送:{recipient}')
click("button[aria-label='Edit copy']")
edit = locate(driver, EC.element_to_be_clickable, (By.CSS_SELECTOR, "button[aria-label='Edit copy']"))
edit.click()
draft = locate(driver, EC.element_to_be_clickable, (By.CSS_SELECTOR, "div.io-ox-mail-compose-window"))
title = locate(draft, EC.element_to_be_clickable, (By.CSS_SELECTOR, "div.floating-header"))
# 等待页面加载 # 等待页面加载
locate(driver, EC.invisibility_of_element_located, (By.CSS_SELECTOR, ".io-ox-busy")) wait = WebDriverWait(driver, timeout=args.timeout)
time.sleep(1) wait.until_not(lambda x: x.find_element(By.CSS_SELECTOR, ".io-ox-busy"))
title.click()
to = draft.find_element(By.CSS_SELECTOR, "input.token-input.tt-input[tabindex='0']") # 点击活动窗口
click("div.io-ox-mail-compose-window div.floating-header")
# 填入收件人
to = locate("div.io-ox-mail-compose-window input.token-input.tt-input[tabindex='0']")
to.send_keys(recipient) to.send_keys(recipient)
send = draft.find_element(By.CSS_SELECTOR, "button[data-action='send']") # 发送邮件
send.click() click("div.io-ox-mail-compose-window button[data-action='send']")
global sent global sent
sent += 1 sent += 1
except Exception as e: except Exception as e:
print(f'[!!!!] 发生错误:{e}') print(f'[警告] 发生错误:{e}')
return 6 key = input('[????] 是否继续?确定 (Y) / 取消 (N): ')
# 等待邮件发送 if key == 'y' or key == 'Y': continue
time.sleep(args.interval) else: return 6
# 检测页面警告 # 检测页面警告
try: alert = driver.find_element(By.CSS_SELECTOR, "div.io-ox-alert.io-ox-alert-error") try:
except: continue wait = WebDriverWait(driver, timeout=args.interval)
alert = wait.until(lambda x: x.find_element(By.CSS_SELECTOR, "div.io-ox-alert.io-ox-alert-error"))
except:
continue
message = alert.text.replace('\n', ' ') message = alert.text.replace('\n', ' ')
print(f'[警告] 来自网页:{message}') print(f'[警告] 来自网页:{message}')
@@ -183,38 +222,14 @@ def main():
if occurrence > 1: occurrence -= 1 if occurrence > 1: occurrence -= 1
# 关闭警告 # 关闭警告
button = locate(alert, EC.element_to_be_clickable, (By.CSS_SELECTOR, "button[data-action='close']")) click("div.io-ox-alert.io-ox-alert-error button[data-action='close']")
button.click()
# 获取页面上的过期邮件
stale = driver.find_element(By.CSS_SELECTOR, "div.io-ox-mail-compose-window")
# 关闭过期邮件 # 关闭过期邮件
button = locate(stale, EC.element_to_be_clickable, (By.CSS_SELECTOR, "button[data-action='close']")) click("div.io-ox-mail-compose-window button[data-action='close']")
button.click()
# 删除过期邮件 # 删除过期邮件
modal = locate(driver, EC.element_to_be_clickable, (By.CSS_SELECTOR, "div.modal-footer")) click("div.modal-footer button[data-action='delete']")
button = locate(modal, EC.element_to_be_clickable, (By.CSS_SELECTOR, "button[data-action='delete']"))
button.click()
def locate(driver, condition, locator): try: status = main()
while True: except KeyboardInterrupt: status = 145
try:
wait = WebDriverWait(driver, timeout=args.timeout)
return wait.until(condition(locator))
except StaleElementReferenceException:
# 如果遇到过期元素,重新尝试查找
continue
except TimeoutException:
# 超时错误
raise Exception('操作超时')
try:
status = main()
except KeyboardInterrupt:
print('[信息] 程序中断')
status = 145
print(f'[信息] 已发送 {sent} 封;发送失败 {errors} 封;跳过重复项 {warnings}') print(f'[信息] 已发送 {sent} 封;发送失败 {errors} 封;跳过重复项 {warnings}')
print(f'[信息] 总耗时 {str(datetime.now() - date)}') print(f'[信息] 总耗时 {str(datetime.now() - date)}')