Files
mailer/main.py
2026-06-03 17:19:37 +08:00

610 lines
22 KiB
Python

import argparse
import logging
import re
from selenium.common.exceptions import TimeoutException
from selenium.webdriver import Chrome, ChromeOptions
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.remote.webelement import WebElement
from common.utils import *
from common.timer import Timer
from common.jsonrpc2 import ServiceProvider
from common.actionflow import Action, ActionFlow
from io import BytesIO
from enum import Enum
from wakepy import keep
from pathlib import Path
from datetime import datetime
from openpyxl import Workbook
from zoneinfo import ZoneInfo
from nameparser import HumanName
parser = argparse.ArgumentParser(description="Mailer")
parser.add_argument('account', type=str)
parser.add_argument('password', type=str)
parser.add_argument('-c', '--column', nargs='+', action='extend')
parser.add_argument('-t', '--timeout', type=int, default=60)
parser.add_argument('-i', '--interval', type=int, default=10)
parser.add_argument('-r', '--attempts', type=int, default=3)
parser.add_argument('-l', '--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'])
args = parser.parse_args()
LOGIN = "https://id.ionos.fr/identifier"
WEBMAIL = "https://email.ionos.fr/appsuite/#app=io.ox/mail&mailto=%s"
def main(driver: Chrome, logger = logging.getLogger('main')):
timer = Timer()
parameters = vars(args)
sp = ServiceProvider.default()
class Status(Enum):
IDLE = 0
READY = 1
RUNNING = 3
STANDBY = 4
CLOSING = 5
class ColumnMapping:
def __init__(self, email=None, recipient=None, code=None, region=None, sent=None, variables=None):
self.email = email or '邮箱'
self.recipient = recipient or '主要联系人'
self.code = code or '客户编号'
self.region = region or '国家地区'
self.sent = sent or '已发送'
self.variables = variables or '变量值'
status = Status.IDLE
options = dict()
headers = list()
progress = dict()
wb: Workbook = None
filename: str = None
column: ColumnMapping = None
limit = 0
def begin(opts: dict, args: dict):
nonlocal status
if status != Status.READY: raise ValueError(status)
options.update(opts)
status = Status.RUNNING
parameters.update(args)
timer.clear()
timer.start()
def pause():
nonlocal status
status = Status.STANDBY
timer.pause()
def resume():
nonlocal status
status = Status.RUNNING
driver.switch_to.window(driver.current_window_handle)
timer.start()
def unique(header: str):
result = dict()
col = headers.index(header) + 1
for row in range(options.get('start', 2), limit + 1):
key = str(wb.active.cell(row, col).value)
result[key] = result.get(key, 0) + 1
return result
sp.set('begin', begin)
sp.set('pause', pause)
sp.set('resume', resume)
sp.set('unique', unique)
sp.set('status', lambda: status.name)
sp.set('uptime', lambda: timer.delta())
try:
mapping = ColumnMapping(**{
k.lower().strip(): v.strip() for k, v in map(lambda o: str.split(o, '=', 2), parameters['column'] or list())
})
except Exception as e:
logger.critical('Unable to load column mappings', exc_info=e)
return 1
class Locale:
def __init__(self, name, timezone, default, morning=None, afternoon=None, evening=None, keywords=None):
self.name: str = name
self.timezone: str = timezone
self.default: str = default
self.morning: str = morning
self.afternoon: str = afternoon
self.evening: str = evening
self.keywords: str = keywords
locales = [
Locale("en", "Europe/London", "Hello" , "Good morning", "Good afternoon" , "Good evening", []),
Locale("fr", "Europe/Paris" , "Bonjour", None , None , "Bonsoir" , ['法国', '比利时', '留尼汪']),
Locale("de", "Europe/Berlin", "Hallo" , "Guten Morgen", "Guten Tag" , "Guten Abend" , ['德国', '奥地利', '瑞士']),
Locale("it", "Europe/Rome" , "Ciao" , "Buongiorno" , "Buon pomeriggio", "Buonasera" , ['意大利']),
Locale("es", "Europe/Madrid", "Hola" , "Buenos días" , "Buenas tardes" , None , ['西班牙']),
Locale("pt", "Europe/Lisbon", "Olá" , "Bom dia" , "Boa tarde" , None , ['葡萄牙']),
]
try:
sp.set('context', lambda: { 'locales': list(map(vars, locales)), 'mapping': vars(mapping), 'parameters': parameters })
driver.get(sp.run())
except Exception as e:
logger.critical('Unable to load starup page', exc_info=e)
return 2
def contains_non_latin_alphabet(string: str):
from unicodedata import category, name
for char in string:
if char.isdigit() or (category(char).startswith('L') and not name(char, '').startswith('LATIN')):
return True
return False
def ready(driver: Chrome):
busy = driver.find_element(By.CSS_SELECTOR, ".io-ox-busy")
return busy
def catch(predicate, force=True):
try:
wait = WebDriverWait(driver, timeout=parameters['interval'])
return wait.until(predicate, 'Timeout')
except Exception as e:
if force: raise e
return None
try:
driver.switch_to.new_window('tab')
driver.set_page_load_timeout(parameters['timeout'])
driver.get(LOGIN)
except TimeoutException:
logger.warning('Timeout')
driver.execute_script("window.stop();")
try:
setup(driver, parameters)
click("#selectAll")
except Exception as e:
logger.debug('Can I haz cheez burger? :3', exc_info=e)
try:
logger.info('Logging in as %s', parameters['account'])
username = locate("#username")
username.send_keys(parameters['account'])
click("#button--with-loader")
password = locate("#password")
password.send_keys(parameters['password'])
click("#button--with-loader")
except Exception as e:
logger.critical('Error while logging in to %s', LOGIN, exc_info=e)
return 3
while True:
try:
loader = locate("#background-loader", wait=False)
if not loader.is_displayed(): break
except:
sleep(1)
def get_subject():
try:
element = locate("h1.subject", wait=False)
subject = element.text.strip()
return subject
except:
return None
def get_address():
try:
element = locate("header div.from", wait=False)
address = re.search(r'[^<\s]+@[^>\s]+', element.text)[0]
return address
except:
return None
def cell(row: int, header: str):
result = wb.active.cell(row, headers.index(header) + 1)
return result
try:
click("li[data-id='default0/Brouillons']", condition=EC.presence_of_element_located)
click("button[data-id='default0/Brouillons']", condition=EC.presence_of_element_located)
click("ul[aria-label='List view'] li[data-index='0']", condition=EC.presence_of_element_located)
logger.info('Done')
except Exception as e:
logger.warning("Could not open drafts; this may cause issues", exc_info=e)
finally:
driver.switch_to.window(driver.window_handles[0])
class Wait(Action):
@classmethod
def prepare(cls):
return True
@classmethod
def perform(cls):
if status == Status.RUNNING: return False
sleep(0.2); return True
class Timeout(Action):
timer = Timer()
@classmethod
def prepare(cls):
cls.timer.clear()
cls.timer.start()
return True
@classmethod
def perform(cls):
if cls.timer.delta() > parameters['timeout']: raise cls
sleep(0.2); return True
class Cancel(Action):
@classmethod
def prepare(cls):
nonlocal status
status = Status.RUNNING
return True
@classmethod
def perform(cls):
nonlocal status
status = Status.READY
driver.switch_to.window(driver.window_handles[0])
raise cls
class Close(Action):
@classmethod
def prepare(cls):
return status == Status.CLOSING
class Skip(Action):
@classmethod
def prepare(cls):
return True
@classmethod
def perform(cls):
driver.switch_to.window(driver.current_window_handle)
raise cls
flow = ActionFlow()
flow.append(Wait)
flow.append(Timeout)
flow.append(Cancel)
flow.append(Close)
flow.append(Skip)
sp.set('actions', lambda: flow.capabilities())
sp.set('cancel', lambda: flow.do(Cancel, force=True))
sp.set('close', lambda: flow.do(Close, force=True))
sp.set('skip', lambda: flow.do(Skip, force=True))
def loads(name: str, b64: str):
from base64 import b64decode
from openpyxl import load_workbook
nonlocal status, filename, wb, limit
if status != Status.IDLE: raise ValueError(status)
data = b64decode(b64)
logger.info('Received %s byte(s) total', len(data))
logger.info('Loading...')
filename = name
buffer = BytesIO(data)
wb = load_workbook(buffer)
limit = wb.active.max_row
headers.clear()
for col in range(1, wb.active.max_column + 1):
value = str(wb.active.cell(1, col).value)
headers.append(value)
logger.info('Done')
flow.allow(Cancel)
status = Status.READY
return limit
def merge(b64: str):
from base64 import b64decode, b64encode
from openpyxl import load_workbook
if status != Status.CLOSING: raise ValueError(status)
data = b64decode(b64)
index = headers.index(column.sent)
buffer = BytesIO(data)
target = load_workbook(buffer)
for row in range(options.get('start', 2), limit + 1):
c1 = wb.active.cell(row, index + 1)
c2 = target.active.cell(row, index + 1)
if not (c1.value and str(c1.value).strip()):
if (c2.value and str(c2.value).strip()):
c1.value = c2.value
io = BytesIO()
wb.save(io)
result = b64encode(io.getbuffer()).decode('ascii')
return result
sp.set('ready', loads)
sp.set('merge', merge)
sp.set('progress', lambda: progress)
class MismatchedEmailSubject(Exception): pass
class MismatchedEmailAddress(Exception): pass
while True:
try:
flow.allow(Wait)
flow.do(Wait)
flow.react()
progress.clear()
driver.switch_to.window(driver.window_handles[1])
subject = get_subject()
column = ColumnMapping(**options.get('mapping'))
if column.email not in headers or limit < 2:
logger.error("Column '%s' is not found or does not contain valid data", column.email)
raise Cancel()
if column.sent not in headers:
wb.active.cell(1, wb.active.max_column + 1).value = column.sent
headers.append(column.sent)
logger.info('Read %s line(s) total', limit)
logger.info('Subject: %s', subject)
logger.info('From: %s', parameters['account'])
locale: Locale = next(filter(lambda o: o.name == options.get('locale'), locales))
logger.info('Locale: %s', locale.name.upper())
logger.info('Timezone: %s', locale.timezone)
progress['done'] = 0
progress['subject'] = subject
logger.info('Done')
except Cancel:
status = Status.IDLE
driver.switch_to.window(driver.window_handles[0])
continue
except StopIteration:
logger.error('Invalid locale: %s', options.get('locale'))
status = Status.IDLE
continue
except Exception as e:
logger.error('Error while loading data', exc_info=e)
status = Status.IDLE
continue
index = 2
attempts = 0
tz = ZoneInfo(locale.timezone)
occurrence = dict()
while index <= limit:
try:
flow.do(Wait)
flow.react()
attempts += 1
name = cell(index, column.recipient).value
email = cell(index, column.email).value
try: code = str(cell(index, column.code).value)
except ValueError: code = None
occurrence.setdefault(code, 0)
if attempts > parameters['attempts']:
logger.warning("[%d/%d] Exhausted all allowed attempts; skipping", index-1, limit-1)
raise Skip()
if (sent := cell(index, column.sent).value) is not None and str(sent).strip():
logger.info("[%d/%d] Already visited; skipping '%s'", index-1, limit-1, email)
occurrence[code] += 1
raise Skip()
if index < options.get('start', 2) or progress['done'] >= options.get('limit', limit):
logger.info("[%d/%d] Not planned; skipping '%s'", index-1, limit-1, email)
raise Skip()
match options.get('occurrence'):
case o if o is None:
pass
case o if o > 0 and code is None:
logger.error("Column '%s' contains invalid ID; could not enforce spam control", column.code)
raise Cancel()
case o if o > 0 and occurrence[code] >= o:
logger.info("[%d/%d] No Spam! Skipping '%s'", index-1, limit-1, email)
raise Skip()
if options.get('slice') and (sub := options.get('subcategories')):
if (value := str(cell(index, column.region).value)) not in sub:
logger.info("[%d/%d] Value '%s' not enlisted; skipping '%s'", index-1, limit-1, value, email)
raise Skip()
while mails := driver.find_elements(By.CSS_SELECTOR, "div.io-ox-mail-compose-window"):
try:
click(mails[0].find_element(By.CSS_SELECTOR, "button[data-action='close']"), wait=False)
click("div.modal-footer button[data-action='delete']", wait=False)
except Exception as e:
logger.warning('Unable to close off email compose windows', exc_info=e)
break
flow.allow(Cancel)
flow.allow(Skip)
flow.do(Wait)
flow.react()
progress['email'] = str(email)
progress['recipient'] = str(name)
logger.info('[%d/%d] Sending to %s', index-1, limit-1, email)
clean = True
error = None
if (target := get_address()) != parameters['account']:
error = MismatchedEmailAddress(target)
if (target := get_subject()) != subject:
error = MismatchedEmailSubject(target)
if error is not None and not options.get('force'):
raise error
flow.do(Wait)
flow.react()
click("ul.classic-toolbar button[aria-label='Edit copy']")
until(ready)
locate("div.io-ox-mail-compose-window iframe", condition=EC.frame_to_be_available_and_switch_to_it)
if options.get('greet') and not (clean := False):
match datetime.now(tz).hour:
case h if 6 <= h < 12: hello = locale.morning
case h if 12 <= h < 18: hello = locale.afternoon
case h if 18 <= h < 21: hello = locale.evening
case _: hello = None
iframe = driver.switch_to.active_element
action = ActionChains(driver)
hello = hello or locale.default
if name is not None and (name := str(name).strip()) and not contains_non_latin_alphabet(name):
parts = HumanName(name)
parts.capitalize(force=True)
short = len(parts.first) < 3 or (len(parts.first) < 5 and parts.first.endswith('.'))
hello = ' '.join(filter(None, [hello, parts.title, parts.first, (parts.middle or parts.last) if short else None]))
hello += ','
action.send_keys(hello).perform()
if items := iframe.find_elements(By.XPATH, f'//*[contains(text(), "{hello}")]'):
target = items[0]
clean = target.text == hello
driver.switch_to.default_content()
flow.do(Wait)
flow.react()
click("div.io-ox-mail-compose-window div[data-extension-id='to'] > div.mail-input")
box = locate("div.io-ox-mail-compose-window div[data-extension-id='to'] > div.mail-input input.token-input.tt-input[tabindex='0']")
box.send_keys(str(email) + Keys.ENTER)
if column.variables in headers and (v := cell(index, column.variables).value) is not None:
clean = False
target = locate("div.io-ox-mail-compose-window div[data-extension-id='subject'] input")
template = target.get_attribute('value')
length = len(template)
for i, value in enumerate(str(v).split(',')):
template = template.replace('$$%s' % i, value.strip())
target.send_keys(Keys.BACKSPACE * length)
target.send_keys(template)
clean = target.get_attribute('value') == template
flow.do(Wait)
flow.react()
box = locate("div.io-ox-mail-compose-window .mail-input .tokenfield .token")
recipient = box.get_attribute('innerText').strip()
if recipient != str(email):
logger.warning('Malformed email address detected; retrying (%d)...', attempts)
continue
if not clean:
logger.warning('Malformed email content detected; retrying (%d)...', attempts)
continue
flow.do(Wait)
flow.react()
click("div.io-ox-mail-compose-window button[data-action='send']")
except Skip:
index += 1
attempts = 0
continue
except Cancel:
status = Status.IDLE
break
except (MismatchedEmailAddress, MismatchedEmailSubject) as e:
logger.error('Nuh-Uh', exc_info=e)
status = Status.STANDBY
continue
except Exception as e:
logger.error('Unexptected error', exc_info=e)
status = Status.STANDBY
continue
try:
alert: WebElement = catch(lambda x: x.find_element(By.CSS_SELECTOR, "div.io-ox-alert.io-ox-alert-error"))
message = alert.text.replace('\n', ' ')
cell(index, column.sent).value = ''
click("div.io-ox-alert.io-ox-alert-error button[data-action='close']")
logger.warning('Error while sending email (message: %s); retrying (%d)...', message, attempts)
except TimeoutException:
cell(index, column.sent).value = '✔️'
progress['done'] += 1
occurrence[code] += 1
index += 1
attempts = 0
except Exception as e:
logger.error('Unexptected error', exc_info=e)
status = Status.STANDBY
continue
if options.get('save'):
try:
status = Status.CLOSING
flow.allow(Wait, False)
flow.allow(Timeout)
flow.allow(Close)
logger.info('Saving document...')
flow.do(Timeout, force=True)
flow.react()
except Close:
pass
except Exception as e:
file = Path('.').joinpath(filename or f'Mailer-{datetime.now().strftime('%Y%m%d-%H%M%S-%f')}.xlsx').resolve()
logger.warning('RPC communication failed; saving at %s', str(file), exc_info=e)
wb.save(file)
flow.allow(Timeout, False)
flow.allow(Close, False)
flow.allow(Cancel, False)
flow.allow(Skip, False)
driver.switch_to.window(driver.window_handles[0])
status = Status.IDLE
logger.info('Done')
if __name__ == '__main__':
try:
logging.basicConfig(level=logging.INFO, format="[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s", datefmt="%Y-%m-%d %H:%M")
logger = logging.getLogger()
level = logging.getLevelNamesMapping().get(args.log_level, 'INFO')
logger.setLevel(level)
opts = ChromeOptions()
opts.add_experimental_option("prefs", {
"custom_handlers.enabled": False,
"custom_handlers.ignored_protocol_handlers": [{
"is_confirmed": True,
"last_modified": "13423573689616443",
"protocol": "mailto",
"security_level": 0,
"url": WEBMAIL,
}],
"profile.default_content_setting_values.notifications": 2,
})
with keep.presenting():
driver = Chrome(options=opts)
status = main(driver)
except KeyboardInterrupt:
status = 0
except Exception as e:
logger.critical('Fatal error', exc_info=e)
status = 1
finally:
driver.quit()
exit(status)