update: created main loop & pivot to wxPython

This commit is contained in:
2025-07-31 14:33:18 +08:00
parent 4173ff9b5a
commit 6597ced3cb
2 changed files with 232 additions and 214 deletions

Binary file not shown.

View File

@@ -2,6 +2,7 @@ import unicodedata
import argparse import argparse
import pandas import pandas
import time import time
import wx
from selenium import webdriver from selenium import webdriver
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException from selenium.common.exceptions import StaleElementReferenceException, TimeoutException
@@ -11,20 +12,18 @@ from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.remote.webelement import WebElement from selenium.webdriver.remote.webelement import WebElement
from tkinter import Tk, filedialog
from datetime import datetime, timedelta from datetime import datetime, timedelta
from zoneinfo import ZoneInfo from zoneinfo import ZoneInfo
from nameparser import HumanName from nameparser import HumanName
parser = argparse.ArgumentParser(description="邮件批量发送脚本") parser = argparse.ArgumentParser(description="邮件批量发送脚本")
parser.add_argument('input', nargs='?') parser.add_argument('url', nargs='?', default='https://id.ionos.fr/identifier')
parser.add_argument('--column-address', type=str, nargs='?', default='邮箱') parser.add_argument('--column-address', type=str, nargs='?', default='邮箱')
parser.add_argument('--column-name', type=str, nargs='?', default='主要联系人') parser.add_argument('--column-name', type=str, nargs='?', default='主要联系人')
parser.add_argument('--column-code', type=str, nargs='?', default='客户编号') parser.add_argument('--column-code', type=str, nargs='?', default='客户编号')
parser.add_argument('--column-sent', type=str, nargs='?', default='已发送') parser.add_argument('--column-sent', type=str, nargs='?', default='已发送')
parser.add_argument('-u', '--url', type=str, nargs='?', default='https://id.ionos.fr/identifier') parser.add_argument('-a', '--address', type=str, nargs='?', default='')
parser.add_argument('-a', '--address', type=str, required=True) parser.add_argument('-p', '--password', type=str, nargs='?', default='')
parser.add_argument('-p', '--password', type=str, required=True)
parser.add_argument('-t', '--timeout', type=int, nargs='?', default=60) parser.add_argument('-t', '--timeout', type=int, nargs='?', default=60)
parser.add_argument('-i', '--interval', type=int, nargs='?', default=10) parser.add_argument('-i', '--interval', type=int, nargs='?', default=10)
parser.add_argument('-m', '--max-occurrence', type=int, nargs='?', default=5) parser.add_argument('-m', '--max-occurrence', type=int, nargs='?', default=5)
@@ -77,36 +76,18 @@ greetings = [
] ]
def main(): def main():
if not args.input:
root = Tk()
root.withdraw()
args.input = filedialog.askopenfilename(defaultextension='xlsx')
print(f'[信息] 正在读取数据:{args.input}')
try: try:
workbook = pandas.read_excel(args.input) print('[信息] 程序初始化中...')
data = workbook.where(pandas.notnull(workbook), None).to_dict(orient='list') app = wx.App(None)
recipients = data.get(args.column_address, [])
except Exception as e:
print(f'[!!!!] 读取数据表失败:{e}')
return 1
limit = len(recipients) print('[信息] 正在启动 Chrome 自动化实例')
names = data.get(args.column_name, [None] * limit)
codes = data.get(args.column_code, [None] * limit)
sents = data.setdefault(args.column_sent, [None] * limit)
print(f'[信息] 已读取联系人信息共 {limit}')
if limit == 0: return 0
print('[信息] 正在启动 Chrome 自动化实例')
try:
opts = webdriver.ChromeOptions() opts = webdriver.ChromeOptions()
opts.add_experimental_option("excludeSwitches", ["enable-logging"]) opts.add_experimental_option("excludeSwitches", ["enable-logging"])
driver = webdriver.Chrome(opts) driver = webdriver.Chrome(opts)
driver.set_page_load_timeout(args.timeout)
except Exception as e: except Exception as e:
print(f'[!!!!] 初始化时发生了错误:{e}') print(f'[!!!!] 初始化时发生了错误:{e}')
return 2 return 1
def locate(selector, condition=EC.presence_of_element_located, parent=driver) -> WebElement: def locate(selector, condition=EC.presence_of_element_located, parent=driver) -> WebElement:
for attempt in range(args.retry): for attempt in range(args.retry):
@@ -176,18 +157,21 @@ def main():
try: click("#selectAll") try: click("#selectAll")
except: pass except: pass
try: if args.address and args.password:
print(f'[信息] 正在登陆 {args.address}') try:
username = locate("#username") print(f'[信息] 正在登陆 {args.address}')
username.send_keys(args.address) username = locate("#username")
click("#button--with-loader") username.send_keys(args.address)
click("#button--with-loader")
password = locate("#password") password = locate("#password")
password.send_keys(args.password) password.send_keys(args.password)
click("#button--with-loader") click("#button--with-loader")
except Exception as e: except Exception as e:
print(f'[!!!!] 登录时发生了错误:{e}') print(f'[!!!!] 登录时发生了错误:{e}')
return 4 return 4
else:
print('[信息] 请在页面上输入账户凭据')
while True: while True:
try: try:
@@ -202,183 +186,217 @@ def main():
except: except:
time.sleep(args.interval) time.sleep(args.interval)
try: while True:
# 打开草稿箱
click("li[data-id='default0/Brouillons']", condition=EC.presence_of_element_located)
click("button[data-id='default0/Brouillons']", condition=EC.presence_of_element_located)
# 打开邮件
click("ul[aria-label='List view'] li[data-index='0']", condition=EC.presence_of_element_located)
subject = locate("h1.subject")
print(f'[信息] 已读取邮件:{subject.text}')
sender = locate("header div.from")
print(f'[信息] 指定发件人:{sender.text[6:].replace('\n', ' ')}')
except Exception as e:
print(f'[!!!!] 读取邮件时发生了错误:{e}')
return 5
rate = 60 / (args.interval + 3)
length = list.count(sents, None)
command = None
timezone = ZoneInfo(args.timezone) if args.timezone else None
print(f'[信息] 当前发送速率 {round(rate, 2)} 封/分钟')
if rate > 8.33: print('[警告] 当前发送速率已超出限制 8.33 封/分钟')
print(f'[信息] 预计使用时间 {timedelta(minutes=length / rate)}')
print(f'[信息] 当前时区:{args.timezone or ''}')
print(f'[信息] 已读取可用问候语 {len(greetings)} 条:', end='\n\n')
for index, item in enumerate(greetings):
print(f'\t[{index}] {item.get('locale')}', end=' ')
print('- %s' % (', '.join(filter(None, item.get('registry'))) or item.get('default')))
while command is None:
match input('\n[????] 请选择 (留空取消操作): ').strip():
case keys if not keys:
command = -1
case keys if keys.isdigit():
number = int(keys)
if number < len(greetings): command = number
if command < 0:
print('[信息] 已取消发送')
exit()
global date
date = datetime.now()
index = 0
active = True
occurrences = {}
while active and index < limit:
global warnings
global errors
global sent
attempt = 0
current = index
index += 1
recipient = recipients[current]
name = names[current]
code = codes[current]
mark = sents[current]
occurrence = occurrences.setdefault(code, [0]) if code else [0]
if mark is not None and str(mark).strip():
print(f'[信息] 已跳过项目 {recipient}')
occurrence[0] += 1
continue
if args.max_occurrence > 0 and occurrence[0] >= args.max_occurrence:
print(f'[警告] 收件人 {recipient} 所属组织出现次数已超出限制 {occurrence}')
warnings += 1
continue
while active:
try:
clean = True
attempt += 1
print('[信息] 正在发送:%s (%.2f %%)' % (recipient, current / limit * 100))
click("button[aria-label='Edit copy']")
ready(driver, lambda x: x.find_element(By.CSS_SELECTOR, ".io-ox-busy"))
locate("div.io-ox-mail-compose-window iframe", condition=EC.frame_to_be_available_and_switch_to_it)
if command > 0 and (entry := greetings[command]):
match datetime.now(timezone).hour if timezone else -1:
case hour if 6 <= hour < 12: registry = entry.get('registry')[0]
case hour if 12 <= hour < 18: registry = entry.get('registry')[1]
case hour if 18 <= hour < 21: registry = entry.get('registry')[2]
case _: registry = None
iframe = driver.switch_to.active_element
action = ActionChains(driver)
clean = False
hello = registry or entry.get('default')
if name is not None and (name := str(name).strip()) and not contains_non_latin_alphabet(name):
parts = HumanName(name)
parts.capitalize(force=True)
short = len(parts.first) < 3 or (len(parts.first) < 5 and parts.first.endswith('.'))
hello = ' '.join(filter(None, [hello, parts.title, parts.first, (parts.middle or parts.last) if short else None]))
hello += ','
action.send_keys(hello).perform()
if items := iframe.find_elements(By.XPATH, f'//*[contains(text(), "{hello}")]'):
target = items[0]
clean = target.text == hello
driver.switch_to.default_content()
wrapper = locate("div.io-ox-mail-compose-window div[data-extension-id='to'] > div.mail-input")
to = locate("input.token-input.tt-input[tabindex='0']", parent=wrapper)
# 填入收件人
click(wrapper)
keyin(to, recipient)
if to.get_attribute('value') != recipient:
print(f'[警告] ({attempt}) 检测到收件人地址不正确,正在重试...')
elif not clean:
print(f'[警告] ({attempt}) 检测到邮件内容不正确,正在重试...')
else:
# 发送邮件
click("div.io-ox-mail-compose-window button[data-action='send']")
# 检测页面警告
try:
wait = WebDriverWait(driver, timeout=args.interval)
alert = wait.until(lambda x: x.find_element(By.CSS_SELECTOR, "div.io-ox-alert.io-ox-alert-error"))
sents[current] = ''
message = alert.text.replace('\n', ' ')
print(f'[警告] ({attempt}): {message or '未知错误'}')
# 关闭警告
click("div.io-ox-alert.io-ox-alert-error button[data-action='close']")
except TimeoutException:
sents[current] = '✔️'
occurrence[0] += 1
sent += 1
break
while mails := driver.find_elements(By.CSS_SELECTOR, "div.io-ox-mail-compose-window"):
try:
# 关闭过期邮件
click("button[data-action='close']", parent=mails[0])
# 删除过期邮件
click("div.modal-footer button[data-action='delete']")
except:
continue
if attempt < args.retry:
continue
else:
print('[警告] 已超出最大重试上限')
errors += 1
break
except KeyboardInterrupt:
print('[信息] 程序中断')
active = False
break
except Exception as e:
print(f'[警告] 发生错误:{e}')
key = input('[????] 重试 (r) / 跳过 (s) / 取消 (C): ')
if key in ['R', 'r']: continue
elif key in ['S', 's']: break
else: active = False
if input('[????] 是否保存到文件?确定 (y) / 取消 (N): ') in ['Y', 'y']:
print(f'[信息] 正在写入文件:{args.input}')
try: try:
pandas.DataFrame.from_dict(data).to_excel(args.input, index=False, sheet_name='Sheet1') print('[信息] 请选择数据源')
dialog = wx.FileDialog(None, 'Open', wildcard='*.xlsx', style=wx.FD_OPEN | wx.FD_FILE_MUST_EXIST)
if dialog.ShowModal() == wx.ID_OK:
filepath = dialog.GetPath()
print(f'[信息] 正在读取数据:{filepath}')
else:
print('[警告] 操作取消')
return 0
workbook = pandas.read_excel(filepath)
data = workbook.where(pandas.notnull(workbook), None).to_dict(orient='list')
recipients = data.get(args.column_address, [])
except Exception as e: except Exception as e:
print(f'[警告] 写入文件时发生了错误{e}') print(f'[警告] 读取数据表失败{e}')
return 6 continue
finally:
dialog.Destroy()
limit = len(recipients)
names = data.get(args.column_name, [None] * limit)
codes = data.get(args.column_code, [None] * limit)
sents = data.setdefault(args.column_sent, [None] * limit)
print(f'[信息] 已读取联系人信息共 {limit}')
if limit == 0: continue
try:
# 打开草稿箱
click("li[data-id='default0/Brouillons']", condition=EC.presence_of_element_located)
click("button[data-id='default0/Brouillons']", condition=EC.presence_of_element_located)
# 打开邮件
click("ul[aria-label='List view'] li[data-index='0']", condition=EC.presence_of_element_located)
subject = locate("h1.subject")
print(f'[信息] 已读取邮件:{subject.text}')
sender = locate("header div.from")
print(f'[信息] 指定发件人:{sender.text[6:].replace('\n', ' ')}')
except Exception as e:
print(f'[!!!!] 读取邮件时发生了错误:{e}')
return 5
rate = 60 / (args.interval + 3)
length = list.count(sents, None)
command = None
timezone = ZoneInfo(args.timezone) if args.timezone else None
print(f'[信息] 当前发送速率 {round(rate, 2)} 封/分钟')
if rate > 8.33: print('[警告] 当前发送速率已超出限制 8.33 封/分钟')
print(f'[信息] 预计使用时间 {timedelta(minutes=length / rate)}')
print(f'[信息] 当前时区:{args.timezone or ''}')
print(f'[信息] 已读取可用问候语 {len(greetings)} 条:', end='\n\n')
for index, item in enumerate(greetings):
print(f'\t[{index}] {item.get('locale')}', end=' ')
print('- %s' % (', '.join(filter(None, item.get('registry'))) or item.get('default')))
while command is None:
match input('\n[????] 请选择 (留空取消操作): ').strip():
case keys if not keys:
print('[信息] 操作取消')
return 0
case keys if keys.isdigit():
number = int(keys)
if number < len(greetings): command = number
global date
date = datetime.now()
index = 0
active = True
occurrences = {}
while active and index < limit:
global warnings
global errors
global sent
attempt = 0
current = index
index += 1
recipient = recipients[current]
name = names[current]
code = codes[current]
mark = sents[current]
occurrence = occurrences.setdefault(code, [0]) if code else [0]
if mark is not None and str(mark).strip():
print(f'[信息] 已跳过项目 {recipient}')
occurrence[0] += 1
continue
if args.max_occurrence > 0 and occurrence[0] >= args.max_occurrence:
print(f'[警告] 收件人 {recipient} 所属组织出现次数已超出限制 {occurrence}')
warnings += 1
continue
while active:
try:
clean = True
attempt += 1
print('[信息] 正在发送:%s (%.2f %%)' % (recipient, current / limit * 100))
click("button[aria-label='Edit copy']")
ready(driver, lambda x: x.find_element(By.CSS_SELECTOR, ".io-ox-busy"))
locate("div.io-ox-mail-compose-window iframe", condition=EC.frame_to_be_available_and_switch_to_it)
if command > 0 and (entry := greetings[command]):
match datetime.now(timezone).hour if timezone else -1:
case hour if 6 <= hour < 12: registry = entry.get('registry')[0]
case hour if 12 <= hour < 18: registry = entry.get('registry')[1]
case hour if 18 <= hour < 21: registry = entry.get('registry')[2]
case _: registry = None
iframe = driver.switch_to.active_element
action = ActionChains(driver)
clean = False
hello = registry or entry.get('default')
if name is not None and (name := str(name).strip()) and not contains_non_latin_alphabet(name):
parts = HumanName(name)
parts.capitalize(force=True)
short = len(parts.first) < 3 or (len(parts.first) < 5 and parts.first.endswith('.'))
hello = ' '.join(filter(None, [hello, parts.title, parts.first, (parts.middle or parts.last) if short else None]))
hello += ','
action.send_keys(hello).perform()
if items := iframe.find_elements(By.XPATH, f'//*[contains(text(), "{hello}")]'):
target = items[0]
clean = target.text == hello
driver.switch_to.default_content()
wrapper = locate("div.io-ox-mail-compose-window div[data-extension-id='to'] > div.mail-input")
to = locate("input.token-input.tt-input[tabindex='0']", parent=wrapper)
# 填入收件人
click(wrapper)
keyin(to, recipient)
if to.get_attribute('value') != recipient:
print(f'[警告] ({attempt}) 检测到收件人地址不正确,正在重试...')
elif not clean:
print(f'[警告] ({attempt}) 检测到邮件内容不正确,正在重试...')
else:
# 发送邮件
click("div.io-ox-mail-compose-window button[data-action='send']")
# 检测页面警告
try:
wait = WebDriverWait(driver, timeout=args.interval)
alert = wait.until(lambda x: x.find_element(By.CSS_SELECTOR, "div.io-ox-alert.io-ox-alert-error"))
sents[current] = ''
message = alert.text.replace('\n', ' ')
print(f'[警告] ({attempt}): {message or '未知错误'}')
# 关闭警告
click("div.io-ox-alert.io-ox-alert-error button[data-action='close']")
except TimeoutException:
sents[current] = '✔️'
occurrence[0] += 1
sent += 1
break
while mails := driver.find_elements(By.CSS_SELECTOR, "div.io-ox-mail-compose-window"):
try:
# 关闭过期邮件
click("button[data-action='close']", parent=mails[0])
# 删除过期邮件
click("div.modal-footer button[data-action='delete']")
except:
continue
if attempt < args.retry:
continue
else:
print('[警告] 已超出最大重试上限')
errors += 1
break
except KeyboardInterrupt:
print('[信息] 程序中断')
active = False
break
except Exception as e:
print(f'[警告] 发生错误:{e}')
key = input('[????] 重试 (r) / 跳过 (s) / 取消 (C): ')
if key in ['R', 'r']: continue
elif key in ['S', 's']: break
else: active = False
progress = index / limit * 100
print('[信息] 当前进度:%.2f %%' % progress)
key = input('[????] 继续运行 (C) / 保存并退出 (s) / 不保存退出 (w): ')
if key in ['W', 'w']: return 0
elif key in ['S', 's']: break
else: continue
print(f'[信息] 正在写入文件:{filepath}')
try:
pandas.DataFrame.from_dict(data).to_excel(filepath, index=False, sheet_name='Sheet1')
except Exception as e:
print(f'[警告] 写入文件时发生了错误:{e}')
return 6
return 0 return 0