Files
mailer/邮件批量发送脚本.py
2025-07-25 14:59:52 +08:00

388 lines
15 KiB
Python

import unicodedata
import argparse
import pandas
import time
from selenium import webdriver
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.remote.webelement import WebElement
from tkinter import Tk, filedialog
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from nameparser import HumanName
parser = argparse.ArgumentParser(description="邮件批量发送脚本")
parser.add_argument('input', nargs='?')
parser.add_argument('--column-address', type=str, nargs='?', default='邮箱')
parser.add_argument('--column-name', type=str, nargs='?', default='主要联系人')
parser.add_argument('--column-code', type=str, nargs='?', default='客户编号')
parser.add_argument('--column-sent', type=str, nargs='?', default='已发送')
parser.add_argument('-u', '--url', type=str, nargs='?', default='https://id.ionos.fr/identifier')
parser.add_argument('-a', '--address', type=str, required=True)
parser.add_argument('-p', '--password', type=str, required=True)
parser.add_argument('-t', '--timeout', type=int, nargs='?', default=60)
parser.add_argument('-i', '--interval', type=int, nargs='?', default=10)
parser.add_argument('-m', '--max-occurrence', type=int, nargs='?', default=5)
parser.add_argument('-T', '--timezone', type=str, nargs='?', default='Europe/Berlin')
parser.add_argument('-r', '--retry', type=int, nargs='?', default=3)
args = parser.parse_args()
date = datetime.now()
sent = 0
errors = 0
warnings = 0
greetings = [
{
"locale": "**",
"default": "不使用问候语",
"registry": []
},
{
"locale": "en",
"default": "Hello",
"registry": ["Good morning", "Good afternoon", "Good evening"]
},
{
"locale": "fr",
"default": "Bonjour",
"registry": [None, "Bon après-midi", "Bonsoir"]
},
{
"locale": "de",
"default": "Hallo",
"registry": ["Guten Morgen", "Guten Tag", "Guten Abend"]
},
{
"locale": "it",
"default": "Ciao",
"registry": ["Buongiorno", "Buon pomeriggio", "Buonasera"]
},
{
"locale": "es",
"default": "Hola",
"registry": ["Buenos días", "Buenas tardes", None]
},
{
"locale": "pt",
"default": "Olá",
"registry": ["Bom dia", "Boa tarde", None]
}
]
def main():
if not args.input:
root = Tk()
root.withdraw()
args.input = filedialog.askopenfilename(defaultextension='xlsx')
print(f'[信息] 正在读取数据:{args.input}')
try:
workbook = pandas.read_excel(args.input)
data = workbook.where(pandas.notnull(workbook), None).to_dict(orient='list')
recipients = data.get(args.column_address, [])
except Exception as e:
print(f'[!!!!] 读取数据表失败:{e}')
return 1
limit = len(recipients)
names = data.setdefault(args.column_name, [None] * limit)
codes = data.setdefault(args.column_code, [None] * limit)
sents = data.setdefault(args.column_sent, [None] * limit)
print(f'[信息] 已读取联系人信息共 {limit}')
if limit == 0: return 0
print('[信息] 正在启动 Chrome 自动化实例')
try:
opts = webdriver.ChromeOptions()
opts.add_experimental_option("excludeSwitches", ["enable-logging"])
driver = webdriver.Chrome(opts)
except Exception as e:
print(f'[!!!!] 初始化时发生了错误:{e}')
return 2
def locate(selector, condition=EC.presence_of_element_located, parent=driver) -> WebElement:
for attempt in range(args.retry):
try:
wait = WebDriverWait(parent, timeout=args.timeout)
return wait.until(condition((By.CSS_SELECTOR, selector)))
except StaleElementReferenceException:
# 如果遇到过期元素,重新尝试查找
continue
except TimeoutException:
# 超时错误
raise Exception('操作超时')
raise Exception(f'无法定位元素: {selector}')
def click(selector, condition=EC.element_to_be_clickable, parent=driver):
element = locate(selector, condition, parent) if isinstance(selector, str) else selector
counter = lambda: int(element.get_attribute('taximeter') or 0)
error = False
value = counter()
driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].setAttribute('taximeter', arguments[1] + 1));", element, value)
for attempt in range(args.retry):
try:
if not error: element.click()
else: driver.execute_script("arguments[0].click();", element)
except StaleElementReferenceException:
break
except:
error = True
continue
# 检测点击事件
try:
WebDriverWait(driver, args.interval).until(lambda _: counter() > value)
break
except TimeoutException: continue
except: break
def keyin(element: WebElement, value):
try: element.send_keys(value)
except: driver.execute_script(f"arguments[0].value = arguments[1];", element, value)
def ready(driver, predicate):
try:
wait = WebDriverWait(driver, timeout=args.timeout)
wait.until(predicate, '操作超时')
except:
return True
wait = WebDriverWait(driver, timeout=args.timeout)
wait.until_not(predicate, '操作超时')
return True
def contains_non_latin_alphabet(string: str):
for char in string:
if char.isdigit() or (unicodedata.category(char).startswith('L') and not unicodedata.name(char, '').startswith('LATIN')):
return True
return False
try:
driver.get(args.url)
except TimeoutException:
# 停止加载
print(f'[警告] 操作超时')
driver.execute_script("window.stop();")
# 接受 cookie
try: click("#selectAll")
except: pass
try:
print(f'[信息] 正在登陆 {args.address}')
username = locate("#username")
username.send_keys(args.address)
click("#button--with-loader")
password = locate("#password")
password.send_keys(args.password)
click("#button--with-loader")
except Exception as e:
print(f'[!!!!] 登录时发生了错误:{e}')
return 4
while True:
try:
driver.find_element(By.CSS_SELECTOR, "#io-ox-core")
loader = driver.find_element(By.CSS_SELECTOR, "#background-loader")
if not loader.is_displayed():
print(f'[信息] 已登录')
break
except:
time.sleep(args.interval)
try:
# 打开草稿箱
click("li[data-id='default0/Brouillons']", condition=EC.presence_of_element_located)
click("button[data-id='default0/Brouillons']", condition=EC.presence_of_element_located)
# 打开邮件
click("ul[aria-label='List view'] li[data-index='0']", condition=EC.presence_of_element_located)
subject = locate("h1.subject")
print(f'[信息] 已读取邮件:{subject.text}')
sender = locate("header div.from")
print(f'[信息] 指定发件人:{sender.text[6:].replace('\n', ' ')}')
except Exception as e:
print(f'[!!!!] 读取邮件时发生了错误:{e}')
return 5
rate = 60 / (args.interval + 3)
length = list.count(sents, None)
command = None
timezone = ZoneInfo(args.timezone)
print(f'[信息] 当前发送速率 {round(rate, 2)} 封/分钟')
if rate > 8.33: print('[警告] 当前发送速率已超出限制 8.33 封/分钟')
print(f'[信息] 预计使用时间 {timedelta(minutes=length / rate)}')
print(f'[信息] 当前时区:{args.timezone}')
print(f'[信息] 已读取可用问候语 {len(greetings)} 条:', end='\n\n')
for index, item in enumerate(greetings):
print(f'\t[{index}] {item.get('locale')}', end=' ')
print('- %s' % (', '.join(filter(None, item.get('registry'))) or item.get('default')))
while command is None:
match input('\n[????] 请选择 (留空取消操作): ').strip():
case keys if not keys:
command = -1
case keys if keys.isdigit():
number = int(keys)
if number < len(greetings): command = number
if command < 0:
print('[信息] 已取消发送')
exit()
global date
date = datetime.now()
index = 0
active = True
occurrences = {}
while active and index < limit:
global warnings
global errors
global sent
attempt = 0
current = index
index += 1
recipient = recipients[current]
name = names[current]
code = codes[current]
mark = sents[current]
occurrence = occurrences.setdefault(code, [0]) if code else [0]
if mark is not None and str(mark).strip():
print(f'[信息] 已跳过项目 {recipient}')
occurrence[0] += 1
continue
if args.max_occurrence > 0 and occurrence[0] >= args.max_occurrence:
print(f'[警告] 收件人 {recipient} 所属组织出现次数已超出限制 {occurrence}')
warnings += 1
continue
while active:
try:
clean = True
attempt += 1
print('[信息] 正在发送:%s (%.2f %%)' % (recipient, current / limit * 100))
click("button[aria-label='Edit copy']")
ready(driver, lambda x: x.find_element(By.CSS_SELECTOR, ".io-ox-busy"))
locate("div.io-ox-mail-compose-window iframe", condition=EC.frame_to_be_available_and_switch_to_it)
if command > 0 and (entry := greetings[command]):
match datetime.now(timezone).hour:
case hour if 6 <= hour < 12: registry = entry.get('registry')[0]
case hour if 12 <= hour < 18: registry = entry.get('registry')[1]
case hour if 18 <= hour < 21: registry = entry.get('registry')[2]
case _: registry = None
iframe = driver.switch_to.active_element
action = ActionChains(driver)
clean = False
hello = registry or entry.get('default')
if name is not None and (name := str(name).strip()) and not contains_non_latin_alphabet(name):
parts = HumanName(name)
parts.capitalize(force=True)
short = len(parts.first) < 3 or (len(parts.first) < 5 and parts.first.endswith('.'))
hello = ' '.join(filter(None, [hello, parts.title, parts.first, (parts.middle or parts.last) if short else None]))
hello += ','
action.send_keys(hello).perform()
if items := iframe.find_elements(By.XPATH, f'//*[contains(text(), "{hello}")]'):
target = items[0]
clean = target.text == hello
driver.switch_to.default_content()
wrapper = locate("div.io-ox-mail-compose-window div[data-extension-id='to'] > div.mail-input")
to = locate("input.token-input.tt-input[tabindex='0']", parent=wrapper)
# 填入收件人
click(wrapper)
keyin(to, recipient)
if to.get_attribute('value') != recipient:
print(f'[警告] ({attempt}) 检测到收件人地址不正确,正在重试...')
elif not clean:
print(f'[警告] ({attempt}) 检测到邮件内容不正确,正在重试...')
else:
# 发送邮件
click("div.io-ox-mail-compose-window button[data-action='send']")
# 检测页面警告
try:
wait = WebDriverWait(driver, timeout=args.interval)
alert = wait.until(lambda x: x.find_element(By.CSS_SELECTOR, "div.io-ox-alert.io-ox-alert-error"))
message = alert.text.replace('\n', ' ')
if not message: print(f'[警告] 程序异常。请保持页面在前台显示,避免遮挡或最小化')
else: print(f'[警告] ({attempt}): {message}')
# 关闭警告
click("div.io-ox-alert.io-ox-alert-error button[data-action='close']")
except TimeoutException:
sents[current] = ''
occurrence[0] += 1
sent += 1
break
while mails := driver.find_elements(By.CSS_SELECTOR, "div.io-ox-mail-compose-window"):
try:
# 关闭过期邮件
click("button[data-action='close']", parent=mails[0])
# 删除过期邮件
click("div.modal-footer button[data-action='delete']")
except:
continue
if attempt < args.retry:
continue
else:
print('[警告] 已超出最大重试上限')
errors += 1
break
except KeyboardInterrupt:
print('[信息] 程序中断')
active = False
break
except Exception as e:
print(f'[警告] 发生错误:{e}')
key = input('[????] 重试 (r) / 跳过 (s) / 取消 (C): ')
if key in ['R', 'r']: continue
elif key in ['S', 's']: break
else: active = False
if input('[????] 是否保存到文件?确定 (y) / 取消 (N): ') in ['Y', 'y']:
print(f'[信息] 正在写入文件:{args.input}')
try:
pandas.DataFrame.from_dict(data).to_excel(args.input, index=False, sheet_name='Sheet1')
except Exception as e:
print(f'[警告] 写入文件时发生了错误:{e}')
return 6
return 0
try: status = main()
except KeyboardInterrupt: status = 145
print(f'[信息] 已发送 {sent} 封;发送失败 {errors} 封;跳过重复项 {warnings}')
print(f'[信息] 总耗时 {str(datetime.now() - date)}')
exit(status)