From 6597ced3cb9bf09f8bfc8584e58addf17fb44a9b Mon Sep 17 00:00:00 2001 From: break27 Date: Thu, 31 Jul 2025 14:33:18 +0800 Subject: [PATCH] update: created main loop & pivot to wxPython --- requirements.txt | Bin 936 -> 970 bytes 邮件批量发送脚本.py | 446 +++++++++++++++++++++++--------------------- 2 files changed, 232 insertions(+), 214 deletions(-) diff --git a/requirements.txt b/requirements.txt index 9119b55734ad6bb27c0bfb13cac21daddb6497f4..11d18332861685b112b021d35aa29fc729741079 100644 GIT binary patch delta 42 ucmZ3%eu{m=3TCBph6;uNhDwGKh75*$hCBvaAT(jnV=w|@V+LLZE(QSP00;E| delta 7 OcmX@bzJh(j3T6NdegfP8 diff --git a/邮件批量发送脚本.py b/邮件批量发送脚本.py index 8e6ad47..9ca0369 100644 --- a/邮件批量发送脚本.py +++ b/邮件批量发送脚本.py @@ -2,6 +2,7 @@ import unicodedata import argparse import pandas import time +import wx from selenium import webdriver from selenium.common.exceptions import StaleElementReferenceException, TimeoutException @@ -11,20 +12,18 @@ from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.remote.webelement import WebElement -from tkinter import Tk, filedialog from datetime import datetime, timedelta from zoneinfo import ZoneInfo from nameparser import HumanName parser = argparse.ArgumentParser(description="邮件批量发送脚本") -parser.add_argument('input', nargs='?') +parser.add_argument('url', nargs='?', default='https://id.ionos.fr/identifier') parser.add_argument('--column-address', type=str, nargs='?', default='邮箱') parser.add_argument('--column-name', type=str, nargs='?', default='主要联系人') parser.add_argument('--column-code', type=str, nargs='?', default='客户编号') parser.add_argument('--column-sent', type=str, nargs='?', default='已发送') -parser.add_argument('-u', '--url', type=str, nargs='?', default='https://id.ionos.fr/identifier') -parser.add_argument('-a', '--address', type=str, required=True) -parser.add_argument('-p', '--password', type=str, required=True) +parser.add_argument('-a', '--address', type=str, nargs='?', default='') +parser.add_argument('-p', '--password', type=str, nargs='?', default='') parser.add_argument('-t', '--timeout', type=int, nargs='?', default=60) parser.add_argument('-i', '--interval', type=int, nargs='?', default=10) parser.add_argument('-m', '--max-occurrence', type=int, nargs='?', default=5) @@ -77,36 +76,18 @@ greetings = [ ] def main(): - if not args.input: - root = Tk() - root.withdraw() - args.input = filedialog.askopenfilename(defaultextension='xlsx') - - print(f'[信息] 正在读取数据:{args.input}') try: - workbook = pandas.read_excel(args.input) - data = workbook.where(pandas.notnull(workbook), None).to_dict(orient='list') - recipients = data.get(args.column_address, []) - except Exception as e: - print(f'[!!!!] 读取数据表失败:{e}') - return 1 + print('[信息] 程序初始化中...') + app = wx.App(None) - limit = len(recipients) - names = data.get(args.column_name, [None] * limit) - codes = data.get(args.column_code, [None] * limit) - sents = data.setdefault(args.column_sent, [None] * limit) - - print(f'[信息] 已读取联系人信息共 {limit} 条') - if limit == 0: return 0 - - print('[信息] 正在启动 Chrome 自动化实例') - try: + print('[信息] 正在启动 Chrome 自动化实例') opts = webdriver.ChromeOptions() opts.add_experimental_option("excludeSwitches", ["enable-logging"]) driver = webdriver.Chrome(opts) + driver.set_page_load_timeout(args.timeout) except Exception as e: print(f'[!!!!] 初始化时发生了错误:{e}') - return 2 + return 1 def locate(selector, condition=EC.presence_of_element_located, parent=driver) -> WebElement: for attempt in range(args.retry): @@ -176,18 +157,21 @@ def main(): try: click("#selectAll") except: pass - try: - print(f'[信息] 正在登陆 {args.address}') - username = locate("#username") - username.send_keys(args.address) - click("#button--with-loader") + if args.address and args.password: + try: + print(f'[信息] 正在登陆 {args.address}') + username = locate("#username") + username.send_keys(args.address) + click("#button--with-loader") - password = locate("#password") - password.send_keys(args.password) - click("#button--with-loader") - except Exception as e: - print(f'[!!!!] 登录时发生了错误:{e}') - return 4 + password = locate("#password") + password.send_keys(args.password) + click("#button--with-loader") + except Exception as e: + print(f'[!!!!] 登录时发生了错误:{e}') + return 4 + else: + print('[信息] 请在页面上输入账户凭据') while True: try: @@ -202,183 +186,217 @@ def main(): except: time.sleep(args.interval) - try: - # 打开草稿箱 - click("li[data-id='default0/Brouillons']", condition=EC.presence_of_element_located) - click("button[data-id='default0/Brouillons']", condition=EC.presence_of_element_located) - - # 打开邮件 - click("ul[aria-label='List view'] li[data-index='0']", condition=EC.presence_of_element_located) - - subject = locate("h1.subject") - print(f'[信息] 已读取邮件:{subject.text}') - - sender = locate("header div.from") - print(f'[信息] 指定发件人:{sender.text[6:].replace('\n', ' ')}') - except Exception as e: - print(f'[!!!!] 读取邮件时发生了错误:{e}') - return 5 - - rate = 60 / (args.interval + 3) - length = list.count(sents, None) - command = None - timezone = ZoneInfo(args.timezone) if args.timezone else None - - print(f'[信息] 当前发送速率 {round(rate, 2)} 封/分钟') - if rate > 8.33: print('[警告] 当前发送速率已超出限制 8.33 封/分钟') - print(f'[信息] 预计使用时间 {timedelta(minutes=length / rate)}') - print(f'[信息] 当前时区:{args.timezone or '无'}') - print(f'[信息] 已读取可用问候语 {len(greetings)} 条:', end='\n\n') - - for index, item in enumerate(greetings): - print(f'\t[{index}] {item.get('locale')}', end=' ') - print('- %s' % (', '.join(filter(None, item.get('registry'))) or item.get('default'))) - - while command is None: - match input('\n[????] 请选择 (留空取消操作): ').strip(): - case keys if not keys: - command = -1 - case keys if keys.isdigit(): - number = int(keys) - if number < len(greetings): command = number - if command < 0: - print('[信息] 已取消发送') - exit() - - global date - date = datetime.now() - - index = 0 - active = True - occurrences = {} - - while active and index < limit: - global warnings - global errors - global sent - - attempt = 0 - current = index - index += 1 - - recipient = recipients[current] - name = names[current] - code = codes[current] - mark = sents[current] - - occurrence = occurrences.setdefault(code, [0]) if code else [0] - - if mark is not None and str(mark).strip(): - print(f'[信息] 已跳过项目 {recipient}') - occurrence[0] += 1 - continue - - if args.max_occurrence > 0 and occurrence[0] >= args.max_occurrence: - print(f'[警告] 收件人 {recipient} 所属组织出现次数已超出限制 {occurrence}') - warnings += 1 - continue - - while active: - try: - clean = True - attempt += 1 - print('[信息] 正在发送:%s (%.2f %%)' % (recipient, current / limit * 100)) - click("button[aria-label='Edit copy']") - - ready(driver, lambda x: x.find_element(By.CSS_SELECTOR, ".io-ox-busy")) - locate("div.io-ox-mail-compose-window iframe", condition=EC.frame_to_be_available_and_switch_to_it) - - if command > 0 and (entry := greetings[command]): - match datetime.now(timezone).hour if timezone else -1: - case hour if 6 <= hour < 12: registry = entry.get('registry')[0] - case hour if 12 <= hour < 18: registry = entry.get('registry')[1] - case hour if 18 <= hour < 21: registry = entry.get('registry')[2] - case _: registry = None - - iframe = driver.switch_to.active_element - action = ActionChains(driver) - clean = False - hello = registry or entry.get('default') - - if name is not None and (name := str(name).strip()) and not contains_non_latin_alphabet(name): - parts = HumanName(name) - parts.capitalize(force=True) - short = len(parts.first) < 3 or (len(parts.first) < 5 and parts.first.endswith('.')) - hello = ' '.join(filter(None, [hello, parts.title, parts.first, (parts.middle or parts.last) if short else None])) - - hello += ',' - action.send_keys(hello).perform() - - if items := iframe.find_elements(By.XPATH, f'//*[contains(text(), "{hello}")]'): - target = items[0] - clean = target.text == hello - - driver.switch_to.default_content() - wrapper = locate("div.io-ox-mail-compose-window div[data-extension-id='to'] > div.mail-input") - to = locate("input.token-input.tt-input[tabindex='0']", parent=wrapper) - - # 填入收件人 - click(wrapper) - keyin(to, recipient) - - if to.get_attribute('value') != recipient: - print(f'[警告] ({attempt}) 检测到收件人地址不正确,正在重试...') - elif not clean: - print(f'[警告] ({attempt}) 检测到邮件内容不正确,正在重试...') - else: - # 发送邮件 - click("div.io-ox-mail-compose-window button[data-action='send']") - # 检测页面警告 - try: - wait = WebDriverWait(driver, timeout=args.interval) - alert = wait.until(lambda x: x.find_element(By.CSS_SELECTOR, "div.io-ox-alert.io-ox-alert-error")) - - sents[current] = '❌' - message = alert.text.replace('\n', ' ') - print(f'[警告] ({attempt}): {message or '未知错误'}') - - # 关闭警告 - click("div.io-ox-alert.io-ox-alert-error button[data-action='close']") - except TimeoutException: - sents[current] = '✔️' - occurrence[0] += 1 - sent += 1 - break - - while mails := driver.find_elements(By.CSS_SELECTOR, "div.io-ox-mail-compose-window"): - try: - # 关闭过期邮件 - click("button[data-action='close']", parent=mails[0]) - # 删除过期邮件 - click("div.modal-footer button[data-action='delete']") - except: - continue - - if attempt < args.retry: - continue - else: - print('[警告] 已超出最大重试上限') - errors += 1 - break - except KeyboardInterrupt: - print('[信息] 程序中断') - active = False - break - except Exception as e: - print(f'[警告] 发生错误:{e}') - - key = input('[????] 重试 (r) / 跳过 (s) / 取消 (C): ') - if key in ['R', 'r']: continue - elif key in ['S', 's']: break - else: active = False - - if input('[????] 是否保存到文件?确定 (y) / 取消 (N): ') in ['Y', 'y']: - print(f'[信息] 正在写入文件:{args.input}') + while True: try: - pandas.DataFrame.from_dict(data).to_excel(args.input, index=False, sheet_name='Sheet1') + print('[信息] 请选择数据源') + dialog = wx.FileDialog(None, 'Open', wildcard='*.xlsx', style=wx.FD_OPEN | wx.FD_FILE_MUST_EXIST) + + if dialog.ShowModal() == wx.ID_OK: + filepath = dialog.GetPath() + print(f'[信息] 正在读取数据:{filepath}') + else: + print('[警告] 操作取消') + return 0 + + workbook = pandas.read_excel(filepath) + data = workbook.where(pandas.notnull(workbook), None).to_dict(orient='list') + recipients = data.get(args.column_address, []) except Exception as e: - print(f'[警告] 写入文件时发生了错误:{e}') - return 6 + print(f'[警告] 读取数据表失败:{e}') + continue + finally: + dialog.Destroy() + + limit = len(recipients) + names = data.get(args.column_name, [None] * limit) + codes = data.get(args.column_code, [None] * limit) + sents = data.setdefault(args.column_sent, [None] * limit) + + print(f'[信息] 已读取联系人信息共 {limit} 条') + if limit == 0: continue + + try: + # 打开草稿箱 + click("li[data-id='default0/Brouillons']", condition=EC.presence_of_element_located) + click("button[data-id='default0/Brouillons']", condition=EC.presence_of_element_located) + + # 打开邮件 + click("ul[aria-label='List view'] li[data-index='0']", condition=EC.presence_of_element_located) + + subject = locate("h1.subject") + print(f'[信息] 已读取邮件:{subject.text}') + + sender = locate("header div.from") + print(f'[信息] 指定发件人:{sender.text[6:].replace('\n', ' ')}') + except Exception as e: + print(f'[!!!!] 读取邮件时发生了错误:{e}') + return 5 + + rate = 60 / (args.interval + 3) + length = list.count(sents, None) + command = None + timezone = ZoneInfo(args.timezone) if args.timezone else None + + print(f'[信息] 当前发送速率 {round(rate, 2)} 封/分钟') + if rate > 8.33: print('[警告] 当前发送速率已超出限制 8.33 封/分钟') + print(f'[信息] 预计使用时间 {timedelta(minutes=length / rate)}') + print(f'[信息] 当前时区:{args.timezone or '无'}') + print(f'[信息] 已读取可用问候语 {len(greetings)} 条:', end='\n\n') + + for index, item in enumerate(greetings): + print(f'\t[{index}] {item.get('locale')}', end=' ') + print('- %s' % (', '.join(filter(None, item.get('registry'))) or item.get('default'))) + + while command is None: + match input('\n[????] 请选择 (留空取消操作): ').strip(): + case keys if not keys: + print('[信息] 操作取消') + return 0 + case keys if keys.isdigit(): + number = int(keys) + if number < len(greetings): command = number + + global date + date = datetime.now() + + index = 0 + active = True + occurrences = {} + + while active and index < limit: + global warnings + global errors + global sent + + attempt = 0 + current = index + index += 1 + + recipient = recipients[current] + name = names[current] + code = codes[current] + mark = sents[current] + + occurrence = occurrences.setdefault(code, [0]) if code else [0] + + if mark is not None and str(mark).strip(): + print(f'[信息] 已跳过项目 {recipient}') + occurrence[0] += 1 + continue + + if args.max_occurrence > 0 and occurrence[0] >= args.max_occurrence: + print(f'[警告] 收件人 {recipient} 所属组织出现次数已超出限制 {occurrence}') + warnings += 1 + continue + + while active: + try: + clean = True + attempt += 1 + print('[信息] 正在发送:%s (%.2f %%)' % (recipient, current / limit * 100)) + click("button[aria-label='Edit copy']") + + ready(driver, lambda x: x.find_element(By.CSS_SELECTOR, ".io-ox-busy")) + locate("div.io-ox-mail-compose-window iframe", condition=EC.frame_to_be_available_and_switch_to_it) + + if command > 0 and (entry := greetings[command]): + match datetime.now(timezone).hour if timezone else -1: + case hour if 6 <= hour < 12: registry = entry.get('registry')[0] + case hour if 12 <= hour < 18: registry = entry.get('registry')[1] + case hour if 18 <= hour < 21: registry = entry.get('registry')[2] + case _: registry = None + + iframe = driver.switch_to.active_element + action = ActionChains(driver) + clean = False + hello = registry or entry.get('default') + + if name is not None and (name := str(name).strip()) and not contains_non_latin_alphabet(name): + parts = HumanName(name) + parts.capitalize(force=True) + short = len(parts.first) < 3 or (len(parts.first) < 5 and parts.first.endswith('.')) + hello = ' '.join(filter(None, [hello, parts.title, parts.first, (parts.middle or parts.last) if short else None])) + + hello += ',' + action.send_keys(hello).perform() + + if items := iframe.find_elements(By.XPATH, f'//*[contains(text(), "{hello}")]'): + target = items[0] + clean = target.text == hello + + driver.switch_to.default_content() + wrapper = locate("div.io-ox-mail-compose-window div[data-extension-id='to'] > div.mail-input") + to = locate("input.token-input.tt-input[tabindex='0']", parent=wrapper) + + # 填入收件人 + click(wrapper) + keyin(to, recipient) + + if to.get_attribute('value') != recipient: + print(f'[警告] ({attempt}) 检测到收件人地址不正确,正在重试...') + elif not clean: + print(f'[警告] ({attempt}) 检测到邮件内容不正确,正在重试...') + else: + # 发送邮件 + click("div.io-ox-mail-compose-window button[data-action='send']") + # 检测页面警告 + try: + wait = WebDriverWait(driver, timeout=args.interval) + alert = wait.until(lambda x: x.find_element(By.CSS_SELECTOR, "div.io-ox-alert.io-ox-alert-error")) + + sents[current] = '❌' + message = alert.text.replace('\n', ' ') + print(f'[警告] ({attempt}): {message or '未知错误'}') + + # 关闭警告 + click("div.io-ox-alert.io-ox-alert-error button[data-action='close']") + except TimeoutException: + sents[current] = '✔️' + occurrence[0] += 1 + sent += 1 + break + + while mails := driver.find_elements(By.CSS_SELECTOR, "div.io-ox-mail-compose-window"): + try: + # 关闭过期邮件 + click("button[data-action='close']", parent=mails[0]) + # 删除过期邮件 + click("div.modal-footer button[data-action='delete']") + except: + continue + + if attempt < args.retry: + continue + else: + print('[警告] 已超出最大重试上限') + errors += 1 + break + except KeyboardInterrupt: + print('[信息] 程序中断') + active = False + break + except Exception as e: + print(f'[警告] 发生错误:{e}') + + key = input('[????] 重试 (r) / 跳过 (s) / 取消 (C): ') + if key in ['R', 'r']: continue + elif key in ['S', 's']: break + else: active = False + + progress = index / limit * 100 + print('[信息] 当前进度:%.2f %%' % progress) + + key = input('[????] 继续运行 (C) / 保存并退出 (s) / 不保存退出 (w): ') + if key in ['W', 'w']: return 0 + elif key in ['S', 's']: break + else: continue + + print(f'[信息] 正在写入文件:{filepath}') + try: + pandas.DataFrame.from_dict(data).to_excel(filepath, index=False, sheet_name='Sheet1') + except Exception as e: + print(f'[警告] 写入文件时发生了错误:{e}') + return 6 return 0