import unicodedata import traceback import argparse import logging import pandas import json import time import re import trio import trio_websocket as ws from selenium.common.exceptions import StaleElementReferenceException, TimeoutException from selenium.webdriver import Chrome, ChromeOptions from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.remote.webelement import WebElement from selenium.webdriver.remote.webdriver import WebDriver from io import BytesIO from enum import Enum from queue import Queue, Empty from wakepy import keep from pathlib import Path from datetime import datetime, timedelta from zoneinfo import ZoneInfo from threading import Thread from nameparser import HumanName parser = argparse.ArgumentParser(description="邮件批量发送脚本") parser.add_argument('url', nargs='?', default='https://id.ionos.fr/identifier') parser.add_argument('--column-address', type=str, nargs='?', default='邮箱') parser.add_argument('--column-name', type=str, nargs='?', default='主要联系人') parser.add_argument('--column-code', type=str, nargs='?', default='客户编号') parser.add_argument('--column-pays', type=str, nargs='?', default='国家地区') parser.add_argument('--column-sent', type=str, nargs='?', default='已发送') parser.add_argument('-a', '--address', type=str, nargs='?') parser.add_argument('-p', '--password', type=str, nargs='?') parser.add_argument('-t', '--timeout', type=int, nargs='?', default=60) parser.add_argument('-i', '--interval', type=int, nargs='?', default=10) parser.add_argument('-r', '--retry', type=int, nargs='?', default=3) args = parser.parse_args() inbox, outbox = Queue(), Queue() connection = None socket = None sent = 0 errors = 0 warnings = 0 class Greetings: def __init__(self, locale: str, timezone: str, default: str, morning=None, afternoon=None, evening=None, predicate=None): self.locale = locale self.timezone = timezone self.default = default self.morning = morning self.afternoon = afternoon self.evening = evening self.predicate = predicate @staticmethod def presets(): return [ Greetings("en", "Europe/London", "Hello" , "Good morning", "Good afternoon" , "Good evening", None), Greetings("fr", "Europe/Paris" , "Bonjour", None , "Bon après-midi" , "Bonsoir" , ['法国', '比利时']), Greetings("de", "Europe/Berlin", "Hallo" , "Guten Morgen", "Guten Tag" , "Guten Abend" , ['德国', '奥地利', '瑞士']), Greetings("it", "Europe/Rome" , "Ciao" , "Buongiorno" , "Buon pomeriggio", "Buonasera" , ['意大利']), Greetings("es", "Europe/Madrid", "Hola" , "Buenos días" , "Buenas tardes" , None , ['西班牙']), Greetings("pt", "Europe/Lisbon", "Olá" , "Bom dia" , "Boa tarde" , None , ['葡萄牙']), ] class Command: def __init__(self, name: str, *args): self.name = name self.args = args def __str__(self): pack = vars(self) return json.dumps(pack) def tell(message, exception=None, level=2): message = ': '.join(filter(None, [message, exception.__class__.__name__ if isinstance(exception, Exception) else exception])) if isinstance(exception, Exception): message += '\n' + ''.join(traceback.format_exception(exception)) if not outbox.is_shutdown: outbox.put(Command('tell', message, level)) match level: case 2: logging.info(message) case 1: logging.warning(message) case 0: logging.error(message) def main(driver: WebDriver): try: html = str(Path('index.html').resolve()) driver.get(html) host, port = socket.getsockname() parameters = vars(args) locales = json.dumps(Greetings.presets(), default=lambda o: o.__dict__) driver.execute_script(f"main(...arguments);", f'ws://{host}:{port}', parameters, locales) except Exception as e: tell('载入初始页面时发生错误', e, level=0) return 1 def locate(selector, condition=EC.presence_of_element_located, parent=driver) -> WebElement: for attempt in range(parameters.get('retry')): try: wait = WebDriverWait(parent, timeout=parameters.get('timeout')) return wait.until(condition((By.CSS_SELECTOR, selector))) except StaleElementReferenceException: # 如果遇到过期元素,重新尝试查找 continue except TimeoutException: # 超时错误 raise Exception('操作超时') raise Exception(f'无法定位元素: {selector}') def click(selector, condition=EC.element_to_be_clickable, parent=driver): element = locate(selector, condition, parent) if isinstance(selector, str) else selector counter = lambda: int(element.get_attribute('taximeter') or 0) error = False value = counter() driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].setAttribute('taximeter', arguments[1] + 1));", element, value) for attempt in range(parameters.get("retry")): try: if not error: element.click() else: driver.execute_script("arguments[0].click();", element) except StaleElementReferenceException: break except: error = True continue # 检测点击事件 try: WebDriverWait(driver, parameters.get("interval")).until(lambda _: counter() > value) break except TimeoutException: continue except: break def ready(driver, predicate=lambda x: x.find_element(By.CSS_SELECTOR, ".io-ox-busy")): try: wait = WebDriverWait(driver, timeout=parameters.get('interval')) wait.until(predicate, '操作超时') except: return True wait = WebDriverWait(driver, timeout=parameters.get('timeout')) wait.until_not(predicate, '操作超时') return True def contains_non_latin_alphabet(string: str): for char in string: if char.isdigit() or (unicodedata.category(char).startswith('L') and not unicodedata.name(char, '').startswith('LATIN')): return True return False try: driver.switch_to.new_window('tab') driver.set_page_load_timeout(parameters.get('timeout')) driver.get(parameters.get('url')) except TimeoutException: # 停止加载 tell('操作超时', level=1) driver.execute_script("window.stop();") # 接受 cookie try: click("#selectAll") except: pass if (address := parameters.get('address')) and (pw := parameters.get('password')): try: username = locate("#username") username.send_keys(address) click("#button--with-loader") password = locate("#password") password.send_keys(pw) click("#button--with-loader") except Exception as e: tell('登录时发生了错误', e, level=0) return 4 while not time.sleep(1): try: if not driver.find_element(By.CSS_SELECTOR, "#background-loader").is_displayed(): tell('正在登陆') click("#io-ox-topbar-account-dropdown-icon button.dropdown-toggle", condition=EC.presence_of_element_located) address = locate("#topbar-account-dropdown .mail").get_attribute('innerText') person = HumanName(address.split('@', 2)[0]) person.capitalize(force=True) tell(f'成功登录 {person.first} ({address})') break except: continue try: condition = EC.presence_of_element_located # 打开草稿箱 click("li[data-id='default0/Brouillons']", condition) click("button[data-id='default0/Brouillons']", condition) # 打开邮件 click("ul[aria-label='List view'] li[data-index='0']", condition) except Exception as e: tell('打开草稿邮件时发生错误', e, level=1) pass class Faillable(Exception): def __init__(self, request: str): self.request = request super().__init__(self.request) class Status(Enum): ACTIVE = 0 INACTIVE = 1 TERMINATED = 3 def isalive(self): return self != Status.TERMINATED def isactive(self): return self == Status.ACTIVE def get_subject(): try: element = driver.find_element(By.CSS_SELECTOR, "h1.subject") subject = element.text.strip() return subject except: return None def get_address(): try: element = driver.find_element(By.CSS_SELECTOR, "header div.from") address = re.search(r'[^<\s]+@[^>\s]+', element.text[6:])[0] return address except: return None def get_request(): try: request = inbox.get(block=False) except Empty: return None if request == 'ONHOLD': outbox.put(Command('setStatus', 'ONHOLD')) request = inbox.get() if request == 'RESUME': outbox.put(Command('setStatus', 'RESUME')) return None if request in ['BYPASS', 'CANCEL']: outbox.put(Command('setStatus', request)) raise Faillable(request) return request while True: try: outbox.put(Command('setAddress', person.first, address)) driver.switch_to.window(driver.window_handles[0]) filename = str(inbox.get()) buffer = BytesIO(inbox.get()) workbook = pandas.read_excel(buffer, sheet_name=0) frame = workbook.where(pandas.notnull(workbook), None) limit = int(frame.last_valid_index()) driver.switch_to.window(driver.window_handles[1]) columns = { column: frame[column].unique() for column in frame.columns } for k, v in columns.items(): columns[k] = { str(unique): len(frame[frame[k] == unique]) for unique in v if unique is not None } if len(v) <= 200 else {} outbox.put(Command('setMetadata', limit, columns)) while not time.sleep(1): request = get_request() subject = get_subject() outbox.put(Command('setSubject', subject)) if request is not None: parameters = dict(json.loads(request)) break except Faillable: continue except Exception as e: tell('读取数据时发生错误', e, level=0) return 5 if parameters.get('slice'): if subcategory := parameters.get('subcategory'): column = parameters.get('column_pays') frame = frame[frame[column].isin(subcategory)] offset = parameters.get('offset') size = parameters.get('chunk_size') start = size * offset end = start + size frame = frame.iloc[start:end] data = frame.to_dict(orient='list') recipients = data.get(parameters.get('column_address'), []) limit = len(recipients) names = data.get(parameters.get('column_name'), [None] * limit) codes = data.get(parameters.get('column_code'), [None] * limit) sents = data.setdefault(parameters.get('column_sent'), [None] * limit) rate = 60 / (parameters.get('interval') + 3) length = list.count(sents, None) tell(f'已读取邮件:{subject}') tell(f'指定发件人:{address}') tell(f'已读取联系人信息共 {limit} 条') tell(f'预计发送数量 {length}') tell(f'当前发送速率 {round(rate, 2)} 封/分钟') if rate > 8.33: tell('当前发送速率已超出限制 8.33 封/分钟', level=1) tell(f'预计使用时间 {timedelta(minutes=length / rate)}') tell(f'已设定允许重试次数:{parameters.get('retry')}') tell(f'已设定最大重复次数:{parameters.get('max_occurrence') or '无限制'}') locale = parameters.get('locale') greetings = [item for item in Greetings.presets() if item.locale == locale][0] timezone = ZoneInfo(greetings.timezone) tell(f'当前时区:{greetings.timezone}') tell(f'当前语言:{greetings.locale.upper()}') index = 0 status = Status.ACTIVE occurrences = {} while status.isactive() and index < limit: global warnings global errors global sent attempt = 0 current = index index += 1 recipient = str(recipients[current]).strip() name = names[current] code = codes[current] mark = sents[current] occurrence = occurrences.setdefault(code, [0]) if code else [0] outbox.put(Command('setProgress', index, name, recipient, sent, warnings, errors)) if mark is not None and str(mark).strip(): tell(f'已跳过项目 {recipient}') occurrence[0] += 1 continue if (maximum := parameters.get('max_occurrence')) and occurrence[0] >= maximum: tell(f'收件人 {recipient} 所属组织出现次数已超出限制 {occurrence}', level=1) warnings += 1 continue while status.isactive(): try: clean = True attempt += 1 request = get_request() print('[信息] 正在发送:%s (%.2f %%)' % (recipient, current / limit * 100)) if (target := get_address()) != address: exception = Exception(f'邮件发件地址与设定不一致\n>> {address}\n>> {target}') if not parameters.get('recovery'): raise exception tell(None, exception, level=1) if (target := get_subject()) != subject: exception = Exception(f'邮件主题与设定不一致\n>> {subject}\n>> {target}') condition = EC.presence_of_element_located if not parameters.get('recovery'): raise exception tell(None, exception, level=1) # 打开存档 click("li[data-id='default0/Archives'] li.folder:nth-child(1)", condition) ready(driver) click("ul[aria-label='List view'] li[data-index='0']", condition) click("ul.classic-toolbar button[aria-label='More actions']", condition) # 复制邮件 click(".abs + ul.dropdown-menu a[data-action='io.ox/mail/actions/copy']", condition) click(".modal-dialog ul.subfolders li[data-id='default0/Brouillons']", condition) click(".modal-dialog .modal-footer button[data-action='ok']", condition) # 返回草稿箱 click("li[data-id='default0/Brouillons']") ready(driver) for attempt in range(parameters.get('retry')): click("ul[aria-label='List view'] li[data-index='0']", condition) if get_subject() == subject: break time.sleep(parameters.get('interval')) if not get_subject() == subject: raise exception click("ul.classic-toolbar button[aria-label='Edit copy']") ready(driver) locate("div.io-ox-mail-compose-window iframe", condition=EC.frame_to_be_available_and_switch_to_it) if parameters.get('greet') and greetings and timezone: match datetime.now(timezone).hour: case hour if 6 <= hour < 12: hello = greetings.morning case hour if 12 <= hour < 18: hello = greetings.afternoon case hour if 18 <= hour < 21: hello = greetings.evening case _: hello = None iframe = driver.switch_to.active_element action = ActionChains(driver) clean = False hello = hello or greetings.default if name is not None and (name := str(name).strip()) and not contains_non_latin_alphabet(name): parts = HumanName(name) parts.capitalize(force=True) short = len(parts.first) < 3 or (len(parts.first) < 5 and parts.first.endswith('.')) hello = ' '.join(filter(None, [hello, parts.title, parts.first, (parts.middle or parts.last) if short else None])) hello += ',' action.send_keys(hello).perform() if items := iframe.find_elements(By.XPATH, f'//*[contains(text(), "{hello}")]'): target = items[0] clean = target.text == hello driver.switch_to.default_content() wrapper = locate("div.io-ox-mail-compose-window div[data-extension-id='to'] > div.mail-input") to = locate("input.token-input.tt-input[tabindex='0']", parent=wrapper) # 填入收件人 click(wrapper) to.send_keys(recipient + Keys.ENTER) token = locate("div.io-ox-mail-compose-window .mail-input .tokenfield .token") target = token.get_attribute('innerText').strip() if target != recipient: tell(f'收件人地址不一致 ({attempt})', target, level=1) elif not clean: tell(f'邮件内容不正确 ({attempt})', level=1) else: # 发送邮件 request = get_request() click("div.io-ox-mail-compose-window button[data-action='send']") # 检测页面警告 try: wait = WebDriverWait(driver, timeout=parameters.get('interval')) alert = wait.until(lambda x: x.find_element(By.CSS_SELECTOR, "div.io-ox-alert.io-ox-alert-error")) sents[current] = '❌' message = alert.text.replace('\n', ' ') tell(f'邮件系统错误 ({attempt})', message or None, level=1) # 关闭警告 click("div.io-ox-alert.io-ox-alert-error button[data-action='close']") except TimeoutException: sents[current] = '✔️' occurrence[0] += 1 sent += 1 break while mails := driver.find_elements(By.CSS_SELECTOR, "div.io-ox-mail-compose-window"): try: click("button[data-action='close']", parent=mails[0]) click("div.modal-footer button[data-action='delete']") except Exception as e: tell("关闭邮件时发生了错误", e, level=1) break if attempt < parameters.get('retry'): continue else: tell('已超出最大重试上限', level=1) errors += 1 break except Faillable as o: request = o.request except KeyboardInterrupt: tell('程序中断', level=1) status = Status.TERMINATED break except Exception as e: tell(f'发生错误 ({attempt})', e, level=0) outbox.put(Command('setStatus', 'FAILED')) request = inbox.get() if request == 'BYPASS': outbox.put(Command('setStatus', 'BYPASS')) break if request == 'RESUME': outbox.put(Command('setStatus', 'RESUME')) continue if request == 'CANCEL': outbox.put(Command('setStatus', 'CANCEL')) status = Status.INACTIVE progress = index / limit * 100 print('[信息] 当前进度:%.2f %%' % progress) if parameters.get('save'): try: tell(f'正在写入文件:{filename}') pandas.DataFrame.from_dict(data).to_excel(filename, index=False, sheet_name='Sheet1') except Exception as e: tell('写入文件时发生了错误', e, level=0) if status.isalive(): outbox.put(Command('setStatus', 'FINISH')) else: break return 0 async def handler(request: ws.WebSocketRequest): global connection websocket = await request.accept() if connection is None: connection = websocket else: await websocket.aclose(code=1000, reason="Server allows only one connection") return async def receiver(): while True: try: message = await connection.get_message() inbox.put(message) except Exception as e: tell('Receiver', e, level=0) break async def sender(): while True: try: message = await trio.to_thread.run_sync(outbox.get) await connection.send_message(str(message)) except Exception as e: tell('Sender', e, level=0) break async with trio.open_nursery() as nursery: nursery.start_soon(receiver) nursery.start_soon(sender) outbox.shutdown(immediate=True) inbox.shutdown(immediate=True) async def backend(listen='127.0.0.1', port=0): global socket listeners = await trio.open_tcp_listeners(port, host=listen) server = ws.WebSocketServer(handler, listeners, max_message_size=125_000_000) socket = listeners[0].socket await server.run() if __name__ == '__main__': try: logging.basicConfig(level=logging.INFO, format="[%(asctime)s] [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M") tell('正在初始化') thread = Thread(target=lambda: trio.run(backend), daemon=True) thread.start() tell('正在启动 Chrome 自动化实例') with keep.presenting(): opts = ChromeOptions() opts.add_experimental_option("excludeSwitches", ["enable-logging"]) driver = Chrome(opts) status = main(driver) except KeyboardInterrupt: tell('程序中断', level=1) status = 145 except Exception as e: tell('致命错误', e, level=0) status = 1 finally: driver.quit() tell(f'已发送 {sent} 封;发送失败 {errors} 封;跳过重复项 {warnings} 个') exit(status)