218 lines
7.7 KiB
Python
218 lines
7.7 KiB
Python
import argparse
|
|
import time
|
|
import csv
|
|
|
|
from selenium import webdriver
|
|
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.webdriver.support.wait import WebDriverWait
|
|
|
|
from datetime import datetime
|
|
from datetime import timedelta
|
|
|
|
parser = argparse.ArgumentParser(description="邮件批量发送脚本")
|
|
parser.add_argument('datasheet')
|
|
parser.add_argument('--column-address', type=str, nargs='?', default='邮箱')
|
|
parser.add_argument('--column-code', type=str, nargs='?', default='客户编号')
|
|
parser.add_argument('--address', type=str, required=True)
|
|
parser.add_argument('--password', type=str, required=True)
|
|
parser.add_argument('--encoding', type=str, nargs='?', default='utf-8')
|
|
parser.add_argument('--timeout', type=int, nargs='?', default=60)
|
|
parser.add_argument('--interval', type=int, nargs='?', default=5)
|
|
parser.add_argument('--rate-limit', type=int, nargs='?', default=8.33)
|
|
parser.add_argument('--max-occurrence', type=int, nargs='?', default=0)
|
|
|
|
args = parser.parse_args()
|
|
date = datetime.now()
|
|
|
|
sent = 0
|
|
errors = 0
|
|
warnings = 0
|
|
|
|
def main():
|
|
# 读取收件人列表
|
|
print(f'[信息] 正在读取数据表:{args.datasheet}')
|
|
try:
|
|
with open(args.datasheet, 'r', encoding=args.encoding) as file:
|
|
rows = csv.DictReader(file)
|
|
data = list(rows)
|
|
|
|
rate = 60 / (args.interval + 3)
|
|
limit = len(data)
|
|
print(f'[信息] 已读取联系人信息共 {limit} 条')
|
|
except Exception as e:
|
|
print(f'[!!!!] 读取数据表失败:{e}')
|
|
return 1
|
|
|
|
try:
|
|
print('[信息] 正在启动 Chrome 自动化实例')
|
|
driver = webdriver.Chrome()
|
|
|
|
print('[信息] 正在载入网页')
|
|
driver.get('https://id.ionos.fr/identifier')
|
|
except Exception as e:
|
|
print(f'[!!!!] 初始化时发生了错误:{e}')
|
|
return 2
|
|
|
|
try:
|
|
selectAll = locate(driver, EC.element_to_be_clickable, (By.ID, 'selectAll'))
|
|
selectAll.click()
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
print(f'[信息] 正在登陆 {args.address}')
|
|
username = locate(driver, EC.element_to_be_clickable, (By.ID, 'username'))
|
|
username.send_keys(args.address)
|
|
|
|
submit = locate(driver, EC.element_to_be_clickable, (By.ID, 'button--with-loader'))
|
|
submit.click()
|
|
|
|
password = locate(driver, EC.element_to_be_clickable, (By.ID, 'password'))
|
|
password.send_keys(args.password)
|
|
|
|
submit = locate(driver, EC.element_to_be_clickable, (By.ID, 'button--with-loader'))
|
|
submit.click()
|
|
except Exception as e:
|
|
print(f'[!!!!] 登录时发生了错误:{e}')
|
|
return 4
|
|
|
|
try:
|
|
drafts = locate(driver, EC.presence_of_element_located, (By.CSS_SELECTOR, "li[data-id='default0/Brouillons']"))
|
|
driver.execute_script('arguments[0].click();', drafts)
|
|
|
|
drafts = locate(driver, EC.presence_of_element_located, (By.CSS_SELECTOR, "button[data-id='default0/Brouillons']"))
|
|
driver.execute_script('arguments[0].click();', drafts)
|
|
|
|
items = locate(driver, EC.presence_of_element_located, (By.CSS_SELECTOR, "ul[aria-label='List view']"))
|
|
mail = locate(items, EC.presence_of_element_located, (By.CSS_SELECTOR, "li[data-index='0']"))
|
|
mail.click()
|
|
|
|
subject = locate(driver, EC.presence_of_element_located, (By.CSS_SELECTOR, "h1.subject"))
|
|
print(f'[信息] 已读取邮件:{subject.text}')
|
|
|
|
header = locate(driver, EC.presence_of_element_located, (By.TAG_NAME, 'header'))
|
|
sender = header.find_element(By.CSS_SELECTOR, "div.from")
|
|
print(f'[信息] 指定发件人:{sender.text[6:].replace('\n', ' ')}')
|
|
except Exception as e:
|
|
print(f'[!!!!] 读取邮件时发生了错误:{e}')
|
|
return 5
|
|
|
|
print(f'[信息] 当前发送速率 {round(rate, 2)} 封/分钟')
|
|
print(f'[信息] 预计使用时间 {timedelta(minutes=limit / rate)}')
|
|
|
|
if rate > args.rate_limit:
|
|
print(f'[警告] 已设置速率限制 {round(args.rate_limit, 2)}')
|
|
print('[警告] 当前发送速率已超出限制')
|
|
|
|
key = input('[????] 是否确定发送?确定 (Y) / 取消 (N): ')
|
|
|
|
if key == 'y' or key == 'Y':
|
|
print('[信息] 已确定发送')
|
|
else:
|
|
print('[信息] 已取消发送')
|
|
exit()
|
|
|
|
global date
|
|
date = datetime.now()
|
|
|
|
current = None
|
|
occurrence = 0
|
|
|
|
for row in data:
|
|
recipient = row.get(args.column_address)
|
|
code = row.get(args.column_code)
|
|
|
|
if code == current:
|
|
occurrence += 1
|
|
else:
|
|
current = code
|
|
occurrence = 1
|
|
|
|
if args.max_occurrence > 0 and occurrence > args.max_occurrence:
|
|
print(f'[警告] 收件人 {recipient} 所属组织已重复出现 {occurrence} 次')
|
|
global warnings
|
|
warnings += 1
|
|
continue
|
|
|
|
try:
|
|
print(f'[信息] 正在发送:{recipient}')
|
|
|
|
edit = locate(driver, EC.element_to_be_clickable, (By.CSS_SELECTOR, "button[aria-label='Edit copy']"))
|
|
edit.click()
|
|
|
|
draft = locate(driver, EC.element_to_be_clickable, (By.CSS_SELECTOR, "div.io-ox-mail-compose-window"))
|
|
title = locate(draft, EC.element_to_be_clickable, (By.CSS_SELECTOR, "div.floating-header"))
|
|
|
|
# 等待页面加载
|
|
locate(driver, EC.invisibility_of_element_located, (By.CSS_SELECTOR, ".io-ox-busy"))
|
|
time.sleep(1)
|
|
title.click()
|
|
|
|
to = draft.find_element(By.CSS_SELECTOR, "input.token-input.tt-input[tabindex='0']")
|
|
to.send_keys(recipient)
|
|
|
|
send = draft.find_element(By.CSS_SELECTOR, "button[data-action='send']")
|
|
send.click()
|
|
|
|
global sent
|
|
sent += 1
|
|
except Exception as e:
|
|
print(f'[!!!!] 发生错误:{e}')
|
|
return 6
|
|
|
|
# 等待邮件发送
|
|
time.sleep(args.interval)
|
|
|
|
# 检测页面警告
|
|
try: alert = driver.find_element(By.CSS_SELECTOR, "div.io-ox-alert.io-ox-alert-error")
|
|
except: continue
|
|
|
|
message = alert.text.replace('\n', ' ')
|
|
print(f'[警告] 来自网页:{message}')
|
|
|
|
global errors
|
|
errors += 1
|
|
|
|
# 发送失败不计入重复项
|
|
if occurrence > 1: occurrence -= 1
|
|
|
|
# 关闭警告
|
|
button = locate(alert, EC.element_to_be_clickable, (By.CSS_SELECTOR, "button[data-action='close']"))
|
|
button.click()
|
|
|
|
# 获取页面上的过期邮件
|
|
stale = driver.find_element(By.CSS_SELECTOR, "div.io-ox-mail-compose-window")
|
|
|
|
# 关闭过期邮件
|
|
button = locate(stale, EC.element_to_be_clickable, (By.CSS_SELECTOR, "button[data-action='close']"))
|
|
button.click()
|
|
|
|
# 删除过期邮件
|
|
modal = locate(driver, EC.element_to_be_clickable, (By.CSS_SELECTOR, "div.modal-footer"))
|
|
button = locate(modal, EC.element_to_be_clickable, (By.CSS_SELECTOR, "button[data-action='delete']"))
|
|
button.click()
|
|
|
|
def locate(driver, condition, locator):
|
|
while True:
|
|
try:
|
|
wait = WebDriverWait(driver, timeout=args.timeout)
|
|
return wait.until(condition(locator))
|
|
except StaleElementReferenceException:
|
|
# 如果遇到过期元素,重新尝试查找
|
|
continue
|
|
except TimeoutException:
|
|
# 超时错误
|
|
raise Exception('操作超时')
|
|
|
|
try:
|
|
status = main()
|
|
except KeyboardInterrupt:
|
|
print('[信息] 程序中断')
|
|
status = 145
|
|
|
|
print(f'[信息] 已发送 {sent} 封;发送失败 {errors} 封;跳过重复项 {warnings} 个')
|
|
print(f'[信息] 总耗时 {str(datetime.now() - date)}')
|
|
exit(status)
|