256 lines
9.3 KiB
Python
256 lines
9.3 KiB
Python
import argparse
|
|
import pandas
|
|
import time
|
|
|
|
from selenium import webdriver
|
|
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.webdriver.support.wait import WebDriverWait
|
|
from selenium.webdriver.remote.webelement import WebElement
|
|
|
|
from tkinter import Tk, filedialog
|
|
from datetime import datetime, timedelta
|
|
|
|
parser = argparse.ArgumentParser(description="邮件批量发送脚本")
|
|
parser.add_argument('input', nargs='?')
|
|
parser.add_argument('--column-address', type=str, nargs='?', default='邮箱')
|
|
parser.add_argument('--column-code', type=str, nargs='?', default='客户编号')
|
|
parser.add_argument('--column-sent', type=str, nargs='?', default='已发送')
|
|
parser.add_argument('-u', '--url', type=str, nargs='?', default='https://id.ionos.fr/identifier')
|
|
parser.add_argument('-a', '--address', type=str, required=True)
|
|
parser.add_argument('-p', '--password', type=str, required=True)
|
|
parser.add_argument('-t', '--timeout', type=int, nargs='?', default=60)
|
|
parser.add_argument('-i', '--interval', type=int, nargs='?', default=10)
|
|
parser.add_argument('-m', '--max-occurrence', type=int, nargs='?', default=5)
|
|
parser.add_argument('-r', '--retry', type=int, nargs='?', default=3)
|
|
|
|
args = parser.parse_args()
|
|
date = datetime.now()
|
|
|
|
sent = 0
|
|
errors = 0
|
|
warnings = 0
|
|
|
|
def main():
|
|
if not args.input:
|
|
root = Tk()
|
|
root.withdraw()
|
|
args.input = filedialog.askopenfilename(defaultextension='xlsx')
|
|
|
|
print(f'[信息] 正在读取数据:{args.input}')
|
|
try:
|
|
workbook = pandas.read_excel(args.input)
|
|
data = workbook.where(pandas.notnull(workbook), None).to_dict(orient='list')
|
|
recipients = data.get(args.column_address, [])
|
|
codes = data.get(args.column_code, [])
|
|
except Exception as e:
|
|
print(f'[!!!!] 读取数据表失败:{e}')
|
|
return 1
|
|
|
|
limit = len(recipients)
|
|
sents = data.setdefault(args.column_sent, [None] * limit)
|
|
|
|
print(f'[信息] 已读取联系人信息共 {limit} 条')
|
|
if limit == 0: return 0
|
|
|
|
print('[信息] 正在启动 Chrome 自动化实例')
|
|
try:
|
|
opts = webdriver.ChromeOptions()
|
|
opts.add_experimental_option("excludeSwitches", ["enable-logging"])
|
|
driver = webdriver.Chrome(opts)
|
|
except Exception as e:
|
|
print(f'[!!!!] 初始化时发生了错误:{e}')
|
|
return 2
|
|
|
|
def locate(selector, condition=EC.presence_of_element_located, parent=driver) -> WebElement:
|
|
while True:
|
|
try:
|
|
wait = WebDriverWait(parent, timeout=args.timeout)
|
|
return wait.until(condition((By.CSS_SELECTOR, selector)))
|
|
except StaleElementReferenceException:
|
|
# 如果遇到过期元素,重新尝试查找
|
|
continue
|
|
except TimeoutException:
|
|
# 超时错误
|
|
raise Exception('操作超时')
|
|
|
|
def click(selector, condition=EC.element_to_be_clickable):
|
|
element = locate(selector, condition) if isinstance(selector, str) else selector
|
|
counter = lambda: int(element.get_attribute('taximeter') or 0)
|
|
error = False
|
|
|
|
value = counter()
|
|
driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].setAttribute('taximeter', arguments[1] + 1));", element, value)
|
|
|
|
for attempt in range(args.retry):
|
|
try:
|
|
if not error: element.click()
|
|
else: driver.execute_script("arguments[0].click();", element)
|
|
except StaleElementReferenceException:
|
|
break
|
|
except:
|
|
error = True
|
|
continue
|
|
# 检测点击事件
|
|
try:
|
|
WebDriverWait(driver, args.interval).until(lambda _: counter() > value)
|
|
break
|
|
except TimeoutException: continue
|
|
except: break
|
|
|
|
try:
|
|
driver.get(args.url)
|
|
except TimeoutException:
|
|
# 停止加载
|
|
print(f'[警告] 操作超时')
|
|
driver.execute_script("window.stop();")
|
|
|
|
# 接受 cookie
|
|
try: click("#selectAll")
|
|
except: pass
|
|
|
|
try:
|
|
print(f'[信息] 正在登陆 {args.address}')
|
|
username = locate("#username")
|
|
username.send_keys(args.address)
|
|
click("#button--with-loader")
|
|
|
|
password = locate("#password")
|
|
password.send_keys(args.password)
|
|
click("#button--with-loader")
|
|
except Exception as e:
|
|
print(f'[!!!!] 登录时发生了错误:{e}')
|
|
return 4
|
|
|
|
while True:
|
|
try:
|
|
driver.find_element(By.CSS_SELECTOR, "#io-ox-core")
|
|
print(f'[信息] 已登录')
|
|
break
|
|
except:
|
|
time.sleep(args.interval)
|
|
|
|
try:
|
|
# 打开草稿箱
|
|
click("li[data-id='default0/Brouillons']", condition=EC.presence_of_element_located)
|
|
click("button[data-id='default0/Brouillons']", condition=EC.presence_of_element_located)
|
|
|
|
# 打开邮件
|
|
click("ul[aria-label='List view'] li[data-index='0']", condition=EC.presence_of_element_located)
|
|
|
|
subject = locate("h1.subject")
|
|
print(f'[信息] 已读取邮件:{subject.text}')
|
|
|
|
sender = locate("header div.from")
|
|
print(f'[信息] 指定发件人:{sender.text[6:].replace('\n', ' ')}')
|
|
except Exception as e:
|
|
print(f'[!!!!] 读取邮件时发生了错误:{e}')
|
|
return 5
|
|
|
|
rate = 60 / (args.interval + 3)
|
|
print(f'[信息] 当前发送速率 {round(rate, 2)} 封/分钟')
|
|
print(f'[信息] 预计使用时间 {timedelta(minutes=limit / rate)}')
|
|
|
|
if rate > 8.33: print('[警告] 当前发送速率已超出限制 8.33 封/分钟')
|
|
key = input('[????] 是否确定发送?确定 (Y) / 取消 (N): ')
|
|
|
|
if key in ['Y', 'y']:
|
|
print('[信息] 已确定发送')
|
|
else:
|
|
print('[信息] 已取消发送')
|
|
exit()
|
|
|
|
global date
|
|
date = datetime.now()
|
|
|
|
index = 0
|
|
active = True
|
|
occurrences = {}
|
|
|
|
while active and index < limit:
|
|
recipient = recipients[index]
|
|
code = codes[index]
|
|
occurrence = occurrences.setdefault(code, [0])
|
|
|
|
current = index
|
|
index += 1
|
|
|
|
if sents[current] is not None:
|
|
print(f'[信息] 已跳过项目 {recipient}')
|
|
continue
|
|
|
|
if args.max_occurrence > 0 and occurrence[0] > args.max_occurrence:
|
|
print(f'[警告] 收件人 {recipient} 所属组织已重复出现 {occurrence} 次')
|
|
global warnings
|
|
warnings += 1
|
|
continue
|
|
|
|
while active:
|
|
try:
|
|
print(f'[信息] 正在发送:{recipient}')
|
|
click("button[aria-label='Edit copy']")
|
|
|
|
# 等待页面加载
|
|
wait = WebDriverWait(driver, timeout=args.timeout)
|
|
wait.until_not(lambda x: x.find_element(By.CSS_SELECTOR, ".io-ox-busy"))
|
|
|
|
# 点击活动窗口
|
|
click("div.io-ox-mail-compose-window div.floating-header")
|
|
|
|
# 填入收件人
|
|
wrapper = locate("div.io-ox-mail-compose-window div[data-extension-id='to'] > div.mail-input")
|
|
to = locate("input.token-input.tt-input[tabindex='0']", parent=wrapper)
|
|
click(wrapper)
|
|
to.send_keys(recipient)
|
|
|
|
# 发送邮件
|
|
click("div.io-ox-mail-compose-window button[data-action='send']")
|
|
|
|
# 检测页面警告
|
|
try:
|
|
wait = WebDriverWait(driver, timeout=args.interval)
|
|
alert = wait.until(lambda x: x.find_element(By.CSS_SELECTOR, "div.io-ox-alert.io-ox-alert-error"))
|
|
except:
|
|
sents[current] = '✔'
|
|
occurrence[0] += 1
|
|
global sent
|
|
sent += 1
|
|
break
|
|
|
|
message = alert.text.replace('\n', ' ')
|
|
print(f'[警告] 来自网页:{message}')
|
|
|
|
global errors
|
|
errors += 1
|
|
|
|
# 关闭警告
|
|
click("div.io-ox-alert.io-ox-alert-error button[data-action='close']")
|
|
# 关闭过期邮件
|
|
click("div.io-ox-mail-compose-window button[data-action='close']")
|
|
# 删除过期邮件
|
|
click("div.modal-footer button[data-action='delete']")
|
|
except Exception as e:
|
|
print(f'[警告] 发生错误:{e}')
|
|
key = input('[????] 重试 (R) / 跳过 (S) / 取消 (C): ')
|
|
if key in ['R', 'r']: continue
|
|
elif key in ['S', 's']: break
|
|
else: active = False
|
|
|
|
if input('[????] 是否保存到文件?确定 (Y) / 取消 (N): ') in ['Y', 'y']:
|
|
print(f'[信息] 正在写入文件:{args.input}')
|
|
try:
|
|
pandas.DataFrame.from_dict(data).to_excel(args.input, index=False, sheet_name='Sheet1')
|
|
except Exception as e:
|
|
print(f'[警告] 写入文件时发生了错误:{e}')
|
|
return 6
|
|
|
|
return 0
|
|
|
|
try: status = main()
|
|
except KeyboardInterrupt: status = 145
|
|
|
|
print(f'[信息] 已发送 {sent} 封;发送失败 {errors} 封;跳过重复项 {warnings} 个')
|
|
print(f'[信息] 总耗时 {str(datetime.now() - date)}')
|
|
exit(status)
|