diff --git a/.gitignore b/.gitignore index 3834556..edab19e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,2 @@ .vscode/ venv/ -resources/ -/*.bat -/*.csv -/*.xlsx \ No newline at end of file diff --git a/index.html b/index.html new file mode 100644 index 0000000..a58fb75 --- /dev/null +++ b/index.html @@ -0,0 +1,566 @@ + + + + + + + + +Python Application + +
+
+ Basic Information +
+
+ File Name + +
+
+ Subject + +
+
+ From + +
+
+ Recipient + +
+
+ Progress + +
+
+ Timezone + +
+
+ Uptime + +
+
+ Remaining + +
+
+ Status + +
+
+
+ + + +
+
+
+ Parameters +
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+
+ + + + +
+
+
+ Columns +
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+
+
+
+ Slicing +
+
+ + + + +
+
+ + +
+
+
+
+ + +
+
+ + +
+
+
+
+
+
+ + + + diff --git a/main.py b/main.py new file mode 100644 index 0000000..3070dc5 --- /dev/null +++ b/main.py @@ -0,0 +1,548 @@ +import unicodedata +import traceback +import argparse +import logging +import pandas +import json +import time +import re + +import trio +import trio_websocket as ws + +from selenium.common.exceptions import StaleElementReferenceException, TimeoutException +from selenium.webdriver import Chrome, ChromeOptions +from selenium.webdriver.common.action_chains import ActionChains +from selenium.webdriver.common.by import By +from selenium.webdriver.common.keys import Keys +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.support.wait import WebDriverWait +from selenium.webdriver.remote.webelement import WebElement +from selenium.webdriver.remote.webdriver import WebDriver + +from io import BytesIO +from enum import Enum +from queue import Queue, Empty +from wakepy import keep +from pathlib import Path +from datetime import datetime, timedelta +from zoneinfo import ZoneInfo +from threading import Thread +from nameparser import HumanName + +parser = argparse.ArgumentParser(description="邮件批量发送脚本") +parser.add_argument('url', nargs='?', default='https://id.ionos.fr/identifier') +parser.add_argument('--column-address', type=str, nargs='?', default='邮箱') +parser.add_argument('--column-name', type=str, nargs='?', default='主要联系人') +parser.add_argument('--column-code', type=str, nargs='?', default='客户编号') +parser.add_argument('--column-pays', type=str, nargs='?', default='国家地区') +parser.add_argument('--column-sent', type=str, nargs='?', default='已发送') +parser.add_argument('-a', '--address', type=str, nargs='?') +parser.add_argument('-p', '--password', type=str, nargs='?') +parser.add_argument('-t', '--timeout', type=int, nargs='?', default=60) +parser.add_argument('-i', '--interval', type=int, nargs='?', default=10) +parser.add_argument('-r', '--retry', type=int, nargs='?', default=3) + +args = parser.parse_args() +inbox, outbox = Queue(), Queue() +connection = None +socket = None +server = None + +sent = 0 +errors = 0 +warnings = 0 + +class Greetings: + def __init__(self, locale: str, timezone: str, default: str, morning=None, afternoon=None, evening=None, predicate=None): + self.locale = locale + self.timezone = timezone + self.default = default + self.morning = morning + self.afternoon = afternoon + self.evening = evening + self.predicate = predicate + + @staticmethod + def presets(): + return [ + Greetings("en", "Europe/London", "Hello" , "Good morning", "Good afternoon" , "Good evening", None), + Greetings("fr", "Europe/Paris" , "Bonjour", None , "Bon après-midi" , "Bonsoir" , ['法国', '比利时']), + Greetings("de", "Europe/Berlin", "Hallo" , "Guten Morgen", "Guten Tag" , "Guten Abend" , ['德国', '奥地利', '瑞士']), + Greetings("it", "Europe/Rome" , "Ciao" , "Buongiorno" , "Buon pomeriggio", "Buonasera" , ['意大利']), + Greetings("es", "Europe/Madrid", "Hola" , "Buenos días" , "Buenas tardes" , None , ['西班牙']), + Greetings("pt", "Europe/Lisbon", "Olá" , "Bom dia" , "Boa tarde" , None , ['葡萄牙']), + ] + +class Command: + def __init__(self, name: str, *args): + self.name = name + self.args = args + + def __str__(self): + pack = vars(self) + return json.dumps(pack) + +def tell(message, exception=None, level=2): + message = ': '.join(map(lambda x: str(x), filter(None, [message, exception]))) + if isinstance(exception, Exception): message += '\n' + ''.join(traceback.format_exception(exception)) + + if not outbox.is_shutdown: + outbox.put(Command('tell', message, level)) + match level: + case 2: logging.info(message) + case 1: logging.warning(message) + case 0: logging.error(message) + +def main(driver: WebDriver): + try: + html = str(Path('index.html').resolve()) + driver.get(html) + host, port = socket.getsockname() + parameters = vars(args) + locales = json.dumps(Greetings.presets(), default=lambda o: o.__dict__) + driver.execute_script(f"main(...arguments);", f'ws://{host}:{port}', parameters, locales) + except Exception as e: + tell('载入初始页面时发生错误', e, level=0) + return 1 + + def locate(selector, condition=EC.presence_of_element_located, parent=driver) -> WebElement: + for attempt in range(parameters.get('retry')): + try: + wait = WebDriverWait(parent, timeout=parameters.get('timeout')) + return wait.until(condition((By.CSS_SELECTOR, selector))) + except StaleElementReferenceException: + # 如果遇到过期元素,重新尝试查找 + continue + except TimeoutException: + # 超时错误 + raise Exception('操作超时') + raise Exception(f'无法定位元素: {selector}') + + def click(selector, condition=EC.element_to_be_clickable, parent=driver): + element = locate(selector, condition, parent) if isinstance(selector, str) else selector + counter = lambda: int(element.get_attribute('taximeter') or 0) + error = False + + value = counter() + driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].setAttribute('taximeter', arguments[1] + 1));", element, value) + + for attempt in range(parameters.get("retry")): + try: + if not error: element.click() + else: driver.execute_script("arguments[0].click();", element) + except StaleElementReferenceException: + break + except: + error = True + continue + # 检测点击事件 + try: + WebDriverWait(driver, parameters.get("interval")).until(lambda _: counter() > value) + break + except TimeoutException: continue + except: break + + def ready(driver, predicate): + try: + wait = WebDriverWait(driver, timeout=parameters.get('timeout')) + wait.until(predicate, '操作超时') + except: + return True + wait = WebDriverWait(driver, timeout=parameters.get('timeout')) + wait.until_not(predicate, '操作超时') + return True + + def contains_non_latin_alphabet(string: str): + for char in string: + if char.isdigit() or (unicodedata.category(char).startswith('L') and not unicodedata.name(char, '').startswith('LATIN')): + return True + return False + + try: + driver.switch_to.new_window('tab') + driver.set_page_load_timeout(parameters.get('timeout')) + driver.get(parameters.get('url')) + except TimeoutException: + # 停止加载 + tell('操作超时', level=1) + driver.execute_script("window.stop();") + + # 接受 cookie + try: click("#selectAll") + except: pass + + if (address := parameters.get('address')) and (pw := parameters.get('password')): + try: + username = locate("#username") + username.send_keys(address) + click("#button--with-loader") + + password = locate("#password") + password.send_keys(pw) + click("#button--with-loader") + except Exception as e: + tell('登录时发生了错误', e, level=0) + return 4 + + while not time.sleep(1): + try: + if not driver.find_element(By.CSS_SELECTOR, "#background-loader").is_displayed(): + tell('正在登陆') + click("#io-ox-topbar-account-dropdown-icon button.dropdown-toggle") + address = locate("#topbar-account-dropdown .mail").get_attribute('innerText') + parts = HumanName(address.split('@', 2)[0]) + parts.capitalize(force=True) + outbox.put(Command('setAddress', parts.full_name, address)) + tell(f'成功登录 {parts.full_name} ({address})') + break + except: continue + + try: + # 打开草稿箱 + click("li[data-id='default0/Brouillons']", condition=EC.presence_of_element_located) + click("button[data-id='default0/Brouillons']", condition=EC.presence_of_element_located) + # 打开邮件 + click("ul[aria-label='List view'] li[data-index='0']", condition=EC.presence_of_element_located) + except Exception as e: + tell('打开草稿邮件时发生错误', e, level=1) + pass + + class Faillable(Exception): + def __init__(self, request: str): + self.request = request + super().__init__(self.request) + + class Status(Enum): + ACTIVE = 0 + INACTIVE = 1 + TERMINATED = 3 + + def isalive(self): return self != Status.TERMINATED + def isactive(self): return self == Status.ACTIVE + + def get_subject(): + try: + element = driver.find_element(By.CSS_SELECTOR, "h1.subject") + subject = element.text.strip() + return subject + except: + return None + + def get_address(): + try: + element = driver.find_element(By.CSS_SELECTOR, "header div.from") + address = re.search(r'[^<\s]+@[^>\s]+', element.text[6:])[0] + return address + except: + return None + + def get_request(): + try: request = inbox.get(block=False) + except Empty: return None + + if request == 'ONHOLD': + outbox.put(Command('setStatus', 'ONHOLD')) + request = inbox.get() + if request == 'RESUME': + outbox.put(Command('setStatus', 'RESUME')) + return None + if request in ['BYPASS', 'CANCEL']: + outbox.put(Command('setStatus', request)) + raise Faillable(request) + + return request + + while True: + try: + driver.switch_to.window(driver.window_handles[0]) + filename = str(inbox.get()) + buffer = BytesIO(inbox.get()) + + workbook = pandas.read_excel(buffer, sheet_name=0) + frame = workbook.where(pandas.notnull(workbook), None) + limit = int(frame.last_valid_index()) + + driver.switch_to.window(driver.window_handles[1]) + columns = { column: frame[column].unique() for column in frame.columns } + for k, v in columns.items(): columns[k] = { str(unique): len(frame[frame[k] == unique]) for unique in v if unique is not None } if len(v) <= 200 else {} + outbox.put(Command('setMetadata', limit, columns)) + + while not time.sleep(1): + request = get_request() + subject = get_subject() + + outbox.put(Command('setSubject', subject)) + if request is not None: + parameters = dict(json.loads(request)) + break + except Faillable: + continue + except Exception as e: + tell('读取数据时发生错误', e, level=0) + return 5 + + if parameters.get('slice'): + if subcategory := parameters.get('subcategory'): + column = parameters.get('column_pays') + frame = frame[frame[column].isin(subcategory)] + + offset = parameters.get('offset') + size = parameters.get('chunk_size') + start = size * offset + end = start + size + frame = frame.iloc[start:end] + + data = frame.to_dict(orient='list') + recipients = data.get(parameters.get('column_address'), []) + limit = len(recipients) + + names = data.get(parameters.get('column_name'), [None] * limit) + codes = data.get(parameters.get('column_code'), [None] * limit) + sents = data.setdefault(parameters.get('column_sent'), [None] * limit) + + rate = 60 / (parameters.get('interval') + 3) + length = list.count(sents, None) + + tell(f'已读取邮件:{subject}') + tell(f'指定发件人:{address}') + tell(f'已读取联系人信息共 {limit} 条') + tell(f'预计发送数量 {length}') + + tell(f'当前发送速率 {round(rate, 2)} 封/分钟') + if rate > 8.33: tell('当前发送速率已超出限制 8.33 封/分钟', level=1) + + tell(f'预计使用时间 {timedelta(minutes=length / rate)}') + tell(f'已设定允许重试次数:{parameters.get('retry')}') + tell(f'已设定最大重复次数:{parameters.get('max_occurrence') or '无限制'}') + + locale = parameters.get('locale') + greetings = [item for item in Greetings.presets() if item.locale == locale][0] + timezone = ZoneInfo(greetings.timezone) + tell(f'当前时区:{greetings.timezone}') + tell(f'当前语言:{greetings.locale.upper()}') + + index = 0 + status = Status.ACTIVE + occurrences = {} + + while status.isactive() and index < limit: + global warnings + global errors + global sent + + attempt = 0 + current = index + index += 1 + + recipient = str(recipients[current]).strip() + name = names[current] + code = codes[current] + mark = sents[current] + + occurrence = occurrences.setdefault(code, [0]) if code else [0] + outbox.put(Command('setProgress', index, name, recipient)) + + if mark is not None and str(mark).strip(): + tell(f'已跳过项目 {recipient}') + occurrence[0] += 1 + continue + + if (maximum := parameters.get('max_occurrence')) and occurrence[0] >= maximum: + tell(f'收件人 {recipient} 所属组织出现次数已超出限制 {occurrence}', level=1) + warnings += 1 + continue + + while status.isactive(): + try: + clean = True + attempt += 1 + request = get_request() + print('[信息] 正在发送:%s (%.2f %%)' % (recipient, current / limit * 100)) + + if (target := get_address()) != address: + raise Exception(f'邮件发件地址与设定不一致\n>> {address}\n>> {target}') + if (target := get_subject()) != subject: + raise Exception(f'邮件主题与设定不一致\n>> {subject}\n>> {target}') + + click("button[aria-label='Edit copy']") + ready(driver, lambda x: x.find_element(By.CSS_SELECTOR, ".io-ox-busy")) + locate("div.io-ox-mail-compose-window iframe", condition=EC.frame_to_be_available_and_switch_to_it) + + if parameters.get('greet') and greetings and timezone: + match datetime.now(timezone).hour: + case hour if 6 <= hour < 12: hello = greetings.morning + case hour if 12 <= hour < 18: hello = greetings.afternoon + case hour if 18 <= hour < 21: hello = greetings.evening + case _: hello = None + + iframe = driver.switch_to.active_element + action = ActionChains(driver) + clean = False + hello = hello or greetings.default + + if name is not None and (name := str(name).strip()) and not contains_non_latin_alphabet(name): + parts = HumanName(name) + parts.capitalize(force=True) + short = len(parts.first) < 3 or (len(parts.first) < 5 and parts.first.endswith('.')) + hello = ' '.join(filter(None, [hello, parts.title, parts.first, (parts.middle or parts.last) if short else None])) + + hello += ',' + action.send_keys(hello).perform() + + if items := iframe.find_elements(By.XPATH, f'//*[contains(text(), "{hello}")]'): + target = items[0] + clean = target.text == hello + + driver.switch_to.default_content() + wrapper = locate("div.io-ox-mail-compose-window div[data-extension-id='to'] > div.mail-input") + to = locate("input.token-input.tt-input[tabindex='0']", parent=wrapper) + + # 填入收件人 + click(wrapper) + to.send_keys(recipient + Keys.ENTER) + + token = locate("div.io-ox-mail-compose-window .mail-input .tokenfield .token") + target = token.get_attribute('innerText').strip() + + if target != recipient: + tell(f'收件人地址不一致 ({attempt})', target, level=1) + elif not clean: + tell(f'邮件内容不正确 ({attempt})', level=1) + else: + # 发送邮件 + request = get_request() + click("div.io-ox-mail-compose-window button[data-action='send']") + # 检测页面警告 + try: + wait = WebDriverWait(driver, timeout=parameters.get('interval')) + alert = wait.until(lambda x: x.find_element(By.CSS_SELECTOR, "div.io-ox-alert.io-ox-alert-error")) + + sents[current] = '❌' + message = alert.text.replace('\n', ' ') + tell(f'邮件系统错误 ({attempt})', message or None, level=1) + + # 关闭警告 + click("div.io-ox-alert.io-ox-alert-error button[data-action='close']") + except TimeoutException: + sents[current] = '✔️' + occurrence[0] += 1 + sent += 1 + break + + while mails := driver.find_elements(By.CSS_SELECTOR, "div.io-ox-mail-compose-window"): + try: + click("button[data-action='close']", parent=mails[0]) + click("div.modal-footer button[data-action='delete']") + except Exception as e: + tell("关闭邮件时发生了错误", e, level=1) + break + + if attempt < parameters.get('retry'): + continue + else: + tell('已超出最大重试上限', level=1) + errors += 1 + break + except Faillable as o: + request = o.request + except KeyboardInterrupt: + tell('程序中断', level=1) + status = Status.TERMINATED + break + except Exception as e: + tell(f'发生错误 ({attempt})', e, level=0) + outbox.put(Command('setStatus', 'FAILED')) + request = inbox.get() + + if request == 'BYPASS': + outbox.put(Command('setStatus', 'BYPASS')) + break + if request == 'RESUME': + outbox.put(Command('setStatus', 'RESUME')) + continue + if request == 'CANCEL': + outbox.put(Command('setStatus', 'CANCEL')) + status = Status.INACTIVE + + progress = index / limit * 100 + print('[信息] 当前进度:%.2f %%' % progress) + + if parameters.get('save'): + try: + tell(f'正在写入文件:{filename}') + pandas.DataFrame.from_dict(data).to_excel(filename, index=False, sheet_name='Sheet1') + except Exception as e: + tell('写入文件时发生了错误', e, level=0) + + if status.isalive(): outbox.put(Command('setStatus', 'FINISH')) + else: break + + return 0 + +async def handler(request: ws.WebSocketRequest): + global connection + websocket = await request.accept() + + if connection is None: + connection = websocket + else: + await websocket.aclose(code=1000, reason="Server allows only one connection") + return + + async def receiver(): + while True: + try: + message = await connection.get_message() + inbox.put(message) + except Exception as e: + tell('Receiver', e, level=0) + break + + async def sender(): + while True: + try: + message = await trio.to_thread.run_sync(outbox.get) + await connection.send_message(str(message)) + except Exception as e: + tell('Sender', e, level=0) + break + + async with trio.open_nursery() as nursery: + nursery.start_soon(receiver) + nursery.start_soon(sender) + + outbox.shutdown(immediate=True) + inbox.shutdown(immediate=True) + +async def backend(listen='127.0.0.1', port=0): + global server, socket + listeners = await trio.open_tcp_listeners(port, host=listen) + server = ws.WebSocketServer(handler, listeners, max_message_size=125_000_000) + socket = listeners[0].socket + await server.run() + +if __name__ == '__main__': + try: + logging.basicConfig(level=logging.INFO, format="[%(asctime)s] [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M") + tell('正在初始化') + thread = Thread(target=lambda: trio.run(backend), daemon=True) + thread.start() + + tell('正在启动 Chrome 自动化实例') + with keep.presenting(): + opts = ChromeOptions() + opts.add_experimental_option("excludeSwitches", ["enable-logging"]) + driver = Chrome(opts) + status = main(driver) + except KeyboardInterrupt: + tell('程序中断', level=1) + status = 145 + except Exception as e: + tell('致命错误', e, level=0) + status = 1 + finally: + driver.quit() + + tell(f'已发送 {sent} 封;发送失败 {errors} 封;跳过重复项 {warnings} 个') + exit(status) diff --git a/requirements.txt b/requirements.txt index 11d1833..e91241b 100644 Binary files a/requirements.txt and b/requirements.txt differ diff --git a/scripts/example.bat b/scripts/example.bat new file mode 100644 index 0000000..a6c3185 --- /dev/null +++ b/scripts/example.bat @@ -0,0 +1 @@ +.\..\venv\Scripts\pythonw.exe .\..\main.py --address "user@example.com" --password "example" --max-occurrence 5 --interval 10 \ No newline at end of file diff --git a/scripts/launch.example.bat b/scripts/launch.example.bat deleted file mode 100644 index 1b00bb6..0000000 --- a/scripts/launch.example.bat +++ /dev/null @@ -1,3 +0,0 @@ -chcp 65001 -.\..\venv\Scripts\python.exe .\..\邮件批量发送.py --address "user@example.com" --password "example" --max-occurrence 5 --interval 10 -@pause \ No newline at end of file diff --git a/scripts/setup.bat b/scripts/setup.bat new file mode 100644 index 0000000..0aac503 --- /dev/null +++ b/scripts/setup.bat @@ -0,0 +1,2 @@ +python -m venv .\..\venv +.\..\venv\Scripts\python.exe -m pip install -r .\..\requirements.txt \ No newline at end of file diff --git a/scripts/slice.example.bat b/scripts/slice.example.bat deleted file mode 100644 index 6b2c771..0000000 --- a/scripts/slice.example.bat +++ /dev/null @@ -1,3 +0,0 @@ -chcp 65001 -.\..\venv\Scripts\python.exe .\..\邮件列表切片.py --size 1000 -@pause \ No newline at end of file diff --git a/邮件列表切片.py b/邮件列表切片.py deleted file mode 100644 index 6479671..0000000 --- a/邮件列表切片.py +++ /dev/null @@ -1,143 +0,0 @@ -import argparse -import pathlib -import pandas -import wx - -parser = argparse.ArgumentParser(description="邮件列表切片脚本") -parser.add_argument('--column-pays', type=str, nargs='?', default='国家地区') -parser.add_argument('--column-code', type=str, nargs='?', default='客户编号') -parser.add_argument('-d', '--outdir', type=str, nargs='?') -parser.add_argument('-s', '--size', type=int, nargs='?', default=1000) - -args = parser.parse_args() - -mappings = [ - { - "locale": "fr", - "predicate": lambda it: it in ['法国', 'FR', '比利时', 'BE'] - }, - { - "locale": "de", - "predicate": lambda it: it in ['德国', 'DE', '奥地利', 'AT', '瑞士', 'CH'] - }, - { - "locale": "it", - "predicate": lambda it: it in ['意大利', 'IT'] - }, - { - "locale": "es", - "predicate": lambda it: it in ['西班牙', 'ES'] - }, - { - "locale": "pt", - "predicate": lambda it: it in ['葡萄牙', 'PT'] - }, - { - "locale": "en", - "predicate": lambda it: bool(it) - }, - { - "locale": "un", - "predicate": lambda it: True - }, -] - -def main(): - try: - print('[信息] 请选择数据源') - app = wx.App(None) - dialog = wx.FileDialog(None, 'Open', wildcard='*.xlsx', style=wx.FD_OPEN|wx.FD_FILE_MUST_EXIST) - - if dialog.ShowModal() == wx.ID_OK: - filepath = dialog.GetPath() - else: - print('[警告] 操作取消') - return 0 - dialog.Destroy() - - if args.outdir is None: - print('[信息] 请选择保存位置') - dialog = wx.DirDialog(None, 'Save to', style=wx.DD_DIR_MUST_EXIST) - - if dialog.ShowModal() == wx.ID_OK: - args.outdir = dialog.GetPath() - else: - print('[警告] 操作取消') - return 0 - dialog.Destroy() - - print(f'[信息] 正在读取数据:{filepath}') - sheets = pandas.read_excel(filepath, sheet_name=None) - names = sheets.keys() - - parts = pathlib.Path(filepath) - prefix = parts.stem - except Exception as e: - print(f'[警告] 读取数据表失败:{e}') - return 1 - - if (length := len(sheets)) > 1: - print(f'[信息] 已读取以下 ({length}) 工作表:', end='\n\n') - for name in names: print(f'\t{name}') - - while True: - if (key := input('\n[????] 请输入工作表名称:')) in names: - sheet = sheets.get(key) - break - else: - print(f'[警告] 名称无效,请重试') - else: - print(f'[信息] 已读取工作表:{names[0]}') - sheet = sheets.get(names[0]) - - data = sheet.to_dict(orient='records') - primary = {} - secondary = {} - - for record in data: - pays = record.get(args.column_pays) - - for rule in mappings: - locale = rule.get('locale') - predicate = rule.get('predicate') - - if predicate(pays): - group: list = primary.setdefault(locale, []) - group.append(record) - break - - for key, group in primary.items(): - factor = 1 - offset = 0 - cursor = 0 - - while (limit := args.size * factor + offset) and (slice := group[cursor:limit]): - last: dict = slice[-1:][0] - next: dict = group[limit] if len(group) > limit else None - - if next is not None and (c1 := last.get(args.column_code)) and (c2 := next.get(args.column_code)) and c1 == c2: - offset += 1 - continue - - name = '.'.join([key, str(factor)]) - secondary.setdefault(name, slice) - factor += 1 - cursor = limit - offset = 0 - - for key, group in secondary.items(): - try: - filepath = f'{args.outdir}/{prefix}.{key}.xlsx' - df = pandas.DataFrame.from_records(group) - - print(f'[信息] 正在写入文件:{filepath}') - df.to_excel(filepath, index=False, sheet_name='Sheet1') - except Exception as e: - print(f'[警告] 写入文件时发生了错误:{e}') - continue - - return 0 - -try: status = main() -except KeyboardInterrupt: status = 144 -exit(status) diff --git a/邮件批量发送.py b/邮件批量发送.py deleted file mode 100644 index ed4ada0..0000000 --- a/邮件批量发送.py +++ /dev/null @@ -1,425 +0,0 @@ -import unicodedata -import argparse -import pandas -import time -import wx - -from selenium.common.exceptions import StaleElementReferenceException, TimeoutException -from selenium.webdriver import Chrome, ChromeOptions -from selenium.webdriver.common.action_chains import ActionChains -from selenium.webdriver.common.by import By -from selenium.webdriver.common.keys import Keys -from selenium.webdriver.support import expected_conditions as EC -from selenium.webdriver.support.wait import WebDriverWait -from selenium.webdriver.remote.webelement import WebElement -from selenium.webdriver.remote.webdriver import WebDriver - -from enum import Enum -from pathlib import Path -from datetime import datetime, timedelta -from zoneinfo import ZoneInfo -from nameparser import HumanName - -parser = argparse.ArgumentParser(description="邮件批量发送脚本") -parser.add_argument('url', nargs='?', default='https://id.ionos.fr/identifier') -parser.add_argument('--column-address', type=str, nargs='?', default='邮箱') -parser.add_argument('--column-name', type=str, nargs='?', default='主要联系人') -parser.add_argument('--column-code', type=str, nargs='?', default='客户编号') -parser.add_argument('--column-sent', type=str, nargs='?', default='已发送') -parser.add_argument('-a', '--address', type=str, nargs='?', default='') -parser.add_argument('-p', '--password', type=str, nargs='?', default='') -parser.add_argument('-t', '--timeout', type=int, nargs='?', default=60) -parser.add_argument('-i', '--interval', type=int, nargs='?', default=10) -parser.add_argument('-m', '--max-occurrence', type=int, nargs='?', default=0) -parser.add_argument('-T', '--timezone', type=str, nargs='?', default='') -parser.add_argument('-r', '--retry', type=int, nargs='?', default=3) - -args = parser.parse_args() -date = datetime.now() - -sent = 0 -errors = 0 -warnings = 0 - -greetings = [ - { - "locale": "en", - "default": "Hello", - "registry": ["Good morning", "Good afternoon", "Good evening"] - }, - { - "locale": "fr", - "default": "Bonjour", - "registry": [None, "Bon après-midi", "Bonsoir"] - }, - { - "locale": "de", - "default": "Hallo", - "registry": ["Guten Morgen", "Guten Tag", "Guten Abend"] - }, - { - "locale": "it", - "default": "Ciao", - "registry": ["Buongiorno", "Buon pomeriggio", "Buonasera"] - }, - { - "locale": "es", - "default": "Hola", - "registry": ["Buenos días", "Buenas tardes", None] - }, - { - "locale": "pt", - "default": "Olá", - "registry": ["Bom dia", "Boa tarde", None] - } -] - -def main(driver: WebDriver): - def locate(selector, condition=EC.presence_of_element_located, parent=driver) -> WebElement: - for attempt in range(args.retry): - try: - wait = WebDriverWait(parent, timeout=args.timeout) - return wait.until(condition((By.CSS_SELECTOR, selector))) - except StaleElementReferenceException: - # 如果遇到过期元素,重新尝试查找 - continue - except TimeoutException: - # 超时错误 - raise Exception('操作超时') - raise Exception(f'无法定位元素: {selector}') - - def click(selector, condition=EC.element_to_be_clickable, parent=driver): - element = locate(selector, condition, parent) if isinstance(selector, str) else selector - counter = lambda: int(element.get_attribute('taximeter') or 0) - error = False - - value = counter() - driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].setAttribute('taximeter', arguments[1] + 1));", element, value) - - for attempt in range(args.retry): - try: - if not error: element.click() - else: driver.execute_script("arguments[0].click();", element) - except StaleElementReferenceException: - break - except: - error = True - continue - # 检测点击事件 - try: - WebDriverWait(driver, args.interval).until(lambda _: counter() > value) - break - except TimeoutException: continue - except: break - - def ready(driver, predicate): - try: - wait = WebDriverWait(driver, timeout=args.timeout) - wait.until(predicate, '操作超时') - except: - return True - wait = WebDriverWait(driver, timeout=args.timeout) - wait.until_not(predicate, '操作超时') - return True - - def contains_non_latin_alphabet(string: str): - for char in string: - if char.isdigit() or (unicodedata.category(char).startswith('L') and not unicodedata.name(char, '').startswith('LATIN')): - return True - return False - - try: - driver.get(args.url) - except TimeoutException: - # 停止加载 - print(f'[警告] 操作超时') - driver.execute_script("window.stop();") - - # 接受 cookie - try: click("#selectAll") - except: pass - - if args.address and args.password: - try: - print(f'[信息] 正在登陆 {args.address}') - username = locate("#username") - username.send_keys(args.address) - click("#button--with-loader") - - password = locate("#password") - password.send_keys(args.password) - click("#button--with-loader") - except Exception as e: - print(f'[!!!!] 登录时发生了错误:{e}') - return 4 - else: - print('[信息] 请在页面上输入账户凭据') - - while True: - try: - driver.find_element(By.CSS_SELECTOR, "#io-ox-core") - loader = driver.find_element(By.CSS_SELECTOR, "#background-loader") - - if not loader.is_displayed(): - print(f'[信息] 已登录') - break - else: - raise Exception() - except: - time.sleep(args.interval) - - while True: - try: - print(f'[信息] 正在打开草稿邮件') - # 打开草稿箱 - click("li[data-id='default0/Brouillons']", condition=EC.presence_of_element_located) - click("button[data-id='default0/Brouillons']", condition=EC.presence_of_element_located) - # 打开邮件 - click("ul[aria-label='List view'] li[data-index='0']", condition=EC.presence_of_element_located) - except Exception as e: - print(f'[警告] 打开草稿邮件时发生了错误:{e}') - pass - - try: - print('[信息] 请选择数据源') - title = 'Open (%s)' % args.address - dialog = wx.FileDialog(None, title, wildcard='*.xlsx', style=wx.FD_OPEN | wx.FD_FILE_MUST_EXIST) - - if dialog.ShowModal() == wx.ID_OK: - filepath = dialog.GetPath() - print(f'[信息] 正在读取数据:{filepath}') - else: - print('[警告] 操作取消') - return 0 - - workbook = pandas.read_excel(filepath) - data = workbook.where(pandas.notnull(workbook), None).to_dict(orient='list') - recipients = data.get(args.column_address, []) - - parts = Path(filepath).stem.split('.', 2) - suffix = parts[1].lower() if len(parts) > 1 else None - except Exception as e: - print(f'[警告] 读取数据表失败:{e}') - continue - finally: - dialog.Destroy() - - limit = len(recipients) - names = data.get(args.column_name, [None] * limit) - codes = data.get(args.column_code, [None] * limit) - sents = data.setdefault(args.column_sent, [None] * limit) - - rate = 60 / (args.interval + 3) - length = list.count(sents, None) - timezone = ZoneInfo(args.timezone) if args.timezone else None - - print(f'[信息] 已读取联系人信息共 {limit} 条') - print(f'[信息] 预计发送数量 {length}') - - print(f'[信息] 当前发送速率 {round(rate, 2)} 封/分钟') - if rate > 8.33: print('[警告] 当前发送速率已超出限制 8.33 封/分钟') - - print(f'[信息] 预计使用时间 {timedelta(minutes=length / rate)}') - print(f'[信息] 已设定允许重试次数:{args.retry}') - print(f'[信息] 已设定最大重复次数:{args.max_occurrence or '无限制'}') - print(f'[信息] 当前时区:{timezone or '无'}') - - entries = list(filter(lambda it: it.get('locale') == suffix, greetings)) - locale = dict(entries[0]) if len(entries) > 0 else None - print(f'[信息] 当前语言:{str(suffix).upper() if locale else '无'}') - - if input('[????] 开始发送?确定 (y) / 取消 (N): ') not in ['Y', 'y']: - print('[信息] 操作取消') - continue - - subject = locate("h1.subject").text.strip() - sender = locate("header div.from").text[6:].replace('\n', ' ').strip().lstrip('<').rstrip('>') - print(f'[信息] 已读取邮件:{subject}') - print(f'[信息] 指定发件人:{sender}') - - if sender.lower() != str(args.address).lower(): - print(f'[警告] 检测到发件人与设定不一致') - print(f'[信息] 提示:请检查邮件是否正确') - - class Status(Enum): - ACTIVE = 0 - INACTIVE = 1 - TERMINATED = 3 - - def isalive(self): return self != Status.TERMINATED - def isactive(self): return self == Status.ACTIVE - - global date - date = datetime.now() - - index = 0 - status = Status.ACTIVE - occurrences = {} - - while status.isactive() and index < limit: - global warnings - global errors - global sent - - attempt = 0 - current = index - index += 1 - - recipient = str(recipients[current]).strip() - name = names[current] - code = codes[current] - mark = sents[current] - - occurrence = occurrences.setdefault(code, [0]) if code else [0] - - if mark is not None and str(mark).strip(): - print(f'[信息] 已跳过项目 {recipient}') - occurrence[0] += 1 - continue - - if args.max_occurrence > 0 and occurrence[0] >= args.max_occurrence: - print(f'[警告] 收件人 {recipient} 所属组织出现次数已超出限制 {occurrence}') - warnings += 1 - continue - - while status.isactive(): - try: - clean = True - attempt += 1 - print('[信息] 正在发送:%s (%.2f %%)' % (recipient, current / limit * 100)) - click("button[aria-label='Edit copy']") - - if (target := locate("h1.subject").text.strip()) != subject: - print(f'[警告] 邮件主题与设定不一致:{target}') - if input('[????] 是否继续?确定 (y) / 取消 (N): ') in ['Y', 'y']: - subject = target - print(f'[信息] 已更新邮件主题设定') - else: - raise Exception('邮件主题意外变更') - - ready(driver, lambda x: x.find_element(By.CSS_SELECTOR, ".io-ox-busy")) - locate("div.io-ox-mail-compose-window iframe", condition=EC.frame_to_be_available_and_switch_to_it) - - if locale and (registry := locale.get('registry')): - match datetime.now(timezone).hour if timezone else -1: - case hour if 6 <= hour < 12: hello = registry[0] - case hour if 12 <= hour < 18: hello = registry[1] - case hour if 18 <= hour < 21: hello = registry[2] - case _: hello = None - - iframe = driver.switch_to.active_element - action = ActionChains(driver) - clean = False - hello = hello or locale.get('default') - - if name is not None and (name := str(name).strip()) and not contains_non_latin_alphabet(name): - parts = HumanName(name) - parts.capitalize(force=True) - short = len(parts.first) < 3 or (len(parts.first) < 5 and parts.first.endswith('.')) - hello = ' '.join(filter(None, [hello, parts.title, parts.first, (parts.middle or parts.last) if short else None])) - - hello += ',' - action.send_keys(hello).perform() - - if items := iframe.find_elements(By.XPATH, f'//*[contains(text(), "{hello}")]'): - target = items[0] - clean = target.text == hello - - driver.switch_to.default_content() - wrapper = locate("div.io-ox-mail-compose-window div[data-extension-id='to'] > div.mail-input") - to = locate("input.token-input.tt-input[tabindex='0']", parent=wrapper) - - # 填入收件人 - click(wrapper) - to.send_keys(recipient + Keys.ENTER) - - token = locate("div.io-ox-mail-compose-window .mail-input .tokenfield .token") - target = token.get_attribute('innerText').strip() - - if target != recipient: - print(f'[警告] ({attempt}): 收件人地址不一致:{target}') - elif not clean: - print(f'[警告] ({attempt}): 邮件内容不正确') - else: - # 发送邮件 - click("div.io-ox-mail-compose-window button[data-action='send']") - # 检测页面警告 - try: - wait = WebDriverWait(driver, timeout=args.interval) - alert = wait.until(lambda x: x.find_element(By.CSS_SELECTOR, "div.io-ox-alert.io-ox-alert-error")) - - sents[current] = '❌' - message = alert.text.replace('\n', ' ') - print(f'[警告] ({attempt}): {message or '未知错误'}') - - # 关闭警告 - click("div.io-ox-alert.io-ox-alert-error button[data-action='close']") - except TimeoutException: - sents[current] = '✔️' - occurrence[0] += 1 - sent += 1 - break - - while mails := driver.find_elements(By.CSS_SELECTOR, "div.io-ox-mail-compose-window"): - try: - click("button[data-action='close']", parent=mails[0]) - click("div.modal-footer button[data-action='delete']") - except Exception as e: - print(f"[警告] 关闭邮件时发生了错误:{e}") - break - - if attempt < args.retry: - continue - else: - print('[警告] 已超出最大重试上限') - errors += 1 - break - except KeyboardInterrupt: - print('[信息] 程序中断') - status = Status.TERMINATED - break - except Exception as e: - print(f'[警告] ({attempt}): 发生错误:{e}') - - key = input('[????] 重试 (r) / 跳过 (s) / 取消 (C): ') - if key in ['R', 'r']: continue - elif key in ['S', 's']: break - else: status = Status.INACTIVE - - progress = index / limit * 100 - print('[信息] 当前进度:%.2f %%' % progress) - - if input('[????] 是否保存到文件?确定 (y) / 取消 (N): ') in ['Y', 'y']: - try: - print(f'[信息] 正在写入文件:{filepath}') - pandas.DataFrame.from_dict(data).to_excel(filepath, index=False, sheet_name='Sheet1') - except Exception as e: - print(f'[警告] 写入文件时发生了错误:{e}') - - if status.isalive() and input('[????] 继续运行 (c) / 退出程序 (W): ') in ['C', 'c']: continue - else: break - - return 0 - -try: - print('[信息] 程序初始化中...') - app = wx.App(None) - - print('[信息] 正在启动 Chrome 自动化实例') - opts = ChromeOptions() - opts.add_experimental_option("excludeSwitches", ["enable-logging"]) - driver = Chrome(opts) - driver.set_page_load_timeout(args.timeout) - status = main(driver) -except KeyboardInterrupt: - print('[信息] 程序中断') - status = 145 -except Exception as e: - print(f'[!!!!] 致命错误:{e}') - status = 1 -finally: - driver.quit() - -print(f'[信息] 已发送 {sent} 封;发送失败 {errors} 封;跳过重复项 {warnings} 个') -print(f'[信息] 总耗时 {str(datetime.now() - date)}') -exit(status) diff --git a/邮件编辑器.html b/邮件编辑器.html deleted file mode 100644 index acd3736..0000000 --- a/邮件编辑器.html +++ /dev/null @@ -1,161 +0,0 @@ - - - - - - 编辑邮件 - - - -
- - - - - - - - - - - - - - - - - - - - - -
- -
- - - \ No newline at end of file