import argparse import time import csv from selenium import webdriver from selenium.common.exceptions import StaleElementReferenceException, TimeoutException from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait from tkinter import Tk, filedialog from datetime import datetime, timedelta parser = argparse.ArgumentParser(description="邮件批量发送脚本") parser.add_argument('input', nargs='?') parser.add_argument('--column-address', type=str, nargs='?', default='邮箱') parser.add_argument('--column-code', type=str, nargs='?', default='客户编号') parser.add_argument('-u', '--url', type=str, nargs='?', default='https://id.ionos.fr/identifier') parser.add_argument('-a', '--address', type=str, required=True) parser.add_argument('-p', '--password', type=str, required=True) parser.add_argument('-e', '--encoding', type=str, nargs='?', default='utf-8') parser.add_argument('-t', '--timeout', type=int, nargs='?', default=60) parser.add_argument('-i', '--interval', type=int, nargs='?', default=10) parser.add_argument('-m', '--max-occurrence', type=int, nargs='?', default=5) parser.add_argument('-r', '--retry', type=int, nargs='?', default=3) args = parser.parse_args() date = datetime.now() sent = 0 errors = 0 warnings = 0 def main(): if not args.input: root = Tk() root.withdraw() args.input = filedialog.askopenfilename(defaultextension='csv') print(f'[信息] 正在读取数据:{args.input}') try: with open(args.input, 'r', encoding=args.encoding) as file: rows = csv.DictReader(file) data = list(rows) except Exception as e: print(f'[!!!!] 读取数据表失败:{e}') return 1 rate = 60 / (args.interval + 3) limit = len(data) print(f'[信息] 已读取联系人信息共 {limit} 条') print('[信息] 正在启动 Chrome 自动化实例') try: opts = webdriver.ChromeOptions() opts.add_experimental_option("excludeSwitches", ["enable-logging"]) driver = webdriver.Chrome(opts) except Exception as e: print(f'[!!!!] 初始化时发生了错误:{e}') return 2 def locate(selector, condition=EC.presence_of_element_located, parent=driver): while True: try: wait = WebDriverWait(parent, timeout=args.timeout) return wait.until(condition((By.CSS_SELECTOR, selector))) except StaleElementReferenceException: # 如果遇到过期元素,重新尝试查找 continue except TimeoutException: # 超时错误 raise Exception('操作超时') def click(selector, condition=EC.element_to_be_clickable): element = locate(selector, condition) if isinstance(selector, str) else selector counter = lambda: int(element.get_attribute('taximeter') or 0) error = False value = counter() driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].setAttribute('taximeter', arguments[1] + 1));", element, value) for attempt in range(args.retry): try: if not error: element.click() else: driver.execute_script("arguments[0].click();", element) except StaleElementReferenceException: break except: error = True continue # 检测点击事件 try: WebDriverWait(driver, args.interval).until(lambda _: counter() > value) break except TimeoutException: continue except: break try: driver.get(args.url) except TimeoutException: # 停止加载 print(f'[警告] 操作超时') driver.execute_script("window.stop();") # 接受 cookie try: click("#selectAll") except: pass try: print(f'[信息] 正在登陆 {args.address}') username = locate("#username") username.send_keys(args.address) click("#button--with-loader") password = locate("#password") password.send_keys(args.password) click("#button--with-loader") except Exception as e: print(f'[!!!!] 登录时发生了错误:{e}') return 4 while True: try: driver.find_element(By.CSS_SELECTOR, "#io-ox-core") print(f'[信息] 已登录') break except: time.sleep(args.interval) try: # 打开草稿箱 click("li[data-id='default0/Brouillons']", condition=EC.presence_of_element_located) click("button[data-id='default0/Brouillons']", condition=EC.presence_of_element_located) # 打开邮件 click("ul[aria-label='List view'] li[data-index='0']", condition=EC.presence_of_element_located) subject = locate("h1.subject") print(f'[信息] 已读取邮件:{subject.text}') sender = locate("header div.from") print(f'[信息] 指定发件人:{sender.text[6:].replace('\n', ' ')}') except Exception as e: print(f'[!!!!] 读取邮件时发生了错误:{e}') return 5 print(f'[信息] 当前发送速率 {round(rate, 2)} 封/分钟') print(f'[信息] 预计使用时间 {timedelta(minutes=limit / rate)}') if rate > 8.33: print('[警告] 当前发送速率已超出限制 8.33 封/分钟') key = input('[????] 是否确定发送?确定 (Y) / 取消 (N): ') if key == 'y' or key == 'Y': print('[信息] 已确定发送') else: print('[信息] 已取消发送') exit() global date date = datetime.now() current = None occurrence = 0 for row in data: recipient = row.get(args.column_address) code = row.get(args.column_code) if code == current: occurrence += 1 else: current = code occurrence = 1 if args.max_occurrence > 0 and occurrence > args.max_occurrence: print(f'[警告] 收件人 {recipient} 所属组织已重复出现 {occurrence} 次') global warnings warnings += 1 continue try: print(f'[信息] 正在发送:{recipient}') click("button[aria-label='Edit copy']") # 等待页面加载 wait = WebDriverWait(driver, timeout=args.timeout) wait.until_not(lambda x: x.find_element(By.CSS_SELECTOR, ".io-ox-busy")) # 点击活动窗口 click("div.io-ox-mail-compose-window div.floating-header") # 填入收件人 wrapper = locate("div.io-ox-mail-compose-window div[data-extension-id='to'] > div.mail-input") to = locate("input.token-input.tt-input[tabindex='0']", parent=wrapper) click(wrapper) to.send_keys(recipient) # 发送邮件 click("div.io-ox-mail-compose-window button[data-action='send']") global sent sent += 1 except Exception as e: print(f'[警告] 发生错误:{e}') key = input('[????] 是否继续?确定 (Y) / 取消 (N): ') if key == 'y' or key == 'Y': continue else: return 6 # 检测页面警告 try: wait = WebDriverWait(driver, timeout=args.interval) alert = wait.until(lambda x: x.find_element(By.CSS_SELECTOR, "div.io-ox-alert.io-ox-alert-error")) except: continue message = alert.text.replace('\n', ' ') print(f'[警告] 来自网页:{message}') global errors errors += 1 # 发送失败不计入重复项 if occurrence > 1: occurrence -= 1 # 关闭警告 click("div.io-ox-alert.io-ox-alert-error button[data-action='close']") # 关闭过期邮件 click("div.io-ox-mail-compose-window button[data-action='close']") # 删除过期邮件 click("div.modal-footer button[data-action='delete']") try: status = main() except KeyboardInterrupt: status = 145 print(f'[信息] 已发送 {sent} 封;发送失败 {errors} 封;跳过重复项 {warnings} 个') print(f'[信息] 总耗时 {str(datetime.now() - date)}') exit(status)