import unicodedata import argparse import pandas import time from selenium import webdriver from selenium.common.exceptions import StaleElementReferenceException, TimeoutException from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.remote.webelement import WebElement from tkinter import Tk, filedialog from datetime import datetime, timedelta from zoneinfo import ZoneInfo from nameparser import HumanName, config parser = argparse.ArgumentParser(description="邮件批量发送脚本") parser.add_argument('input', nargs='?') parser.add_argument('--column-address', type=str, nargs='?', default='邮箱') parser.add_argument('--column-name', type=str, nargs='?', default='主要联系人') parser.add_argument('--column-code', type=str, nargs='?', default='客户编号') parser.add_argument('--column-sent', type=str, nargs='?', default='已发送') parser.add_argument('-u', '--url', type=str, nargs='?', default='https://id.ionos.fr/identifier') parser.add_argument('-a', '--address', type=str, required=True) parser.add_argument('-p', '--password', type=str, required=True) parser.add_argument('-t', '--timeout', type=int, nargs='?', default=60) parser.add_argument('-i', '--interval', type=int, nargs='?', default=10) parser.add_argument('-m', '--max-occurrence', type=int, nargs='?', default=5) parser.add_argument('-T', '--timezone', type=str, nargs='?', default='Europe/Berlin') parser.add_argument('-r', '--retry', type=int, nargs='?', default=3) args = parser.parse_args() date = datetime.now() sent = 0 errors = 0 warnings = 0 greetings = [ { "locale": "**", "default": "不使用问候语", "registry": [] }, { "locale": "en", "default": "Hello", "registry": ["Good morning", "Good afternoon", "Good evening"] }, { "locale": "fr", "default": "Bonjour", "registry": [None, "Bon après-midi", "Bonsoir"] }, { "locale": "de", "default": "Hallo", "registry": ["Guten Morgen", "Guten Tag", "Guten Abend"] }, { "locale": "it", "default": "Ciao", "registry": ["Buongiorno", "Buon pomeriggio", "Buonasera"] }, { "locale": "es", "default": "Hola", "registry": ["Buenos días", "Buenas tardes", None] }, { "locale": "pt", "default": "Olá", "registry": ["Bom dia", "Boa tarde", None] } ] def main(): if not args.input: root = Tk() root.withdraw() args.input = filedialog.askopenfilename(defaultextension='xlsx') print(f'[信息] 正在读取数据:{args.input}') try: workbook = pandas.read_excel(args.input) data = workbook.where(pandas.notnull(workbook), None).to_dict(orient='list') recipients = data.get(args.column_address, []) except Exception as e: print(f'[!!!!] 读取数据表失败:{e}') return 1 limit = len(recipients) names = data.setdefault(args.column_name, [None] * limit) codes = data.setdefault(args.column_code, [None] * limit) sents = data.setdefault(args.column_sent, [None] * limit) print(f'[信息] 已读取联系人信息共 {limit} 条') if limit == 0: return 0 print('[信息] 正在启动 Chrome 自动化实例') try: opts = webdriver.ChromeOptions() opts.add_experimental_option("excludeSwitches", ["enable-logging"]) driver = webdriver.Chrome(opts) except Exception as e: print(f'[!!!!] 初始化时发生了错误:{e}') return 2 def locate(selector, condition=EC.presence_of_element_located, parent=driver) -> WebElement: for attempt in range(args.retry): try: wait = WebDriverWait(parent, timeout=args.timeout) return wait.until(condition((By.CSS_SELECTOR, selector))) except StaleElementReferenceException: # 如果遇到过期元素,重新尝试查找 continue except TimeoutException: # 超时错误 raise Exception('操作超时') raise Exception(f'无法定位元素: {selector}') def click(selector, condition=EC.element_to_be_clickable, parent=driver): element = locate(selector, condition, parent) if isinstance(selector, str) else selector counter = lambda: int(element.get_attribute('taximeter') or 0) error = False value = counter() driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].setAttribute('taximeter', arguments[1] + 1));", element, value) for attempt in range(args.retry): try: if not error: element.click() else: driver.execute_script("arguments[0].click();", element) except StaleElementReferenceException: break except: error = True continue # 检测点击事件 try: WebDriverWait(driver, args.interval).until(lambda _: counter() > value) break except TimeoutException: continue except: break def keyin(element: WebElement, value): try: element.send_keys(value) except: driver.execute_script(f"arguments[0].value = arguments[1];", element, value) def ready(driver, predicate): try: wait = WebDriverWait(driver, timeout=args.timeout) wait.until(predicate, '操作超时') except: return True wait = WebDriverWait(driver, timeout=args.timeout) wait.until_not(predicate, '操作超时') return True def contains_non_latin_alphabet(string: str): for char in string: if char.isdigit() or (unicodedata.category(char).startswith('L') and not unicodedata.name(char, '').startswith('LATIN')): return True return False try: driver.get(args.url) except TimeoutException: # 停止加载 print(f'[警告] 操作超时') driver.execute_script("window.stop();") # 接受 cookie try: click("#selectAll") except: pass try: print(f'[信息] 正在登陆 {args.address}') username = locate("#username") username.send_keys(args.address) click("#button--with-loader") password = locate("#password") password.send_keys(args.password) click("#button--with-loader") except Exception as e: print(f'[!!!!] 登录时发生了错误:{e}') return 4 while True: try: driver.find_element(By.CSS_SELECTOR, "#io-ox-core") loader = driver.find_element(By.CSS_SELECTOR, "#background-loader") if not loader.is_displayed(): print(f'[信息] 已登录') break except: time.sleep(args.interval) try: # 打开草稿箱 click("li[data-id='default0/Brouillons']", condition=EC.presence_of_element_located) click("button[data-id='default0/Brouillons']", condition=EC.presence_of_element_located) # 打开邮件 click("ul[aria-label='List view'] li[data-index='0']", condition=EC.presence_of_element_located) subject = locate("h1.subject") print(f'[信息] 已读取邮件:{subject.text}') sender = locate("header div.from") print(f'[信息] 指定发件人:{sender.text[6:].replace('\n', ' ')}') except Exception as e: print(f'[!!!!] 读取邮件时发生了错误:{e}') return 5 rate = 60 / (args.interval + 3) command = None timezone = ZoneInfo(args.timezone) print(f'[信息] 当前发送速率 {round(rate, 2)} 封/分钟') print(f'[信息] 预计使用时间 {timedelta(minutes=limit / rate)}') if rate > 8.33: print('[警告] 当前发送速率已超出限制 8.33 封/分钟') print(f'[信息] 当前时区:{args.timezone}') print(f'[信息] 已读取可用问候语 {len(greetings)} 条:', end='\n\n') for index, item in enumerate(greetings): print(f'\t[{index}] {item.get('locale')}', end=' ') print('- %s' % (', '.join(filter(None, item.get('registry'))) or item.get('default'))) while command is None: match input('\n[????] 请选择 (留空取消操作): ').strip(): case keys if not keys: command = -1 case keys if keys.isdigit(): number = int(keys) if number < len(greetings): command = number if command < 0: print('[信息] 已取消发送') exit() global date date = datetime.now() index = 0 active = True occurrences = {} while active and index < limit: global warnings global errors global sent attempt = 0 current = index index += 1 recipient = recipients[current] name = names[current] code = codes[current] mark = sents[current] if not code: print(f'[警告] 最大允许重复次数已设置为 [{args.max_occurrence}], 但未提供有效唯一标识 (如客户编号)') occurrence = [0] warnings += 1 else: occurrence = occurrences.setdefault(code, [0]) if mark is not None and str(mark).strip(): print(f'[信息] 已跳过项目 {recipient}') occurrence[0] += 1 continue if args.max_occurrence > 0 and occurrence[0] >= args.max_occurrence: print(f'[警告] 收件人 {recipient} 所属组织出现次数已超出限制 {occurrence}') warnings += 1 continue while active: try: clean = True attempt += 1 print(f'[信息] 正在发送:{recipient}') click("button[aria-label='Edit copy']") ready(driver, lambda x: x.find_element(By.CSS_SELECTOR, ".io-ox-busy")) locate("div.io-ox-mail-compose-window iframe", condition=EC.frame_to_be_available_and_switch_to_it) if command > 0 and (entry := greetings[command]): match datetime.now(timezone).hour: case hour if 6 <= hour < 12: registry = entry.get('registry')[0] case hour if 12 <= hour < 18: registry = entry.get('registry')[1] case hour if 18 <= hour < 21: registry = entry.get('registry')[2] case _: registry = None clean = False iframe = driver.switch_to.active_element action = ActionChains(driver) hello: str = registry or entry.get('default') if name is not None and (name := str(name).strip()) and not contains_non_latin_alphabet(name): const = config.CONSTANTS const.titles.add('M.') parts = HumanName(name, const) parts.capitalize() person = ' '.join(filter(None, [parts.title, parts.first or parts.middle or parts.last])) hello = ' '.join([hello, person]) hello += ',' action.send_keys(hello).perform() if items := iframe.find_elements(By.XPATH, f'//*[contains(text(), "{hello}")]'): target = items[0] clean = target.text == hello driver.switch_to.default_content() wrapper = locate("div.io-ox-mail-compose-window div[data-extension-id='to'] > div.mail-input") to = locate("input.token-input.tt-input[tabindex='0']", parent=wrapper) # 填入收件人 click(wrapper) keyin(to, recipient) if to.get_attribute('value') != recipient: print(f'[警告] ({attempt}) 检测到收件人地址不正确,正在重试...') elif not clean: print(f'[警告] ({attempt}) 检测到邮件内容不正确,正在重试...') else: # 发送邮件 click("div.io-ox-mail-compose-window button[data-action='send']") # 检测页面警告 try: wait = WebDriverWait(driver, timeout=args.interval) alert = wait.until(lambda x: x.find_element(By.CSS_SELECTOR, "div.io-ox-alert.io-ox-alert-error")) message = alert.text.replace('\n', ' ') if not message: print(f'[警告] 程序异常。请保持页面在前台显示,避免最小化') else: print(f'[警告] ({attempt}): {message}') # 关闭警告 click("div.io-ox-alert.io-ox-alert-error button[data-action='close']") except TimeoutException: sents[current] = '✔' occurrence[0] += 1 sent += 1 break while mails := driver.find_elements(By.CSS_SELECTOR, "div.io-ox-mail-compose-window"): try: # 关闭过期邮件 click("button[data-action='close']", parent=mails[0]) # 删除过期邮件 click("div.modal-footer button[data-action='delete']") except: continue if attempt < args.retry: continue else: print('[警告] 已超出最大重试上限') errors += 1 break except KeyboardInterrupt: print('[信息] 程序中断') active = False break except Exception as e: print(f'[警告] 发生错误:{e}') key = input('[????] 重试 (r) / 跳过 (s) / 取消 (C): ') if key in ['R', 'r']: continue elif key in ['S', 's']: break else: active = False if input('[????] 是否保存到文件?确定 (y) / 取消 (N): ') in ['Y', 'y']: print(f'[信息] 正在写入文件:{args.input}') try: pandas.DataFrame.from_dict(data).to_excel(args.input, index=False, sheet_name='Sheet1') except Exception as e: print(f'[警告] 写入文件时发生了错误:{e}') return 6 return 0 try: status = main() except KeyboardInterrupt: status = 145 print(f'[信息] 已发送 {sent} 封;发送失败 {errors} 封;跳过重复项 {warnings} 个') print(f'[信息] 总耗时 {str(datetime.now() - date)}') exit(status)