import unicodedata import argparse import pandas import time import wx from selenium.common.exceptions import StaleElementReferenceException, TimeoutException from selenium.webdriver import Chrome, ChromeOptions from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.remote.webelement import WebElement from selenium.webdriver.remote.webdriver import WebDriver from enum import Enum from pathlib import Path from datetime import datetime, timedelta from zoneinfo import ZoneInfo from nameparser import HumanName parser = argparse.ArgumentParser(description="邮件批量发送脚本") parser.add_argument('url', nargs='?', default='https://id.ionos.fr/identifier') parser.add_argument('--column-address', type=str, nargs='?', default='邮箱') parser.add_argument('--column-name', type=str, nargs='?', default='主要联系人') parser.add_argument('--column-code', type=str, nargs='?', default='客户编号') parser.add_argument('--column-sent', type=str, nargs='?', default='已发送') parser.add_argument('-a', '--address', type=str, nargs='?', default='') parser.add_argument('-p', '--password', type=str, nargs='?', default='') parser.add_argument('-t', '--timeout', type=int, nargs='?', default=60) parser.add_argument('-i', '--interval', type=int, nargs='?', default=10) parser.add_argument('-m', '--max-occurrence', type=int, nargs='?', default=0) parser.add_argument('-T', '--timezone', type=str, nargs='?', default='') parser.add_argument('-r', '--retry', type=int, nargs='?', default=3) args = parser.parse_args() date = datetime.now() sent = 0 errors = 0 warnings = 0 greetings = [ { "locale": "en", "default": "Hello", "registry": ["Good morning", "Good afternoon", "Good evening"] }, { "locale": "fr", "default": "Bonjour", "registry": [None, "Bon après-midi", "Bonsoir"] }, { "locale": "de", "default": "Hallo", "registry": ["Guten Morgen", "Guten Tag", "Guten Abend"] }, { "locale": "it", "default": "Ciao", "registry": ["Buongiorno", "Buon pomeriggio", "Buonasera"] }, { "locale": "es", "default": "Hola", "registry": ["Buenos días", "Buenas tardes", None] }, { "locale": "pt", "default": "Olá", "registry": ["Bom dia", "Boa tarde", None] } ] def main(driver: WebDriver): def locate(selector, condition=EC.presence_of_element_located, parent=driver) -> WebElement: for attempt in range(args.retry): try: wait = WebDriverWait(parent, timeout=args.timeout) return wait.until(condition((By.CSS_SELECTOR, selector))) except StaleElementReferenceException: # 如果遇到过期元素,重新尝试查找 continue except TimeoutException: # 超时错误 raise Exception('操作超时') raise Exception(f'无法定位元素: {selector}') def click(selector, condition=EC.element_to_be_clickable, parent=driver): element = locate(selector, condition, parent) if isinstance(selector, str) else selector counter = lambda: int(element.get_attribute('taximeter') or 0) error = False value = counter() driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].setAttribute('taximeter', arguments[1] + 1));", element, value) for attempt in range(args.retry): try: if not error: element.click() else: driver.execute_script("arguments[0].click();", element) except StaleElementReferenceException: break except: error = True continue # 检测点击事件 try: WebDriverWait(driver, args.interval).until(lambda _: counter() > value) break except TimeoutException: continue except: break def ready(driver, predicate): try: wait = WebDriverWait(driver, timeout=args.timeout) wait.until(predicate, '操作超时') except: return True wait = WebDriverWait(driver, timeout=args.timeout) wait.until_not(predicate, '操作超时') return True def contains_non_latin_alphabet(string: str): for char in string: if char.isdigit() or (unicodedata.category(char).startswith('L') and not unicodedata.name(char, '').startswith('LATIN')): return True return False try: driver.get(args.url) except TimeoutException: # 停止加载 print(f'[警告] 操作超时') driver.execute_script("window.stop();") # 接受 cookie try: click("#selectAll") except: pass if args.address and args.password: try: print(f'[信息] 正在登陆 {args.address}') username = locate("#username") username.send_keys(args.address) click("#button--with-loader") password = locate("#password") password.send_keys(args.password) click("#button--with-loader") except Exception as e: print(f'[!!!!] 登录时发生了错误:{e}') return 4 else: print('[信息] 请在页面上输入账户凭据') while True: try: driver.find_element(By.CSS_SELECTOR, "#io-ox-core") loader = driver.find_element(By.CSS_SELECTOR, "#background-loader") if not loader.is_displayed(): print(f'[信息] 已登录') break else: raise Exception() except: time.sleep(args.interval) while True: try: print(f'[信息] 正在打开草稿邮件') # 打开草稿箱 click("li[data-id='default0/Brouillons']", condition=EC.presence_of_element_located) click("button[data-id='default0/Brouillons']", condition=EC.presence_of_element_located) # 打开邮件 click("ul[aria-label='List view'] li[data-index='0']", condition=EC.presence_of_element_located) except Exception as e: print(f'[警告] 打开草稿邮件时发生了错误:{e}') pass try: print('[信息] 请选择数据源') title = 'Open (%s)' % args.address dialog = wx.FileDialog(None, title, wildcard='*.xlsx', style=wx.FD_OPEN | wx.FD_FILE_MUST_EXIST) if dialog.ShowModal() == wx.ID_OK: filepath = dialog.GetPath() print(f'[信息] 正在读取数据:{filepath}') else: print('[警告] 操作取消') return 0 workbook = pandas.read_excel(filepath) data = workbook.where(pandas.notnull(workbook), None).to_dict(orient='list') recipients = data.get(args.column_address, []) parts = Path(filepath).stem.split('.', 2) suffix = parts[1].lower() if len(parts) > 1 else None except Exception as e: print(f'[警告] 读取数据表失败:{e}') continue finally: dialog.Destroy() limit = len(recipients) names = data.get(args.column_name, [None] * limit) codes = data.get(args.column_code, [None] * limit) sents = data.setdefault(args.column_sent, [None] * limit) rate = 60 / (args.interval + 3) length = list.count(sents, None) timezone = ZoneInfo(args.timezone) if args.timezone else None print(f'[信息] 已读取联系人信息共 {limit} 条') print(f'[信息] 预计发送数量 {length}') print(f'[信息] 当前发送速率 {round(rate, 2)} 封/分钟') if rate > 8.33: print('[警告] 当前发送速率已超出限制 8.33 封/分钟') print(f'[信息] 预计使用时间 {timedelta(minutes=length / rate)}') print(f'[信息] 已设定允许重试次数:{args.retry}') print(f'[信息] 已设定最大重复次数:{args.max_occurrence or '无限制'}') print(f'[信息] 当前时区:{timezone or '无'}') entries = list(filter(lambda it: it.get('locale') == suffix, greetings)) locale = dict(entries[0]) if len(entries) > 0 else None print(f'[信息] 当前语言:{str(suffix).upper() if locale else '无'}') if input('[????] 开始发送?确定 (y) / 取消 (N): ') not in ['Y', 'y']: print('[信息] 操作取消') continue subject = locate("h1.subject").text.strip() sender = locate("header div.from").text[6:].replace('\n', ' ').strip().lstrip('<').rstrip('>') print(f'[信息] 已读取邮件:{subject}') print(f'[信息] 指定发件人:{sender}') if sender.lower() != str(args.address).lower(): print(f'[警告] 检测到发件人与设定不一致') print(f'[信息] 提示:请检查邮件是否正确') class Status(Enum): ACTIVE = 0 INACTIVE = 1 TERMINATED = 3 def isalive(self): return self != Status.TERMINATED def isactive(self): return self == Status.ACTIVE global date date = datetime.now() index = 0 status = Status.ACTIVE occurrences = {} while status.isactive() and index < limit: global warnings global errors global sent attempt = 0 current = index index += 1 recipient = str(recipients[current]).strip() name = names[current] code = codes[current] mark = sents[current] occurrence = occurrences.setdefault(code, [0]) if code else [0] if mark is not None and str(mark).strip(): print(f'[信息] 已跳过项目 {recipient}') occurrence[0] += 1 continue if args.max_occurrence > 0 and occurrence[0] >= args.max_occurrence: print(f'[警告] 收件人 {recipient} 所属组织出现次数已超出限制 {occurrence}') warnings += 1 continue while status.isactive(): try: clean = True attempt += 1 print('[信息] 正在发送:%s (%.2f %%)' % (recipient, current / limit * 100)) click("button[aria-label='Edit copy']") if (target := locate("h1.subject").text.strip()) != subject: print(f'[警告] 邮件主题与设定不一致:{target}') if input('[????] 是否继续?确定 (y) / 取消 (N): ') in ['Y', 'y']: subject = target print(f'[信息] 已更新邮件主题设定') else: raise Exception('邮件主题意外变更') ready(driver, lambda x: x.find_element(By.CSS_SELECTOR, ".io-ox-busy")) locate("div.io-ox-mail-compose-window iframe", condition=EC.frame_to_be_available_and_switch_to_it) if locale and (registry := locale.get('registry')): match datetime.now(timezone).hour if timezone else -1: case hour if 6 <= hour < 12: hello = registry[0] case hour if 12 <= hour < 18: hello = registry[1] case hour if 18 <= hour < 21: hello = registry[2] case _: hello = None iframe = driver.switch_to.active_element action = ActionChains(driver) clean = False hello = hello or locale.get('default') if name is not None and (name := str(name).strip()) and not contains_non_latin_alphabet(name): parts = HumanName(name) parts.capitalize(force=True) short = len(parts.first) < 3 or (len(parts.first) < 5 and parts.first.endswith('.')) hello = ' '.join(filter(None, [hello, parts.title, parts.first, (parts.middle or parts.last) if short else None])) hello += ',' action.send_keys(hello).perform() if items := iframe.find_elements(By.XPATH, f'//*[contains(text(), "{hello}")]'): target = items[0] clean = target.text == hello driver.switch_to.default_content() wrapper = locate("div.io-ox-mail-compose-window div[data-extension-id='to'] > div.mail-input") to = locate("input.token-input.tt-input[tabindex='0']", parent=wrapper) # 填入收件人 click(wrapper) to.send_keys(recipient + Keys.ENTER) token = locate("div.io-ox-mail-compose-window .mail-input .tokenfield .token") target = token.get_attribute('innerText').strip() if target != recipient: print(f'[警告] ({attempt}): 收件人地址不一致:{target}') elif not clean: print(f'[警告] ({attempt}): 邮件内容不正确') else: # 发送邮件 click("div.io-ox-mail-compose-window button[data-action='send']") # 检测页面警告 try: wait = WebDriverWait(driver, timeout=args.interval) alert = wait.until(lambda x: x.find_element(By.CSS_SELECTOR, "div.io-ox-alert.io-ox-alert-error")) sents[current] = '❌' message = alert.text.replace('\n', ' ') print(f'[警告] ({attempt}): {message or '未知错误'}') # 关闭警告 click("div.io-ox-alert.io-ox-alert-error button[data-action='close']") except TimeoutException: sents[current] = '✔️' occurrence[0] += 1 sent += 1 break while mails := driver.find_elements(By.CSS_SELECTOR, "div.io-ox-mail-compose-window"): try: # 关闭过期邮件 click("button[data-action='close']", parent=mails[0]) # 删除过期邮件 click("div.modal-footer button[data-action='delete']") except Exception as e: print(f"[警告] 关闭邮件时发生了错误:{e}") continue if attempt < args.retry: continue else: print('[警告] 已超出最大重试上限') errors += 1 break except KeyboardInterrupt: print('[信息] 程序中断') status = Status.TERMINATED break except Exception as e: print(f'[警告] ({attempt}): 发生错误:{e}') key = input('[????] 重试 (r) / 跳过 (s) / 取消 (C): ') if key in ['R', 'r']: continue elif key in ['S', 's']: break else: status = Status.INACTIVE progress = index / limit * 100 print('[信息] 当前进度:%.2f %%' % progress) if input('[????] 是否保存到文件?确定 (y) / 取消 (N): ') in ['Y', 'y']: try: print(f'[信息] 正在写入文件:{filepath}') pandas.DataFrame.from_dict(data).to_excel(filepath, index=False, sheet_name='Sheet1') except Exception as e: print(f'[警告] 写入文件时发生了错误:{e}') if status.isalive() and input('[????] 继续运行 (c) / 退出程序 (W): ') in ['C', 'c']: continue else: break return 0 try: print('[信息] 程序初始化中...') app = wx.App(None) print('[信息] 正在启动 Chrome 自动化实例') opts = ChromeOptions() opts.add_experimental_option("excludeSwitches", ["enable-logging"]) driver = Chrome(opts) driver.set_page_load_timeout(args.timeout) status = main(driver) except KeyboardInterrupt: print('[信息] 程序中断') status = 145 except Exception as e: print(f'[!!!!] 致命错误:{e}') status = 1 finally: driver.quit() print(f'[信息] 已发送 {sent} 封;发送失败 {errors} 封;跳过重复项 {warnings} 个') print(f'[信息] 总耗时 {str(datetime.now() - date)}') exit(status)