From 7df88078c934cb698b770acf4caae8fef5f48986 Mon Sep 17 00:00:00 2001 From: break27 Date: Mon, 7 Jul 2025 14:12:02 +0800 Subject: [PATCH] update: click feedback --- .gitignore | 3 +- 邮件批量发送脚本.py | 215 +++++++++++++++++++++++--------------------- 2 files changed, 117 insertions(+), 101 deletions(-) diff --git a/.gitignore b/.gitignore index 674a2d9..0c63e0d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .vscode/ .venv/ -*.csv \ No newline at end of file +*.csv +*.bat \ No newline at end of file diff --git a/邮件批量发送脚本.py b/邮件批量发送脚本.py index f9ef250..2042a4e 100644 --- a/邮件批量发送脚本.py +++ b/邮件批量发送脚本.py @@ -8,20 +8,21 @@ from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait -from datetime import datetime -from datetime import timedelta +from tkinter import Tk, filedialog +from datetime import datetime, timedelta parser = argparse.ArgumentParser(description="邮件批量发送脚本") -parser.add_argument('datasheet') +parser.add_argument('input', nargs='?') parser.add_argument('--column-address', type=str, nargs='?', default='邮箱') parser.add_argument('--column-code', type=str, nargs='?', default='客户编号') -parser.add_argument('--address', type=str, required=True) -parser.add_argument('--password', type=str, required=True) -parser.add_argument('--encoding', type=str, nargs='?', default='utf-8') -parser.add_argument('--timeout', type=int, nargs='?', default=60) -parser.add_argument('--interval', type=int, nargs='?', default=5) -parser.add_argument('--rate-limit', type=int, nargs='?', default=8.33) -parser.add_argument('--max-occurrence', type=int, nargs='?', default=0) +parser.add_argument('-u', '--url', type=str, nargs='?', default='https://id.ionos.fr/identifier') +parser.add_argument('-a', '--address', type=str, required=True) +parser.add_argument('-p', '--password', type=str, required=True) +parser.add_argument('-e', '--encoding', type=str, nargs='?', default='utf-8') +parser.add_argument('-t', '--timeout', type=int, nargs='?', default=60) +parser.add_argument('-i', '--interval', type=int, nargs='?', default=10) +parser.add_argument('-m', '--max-occurrence', type=int, nargs='?', default=5) +parser.add_argument('-r', '--retry', type=int, nargs='?', default=3) args = parser.parse_args() date = datetime.now() @@ -31,73 +32,113 @@ errors = 0 warnings = 0 def main(): - # 读取收件人列表 - print(f'[信息] 正在读取数据表:{args.datasheet}') + if not args.input: + root = Tk() + root.withdraw() + args.input = filedialog.askopenfilename(defaultextension='csv') + + print(f'[信息] 正在读取数据:{args.input}') try: - with open(args.datasheet, 'r', encoding=args.encoding) as file: + with open(args.input, 'r', encoding=args.encoding) as file: rows = csv.DictReader(file) data = list(rows) - - rate = 60 / (args.interval + 3) - limit = len(data) - print(f'[信息] 已读取联系人信息共 {limit} 条') except Exception as e: print(f'[!!!!] 读取数据表失败:{e}') return 1 - try: - print('[信息] 正在启动 Chrome 自动化实例') - driver = webdriver.Chrome() + rate = 60 / (args.interval + 3) + limit = len(data) + print(f'[信息] 已读取联系人信息共 {limit} 条') - print('[信息] 正在载入网页') - driver.get('https://id.ionos.fr/identifier') - except TimeoutException: - # 停止加载 - print(f'[警告] 操作超时') - driver.execute_script("window.stop();") + print('[信息] 正在启动 Chrome 自动化实例') + try: + opts = webdriver.ChromeOptions() + opts.add_experimental_option("excludeSwitches", ["enable-logging"]) + driver = webdriver.Chrome(opts) except Exception as e: print(f'[!!!!] 初始化时发生了错误:{e}') return 2 + def locate(selector, condition=EC.presence_of_element_located): + while True: + try: + wait = WebDriverWait(driver, timeout=args.timeout) + return wait.until(condition((By.CSS_SELECTOR, selector))) + except StaleElementReferenceException: + # 如果遇到过期元素,重新尝试查找 + continue + except TimeoutException: + # 超时错误 + raise Exception('操作超时') + + def click(selector, condition=EC.element_to_be_clickable): + element = locate(selector, condition) if isinstance(selector, str) else selector + counter = lambda: int(element.get_attribute('taximeter') or 0) + error = False + + value = counter() + driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].setAttribute('taximeter', arguments[1] + 1));", element, value) + + for attempt in range(args.retry): + try: + if not error: element.click() + else: driver.execute_script("arguments[0].click();", element) + except StaleElementReferenceException: + break + except: + error = True + continue + # 检测点击事件 + try: + WebDriverWait(driver, args.interval).until(lambda _: counter() > value) + break + except TimeoutException: continue + except: break + try: - selectAll = locate(driver, EC.element_to_be_clickable, (By.ID, 'selectAll')) - selectAll.click() - except: - pass + driver.get(args.url) + except TimeoutException: + # 停止加载 + print(f'[警告] 操作超时') + driver.execute_script("window.stop();") + + # 接受 cookie + try: click("#selectAll") + except: pass try: print(f'[信息] 正在登陆 {args.address}') - username = locate(driver, EC.element_to_be_clickable, (By.ID, 'username')) + username = locate("#username") username.send_keys(args.address) + click("#button--with-loader") - submit = locate(driver, EC.element_to_be_clickable, (By.ID, 'button--with-loader')) - submit.click() - - password = locate(driver, EC.element_to_be_clickable, (By.ID, 'password')) + password = locate("#password") password.send_keys(args.password) - - submit = locate(driver, EC.element_to_be_clickable, (By.ID, 'button--with-loader')) - submit.click() + click("#button--with-loader") except Exception as e: print(f'[!!!!] 登录时发生了错误:{e}') return 4 + while True: + try: + driver.find_element(By.CSS_SELECTOR, "#io-ox-core") + print(f'[信息] 已登录') + break + except: + time.sleep(args.interval) + try: - drafts = locate(driver, EC.presence_of_element_located, (By.CSS_SELECTOR, "li[data-id='default0/Brouillons']")) - driver.execute_script('arguments[0].click();', drafts) - - drafts = locate(driver, EC.presence_of_element_located, (By.CSS_SELECTOR, "button[data-id='default0/Brouillons']")) - driver.execute_script('arguments[0].click();', drafts) + # 打开草稿箱 + click("li[data-id='default0/Brouillons']", condition=EC.presence_of_element_located) + click("button[data-id='default0/Brouillons']", condition=EC.presence_of_element_located) - items = locate(driver, EC.presence_of_element_located, (By.CSS_SELECTOR, "ul[aria-label='List view']")) - mail = locate(items, EC.presence_of_element_located, (By.CSS_SELECTOR, "li[data-index='0']")) - mail.click() + # 打开邮件 + click("ul[aria-label='List view'] li[data-index='0']", condition=EC.presence_of_element_located) - subject = locate(driver, EC.presence_of_element_located, (By.CSS_SELECTOR, "h1.subject")) + subject = locate("h1.subject") print(f'[信息] 已读取邮件:{subject.text}') - header = locate(driver, EC.presence_of_element_located, (By.TAG_NAME, 'header')) - sender = header.find_element(By.CSS_SELECTOR, "div.from") + sender = locate("header div.from") print(f'[信息] 指定发件人:{sender.text[6:].replace('\n', ' ')}') except Exception as e: print(f'[!!!!] 读取邮件时发生了错误:{e}') @@ -106,10 +147,7 @@ def main(): print(f'[信息] 当前发送速率 {round(rate, 2)} 封/分钟') print(f'[信息] 预计使用时间 {timedelta(minutes=limit / rate)}') - if rate > args.rate_limit: - print(f'[警告] 已设置速率限制 {round(args.rate_limit, 2)}') - print('[警告] 当前发送速率已超出限制') - + if rate > 8.33: print('[警告] 当前发送速率已超出限制 8.33 封/分钟') key = input('[????] 是否确定发送?确定 (Y) / 取消 (N): ') if key == 'y' or key == 'Y': @@ -142,36 +180,37 @@ def main(): try: print(f'[信息] 正在发送:{recipient}') - - edit = locate(driver, EC.element_to_be_clickable, (By.CSS_SELECTOR, "button[aria-label='Edit copy']")) - edit.click() - - draft = locate(driver, EC.element_to_be_clickable, (By.CSS_SELECTOR, "div.io-ox-mail-compose-window")) - title = locate(draft, EC.element_to_be_clickable, (By.CSS_SELECTOR, "div.floating-header")) + click("button[aria-label='Edit copy']") # 等待页面加载 - locate(driver, EC.invisibility_of_element_located, (By.CSS_SELECTOR, ".io-ox-busy")) - time.sleep(1) - title.click() + wait = WebDriverWait(driver, timeout=args.timeout) + wait.until_not(lambda x: x.find_element(By.CSS_SELECTOR, ".io-ox-busy")) - to = draft.find_element(By.CSS_SELECTOR, "input.token-input.tt-input[tabindex='0']") + # 点击活动窗口 + click("div.io-ox-mail-compose-window div.floating-header") + + # 填入收件人 + to = locate("div.io-ox-mail-compose-window input.token-input.tt-input[tabindex='0']") to.send_keys(recipient) - send = draft.find_element(By.CSS_SELECTOR, "button[data-action='send']") - send.click() + # 发送邮件 + click("div.io-ox-mail-compose-window button[data-action='send']") global sent sent += 1 except Exception as e: - print(f'[!!!!] 发生错误:{e}') - return 6 + print(f'[警告] 发生错误:{e}') + key = input('[????] 是否继续?确定 (Y) / 取消 (N): ') - # 等待邮件发送 - time.sleep(args.interval) + if key == 'y' or key == 'Y': continue + else: return 6 # 检测页面警告 - try: alert = driver.find_element(By.CSS_SELECTOR, "div.io-ox-alert.io-ox-alert-error") - except: continue + try: + wait = WebDriverWait(driver, timeout=args.interval) + alert = wait.until(lambda x: x.find_element(By.CSS_SELECTOR, "div.io-ox-alert.io-ox-alert-error")) + except: + continue message = alert.text.replace('\n', ' ') print(f'[警告] 来自网页:{message}') @@ -183,38 +222,14 @@ def main(): if occurrence > 1: occurrence -= 1 # 关闭警告 - button = locate(alert, EC.element_to_be_clickable, (By.CSS_SELECTOR, "button[data-action='close']")) - button.click() - - # 获取页面上的过期邮件 - stale = driver.find_element(By.CSS_SELECTOR, "div.io-ox-mail-compose-window") - + click("div.io-ox-alert.io-ox-alert-error button[data-action='close']") # 关闭过期邮件 - button = locate(stale, EC.element_to_be_clickable, (By.CSS_SELECTOR, "button[data-action='close']")) - button.click() - + click("div.io-ox-mail-compose-window button[data-action='close']") # 删除过期邮件 - modal = locate(driver, EC.element_to_be_clickable, (By.CSS_SELECTOR, "div.modal-footer")) - button = locate(modal, EC.element_to_be_clickable, (By.CSS_SELECTOR, "button[data-action='delete']")) - button.click() + click("div.modal-footer button[data-action='delete']") -def locate(driver, condition, locator): - while True: - try: - wait = WebDriverWait(driver, timeout=args.timeout) - return wait.until(condition(locator)) - except StaleElementReferenceException: - # 如果遇到过期元素,重新尝试查找 - continue - except TimeoutException: - # 超时错误 - raise Exception('操作超时') - -try: - status = main() -except KeyboardInterrupt: - print('[信息] 程序中断') - status = 145 +try: status = main() +except KeyboardInterrupt: status = 145 print(f'[信息] 已发送 {sent} 封;发送失败 {errors} 封;跳过重复项 {warnings} 个') print(f'[信息] 总耗时 {str(datetime.now() - date)}')