initial commit

This commit is contained in:
2025-04-03 12:00:06 +02:00
commit a7b06ab24c
2 changed files with 187 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
.vscode/
*.csv

185
邮件批量发送脚本.py Normal file
View File

@@ -0,0 +1,185 @@
import argparse
import time
import csv
from selenium import webdriver
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException, ElementNotInteractableException
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from datetime import datetime
from datetime import timedelta
parser = argparse.ArgumentParser(description="邮件批量发送脚本")
parser.add_argument('datasheet')
parser.add_argument('--column', type=str, nargs='?', default='邮箱')
parser.add_argument('--address', type=str, required=True)
parser.add_argument('--password', type=str, required=True)
parser.add_argument('--encoding', type=str, nargs='?', default='utf-8')
parser.add_argument('--timeout', type=int, nargs='?', default=60)
parser.add_argument('--interval', type=int, nargs='?', default=4)
parser.add_argument('--rate-limit', type=int, nargs='?', default=8.33)
args = parser.parse_args()
date = datetime.now()
sent = 0
errors = 0
def main():
# 读取收件人列表
print(f'[信息] 正在读取数据表:{args.datasheet}')
try:
with open(args.datasheet, 'r', encoding=args.encoding) as file:
rows = csv.DictReader(file)
recipients = [row[args.column] for row in rows]
print(f'[信息] 已读取联系人信息共 {len(recipients)}')
except Exception as e:
print(f'[!!!!] 读取数据表失败:{e}')
return 1
try:
print('[信息] 正在启动 Chrome 自动化实例')
driver = webdriver.Chrome()
print('[信息] 正在载入网页')
driver.get('https://id.ionos.fr/identifier')
except Exception as e:
print(f'[!!!!] 初始化时发生了错误:{e}')
return 2
try:
selectAll = locate(driver, EC.element_to_be_clickable, (By.ID, 'selectAll'))
selectAll.click()
except:
pass
try:
print(f'[信息] 正在登陆 {args.address}')
username = locate(driver, EC.element_to_be_clickable, (By.ID, 'username'))
username.send_keys(args.address)
submit = locate(driver, EC.element_to_be_clickable, (By.ID, 'button--with-loader'))
submit.click()
password = locate(driver, EC.element_to_be_clickable, (By.ID, 'password'))
password.send_keys(args.password)
submit = locate(driver, EC.element_to_be_clickable, (By.ID, 'button--with-loader'))
submit.click()
except Exception as e:
print(f'[!!!!] 登录时发生了错误:{e}')
return 4
try:
drafts = locate(driver, EC.presence_of_element_located, (By.CSS_SELECTOR, "li[data-id='default0/Brouillons']"))
driver.execute_script('arguments[0].click();', drafts)
drafts = locate(driver, EC.presence_of_element_located, (By.CSS_SELECTOR, "button[data-id='default0/Brouillons']"))
driver.execute_script('arguments[0].click();', drafts)
items = locate(driver, EC.presence_of_element_located, (By.CSS_SELECTOR, "ul[aria-label='List view']"))
mail = locate(items, EC.presence_of_element_located, (By.CSS_SELECTOR, "li[data-index='0']"))
mail.click()
subject = locate(driver, EC.presence_of_element_located, (By.CSS_SELECTOR, "h1.subject"))
print(f'[信息] 已读取邮件:{subject.text}')
header = locate(driver, EC.presence_of_element_located, (By.TAG_NAME, 'header'))
sender = header.find_element(By.CSS_SELECTOR, "div.from")
print(f'[信息] 指定发件人:{sender.text[6:].replace('\n', ' ')}')
except Exception as e:
print(f'[!!!!] 读取邮件时发生了错误:{e}')
return 5
rate = 60 / (args.interval + 4)
print(f'[信息] 当前发送速率 {rate} 封/分钟')
print(f'[信息] 预计使用时间 {timedelta(seconds=rate * len(recipients))}')
if rate > args.rate_limit:
print('[警告] 当前发送速率已超出速率限制')
key = input('[????] 是否确定发送?确定 (Y) / 取消 (N): ')
if key == 'y' or key == 'Y':
print('[信息] 已确定发送')
else:
print('[信息] 已取消发送')
exit()
global date
global sent
global errors
date = datetime.now()
for recipient in recipients:
try:
print(f'[信息] 正在发送:{recipient}')
edit = locate(driver, EC.element_to_be_clickable, (By.CSS_SELECTOR, "button[aria-label='Edit copy']"))
edit.click()
# 等待页面加载
locate(driver, EC.invisibility_of_element_located, (By.CSS_SELECTOR, "div.window-blocker.io-ox-busy"))
mails = driver.find_elements(By.CSS_SELECTOR, "div.io-ox-mail-compose-window")
draft = mails[-1:][0]
# 等待页面加载
locate(driver, EC.invisibility_of_element_located, (By.CSS_SELECTOR, "div.window-blocker.io-ox-busy"))
title = locate(draft, EC.element_to_be_clickable, (By.CSS_SELECTOR, "div.floating-header"))
title.click()
to = locate(draft, EC.element_to_be_clickable, (By.CSS_SELECTOR, "input.token-input.tt-input[tabindex='0']"))
to.send_keys(recipient)
send = locate(draft, EC.element_to_be_clickable, (By.CSS_SELECTOR, "button[data-action='send']"))
send.click()
sent += 1
time.sleep(args.interval)
except Exception as e:
print(f'[!!!!] 发生错误:{e}')
return 6
try:
stale = driver.find_element(By.CSS_SELECTOR, "div.io-ox-mail-compose-window")
except:
continue
alert = locate(driver, EC.element_to_be_clickable, (By.CSS_SELECTOR, "div.io-ox-alert.io-ox-alert-error"))
message = alert.text.replace('\n', ' ')
print(f'[警告] 来自网页:{message}')
errors += 1
button = locate(stale, EC.element_to_be_clickable, (By.CSS_SELECTOR, "button[data-action='close']"))
button.click()
modal = locate(driver, EC.element_to_be_clickable, (By.CSS_SELECTOR, "div.modal-footer"))
button = locate(modal, EC.element_to_be_clickable, (By.CSS_SELECTOR, "button[data-action='delete']"))
button.click()
def locate(driver, condition, locator):
while True:
try:
wait = WebDriverWait(driver, timeout=args.timeout)
return wait.until(condition(locator))
except StaleElementReferenceException:
# 如果遇到过期元素,重新尝试查找
continue
except TimeoutException:
raise Exception('操作超时')
try:
status = main()
except KeyboardInterrupt:
print('[信息] 程序中断')
status = 145
print(f'[信息] 已发送 {sent} 封;发送失败 {errors}')
print(f'[信息] 总耗时 {str(datetime.now() - date)}')
exit(status)