-
-
-
-
+
+
@@ -193,178 +188,157 @@
-
@@ -570,15 +490,33 @@ form {
padding: 0 1em 0 1em;
}
+select, option, input[type='checkbox'] {
+ cursor: pointer;
+}
+
label {
user-select: none;
width: fit-content;
}
-.rate:not(:first-child)::before {
- padding: 0em 0.4em;
- content: '–';
- color: #666;
+#actions {
+ max-height: 2.5em;
+ column-gap: 0.4em;
+}
+
+#actions > button {
+ height: stretch;
+}
+
+#messages {
+ resize: none;
+ white-space: pre-wrap;
+ margin-top: 24px;
+ width: 100%;
+}
+
+#mesages:focus {
+ outline: none;
}
.inline-flex {
@@ -592,10 +530,6 @@ label {
white-space: nowrap;
}
-.warning {
- color: orangered;
-}
-
.gaps {
row-gap: 0.4em;
}
@@ -613,9 +547,19 @@ label {
animation: spin 1s linear infinite;
}
+.pulse {
+ animation: pulse 2s cubic-bezier(0.4, 0, 0.6, 1) infinite;
+}
+
@keyframes spin {
to {
transform: rotate(360deg);
}
}
-
\ No newline at end of file
+
+@keyframes pulse {
+ 50% {
+ opacity: 0.6;
+ }
+}
+
diff --git a/main.py b/main.py
index aa59379..a0e22b3 100644
--- a/main.py
+++ b/main.py
@@ -1,593 +1,615 @@
-import unicodedata
-import traceback
import argparse
import logging
-import pandas
-import json
-import time
import re
-import trio
-import trio_websocket as ws
-
-from selenium.common.exceptions import StaleElementReferenceException, TimeoutException, InvalidSessionIdException
+from selenium.common.exceptions import TimeoutException
from selenium.webdriver import Chrome, ChromeOptions
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
-from selenium.webdriver.remote.webelement import WebElement
-from selenium.webdriver.remote.webdriver import WebDriver
+
+from common.utils import *
+from common.timer import Timer
+from common.jsonrpc2 import ServiceProvider
+from common.actionflow import Action, ActionFlow
from io import BytesIO
from enum import Enum
-from queue import Queue, Empty, ShutDown
from wakepy import keep
from pathlib import Path
-from datetime import datetime, timedelta
+from datetime import datetime
+from openpyxl import Workbook
from zoneinfo import ZoneInfo
-from threading import Thread
from nameparser import HumanName
-parser = argparse.ArgumentParser(description="Bulk Email Sending")
-parser.add_argument('url', nargs='?', default='https://id.ionos.fr/identifier')
-parser.add_argument('--column-address', type=str, default='邮箱')
-parser.add_argument('--column-name', type=str, default='主要联系人')
-parser.add_argument('--column-code', type=str, default='客户编号')
-parser.add_argument('--column-pays', type=str, default='国家地区')
-parser.add_argument('--column-sent', type=str, default='已发送')
-parser.add_argument('--column-vars', type=str, default='变量值')
-parser.add_argument('-a', '--address', type=str)
-parser.add_argument('-p', '--password', type=str)
+parser = argparse.ArgumentParser(description="Mailer")
+parser.add_argument('account', type=str)
+parser.add_argument('password', type=str)
+parser.add_argument('-c', '--column', nargs='+', action='extend')
parser.add_argument('-t', '--timeout', type=int, default=60)
parser.add_argument('-i', '--interval', type=int, default=10)
parser.add_argument('-r', '--attempts', type=int, default=3)
-
+parser.add_argument('-l', '--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'])
args = parser.parse_args()
-inbox, outbox = Queue(), Queue()
-connection, server = None, None
-class Greetings:
- def __init__(self, locale: str, timezone: str, default: str, morning=None, afternoon=None, evening=None, predicate=None):
- self.locale = locale
- self.timezone = timezone
- self.default = default
- self.morning = morning
- self.afternoon = afternoon
- self.evening = evening
- self.predicate = predicate
+LOGIN = "https://id.ionos.fr/identifier"
+WEBMAIL = "https://email.ionos.fr/appsuite/#app=io.ox/mail&mailto=%s"
- @staticmethod
- def presets():
- return [
- Greetings("en", "Europe/London", "Hello" , "Good morning", "Good afternoon" , "Good evening", []),
- Greetings("fr", "Europe/Paris" , "Bonjour", None , "Bon après-midi" , "Bonsoir" , ['法国', '比利时']),
- Greetings("de", "Europe/Berlin", "Hallo" , "Guten Morgen", "Guten Tag" , "Guten Abend" , ['德国', '奥地利', '瑞士']),
- Greetings("it", "Europe/Rome" , "Ciao" , "Buongiorno" , "Buon pomeriggio", "Buonasera" , ['意大利']),
- Greetings("es", "Europe/Madrid", "Hola" , "Buenos días" , "Buenas tardes" , None , ['西班牙']),
- Greetings("pt", "Europe/Lisbon", "Olá" , "Bom dia" , "Boa tarde" , None , ['葡萄牙']),
- ]
+def main(driver: Chrome, logger = logging.getLogger('main')):
+ timer = Timer()
+ parameters = vars(args)
+ sp = ServiceProvider.default()
-class Command:
- def __init__(self, name: str, *args):
- self.name = name
- self.args = args
+ class Status(Enum):
+ IDLE = 0
+ READY = 1
+ RUNNING = 3
+ STANDBY = 4
+ CLOSING = 5
+
+ class ColumnMapping:
+ def __init__(self, email=None, recipient=None, code=None, region=None, sent=None, variables=None):
+ self.email = email or '邮箱'
+ self.recipient = recipient or '主要联系人'
+ self.code = code or '客户编号'
+ self.region = region or '国家地区'
+ self.sent = sent or '已发送'
+ self.variables = variables or '变量值'
+
+ status = Status.IDLE
+ options = dict()
+ headers = list()
+ progress = dict()
+ wb: Workbook = None
+ filename: str = None
+ column: ColumnMapping = None
+ limit = 0
- def __str__(self):
- pack = vars(self)
- return json.dumps(pack)
+ def begin(opts: dict, args: dict):
+ nonlocal status
+ if status != Status.READY: raise ValueError(status)
+ options.update(opts)
+ status = Status.RUNNING
+ parameters.update(args)
+ timer.clear()
+ timer.start()
-def tell(message, exception=None, level=2):
- message = ': '.join(filter(None, [message, exception.__class__.__name__ if isinstance(exception, Exception) else exception]))
- if isinstance(exception, Exception): message += '\n' + ''.join(traceback.format_exception(exception))
+ def pause():
+ nonlocal status
+ status = Status.STANDBY
+ timer.pause()
- if not outbox.is_shutdown:
- outbox.put(Command('tell', message, level))
- match level:
- case 2: logging.info(message)
- case 1: logging.warning(message)
- case 0: logging.error(message)
+ def resume():
+ nonlocal status
+ status = Status.RUNNING
+ driver.switch_to.window(driver.current_window_handle)
+ timer.start()
+
+ def unique(header: str):
+ result = dict()
+ col = headers.index(header) + 1
+
+ for row in range(options.get('start', 2), limit + 1):
+ key = str(wb.active.cell(row, col).value)
+ result[key] = result.get(key, 0) + 1
+
+ return result
+
+ sp.set('begin', begin)
+ sp.set('pause', pause)
+ sp.set('resume', resume)
+ sp.set('unique', unique)
+ sp.set('status', lambda: status.name)
+ sp.set('uptime', lambda: timer.delta())
-def main(driver: WebDriver):
try:
- html = str(Path('index.html').resolve())
- driver.get(html)
- endpoint = server.listeners[0]
- parameters = vars(args)
- locales = json.dumps(Greetings.presets(), default=lambda o: o.__dict__)
- driver.execute_script(f"main(...arguments);", f'ws://{endpoint.address}:{endpoint.port}', parameters, locales)
+ mapping = ColumnMapping(**{
+ k.lower().strip(): v.strip() for k, v in map(lambda o: str.split(o, '=', 2), parameters['column'] or list())
+ })
except Exception as e:
- tell('载入初始页面时发生错误', e, level=0)
+ logger.critical('Unable to load column mappings', exc_info=e)
return 1
- def locate(selector, condition=EC.presence_of_element_located, parent=driver) -> WebElement:
- for attempt in range(parameters.get('attempts')):
- try:
- wait = WebDriverWait(parent, timeout=parameters.get('timeout'))
- return wait.until(condition((By.CSS_SELECTOR, selector)))
- except StaleElementReferenceException:
- # 如果遇到过期元素,重新尝试查找
- continue
- except TimeoutException:
- # 超时错误
- raise Exception('操作超时')
- raise Exception(f'无法定位元素: {selector}')
+ class Locale:
+ def __init__(self, name, timezone, default, morning=None, afternoon=None, evening=None, keywords=None):
+ self.name: str = name
+ self.timezone: str = timezone
+ self.default: str = default
+ self.morning: str = morning
+ self.afternoon: str = afternoon
+ self.evening: str = evening
+ self.keywords: str = keywords
- def click(selector, condition=EC.element_to_be_clickable, parent=driver):
- element = locate(selector, condition, parent) if isinstance(selector, str) else selector
- counter = lambda: int(element.get_attribute('taximeter') or 0)
- error = False
+ locales = [
+ Locale("en", "Europe/London", "Hello" , "Good morning", "Good afternoon" , "Good evening", []),
+ Locale("fr", "Europe/Paris" , "Bonjour", None , None , "Bonsoir" , ['法国', '比利时']),
+ Locale("de", "Europe/Berlin", "Hallo" , "Guten Morgen", "Guten Tag" , "Guten Abend" , ['德国', '奥地利', '瑞士']),
+ Locale("it", "Europe/Rome" , "Ciao" , "Buongiorno" , "Buon pomeriggio", "Buonasera" , ['意大利']),
+ Locale("es", "Europe/Madrid", "Hola" , "Buenos días" , "Buenas tardes" , None , ['西班牙']),
+ Locale("pt", "Europe/Lisbon", "Olá" , "Bom dia" , "Boa tarde" , None , ['葡萄牙']),
+ ]
- value = counter()
- driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].setAttribute('taximeter', arguments[1] + 1));", element, value)
-
- for attempt in range(parameters.get('attempts')):
- try:
- if not error: element.click()
- else: driver.execute_script("arguments[0].click();", element)
- except StaleElementReferenceException:
- break
- except:
- error = True
- continue
- # 检测点击事件
- try:
- WebDriverWait(driver, parameters.get("interval")).until(lambda _: counter() > value)
- break
- except TimeoutException: continue
- except: break
-
- def ready(driver, predicate=lambda x: x.find_element(By.CSS_SELECTOR, ".io-ox-busy")):
- try:
- wait = WebDriverWait(driver, timeout=parameters.get('interval'))
- wait.until(predicate, '操作超时')
- except:
- return True
- wait = WebDriverWait(driver, timeout=parameters.get('timeout'))
- wait.until_not(predicate, '操作超时')
- return True
+ try:
+ sp.set('context', lambda: { 'locales': list(map(vars, locales)), 'mapping': vars(mapping), 'parameters': parameters })
+ driver.get(sp.run())
+ except Exception as e:
+ logger.critical('Unable to load starup page', exc_info=e)
+ return 2
def contains_non_latin_alphabet(string: str):
+ from unicodedata import category, name
for char in string:
- if char.isdigit() or (unicodedata.category(char).startswith('L') and not unicodedata.name(char, '').startswith('LATIN')):
+ if char.isdigit() or (category(char).startswith('L') and not name(char, '').startswith('LATIN')):
return True
return False
+
+ def ready(driver: Chrome):
+ busy = driver.find_element(By.CSS_SELECTOR, ".io-ox-busy")
+ return busy
try:
driver.switch_to.new_window('tab')
- driver.set_page_load_timeout(parameters.get('timeout'))
- driver.get(parameters.get('url'))
+ driver.set_page_load_timeout(parameters['timeout'])
+ driver.get(LOGIN)
except TimeoutException:
- # 停止加载
- tell('操作超时', level=1)
+ logger.warning('Timeout')
driver.execute_script("window.stop();")
- # 接受 cookie
- try: click("#selectAll")
- except: pass
-
- if (address := parameters.get('address')) and (pw := parameters.get('password')):
- try:
- username = locate("#username")
- username.send_keys(address)
- click("#button--with-loader")
-
- password = locate("#password")
- password.send_keys(pw)
- click("#button--with-loader")
- except Exception as e:
- tell('登录时发生了错误', e, level=0)
- return 4
-
- while not time.sleep(1):
- try:
- if not driver.find_element(By.CSS_SELECTOR, "#background-loader").is_displayed():
- tell('正在登陆')
- click("#io-ox-topbar-account-dropdown-icon button.dropdown-toggle", condition=EC.presence_of_element_located)
- address = locate("#topbar-account-dropdown .mail").get_attribute('innerText')
- person = HumanName(address.split('@', 2)[0])
- person.capitalize(force=True)
- tell(f'成功登录 {person.first} ({address})')
- break
- except: continue
+ try:
+ setup(driver, parameters)
+ click("#selectAll")
+ except Exception as e:
+ logger.debug('Can I haz cheez burger? :3', exc_info=e)
try:
- condition = EC.presence_of_element_located
- # 打开草稿箱
- click("li[data-id='default0/Brouillons']", condition)
- click("button[data-id='default0/Brouillons']", condition)
- # 打开邮件
- click("ul[aria-label='List view'] li[data-index='0']", condition)
+ logger.info('Logging in as %s', parameters['account'])
+ username = locate("#username")
+ username.send_keys(parameters['account'])
+ click("#button--with-loader")
+
+ password = locate("#password")
+ password.send_keys(parameters['password'])
+ click("#button--with-loader")
except Exception as e:
- tell('打开草稿邮件时发生错误', e, level=1)
- pass
+ logger.critical('Error while logging in to %s', LOGIN, exc_info=e)
+ return 3
- class Faillable(Exception):
- def __init__(self, request: str):
- self.request = request
- super().__init__(self.request)
-
- class Status(Enum):
- ACTIVE = 0
- INACTIVE = 1
- TERMINATED = 3
-
- def isalive(self): return self != Status.TERMINATED
- def isactive(self): return self == Status.ACTIVE
+ while True:
+ try:
+ loader = locate("#background-loader", wait=False)
+ if not loader.is_displayed(): break
+ except:
+ sleep(1)
def get_subject():
try:
- element = driver.find_element(By.CSS_SELECTOR, "h1.subject")
+ element = locate("h1.subject", wait=False)
subject = element.text.strip()
return subject
except:
return None
-
+
def get_address():
try:
- element = driver.find_element(By.CSS_SELECTOR, "header div.from")
+ element = locate("header div.from", wait=False)
address = re.search(r'[^<\s]+@[^>\s]+', element.text)[0]
return address
except:
return None
+
+ def cell(row: int, header: str):
+ result = wb.active.cell(row, headers.index(header) + 1)
+ return result
- def get_request():
- try: request = inbox.get(block=False)
- except Empty: return None
+ try:
+ click("li[data-id='default0/Brouillons']", condition=EC.presence_of_element_located)
+ click("button[data-id='default0/Brouillons']", condition=EC.presence_of_element_located)
+ click("ul[aria-label='List view'] li[data-index='0']", condition=EC.presence_of_element_located)
+ logger.info('Done')
+ except Exception as e:
+ logger.warning("Could not open drafts; this may cause issues", exc_info=e)
+ finally:
+ driver.switch_to.window(driver.window_handles[0])
- if request == 'ONHOLD':
- outbox.put(Command('setStatus', 'ONHOLD'))
- request = inbox.get()
- if request == 'RESUME':
- outbox.put(Command('setStatus', 'RESUME'))
- return None
- if request in ['BYPASS', 'CANCEL']:
- outbox.put(Command('setStatus', request))
- raise Faillable(request)
+ class Wait(Action):
+ @classmethod
+ def prepare(cls):
+ return True
- return request
+ @classmethod
+ def perform(cls):
+ if status == Status.RUNNING: return False
+ sleep(0.2); return True
+
+ class Timeout(Action):
+ timer = Timer()
+
+ @classmethod
+ def prepare(cls):
+ cls.timer.clear()
+ cls.timer.start()
+ return True
+
+ @classmethod
+ def perform(cls):
+ if cls.timer.delta() > parameters['timeout']: raise cls
+ sleep(0.2); return True
+
+ class Cancel(Action):
+ @classmethod
+ def prepare(cls):
+ nonlocal status
+ status = Status.RUNNING
+ return True
+
+ @classmethod
+ def perform(cls):
+ nonlocal status
+ status = Status.READY
+ driver.switch_to.window(driver.window_handles[0])
+ raise cls
+
+ class Close(Action):
+ @classmethod
+ def prepare(cls):
+ return status == Status.CLOSING
+
+ class Skip(Action):
+ @classmethod
+ def prepare(cls):
+ return True
+
+ @classmethod
+ def perform(cls):
+ driver.switch_to.window(driver.current_window_handle)
+ raise cls
+
+ flow = ActionFlow()
+ flow.append(Wait)
+ flow.append(Timeout)
+ flow.append(Cancel)
+ flow.append(Close)
+ flow.append(Skip)
+
+ sp.set('actions', lambda: flow.capabilities())
+ sp.set('cancel', lambda: flow.do(Cancel))
+ sp.set('close', lambda: flow.do(Close))
+ sp.set('skip', lambda: flow.do(Skip))
+
+ def loads(name: str, b64: str):
+ from base64 import b64decode
+ from openpyxl import load_workbook
+ nonlocal status, filename, wb, limit
+ if status != Status.IDLE: raise ValueError(status)
+
+ data = b64decode(b64)
+ logger.info('Received %s byte(s) total', len(data))
+ logger.info('Loading...')
+ filename = name
+ buffer = BytesIO(data)
+ wb = load_workbook(buffer)
+ limit = wb.active.max_row
+ headers.clear()
+
+ for col in range(1, wb.active.max_column + 1):
+ value = str(wb.active.cell(1, col).value)
+ headers.append(value)
+
+ logger.info('Done')
+ flow.allow(Cancel)
+ status = Status.READY
+ return limit
+
+ def merge(b64: str):
+ from base64 import b64decode, b64encode
+ from openpyxl import load_workbook
+ if status != Status.CLOSING: raise ValueError(status)
+
+ data = b64decode(b64)
+ index = headers.index(column.sent)
+ buffer = BytesIO(data)
+ target = load_workbook(buffer)
+
+ for row in range(options.get('start', 2), limit + 1):
+ c1 = wb.active.cell(row, index + 1)
+ c2 = target.active.cell(row, index + 1)
+ if not (c1.value and str(c1.value).strip()):
+ if (c2.value and str(c2.value).strip()):
+ c1.value = c2.value
+
+ io = BytesIO()
+ wb.save(io)
+ result = b64encode(io.getbuffer()).decode('ascii')
+ return result
+
+ sp.set('ready', loads)
+ sp.set('merge', merge)
+ sp.set('progress', lambda: progress)
+
+ class MismatchedEmailSubject(Exception): pass
+ class MismatchedEmailAddress(Exception): pass
while True:
try:
- outbox.put(Command('setAddress', person.first, address))
- driver.switch_to.window(driver.window_handles[0])
- filename = str(inbox.get())
- buffer = BytesIO(inbox.get())
-
- spreadsheet = pandas.read_excel(buffer, sheet_name=0)
- frame = spreadsheet.where(pandas.notnull(spreadsheet), None)
- index = frame.index
- limit = len(index)
-
+ flow.allow(Wait)
+ flow.do(Wait)
+ flow.react()
+ progress.clear()
driver.switch_to.window(driver.window_handles[1])
- columns = { column: frame[column].unique() for column in frame.columns }
- for k, v in columns.items(): columns[k] = { str(unique): len(frame[frame[k] == unique]) for unique in v if unique is not None } if len(v) <= 200 else {}
- outbox.put(Command('setMetadata', limit, columns))
+ subject = get_subject()
+ column = ColumnMapping(**options.get('mapping'))
- while not time.sleep(1):
- request = get_request()
- subject = get_subject()
+ if column.email not in headers or limit < 2:
+ logger.error("Column '%s' is not found or does not contain valid data", column.email)
+ raise Cancel()
- outbox.put(Command('setSubject', subject))
- if request is not None:
- parameters = dict(json.loads(request))
- break
+ if column.sent not in headers:
+ wb.active.cell(1, wb.active.max_column + 1).value = column.sent
+ headers.append(column.sent)
- ca = parameters.get('column_address')
- cn = parameters.get('column_name')
- cc = parameters.get('column_code')
- cs = parameters.get('column_sent')
- cp = parameters.get('column_pays')
- cv = parameters.get('column_vars')
+ logger.info('Read %s line(s) total', limit)
+ logger.info('Subject: %s', subject)
+ logger.info('From: %s', parameters['account'])
+ locale: Locale = next(filter(lambda o: o.name == options.get('locale'), locales))
+ logger.info('Locale: %s', locale.name.upper())
+ logger.info('Timezone: %s', locale.timezone)
- if parameters.get('slice'):
- if subcategory := parameters.get('subcategory'):
- data = frame[frame[cp].isin(subcategory)]
- index = data.index
-
- offset = parameters.get('offset')
- size = parameters.get('chunk_size')
- start = offset * size
- end = start + size
- index = index[start:end]
- limit = len(index)
-
- if ca not in frame or not limit:
- tell('输入无效', filename, level=1)
- outbox.put(Command('setStatus', 'FINISH'))
- continue
- if cs not in frame:
- frame[cs] = None
-
- rate = 60 / parameters.get('interval')
- length = limit - frame.loc[index, cs].count()
-
- tell(f'已读取邮件:{subject}')
- tell(f'指定发件人:{address}')
- tell(f'已读取联系人信息共 {limit} 条')
- tell(f'预计发送数量 {length}')
-
- tell(f'当前发送速率 {round(rate, 2)} 封/分钟')
- if rate > 8.33: tell('当前发送速率已超出限制 8.33 封/分钟', level=1)
-
- tell(f'预计使用时间 {timedelta(minutes=length / rate)}')
- tell(f'已设定允许重试次数:{parameters.get('attempts')}')
- tell(f'已设定最大重复次数:{parameters.get('max_occurrence') or '无限制'}')
-
- locale = parameters.get('locale')
- greetings = [item for item in Greetings.presets() if item.locale == locale][0]
- timezone = ZoneInfo(greetings.timezone)
- tell(f'当前时区:{greetings.timezone}')
- tell(f'当前语言:{greetings.locale.upper()}')
- except Faillable:
+ progress['done'] = 0
+ progress['subject'] = subject
+ logger.info('Done')
+ except Cancel:
+ status = Status.IDLE
+ driver.switch_to.window(driver.window_handles[0])
+ continue
+ except StopIteration:
+ logger.error('Invalid locale: %s', options.get('locale'))
+ status = Status.IDLE
continue
except Exception as e:
- tell('读取数据时发生错误', e, level=0)
- return 5
+ logger.error('Error while loading data', exc_info=e)
+ status = Status.IDLE
+ continue
+
+ index = 2
+ attempts = 0
+ tz = ZoneInfo(locale.timezone)
+ occurrence = dict()
- cursor = 0
- status = Status.ACTIVE
- sent = 0
- errors = 0
- warnings = 0
+ while index <= limit:
+ try:
+ flow.do(Wait)
+ flow.react()
+ attempts += 1
+ name = cell(index, column.recipient).value
+ email = cell(index, column.email).value
- while status.isactive() and cursor < limit:
- attempt = 0
- current = cursor
- cursor += 1
+ try: code = str(cell(index, column.code).value)
+ except ValueError: code = None
+ occurrence.setdefault(code, 0)
- axis = index[current]
- name = dict(frame.iloc[axis]).get(cn)
- code = dict(frame.iloc[axis]).get(cc)
- recipient = str(frame.loc[axis, ca]).strip()
- occurrence = pandas.Series.count(frame[frame[cc] == code][cs]) if code is not None else 0
+ if attempts > parameters['attempts']:
+ logger.warning("[%d/%d] Exhausted all allowed attempts; skipping", index-1, limit-1)
+ raise Skip()
- if (remarks := frame.loc[axis, cs]) is not None and str(remarks).strip():
- tell(f'已跳过项目 {recipient}')
- continue
+ if (sent := cell(index, column.sent).value) is not None and str(sent).strip():
+ logger.info("[%d/%d] Already visited; skipping '%s'", index-1, limit-1, email)
+ occurrence[code] += 1
+ raise Skip()
- if (maximum := parameters.get('max_occurrence')) and occurrence >= maximum:
- tell(f'已跳过项目 {recipient}')
- warnings += 1
- continue
+ if index < options.get('start', 2) or progress['done'] >= options.get('limit', limit):
+ logger.info("[%d/%d] Not planned; skipping '%s'", index-1, limit-1, email)
+ raise Skip()
- while status.isactive():
- try:
- outbox.put(Command('setProgress', cursor, name, recipient, sent, warnings, errors, attempt))
- tell('正在发送:%s (%.2f %%)' % (recipient, current / limit * 100))
-
- clean = True
- attempt += 1
- request = get_request()
-
- if (target := get_address()) != address:
- exception = Exception(f'邮件发件地址与设定不一致\n>> {address}\n>> {target}')
- tell(None, exception, level=1)
-
- if (target := get_subject()) != subject:
- exception = Exception(f'邮件主题与设定不一致\n>> {subject}\n>> {target}')
- condition = EC.presence_of_element_located
- if not parameters.get('recovery'): raise exception
- tell(None, exception, level=1)
- # 打开存档
- click("li[data-id='virtual/standard'] > .subfolders li:nth-child(6) li.folder:nth-child(1)", condition)
- ready(driver)
- click("ul[aria-label='List view'] li[data-index='0']", condition)
- ready(driver)
- click("ul.classic-toolbar button[aria-label='More actions']", condition)
- # 复制邮件
- click(".abs + ul.dropdown-menu a[data-action='io.ox/mail/actions/copy']", condition)
- click(".modal-dialog ul.subfolders li[data-id='default0/Brouillons']", condition)
- click(".modal-dialog .modal-footer button[data-action='ok']", condition)
- # 返回草稿箱
- click("li[data-id='default0/Brouillons']")
-
- for attempt in range(parameters.get('attempts')):
- ready(driver)
- click("ul[aria-label='List view'] li[data-index='0']", condition)
- if get_subject() == subject: break
- if not get_subject() == subject: raise exception
-
- click("ul.classic-toolbar button[aria-label='Edit copy']")
- ready(driver)
- locate("div.io-ox-mail-compose-window iframe", condition=EC.frame_to_be_available_and_switch_to_it)
-
- if parameters.get('greet') and greetings and timezone:
- match datetime.now(timezone).hour:
- case hour if 6 <= hour < 12: hello = greetings.morning
- case hour if 12 <= hour < 18: hello = greetings.afternoon
- case hour if 18 <= hour < 21: hello = greetings.evening
- case _: hello = None
-
- iframe = driver.switch_to.active_element
- action = ActionChains(driver)
- clean = False
- hello = hello or greetings.default
-
- if name is not None and (name := str(name).strip()) and not contains_non_latin_alphabet(name):
- parts = HumanName(name)
- parts.capitalize(force=True)
- short = len(parts.first) < 3 or (len(parts.first) < 5 and parts.first.endswith('.'))
- hello = ' '.join(filter(None, [hello, parts.title, parts.first, (parts.middle or parts.last) if short else None]))
-
- hello += ','
- action.send_keys(hello).perform()
-
- if items := iframe.find_elements(By.XPATH, f'//*[contains(text(), "{hello}")]'):
- target = items[0]
- clean = target.text == hello
-
- driver.switch_to.default_content()
- wrapper = locate("div.io-ox-mail-compose-window div[data-extension-id='to'] > div.mail-input")
- to = locate("input.token-input.tt-input[tabindex='0']", parent=wrapper)
-
- # 填入收件人
- click(wrapper)
- to.send_keys(recipient + Keys.ENTER)
-
- # 引入变量值
- if cv in frame and (variables := frame.loc[axis, cv]):
- clean = False
- target = locate("div.io-ox-mail-compose-window div[data-extension-id='subject'] input")
- attribute = target.get_attribute('value')
-
- for i, value in enumerate(str(variables).split(',')):
- attribute = attribute.replace(f'$${i}', value.strip())
-
- target.send_keys(Keys.BACKSPACE * len(attribute))
- target.send_keys(attribute)
- clean = target.get_attribute('value') == attribute
-
- token = locate("div.io-ox-mail-compose-window .mail-input .tokenfield .token")
- target = token.get_attribute('innerText').strip()
-
- if target != recipient:
- tell(f'收件人地址不一致 ({attempt})', target, level=1)
- elif not clean:
- tell(f'邮件内容不正确 ({attempt})', level=1)
- else:
- # 发送邮件
- request = get_request()
- click("div.io-ox-mail-compose-window button[data-action='send']")
- # 检测页面警告
- try:
- wait = WebDriverWait(driver, timeout=parameters.get('interval'))
- alert = wait.until(lambda x: x.find_element(By.CSS_SELECTOR, "div.io-ox-alert.io-ox-alert-error"))
-
- frame.loc[axis, cs] = '❌'
- message = alert.text.replace('\n', ' ')
- tell(f'邮件系统错误 ({attempt})', message or None, level=1)
- # 关闭警告
- click("div.io-ox-alert.io-ox-alert-error button[data-action='close']")
- except TimeoutException:
- frame.loc[axis, cs] = '✔️'
- sent += 1
- break
-
- while mails := driver.find_elements(By.CSS_SELECTOR, "div.io-ox-mail-compose-window"):
- try:
- click("button[data-action='close']", parent=mails[0])
- click("div.modal-footer button[data-action='delete']")
- except Exception as e:
- tell("关闭邮件时发生了错误", e, level=1)
- break
-
- if attempt < parameters.get('attempts'):
- continue
- else:
- tell('已超出最大重试上限', level=1)
- errors += 1
- break
- except (KeyboardInterrupt, InvalidSessionIdException, ShutDown):
- request = None
- exception = None
- except Faillable as o:
- request = o.request
- exception = None
- except Exception as e:
- request = None
- exception = e
-
- if exception is not None:
- try:
- tell(f'发生错误 ({attempt})', exception, level=0)
- outbox.put(Command('setStatus', 'FAILED'))
- request = inbox.get()
- except ShutDown:
+ match options.get('occurrence'):
+ case o if o is None:
pass
+ case o if o > 0 and code is None:
+ logger.error("Column '%s' contains invalid ID; could not enforce spam control", column.code)
+ raise Cancel()
+ case o if o > 0 and occurrence[code] >= o:
+ logger.info("[%d/%d] No Spam! Skipping '%s'", index-1, limit-1, email)
+ raise Skip()
- if request == 'CANCEL':
- outbox.put(Command('setStatus', 'CANCEL'))
- status = Status.INACTIVE
- break
- if request == 'BYPASS':
- outbox.put(Command('setStatus', 'BYPASS'))
- break
- if request == 'RESUME':
- outbox.put(Command('setStatus', 'RESUME'))
- continue
+ if options.get('slice') and (sub := options.get('subcategories')):
+ if (value := str(cell(index, column.region).value)) not in sub:
+ logger.info("[%d/%d] Value '%s' not enlisted; skipping '%s'", index-1, limit-1, value, email)
+ raise Skip()
- tell('程序中断', level=1)
- status = Status.TERMINATED
-
- outbox.put(Command('setProgress', cursor, name, recipient, sent, warnings, errors))
- progress = cursor / limit * 100
- tell('当前进度:%.2f %%' % progress)
- tell(f'已发送 {sent} 封;发送失败 {errors} 封;跳过重复项 {warnings} 个')
-
- if parameters.get('save'):
- for attempt in range(parameters.get('attempts')):
- try:
- path = Path(filename)
- tell(f'正在写入文件:{path}')
- if path.exists():
- spreadsheet = pandas.read_excel(path, sheet_name=0)
- frame = frame.combine_first(spreadsheet)
- with pandas.ExcelWriter(path, mode='w') as writer:
- frame.to_excel(writer, index=False)
+ while mails := driver.find_elements(By.CSS_SELECTOR, "div.io-ox-mail-compose-window"):
+ try:
+ click(mails[0].find_element(By.CSS_SELECTOR, "button[data-action='close']"), wait=False)
+ click("div.modal-footer button[data-action='delete']", wait=False)
+ except Exception as e:
+ logger.warning('Unable to close off email compose windows', exc_info=e)
break
- except Exception as e:
- tell(f'写入文件时发生错误 ({attempt})', e, level=1)
- time.sleep(parameters.get('interval'))
+
+ flow.allow(Cancel)
+ flow.allow(Skip)
+ flow.do(Wait)
+ flow.react()
+ progress['email'] = str(email)
+ progress['recipient'] = str(name)
+ logger.info('[%d/%d] Sending to %s', index-1, limit-1, email)
+ clean = True
+
+ if (target := get_address()) != parameters['account']:
+ e = MismatchedEmailAddress(target)
+ raise e
+
+ if (target := get_subject()) != subject:
+ e = MismatchedEmailSubject(target)
+ c = EC.presence_of_element_located
+ if not options.get('recover'): raise e
+
+ click("li[data-id='virtual/standard'] > .subfolders li:nth-child(6) li.folder:nth-child(1)", condition=c)
+ until(ready)
+ click("ul[aria-label='List view'] li[data-index='0']", condition=c)
+ until(ready)
+ click("ul.classic-toolbar button[aria-label='More actions']", condition=c)
+ click(".abs + ul.dropdown-menu a[data-action='io.ox/mail/actions/copy']", condition=c)
+ click(".modal-dialog ul.subfolders li[data-id='default0/Brouillons']", condition=c)
+ click(".modal-dialog .modal-footer button[data-action='ok']", condition=c)
+ click("li[data-id='default0/Brouillons']")
+
+ for _ in range(parameters['attempts']):
+ until(ready)
+ click("ul[aria-label='List view'] li[data-index='0']", condition=c)
+ if get_subject() == subject: break
+ if not get_subject() == subject: raise e
+
+ flow.do(Wait)
+ flow.react()
+ click("ul.classic-toolbar button[aria-label='Edit copy']")
+ until(ready)
+ locate("div.io-ox-mail-compose-window iframe", condition=EC.frame_to_be_available_and_switch_to_it)
+
+ if options.get('greet') and not (clean := False):
+ match datetime.now(tz).hour:
+ case h if 6 <= h < 12: hello = locale.morning
+ case h if 12 <= h < 18: hello = locale.afternoon
+ case h if 18 <= h < 21: hello = locale.evening
+ case _: hello = None
+
+ iframe = driver.switch_to.active_element
+ action = ActionChains(driver)
+ hello = hello or locale.default
+
+ if name is not None and (name := str(name).strip()) and not contains_non_latin_alphabet(name):
+ parts = HumanName(name)
+ parts.capitalize(force=True)
+ short = len(parts.first) < 3 or (len(parts.first) < 5 and parts.first.endswith('.'))
+ hello = ' '.join(filter(None, [hello, parts.title, parts.first, (parts.middle or parts.last) if short else None]))
+
+ hello += ','
+ action.send_keys(hello).perform()
+
+ if items := iframe.find_elements(By.XPATH, f'//*[contains(text(), "{hello}")]'):
+ target = items[0]
+ clean = target.text == hello
+
+ driver.switch_to.default_content()
+ flow.do(Wait)
+ flow.react()
+ click("div.io-ox-mail-compose-window div[data-extension-id='to'] > div.mail-input")
+ box = locate("div.io-ox-mail-compose-window div[data-extension-id='to'] > div.mail-input input.token-input.tt-input[tabindex='0']")
+ box.send_keys(str(email) + Keys.ENTER)
+
+ if column.variables in headers and (v := cell(index, column.variables).value) is not None:
+ clean = False
+ target = locate("div.io-ox-mail-compose-window div[data-extension-id='subject'] input")
+ template = target.get_attribute('value')
+ length = len(template)
+
+ for i, value in enumerate(str(v).split(',')):
+ template = template.replace('$$%s' % i, value.strip())
+
+ target.send_keys(Keys.BACKSPACE * length)
+ target.send_keys(template)
+ clean = target.get_attribute('value') == template
+
+ flow.do(Wait)
+ flow.react()
+ box = locate("div.io-ox-mail-compose-window .mail-input .tokenfield .token")
+ recipient = box.get_attribute('innerText').strip()
+
+ if recipient != str(email):
+ logger.warning('Malformed email address detected; retrying (%d)...', attempts)
continue
- if status.isalive(): outbox.put(Command('setStatus', 'FINISH'))
- else: return 0
+ if not clean:
+ logger.warning('Malformed email content detected; retrying (%d)...', attempts)
+ continue
-async def handler(request: ws.WebSocketRequest):
- global connection
- websocket = await request.accept()
-
- if connection is None:
- connection = websocket
- else:
- await websocket.aclose(code=1000, reason="Server allows only one connection")
- return
-
- async def receiver():
- while True:
- try:
- message = await connection.get_message()
- inbox.put(message)
- except Exception as e:
- inbox.shutdown(immediate=True)
+ flow.do(Wait)
+ flow.react()
+ click("div.io-ox-mail-compose-window button[data-action='send']")
+ except Skip:
+ index += 1
+ attempts = 0
+ continue
+ except Cancel:
+ status = Status.IDLE
break
-
- async def sender():
- while True:
- try:
- message = await trio.to_thread.run_sync(outbox.get)
- await connection.send_message(str(message))
+ except (MismatchedEmailAddress, MismatchedEmailSubject) as e:
+ logger.error('Nuh-Uh', exc_info=e)
+ status = Status.STANDBY
+ continue
except Exception as e:
- outbox.shutdown(immediate=True)
- break
+ logger.error('Unexptected error', exc_info=e)
+ status = Status.STANDBY
+ continue
- async with trio.open_nursery() as nursery:
- nursery.start_soon(receiver)
- nursery.start_soon(sender)
+ try:
+ w = WebDriverWait(driver, timeout=parameters['interval'])
+ alert = w.until(lambda x: x.find_element(By.CSS_SELECTOR, "div.io-ox-alert.io-ox-alert-error"))
+ message = alert.text.replace('\n', ' ')
+
+ cell(index, column.sent).value = '❌'
+ click("div.io-ox-alert.io-ox-alert-error button[data-action='close']")
+ logger.warning('Error while sending email (message: %s); retrying (%d)...', message, attempts)
+ except TimeoutException:
+ cell(index, column.sent).value = '✔️'
+ progress['done'] += 1
+ occurrence[code] += 1
+ index += 1
+ attempts = 0
+ except Exception as e:
+ logger.error('Unexptected error', exc_info=e)
+ status = Status.STANDBY
+ continue
-async def backend(listen='127.0.0.1', port=0):
- global server
- listeners = await trio.open_tcp_listeners(port, host=listen)
- server = ws.WebSocketServer(handler, listeners, max_message_size=125_000_000)
- await server.run()
+ if options.get('save'):
+ try:
+ status = Status.CLOSING
+ flow.allow(Wait, False)
+ flow.allow(Timeout)
+ flow.allow(Close)
+ logger.info('Saving document...')
+ flow.do(Timeout)
+ flow.react()
+ except Close:
+ pass
+ except Exception as e:
+ file = Path('.').joinpath(filename or f'Mailer-{datetime.now().strftime('%Y%m%d-%H%M%S-%f')}.xlsx').resolve()
+ logger.warning('RPC communication failed; saving at %s', str(file), exc_info=e)
+ wb.save(file)
+
+ flow.allow(Timeout, False)
+ flow.allow(Close, False)
+ flow.allow(Cancel, False)
+ flow.allow(Skip, False)
+ driver.switch_to.window(driver.window_handles[0])
+ status = Status.IDLE
+ logger.info('Done')
if __name__ == '__main__':
try:
- logging.basicConfig(level=logging.INFO, format="[%(asctime)s] [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M")
- tell('正在初始化')
- thread = Thread(target=lambda: trio.run(backend), daemon=True)
- thread.start()
+ logging.basicConfig(level=logging.INFO, format="[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s", datefmt="%Y-%m-%d %H:%M")
+ logger = logging.getLogger()
+ level = logging.getLevelNamesMapping().get(args.log_level, 'INFO')
+ logger.setLevel(level)
+ opts = ChromeOptions()
+ opts.add_experimental_option("prefs", {
+ "custom_handlers.enabled": False,
+ "custom_handlers.ignored_protocol_handlers": [{
+ "is_confirmed": True,
+ "last_modified": "13423573689616443",
+ "protocol": "mailto",
+ "security_level": 0,
+ "url": WEBMAIL,
+ }],
+ "profile.default_content_setting_values.notifications": 2,
+ })
- tell('正在启动 Chrome 自动化实例')
with keep.presenting():
- opts = ChromeOptions()
- opts.add_experimental_option("excludeSwitches", ["enable-logging"])
- driver = Chrome(opts)
+ driver = Chrome(options=opts)
status = main(driver)
+ except KeyboardInterrupt:
+ status = 0
except Exception as e:
- tell('致命错误', e, level=0)
+ logger.critical('Fatal error', exc_info=e)
status = 1
finally:
driver.quit()
diff --git a/requirements.txt b/requirements.txt
index e91241b..0db59f7 100644
Binary files a/requirements.txt and b/requirements.txt differ