549 lines
22 KiB
Python
549 lines
22 KiB
Python
import unicodedata
|
|
import traceback
|
|
import argparse
|
|
import logging
|
|
import pandas
|
|
import json
|
|
import time
|
|
import re
|
|
|
|
import trio
|
|
import trio_websocket as ws
|
|
|
|
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException
|
|
from selenium.webdriver import Chrome, ChromeOptions
|
|
from selenium.webdriver.common.action_chains import ActionChains
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.common.keys import Keys
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.webdriver.support.wait import WebDriverWait
|
|
from selenium.webdriver.remote.webelement import WebElement
|
|
from selenium.webdriver.remote.webdriver import WebDriver
|
|
|
|
from io import BytesIO
|
|
from enum import Enum
|
|
from queue import Queue, Empty
|
|
from wakepy import keep
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
from zoneinfo import ZoneInfo
|
|
from threading import Thread
|
|
from nameparser import HumanName
|
|
|
|
parser = argparse.ArgumentParser(description="邮件批量发送脚本")
|
|
parser.add_argument('url', nargs='?', default='https://id.ionos.fr/identifier')
|
|
parser.add_argument('--column-address', type=str, nargs='?', default='邮箱')
|
|
parser.add_argument('--column-name', type=str, nargs='?', default='主要联系人')
|
|
parser.add_argument('--column-code', type=str, nargs='?', default='客户编号')
|
|
parser.add_argument('--column-pays', type=str, nargs='?', default='国家地区')
|
|
parser.add_argument('--column-sent', type=str, nargs='?', default='已发送')
|
|
parser.add_argument('-a', '--address', type=str, nargs='?')
|
|
parser.add_argument('-p', '--password', type=str, nargs='?')
|
|
parser.add_argument('-t', '--timeout', type=int, nargs='?', default=60)
|
|
parser.add_argument('-i', '--interval', type=int, nargs='?', default=10)
|
|
parser.add_argument('-r', '--retry', type=int, nargs='?', default=3)
|
|
|
|
args = parser.parse_args()
|
|
inbox, outbox = Queue(), Queue()
|
|
connection = None
|
|
socket = None
|
|
server = None
|
|
|
|
sent = 0
|
|
errors = 0
|
|
warnings = 0
|
|
|
|
class Greetings:
|
|
def __init__(self, locale: str, timezone: str, default: str, morning=None, afternoon=None, evening=None, predicate=None):
|
|
self.locale = locale
|
|
self.timezone = timezone
|
|
self.default = default
|
|
self.morning = morning
|
|
self.afternoon = afternoon
|
|
self.evening = evening
|
|
self.predicate = predicate
|
|
|
|
@staticmethod
|
|
def presets():
|
|
return [
|
|
Greetings("en", "Europe/London", "Hello" , "Good morning", "Good afternoon" , "Good evening", None),
|
|
Greetings("fr", "Europe/Paris" , "Bonjour", None , "Bon après-midi" , "Bonsoir" , ['法国', '比利时']),
|
|
Greetings("de", "Europe/Berlin", "Hallo" , "Guten Morgen", "Guten Tag" , "Guten Abend" , ['德国', '奥地利', '瑞士']),
|
|
Greetings("it", "Europe/Rome" , "Ciao" , "Buongiorno" , "Buon pomeriggio", "Buonasera" , ['意大利']),
|
|
Greetings("es", "Europe/Madrid", "Hola" , "Buenos días" , "Buenas tardes" , None , ['西班牙']),
|
|
Greetings("pt", "Europe/Lisbon", "Olá" , "Bom dia" , "Boa tarde" , None , ['葡萄牙']),
|
|
]
|
|
|
|
class Command:
|
|
def __init__(self, name: str, *args):
|
|
self.name = name
|
|
self.args = args
|
|
|
|
def __str__(self):
|
|
pack = vars(self)
|
|
return json.dumps(pack)
|
|
|
|
def tell(message, exception=None, level=2):
|
|
message = ': '.join(map(lambda x: str(x), filter(None, [message, exception])))
|
|
if isinstance(exception, Exception): message += '\n' + ''.join(traceback.format_exception(exception))
|
|
|
|
if not outbox.is_shutdown:
|
|
outbox.put(Command('tell', message, level))
|
|
match level:
|
|
case 2: logging.info(message)
|
|
case 1: logging.warning(message)
|
|
case 0: logging.error(message)
|
|
|
|
def main(driver: WebDriver):
|
|
try:
|
|
html = str(Path('index.html').resolve())
|
|
driver.get(html)
|
|
host, port = socket.getsockname()
|
|
parameters = vars(args)
|
|
locales = json.dumps(Greetings.presets(), default=lambda o: o.__dict__)
|
|
driver.execute_script(f"main(...arguments);", f'ws://{host}:{port}', parameters, locales)
|
|
except Exception as e:
|
|
tell('载入初始页面时发生错误', e, level=0)
|
|
return 1
|
|
|
|
def locate(selector, condition=EC.presence_of_element_located, parent=driver) -> WebElement:
|
|
for attempt in range(parameters.get('retry')):
|
|
try:
|
|
wait = WebDriverWait(parent, timeout=parameters.get('timeout'))
|
|
return wait.until(condition((By.CSS_SELECTOR, selector)))
|
|
except StaleElementReferenceException:
|
|
# 如果遇到过期元素,重新尝试查找
|
|
continue
|
|
except TimeoutException:
|
|
# 超时错误
|
|
raise Exception('操作超时')
|
|
raise Exception(f'无法定位元素: {selector}')
|
|
|
|
def click(selector, condition=EC.element_to_be_clickable, parent=driver):
|
|
element = locate(selector, condition, parent) if isinstance(selector, str) else selector
|
|
counter = lambda: int(element.get_attribute('taximeter') or 0)
|
|
error = False
|
|
|
|
value = counter()
|
|
driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].setAttribute('taximeter', arguments[1] + 1));", element, value)
|
|
|
|
for attempt in range(parameters.get("retry")):
|
|
try:
|
|
if not error: element.click()
|
|
else: driver.execute_script("arguments[0].click();", element)
|
|
except StaleElementReferenceException:
|
|
break
|
|
except:
|
|
error = True
|
|
continue
|
|
# 检测点击事件
|
|
try:
|
|
WebDriverWait(driver, parameters.get("interval")).until(lambda _: counter() > value)
|
|
break
|
|
except TimeoutException: continue
|
|
except: break
|
|
|
|
def ready(driver, predicate):
|
|
try:
|
|
wait = WebDriverWait(driver, timeout=parameters.get('timeout'))
|
|
wait.until(predicate, '操作超时')
|
|
except:
|
|
return True
|
|
wait = WebDriverWait(driver, timeout=parameters.get('timeout'))
|
|
wait.until_not(predicate, '操作超时')
|
|
return True
|
|
|
|
def contains_non_latin_alphabet(string: str):
|
|
for char in string:
|
|
if char.isdigit() or (unicodedata.category(char).startswith('L') and not unicodedata.name(char, '').startswith('LATIN')):
|
|
return True
|
|
return False
|
|
|
|
try:
|
|
driver.switch_to.new_window('tab')
|
|
driver.set_page_load_timeout(parameters.get('timeout'))
|
|
driver.get(parameters.get('url'))
|
|
except TimeoutException:
|
|
# 停止加载
|
|
tell('操作超时', level=1)
|
|
driver.execute_script("window.stop();")
|
|
|
|
# 接受 cookie
|
|
try: click("#selectAll")
|
|
except: pass
|
|
|
|
if (address := parameters.get('address')) and (pw := parameters.get('password')):
|
|
try:
|
|
username = locate("#username")
|
|
username.send_keys(address)
|
|
click("#button--with-loader")
|
|
|
|
password = locate("#password")
|
|
password.send_keys(pw)
|
|
click("#button--with-loader")
|
|
except Exception as e:
|
|
tell('登录时发生了错误', e, level=0)
|
|
return 4
|
|
|
|
while not time.sleep(1):
|
|
try:
|
|
if not driver.find_element(By.CSS_SELECTOR, "#background-loader").is_displayed():
|
|
tell('正在登陆')
|
|
click("#io-ox-topbar-account-dropdown-icon button.dropdown-toggle")
|
|
address = locate("#topbar-account-dropdown .mail").get_attribute('innerText')
|
|
parts = HumanName(address.split('@', 2)[0])
|
|
parts.capitalize(force=True)
|
|
outbox.put(Command('setAddress', parts.full_name, address))
|
|
tell(f'成功登录 {parts.full_name} ({address})')
|
|
break
|
|
except: continue
|
|
|
|
try:
|
|
# 打开草稿箱
|
|
click("li[data-id='default0/Brouillons']", condition=EC.presence_of_element_located)
|
|
click("button[data-id='default0/Brouillons']", condition=EC.presence_of_element_located)
|
|
# 打开邮件
|
|
click("ul[aria-label='List view'] li[data-index='0']", condition=EC.presence_of_element_located)
|
|
except Exception as e:
|
|
tell('打开草稿邮件时发生错误', e, level=1)
|
|
pass
|
|
|
|
class Faillable(Exception):
|
|
def __init__(self, request: str):
|
|
self.request = request
|
|
super().__init__(self.request)
|
|
|
|
class Status(Enum):
|
|
ACTIVE = 0
|
|
INACTIVE = 1
|
|
TERMINATED = 3
|
|
|
|
def isalive(self): return self != Status.TERMINATED
|
|
def isactive(self): return self == Status.ACTIVE
|
|
|
|
def get_subject():
|
|
try:
|
|
element = driver.find_element(By.CSS_SELECTOR, "h1.subject")
|
|
subject = element.text.strip()
|
|
return subject
|
|
except:
|
|
return None
|
|
|
|
def get_address():
|
|
try:
|
|
element = driver.find_element(By.CSS_SELECTOR, "header div.from")
|
|
address = re.search(r'[^<\s]+@[^>\s]+', element.text[6:])[0]
|
|
return address
|
|
except:
|
|
return None
|
|
|
|
def get_request():
|
|
try: request = inbox.get(block=False)
|
|
except Empty: return None
|
|
|
|
if request == 'ONHOLD':
|
|
outbox.put(Command('setStatus', 'ONHOLD'))
|
|
request = inbox.get()
|
|
if request == 'RESUME':
|
|
outbox.put(Command('setStatus', 'RESUME'))
|
|
return None
|
|
if request in ['BYPASS', 'CANCEL']:
|
|
outbox.put(Command('setStatus', request))
|
|
raise Faillable(request)
|
|
|
|
return request
|
|
|
|
while True:
|
|
try:
|
|
driver.switch_to.window(driver.window_handles[0])
|
|
filename = str(inbox.get())
|
|
buffer = BytesIO(inbox.get())
|
|
|
|
workbook = pandas.read_excel(buffer, sheet_name=0)
|
|
frame = workbook.where(pandas.notnull(workbook), None)
|
|
limit = int(frame.last_valid_index())
|
|
|
|
driver.switch_to.window(driver.window_handles[1])
|
|
columns = { column: frame[column].unique() for column in frame.columns }
|
|
for k, v in columns.items(): columns[k] = { str(unique): len(frame[frame[k] == unique]) for unique in v if unique is not None } if len(v) <= 200 else {}
|
|
outbox.put(Command('setMetadata', limit, columns))
|
|
|
|
while not time.sleep(1):
|
|
request = get_request()
|
|
subject = get_subject()
|
|
|
|
outbox.put(Command('setSubject', subject))
|
|
if request is not None:
|
|
parameters = dict(json.loads(request))
|
|
break
|
|
except Faillable:
|
|
continue
|
|
except Exception as e:
|
|
tell('读取数据时发生错误', e, level=0)
|
|
return 5
|
|
|
|
if parameters.get('slice'):
|
|
if subcategory := parameters.get('subcategory'):
|
|
column = parameters.get('column_pays')
|
|
frame = frame[frame[column].isin(subcategory)]
|
|
|
|
offset = parameters.get('offset')
|
|
size = parameters.get('chunk_size')
|
|
start = size * offset
|
|
end = start + size
|
|
frame = frame.iloc[start:end]
|
|
|
|
data = frame.to_dict(orient='list')
|
|
recipients = data.get(parameters.get('column_address'), [])
|
|
limit = len(recipients)
|
|
|
|
names = data.get(parameters.get('column_name'), [None] * limit)
|
|
codes = data.get(parameters.get('column_code'), [None] * limit)
|
|
sents = data.setdefault(parameters.get('column_sent'), [None] * limit)
|
|
|
|
rate = 60 / (parameters.get('interval') + 3)
|
|
length = list.count(sents, None)
|
|
|
|
tell(f'已读取邮件:{subject}')
|
|
tell(f'指定发件人:{address}')
|
|
tell(f'已读取联系人信息共 {limit} 条')
|
|
tell(f'预计发送数量 {length}')
|
|
|
|
tell(f'当前发送速率 {round(rate, 2)} 封/分钟')
|
|
if rate > 8.33: tell('当前发送速率已超出限制 8.33 封/分钟', level=1)
|
|
|
|
tell(f'预计使用时间 {timedelta(minutes=length / rate)}')
|
|
tell(f'已设定允许重试次数:{parameters.get('retry')}')
|
|
tell(f'已设定最大重复次数:{parameters.get('max_occurrence') or '无限制'}')
|
|
|
|
locale = parameters.get('locale')
|
|
greetings = [item for item in Greetings.presets() if item.locale == locale][0]
|
|
timezone = ZoneInfo(greetings.timezone)
|
|
tell(f'当前时区:{greetings.timezone}')
|
|
tell(f'当前语言:{greetings.locale.upper()}')
|
|
|
|
index = 0
|
|
status = Status.ACTIVE
|
|
occurrences = {}
|
|
|
|
while status.isactive() and index < limit:
|
|
global warnings
|
|
global errors
|
|
global sent
|
|
|
|
attempt = 0
|
|
current = index
|
|
index += 1
|
|
|
|
recipient = str(recipients[current]).strip()
|
|
name = names[current]
|
|
code = codes[current]
|
|
mark = sents[current]
|
|
|
|
occurrence = occurrences.setdefault(code, [0]) if code else [0]
|
|
outbox.put(Command('setProgress', index, name, recipient))
|
|
|
|
if mark is not None and str(mark).strip():
|
|
tell(f'已跳过项目 {recipient}')
|
|
occurrence[0] += 1
|
|
continue
|
|
|
|
if (maximum := parameters.get('max_occurrence')) and occurrence[0] >= maximum:
|
|
tell(f'收件人 {recipient} 所属组织出现次数已超出限制 {occurrence}', level=1)
|
|
warnings += 1
|
|
continue
|
|
|
|
while status.isactive():
|
|
try:
|
|
clean = True
|
|
attempt += 1
|
|
request = get_request()
|
|
print('[信息] 正在发送:%s (%.2f %%)' % (recipient, current / limit * 100))
|
|
|
|
if (target := get_address()) != address:
|
|
raise Exception(f'邮件发件地址与设定不一致\n>> {address}\n>> {target}')
|
|
if (target := get_subject()) != subject:
|
|
raise Exception(f'邮件主题与设定不一致\n>> {subject}\n>> {target}')
|
|
|
|
click("button[aria-label='Edit copy']")
|
|
ready(driver, lambda x: x.find_element(By.CSS_SELECTOR, ".io-ox-busy"))
|
|
locate("div.io-ox-mail-compose-window iframe", condition=EC.frame_to_be_available_and_switch_to_it)
|
|
|
|
if parameters.get('greet') and greetings and timezone:
|
|
match datetime.now(timezone).hour:
|
|
case hour if 6 <= hour < 12: hello = greetings.morning
|
|
case hour if 12 <= hour < 18: hello = greetings.afternoon
|
|
case hour if 18 <= hour < 21: hello = greetings.evening
|
|
case _: hello = None
|
|
|
|
iframe = driver.switch_to.active_element
|
|
action = ActionChains(driver)
|
|
clean = False
|
|
hello = hello or greetings.default
|
|
|
|
if name is not None and (name := str(name).strip()) and not contains_non_latin_alphabet(name):
|
|
parts = HumanName(name)
|
|
parts.capitalize(force=True)
|
|
short = len(parts.first) < 3 or (len(parts.first) < 5 and parts.first.endswith('.'))
|
|
hello = ' '.join(filter(None, [hello, parts.title, parts.first, (parts.middle or parts.last) if short else None]))
|
|
|
|
hello += ','
|
|
action.send_keys(hello).perform()
|
|
|
|
if items := iframe.find_elements(By.XPATH, f'//*[contains(text(), "{hello}")]'):
|
|
target = items[0]
|
|
clean = target.text == hello
|
|
|
|
driver.switch_to.default_content()
|
|
wrapper = locate("div.io-ox-mail-compose-window div[data-extension-id='to'] > div.mail-input")
|
|
to = locate("input.token-input.tt-input[tabindex='0']", parent=wrapper)
|
|
|
|
# 填入收件人
|
|
click(wrapper)
|
|
to.send_keys(recipient + Keys.ENTER)
|
|
|
|
token = locate("div.io-ox-mail-compose-window .mail-input .tokenfield .token")
|
|
target = token.get_attribute('innerText').strip()
|
|
|
|
if target != recipient:
|
|
tell(f'收件人地址不一致 ({attempt})', target, level=1)
|
|
elif not clean:
|
|
tell(f'邮件内容不正确 ({attempt})', level=1)
|
|
else:
|
|
# 发送邮件
|
|
request = get_request()
|
|
click("div.io-ox-mail-compose-window button[data-action='send']")
|
|
# 检测页面警告
|
|
try:
|
|
wait = WebDriverWait(driver, timeout=parameters.get('interval'))
|
|
alert = wait.until(lambda x: x.find_element(By.CSS_SELECTOR, "div.io-ox-alert.io-ox-alert-error"))
|
|
|
|
sents[current] = '❌'
|
|
message = alert.text.replace('\n', ' ')
|
|
tell(f'邮件系统错误 ({attempt})', message or None, level=1)
|
|
|
|
# 关闭警告
|
|
click("div.io-ox-alert.io-ox-alert-error button[data-action='close']")
|
|
except TimeoutException:
|
|
sents[current] = '✔️'
|
|
occurrence[0] += 1
|
|
sent += 1
|
|
break
|
|
|
|
while mails := driver.find_elements(By.CSS_SELECTOR, "div.io-ox-mail-compose-window"):
|
|
try:
|
|
click("button[data-action='close']", parent=mails[0])
|
|
click("div.modal-footer button[data-action='delete']")
|
|
except Exception as e:
|
|
tell("关闭邮件时发生了错误", e, level=1)
|
|
break
|
|
|
|
if attempt < parameters.get('retry'):
|
|
continue
|
|
else:
|
|
tell('已超出最大重试上限', level=1)
|
|
errors += 1
|
|
break
|
|
except Faillable as o:
|
|
request = o.request
|
|
except KeyboardInterrupt:
|
|
tell('程序中断', level=1)
|
|
status = Status.TERMINATED
|
|
break
|
|
except Exception as e:
|
|
tell(f'发生错误 ({attempt})', e, level=0)
|
|
outbox.put(Command('setStatus', 'FAILED'))
|
|
request = inbox.get()
|
|
|
|
if request == 'BYPASS':
|
|
outbox.put(Command('setStatus', 'BYPASS'))
|
|
break
|
|
if request == 'RESUME':
|
|
outbox.put(Command('setStatus', 'RESUME'))
|
|
continue
|
|
if request == 'CANCEL':
|
|
outbox.put(Command('setStatus', 'CANCEL'))
|
|
status = Status.INACTIVE
|
|
|
|
progress = index / limit * 100
|
|
print('[信息] 当前进度:%.2f %%' % progress)
|
|
|
|
if parameters.get('save'):
|
|
try:
|
|
tell(f'正在写入文件:{filename}')
|
|
pandas.DataFrame.from_dict(data).to_excel(filename, index=False, sheet_name='Sheet1')
|
|
except Exception as e:
|
|
tell('写入文件时发生了错误', e, level=0)
|
|
|
|
if status.isalive(): outbox.put(Command('setStatus', 'FINISH'))
|
|
else: break
|
|
|
|
return 0
|
|
|
|
async def handler(request: ws.WebSocketRequest):
|
|
global connection
|
|
websocket = await request.accept()
|
|
|
|
if connection is None:
|
|
connection = websocket
|
|
else:
|
|
await websocket.aclose(code=1000, reason="Server allows only one connection")
|
|
return
|
|
|
|
async def receiver():
|
|
while True:
|
|
try:
|
|
message = await connection.get_message()
|
|
inbox.put(message)
|
|
except Exception as e:
|
|
tell('Receiver', e, level=0)
|
|
break
|
|
|
|
async def sender():
|
|
while True:
|
|
try:
|
|
message = await trio.to_thread.run_sync(outbox.get)
|
|
await connection.send_message(str(message))
|
|
except Exception as e:
|
|
tell('Sender', e, level=0)
|
|
break
|
|
|
|
async with trio.open_nursery() as nursery:
|
|
nursery.start_soon(receiver)
|
|
nursery.start_soon(sender)
|
|
|
|
outbox.shutdown(immediate=True)
|
|
inbox.shutdown(immediate=True)
|
|
|
|
async def backend(listen='127.0.0.1', port=0):
|
|
global server, socket
|
|
listeners = await trio.open_tcp_listeners(port, host=listen)
|
|
server = ws.WebSocketServer(handler, listeners, max_message_size=125_000_000)
|
|
socket = listeners[0].socket
|
|
await server.run()
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
logging.basicConfig(level=logging.INFO, format="[%(asctime)s] [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M")
|
|
tell('正在初始化')
|
|
thread = Thread(target=lambda: trio.run(backend), daemon=True)
|
|
thread.start()
|
|
|
|
tell('正在启动 Chrome 自动化实例')
|
|
with keep.presenting():
|
|
opts = ChromeOptions()
|
|
opts.add_experimental_option("excludeSwitches", ["enable-logging"])
|
|
driver = Chrome(opts)
|
|
status = main(driver)
|
|
except KeyboardInterrupt:
|
|
tell('程序中断', level=1)
|
|
status = 145
|
|
except Exception as e:
|
|
tell('致命错误', e, level=0)
|
|
status = 1
|
|
finally:
|
|
driver.quit()
|
|
|
|
tell(f'已发送 {sent} 封;发送失败 {errors} 封;跳过重复项 {warnings} 个')
|
|
exit(status)
|