import unicodedata import argparse import pandas import time import wx from selenium import webdriver from selenium.common.exceptions import StaleElementReferenceException, TimeoutException from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.remote.webelement import WebElement from datetime import datetime, timedelta from zoneinfo import ZoneInfo from nameparser import HumanName parser = argparse.ArgumentParser(description="邮件批量发送脚本") parser.add_argument('url', nargs='?', default='https://id.ionos.fr/identifier') parser.add_argument('--column-address', type=str, nargs='?', default='邮箱') parser.add_argument('--column-name', type=str, nargs='?', default='主要联系人') parser.add_argument('--column-code', type=str, nargs='?', default='客户编号') parser.add_argument('--column-sent', type=str, nargs='?', default='已发送') parser.add_argument('-a', '--address', type=str, nargs='?', default='') parser.add_argument('-p', '--password', type=str, nargs='?', default='') parser.add_argument('-t', '--timeout', type=int, nargs='?', default=60) parser.add_argument('-i', '--interval', type=int, nargs='?', default=10) parser.add_argument('-m', '--max-occurrence', type=int, nargs='?', default=5) parser.add_argument('-T', '--timezone', type=str, nargs='?', default='') parser.add_argument('-r', '--retry', type=int, nargs='?', default=3) args = parser.parse_args() date = datetime.now() sent = 0 errors = 0 warnings = 0 greetings = [ { "locale": "**", "default": "不使用问候语", "registry": [] }, { "locale": "en", "default": "Hello", "registry": ["Good morning", "Good afternoon", "Good evening"] }, { "locale": "fr", "default": "Bonjour", "registry": [None, "Bon après-midi", "Bonsoir"] }, { "locale": "de", "default": "Hallo", "registry": ["Guten Morgen", "Guten Tag", "Guten Abend"] }, { "locale": "it", "default": "Ciao", "registry": ["Buongiorno", "Buon pomeriggio", "Buonasera"] }, { "locale": "es", "default": "Hola", "registry": ["Buenos días", "Buenas tardes", None] }, { "locale": "pt", "default": "Olá", "registry": ["Bom dia", "Boa tarde", None] } ] def main(): try: print('[信息] 程序初始化中...') app = wx.App(None) print('[信息] 正在启动 Chrome 自动化实例') opts = webdriver.ChromeOptions() opts.add_experimental_option("excludeSwitches", ["enable-logging"]) driver = webdriver.Chrome(opts) driver.set_page_load_timeout(args.timeout) except Exception as e: print(f'[!!!!] 初始化时发生了错误:{e}') return 1 def locate(selector, condition=EC.presence_of_element_located, parent=driver) -> WebElement: for attempt in range(args.retry): try: wait = WebDriverWait(parent, timeout=args.timeout) return wait.until(condition((By.CSS_SELECTOR, selector))) except StaleElementReferenceException: # 如果遇到过期元素,重新尝试查找 continue except TimeoutException: # 超时错误 raise Exception('操作超时') raise Exception(f'无法定位元素: {selector}') def click(selector, condition=EC.element_to_be_clickable, parent=driver): element = locate(selector, condition, parent) if isinstance(selector, str) else selector counter = lambda: int(element.get_attribute('taximeter') or 0) error = False value = counter() driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].setAttribute('taximeter', arguments[1] + 1));", element, value) for attempt in range(args.retry): try: if not error: element.click() else: driver.execute_script("arguments[0].click();", element) except StaleElementReferenceException: break except: error = True continue # 检测点击事件 try: WebDriverWait(driver, args.interval).until(lambda _: counter() > value) break except TimeoutException: continue except: break def keyin(element: WebElement, value): try: element.send_keys(value) except: driver.execute_script(f"arguments[0].value = arguments[1];", element, value) def ready(driver, predicate): try: wait = WebDriverWait(driver, timeout=args.timeout) wait.until(predicate, '操作超时') except: return True wait = WebDriverWait(driver, timeout=args.timeout) wait.until_not(predicate, '操作超时') return True def contains_non_latin_alphabet(string: str): for char in string: if char.isdigit() or (unicodedata.category(char).startswith('L') and not unicodedata.name(char, '').startswith('LATIN')): return True return False try: driver.get(args.url) except TimeoutException: # 停止加载 print(f'[警告] 操作超时') driver.execute_script("window.stop();") # 接受 cookie try: click("#selectAll") except: pass if args.address and args.password: try: print(f'[信息] 正在登陆 {args.address}') username = locate("#username") username.send_keys(args.address) click("#button--with-loader") password = locate("#password") password.send_keys(args.password) click("#button--with-loader") except Exception as e: print(f'[!!!!] 登录时发生了错误:{e}') return 4 else: print('[信息] 请在页面上输入账户凭据') while True: try: driver.find_element(By.CSS_SELECTOR, "#io-ox-core") loader = driver.find_element(By.CSS_SELECTOR, "#background-loader") if not loader.is_displayed(): print(f'[信息] 已登录') break else: raise Exception() except: time.sleep(args.interval) while True: try: print('[信息] 请选择数据源') dialog = wx.FileDialog(None, 'Open', wildcard='*.xlsx', style=wx.FD_OPEN | wx.FD_FILE_MUST_EXIST) if dialog.ShowModal() == wx.ID_OK: filepath = dialog.GetPath() print(f'[信息] 正在读取数据:{filepath}') else: print('[警告] 操作取消') return 0 workbook = pandas.read_excel(filepath) data = workbook.where(pandas.notnull(workbook), None).to_dict(orient='list') recipients = data.get(args.column_address, []) except Exception as e: print(f'[警告] 读取数据表失败:{e}') continue finally: dialog.Destroy() limit = len(recipients) names = data.get(args.column_name, [None] * limit) codes = data.get(args.column_code, [None] * limit) sents = data.setdefault(args.column_sent, [None] * limit) print(f'[信息] 已读取联系人信息共 {limit} 条') if limit == 0: continue try: # 打开草稿箱 click("li[data-id='default0/Brouillons']", condition=EC.presence_of_element_located) click("button[data-id='default0/Brouillons']", condition=EC.presence_of_element_located) # 打开邮件 click("ul[aria-label='List view'] li[data-index='0']", condition=EC.presence_of_element_located) subject = locate("h1.subject") print(f'[信息] 已读取邮件:{subject.text}') sender = locate("header div.from") print(f'[信息] 指定发件人:{sender.text[6:].replace('\n', ' ')}') except Exception as e: print(f'[!!!!] 读取邮件时发生了错误:{e}') return 5 rate = 60 / (args.interval + 3) length = list.count(sents, None) command = None timezone = ZoneInfo(args.timezone) if args.timezone else None print(f'[信息] 当前发送速率 {round(rate, 2)} 封/分钟') if rate > 8.33: print('[警告] 当前发送速率已超出限制 8.33 封/分钟') print(f'[信息] 预计使用时间 {timedelta(minutes=length / rate)}') print(f'[信息] 当前时区:{args.timezone or '无'}') print(f'[信息] 已读取可用问候语 {len(greetings)} 条:', end='\n\n') for index, item in enumerate(greetings): print(f'\t[{index}] {item.get('locale')}', end=' ') print('- %s' % (', '.join(filter(None, item.get('registry'))) or item.get('default'))) while command is None: match input('\n[????] 请选择 (留空取消操作): ').strip(): case keys if not keys: print('[信息] 操作取消') return 0 case keys if keys.isdigit(): number = int(keys) if number < len(greetings): command = number global date date = datetime.now() index = 0 active = True occurrences = {} while active and index < limit: global warnings global errors global sent attempt = 0 current = index index += 1 recipient = str(recipients[current]).strip() name = names[current] code = codes[current] mark = sents[current] occurrence = occurrences.setdefault(code, [0]) if code else [0] if mark is not None and str(mark).strip(): print(f'[信息] 已跳过项目 {recipient}') occurrence[0] += 1 continue if args.max_occurrence > 0 and occurrence[0] >= args.max_occurrence: print(f'[警告] 收件人 {recipient} 所属组织出现次数已超出限制 {occurrence}') warnings += 1 continue while active: try: clean = True attempt += 1 print('[信息] 正在发送:%s (%.2f %%)' % (recipient, current / limit * 100)) click("button[aria-label='Edit copy']") ready(driver, lambda x: x.find_element(By.CSS_SELECTOR, ".io-ox-busy")) locate("div.io-ox-mail-compose-window iframe", condition=EC.frame_to_be_available_and_switch_to_it) if command > 0 and (entry := greetings[command]): match datetime.now(timezone).hour if timezone else -1: case hour if 6 <= hour < 12: registry = entry.get('registry')[0] case hour if 12 <= hour < 18: registry = entry.get('registry')[1] case hour if 18 <= hour < 21: registry = entry.get('registry')[2] case _: registry = None iframe = driver.switch_to.active_element action = ActionChains(driver) clean = False hello = registry or entry.get('default') if name is not None and (name := str(name).strip()) and not contains_non_latin_alphabet(name): parts = HumanName(name) parts.capitalize(force=True) short = len(parts.first) < 3 or (len(parts.first) < 5 and parts.first.endswith('.')) hello = ' '.join(filter(None, [hello, parts.title, parts.first, (parts.middle or parts.last) if short else None])) hello += ',' action.send_keys(hello).perform() if items := iframe.find_elements(By.XPATH, f'//*[contains(text(), "{hello}")]'): target = items[0] clean = target.text == hello driver.switch_to.default_content() wrapper = locate("div.io-ox-mail-compose-window div[data-extension-id='to'] > div.mail-input") to = locate("input.token-input.tt-input[tabindex='0']", parent=wrapper) # 填入收件人 click(wrapper) keyin(to, recipient) if to.get_attribute('value') != recipient: print(f'[警告] ({attempt}) 检测到收件人地址不正确,正在重试...') elif not clean: print(f'[警告] ({attempt}) 检测到邮件内容不正确,正在重试...') else: # 发送邮件 click("div.io-ox-mail-compose-window button[data-action='send']") # 检测页面警告 try: wait = WebDriverWait(driver, timeout=args.interval) alert = wait.until(lambda x: x.find_element(By.CSS_SELECTOR, "div.io-ox-alert.io-ox-alert-error")) sents[current] = '❌' message = alert.text.replace('\n', ' ') print(f'[警告] ({attempt}): {message or '未知错误'}') # 关闭警告 click("div.io-ox-alert.io-ox-alert-error button[data-action='close']") except TimeoutException: sents[current] = '✔️' occurrence[0] += 1 sent += 1 break while mails := driver.find_elements(By.CSS_SELECTOR, "div.io-ox-mail-compose-window"): try: # 关闭过期邮件 click("button[data-action='close']", parent=mails[0]) # 删除过期邮件 click("div.modal-footer button[data-action='delete']") except: continue if attempt < args.retry: continue else: print('[警告] 已超出最大重试上限') errors += 1 break except KeyboardInterrupt: print('[信息] 程序中断') active = False break except Exception as e: print(f'[警告] 发生错误:{e}') key = input('[????] 重试 (r) / 跳过 (s) / 取消 (C): ') if key in ['R', 'r']: continue elif key in ['S', 's']: break else: active = False progress = index / limit * 100 print('[信息] 当前进度:%.2f %%' % progress) if input('[????] 是否保存到文件?确定 (y) / 取消 (N): ') in ['Y', 'y']: try: print(f'[信息] 正在写入文件:{filepath}') pandas.DataFrame.from_dict(data).to_excel(filepath, index=False, sheet_name='Sheet1') except Exception as e: print(f'[警告] 写入文件时发生了错误:{e}') if input('[????] 继续运行 (C) / 退出程序 (w): ') in ['W', 'w']: break else: continue return 0 try: status = main() except KeyboardInterrupt: status = 145 print(f'[信息] 已发送 {sent} 封;发送失败 {errors} 封;跳过重复项 {warnings} 个') print(f'[信息] 总耗时 {str(datetime.now() - date)}') exit(status)