import argparse import logging import re from selenium.common.exceptions import TimeoutException from selenium.webdriver import Chrome, ChromeOptions from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.remote.webelement import WebElement from common.utils import * from common.timer import Timer from common.jsonrpc2 import ServiceProvider from common.actionflow import Action, ActionFlow from io import BytesIO from enum import Enum from wakepy import keep from pathlib import Path from datetime import datetime from openpyxl import Workbook from zoneinfo import ZoneInfo from nameparser import HumanName parser = argparse.ArgumentParser(description="Mailer") parser.add_argument('account', type=str) parser.add_argument('password', type=str) parser.add_argument('-c', '--column', nargs='+', action='extend') parser.add_argument('-t', '--timeout', type=int, default=60) parser.add_argument('-i', '--interval', type=int, default=10) parser.add_argument('-r', '--attempts', type=int, default=3) parser.add_argument('-l', '--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']) args = parser.parse_args() LOGIN = "https://id.ionos.fr/identifier" WEBMAIL = "https://email.ionos.fr/appsuite/#app=io.ox/mail&mailto=%s" def main(driver: Chrome, logger = logging.getLogger('main')): timer = Timer() parameters = vars(args) sp = ServiceProvider.default() class Status(Enum): IDLE = 0 READY = 1 RUNNING = 3 STANDBY = 4 CLOSING = 5 class ColumnMapping: def __init__(self, email=None, recipient=None, code=None, region=None, sent=None, variables=None): self.email = email or '邮箱' self.recipient = recipient or '主要联系人' self.code = code or '客户编号' self.region = region or '国家地区' self.sent = sent or '已发送' self.variables = variables or '变量值' status = Status.IDLE options = dict() headers = list() progress = dict() wb: Workbook = None filename: str = None column: ColumnMapping = None limit = 0 def begin(opts: dict, args: dict): nonlocal status if status != Status.READY: raise ValueError(status) options.update(opts) status = Status.RUNNING parameters.update(args) timer.clear() timer.start() def pause(): nonlocal status status = Status.STANDBY timer.pause() def resume(): nonlocal status status = Status.RUNNING driver.switch_to.window(driver.current_window_handle) timer.start() def unique(header: str): result = dict() col = headers.index(header) + 1 for row in range(options.get('start', 2), limit + 1): key = str(wb.active.cell(row, col).value) result[key] = result.get(key, 0) + 1 return result sp.set('begin', begin) sp.set('pause', pause) sp.set('resume', resume) sp.set('unique', unique) sp.set('status', lambda: status.name) sp.set('uptime', lambda: timer.delta()) try: mapping = ColumnMapping(**{ k.lower().strip(): v.strip() for k, v in map(lambda o: str.split(o, '=', 2), parameters['column'] or list()) }) except Exception as e: logger.critical('Unable to load column mappings', exc_info=e) return 1 class Locale: def __init__(self, name, timezone, default, morning=None, afternoon=None, evening=None, keywords=None): self.name: str = name self.timezone: str = timezone self.default: str = default self.morning: str = morning self.afternoon: str = afternoon self.evening: str = evening self.keywords: str = keywords locales = [ Locale("en", "Europe/London", "Hello" , "Good morning", "Good afternoon" , "Good evening", []), Locale("fr", "Europe/Paris" , "Bonjour", None , None , "Bonsoir" , ['法国', '比利时', '留尼汪']), Locale("de", "Europe/Berlin", "Hallo" , "Guten Morgen", "Guten Tag" , "Guten Abend" , ['德国', '奥地利', '瑞士']), Locale("it", "Europe/Rome" , "Ciao" , "Buongiorno" , "Buon pomeriggio", "Buonasera" , ['意大利']), Locale("es", "Europe/Madrid", "Hola" , "Buenos días" , "Buenas tardes" , None , ['西班牙']), Locale("pt", "Europe/Lisbon", "Olá" , "Bom dia" , "Boa tarde" , None , ['葡萄牙']), ] try: sp.set('context', lambda: { 'locales': list(map(vars, locales)), 'mapping': vars(mapping), 'parameters': parameters }) driver.get(sp.run()) except Exception as e: logger.critical('Unable to load starup page', exc_info=e) return 2 def contains_non_latin_alphabet(string: str): from unicodedata import category, name for char in string: if char.isdigit() or (category(char).startswith('L') and not name(char, '').startswith('LATIN')): return True return False def ready(driver: Chrome): busy = driver.find_element(By.CSS_SELECTOR, ".io-ox-busy") return busy def catch(predicate, force=True): try: wait = WebDriverWait(driver, timeout=parameters['interval']) return wait.until(predicate, 'Timeout') except Exception as e: if force: raise e return None try: driver.switch_to.new_window('tab') driver.set_page_load_timeout(parameters['timeout']) driver.get(LOGIN) except TimeoutException: logger.warning('Timeout') driver.execute_script("window.stop();") try: setup(driver, parameters) click("#selectAll") except Exception as e: logger.debug('Can I haz cheez burger? :3', exc_info=e) try: logger.info('Logging in as %s', parameters['account']) username = locate("#username") username.send_keys(parameters['account']) click("#button--with-loader") password = locate("#password") password.send_keys(parameters['password']) click("#button--with-loader") except Exception as e: logger.critical('Error while logging in to %s', LOGIN, exc_info=e) return 3 while True: try: loader = locate("#background-loader", wait=False) if not loader.is_displayed(): break except: sleep(1) def get_subject(): try: element = locate("h1.subject", wait=False) subject = element.text.strip() return subject except: return None def get_address(): try: element = locate("header div.from", wait=False) address = re.search(r'[^<\s]+@[^>\s]+', element.text)[0] return address except: return None def cell(row: int, header: str): result = wb.active.cell(row, headers.index(header) + 1) return result try: click("li[data-id='default0/Brouillons']", condition=EC.presence_of_element_located) click("button[data-id='default0/Brouillons']", condition=EC.presence_of_element_located) click("ul[aria-label='List view'] li[data-index='0']", condition=EC.presence_of_element_located) logger.info('Done') except Exception as e: logger.warning("Could not open drafts; this may cause issues", exc_info=e) finally: driver.switch_to.window(driver.window_handles[0]) class Wait(Action): @classmethod def prepare(cls): return True @classmethod def perform(cls): if status == Status.RUNNING: return False sleep(0.2); return True class Timeout(Action): timer = Timer() @classmethod def prepare(cls): cls.timer.clear() cls.timer.start() return True @classmethod def perform(cls): if cls.timer.delta() > parameters['timeout']: raise cls sleep(0.2); return True class Cancel(Action): @classmethod def prepare(cls): nonlocal status status = Status.RUNNING return True @classmethod def perform(cls): nonlocal status status = Status.READY driver.switch_to.window(driver.window_handles[0]) raise cls class Close(Action): @classmethod def prepare(cls): return status == Status.CLOSING class Skip(Action): @classmethod def prepare(cls): return True @classmethod def perform(cls): driver.switch_to.window(driver.current_window_handle) raise cls flow = ActionFlow() flow.append(Wait) flow.append(Timeout) flow.append(Cancel) flow.append(Close) flow.append(Skip) sp.set('actions', lambda: flow.capabilities()) sp.set('cancel', lambda: flow.do(Cancel, force=True)) sp.set('close', lambda: flow.do(Close, force=True)) sp.set('skip', lambda: flow.do(Skip, force=True)) def loads(name: str, b64: str): from base64 import b64decode from openpyxl import load_workbook nonlocal status, filename, wb, limit if status != Status.IDLE: raise ValueError(status) data = b64decode(b64) logger.info('Received %s byte(s) total', len(data)) logger.info('Loading...') filename = name buffer = BytesIO(data) wb = load_workbook(buffer) limit = wb.active.max_row headers.clear() for col in range(1, wb.active.max_column + 1): value = str(wb.active.cell(1, col).value) headers.append(value) logger.info('Done') flow.allow(Cancel) status = Status.READY return limit def merge(b64: str): from base64 import b64decode, b64encode from openpyxl import load_workbook if status != Status.CLOSING: raise ValueError(status) data = b64decode(b64) index = headers.index(column.sent) buffer = BytesIO(data) target = load_workbook(buffer) for row in range(options.get('start', 2), limit + 1): c1 = wb.active.cell(row, index + 1) c2 = target.active.cell(row, index + 1) if not (c1.value and str(c1.value).strip()): if (c2.value and str(c2.value).strip()): c1.value = c2.value io = BytesIO() wb.save(io) result = b64encode(io.getbuffer()).decode('ascii') return result sp.set('ready', loads) sp.set('merge', merge) sp.set('progress', lambda: progress) class MismatchedEmailSubject(Exception): pass class MismatchedEmailAddress(Exception): pass while True: try: flow.allow(Wait) flow.do(Wait) flow.react() progress.clear() driver.switch_to.window(driver.window_handles[1]) subject = get_subject() column = ColumnMapping(**options.get('mapping')) if column.email not in headers or limit < 2: logger.error("Column '%s' is not found or does not contain valid data", column.email) raise Cancel() if column.sent not in headers: wb.active.cell(1, wb.active.max_column + 1).value = column.sent headers.append(column.sent) logger.info('Read %s line(s) total', limit) logger.info('Subject: %s', subject) logger.info('From: %s', parameters['account']) locale: Locale = next(filter(lambda o: o.name == options.get('locale'), locales)) logger.info('Locale: %s', locale.name.upper()) logger.info('Timezone: %s', locale.timezone) progress['done'] = 0 progress['subject'] = subject logger.info('Done') except Cancel: status = Status.IDLE driver.switch_to.window(driver.window_handles[0]) continue except StopIteration: logger.error('Invalid locale: %s', options.get('locale')) status = Status.IDLE continue except Exception as e: logger.error('Error while loading data', exc_info=e) status = Status.IDLE continue index = 2 attempts = 0 tz = ZoneInfo(locale.timezone) occurrence = dict() while index <= limit: try: flow.do(Wait) flow.react() attempts += 1 name = cell(index, column.recipient).value email = cell(index, column.email).value try: code = str(cell(index, column.code).value) except ValueError: code = None occurrence.setdefault(code, 0) if attempts > parameters['attempts']: logger.warning("[%d/%d] Exhausted all allowed attempts; skipping", index-1, limit-1) raise Skip() if (sent := cell(index, column.sent).value) is not None and str(sent).strip(): logger.info("[%d/%d] Already visited; skipping '%s'", index-1, limit-1, email) occurrence[code] += 1 raise Skip() if index < options.get('start', 2) or progress['done'] >= options.get('limit', limit): logger.info("[%d/%d] Not planned; skipping '%s'", index-1, limit-1, email) raise Skip() match options.get('occurrence'): case o if o is None: pass case o if o > 0 and code is None: logger.error("Column '%s' contains invalid ID; could not enforce spam control", column.code) raise Cancel() case o if o > 0 and occurrence[code] >= o: logger.info("[%d/%d] No Spam! Skipping '%s'", index-1, limit-1, email) raise Skip() if options.get('slice') and (sub := options.get('subcategories')): if (value := str(cell(index, column.region).value)) not in sub: logger.info("[%d/%d] Value '%s' not enlisted; skipping '%s'", index-1, limit-1, value, email) raise Skip() while mails := driver.find_elements(By.CSS_SELECTOR, "div.io-ox-mail-compose-window"): try: click(mails[0].find_element(By.CSS_SELECTOR, "button[data-action='close']"), wait=False) click("div.modal-footer button[data-action='delete']", wait=False) except Exception as e: logger.warning('Unable to close off email compose windows', exc_info=e) break flow.allow(Cancel) flow.allow(Skip) flow.do(Wait) flow.react() progress['email'] = str(email) progress['recipient'] = str(name) logger.info('[%d/%d] Sending to %s', index-1, limit-1, email) clean = True error = None if (target := get_address()) != parameters['account']: error = MismatchedEmailAddress(target) if (target := get_subject()) != subject: error = MismatchedEmailSubject(target) if error is not None and not options.get('force'): raise error flow.do(Wait) flow.react() click("ul.classic-toolbar button[aria-label='Edit copy']") until(ready) locate("div.io-ox-mail-compose-window iframe", condition=EC.frame_to_be_available_and_switch_to_it) if options.get('greet') and not (clean := False): match datetime.now(tz).hour: case h if 6 <= h < 12: hello = locale.morning case h if 12 <= h < 18: hello = locale.afternoon case h if 18 <= h < 21: hello = locale.evening case _: hello = None iframe = driver.switch_to.active_element action = ActionChains(driver) hello = hello or locale.default if name is not None and (name := str(name).strip()) and not contains_non_latin_alphabet(name): parts = HumanName(name) parts.capitalize(force=True) short = len(parts.first) < 3 or (len(parts.first) < 5 and parts.first.endswith('.')) hello = ' '.join(filter(None, [hello, parts.title, parts.first, (parts.middle or parts.last) if short else None])) hello += ',' action.send_keys(hello).perform() if items := iframe.find_elements(By.XPATH, f'//*[contains(text(), "{hello}")]'): target = items[0] clean = target.text == hello driver.switch_to.default_content() flow.do(Wait) flow.react() click("div.io-ox-mail-compose-window div[data-extension-id='to'] > div.mail-input") box = locate("div.io-ox-mail-compose-window div[data-extension-id='to'] > div.mail-input input.token-input.tt-input[tabindex='0']") box.send_keys(str(email) + Keys.ENTER) if column.variables in headers and (v := cell(index, column.variables).value) is not None: clean = False target = locate("div.io-ox-mail-compose-window div[data-extension-id='subject'] input") template = target.get_attribute('value') length = len(template) for i, value in enumerate(str(v).split(',')): template = template.replace('$$%s' % i, value.strip()) target.send_keys(Keys.BACKSPACE * length) target.send_keys(template) clean = target.get_attribute('value') == template flow.do(Wait) flow.react() box = locate("div.io-ox-mail-compose-window .mail-input .tokenfield .token") recipient = box.get_attribute('innerText').strip() if recipient != str(email): logger.warning('Malformed email address detected; retrying (%d)...', attempts) continue if not clean: logger.warning('Malformed email content detected; retrying (%d)...', attempts) continue flow.do(Wait) flow.react() click("div.io-ox-mail-compose-window button[data-action='send']") except Skip: index += 1 attempts = 0 continue except Cancel: status = Status.IDLE break except (MismatchedEmailAddress, MismatchedEmailSubject) as e: logger.error('Nuh-Uh', exc_info=e) status = Status.STANDBY continue except Exception as e: logger.error('Unexptected error', exc_info=e) status = Status.STANDBY continue try: alert: WebElement = catch(lambda x: x.find_element(By.CSS_SELECTOR, "div.io-ox-alert.io-ox-alert-error")) message = alert.text.replace('\n', ' ') cell(index, column.sent).value = '❌' click("div.io-ox-alert.io-ox-alert-error button[data-action='close']") logger.warning('Error while sending email (message: %s); retrying (%d)...', message, attempts) except TimeoutException: cell(index, column.sent).value = '✔️' progress['done'] += 1 occurrence[code] += 1 index += 1 attempts = 0 except Exception as e: logger.error('Unexptected error', exc_info=e) status = Status.STANDBY continue if options.get('save'): try: status = Status.CLOSING flow.allow(Wait, False) flow.allow(Timeout) flow.allow(Close) logger.info('Saving document...') flow.do(Timeout, force=True) flow.react() except Close: pass except Exception as e: file = Path('.').joinpath(filename or f'Mailer-{datetime.now().strftime('%Y%m%d-%H%M%S-%f')}.xlsx').resolve() logger.warning('RPC communication failed; saving at %s', str(file), exc_info=e) wb.save(file) flow.allow(Timeout, False) flow.allow(Close, False) flow.allow(Cancel, False) flow.allow(Skip, False) driver.switch_to.window(driver.window_handles[0]) status = Status.IDLE logger.info('Done') if __name__ == '__main__': try: logging.basicConfig(level=logging.INFO, format="[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s", datefmt="%Y-%m-%d %H:%M") logger = logging.getLogger() level = logging.getLevelNamesMapping().get(args.log_level, 'INFO') logger.setLevel(level) opts = ChromeOptions() opts.add_experimental_option("prefs", { "custom_handlers.enabled": False, "custom_handlers.ignored_protocol_handlers": [{ "is_confirmed": True, "last_modified": "13423573689616443", "protocol": "mailto", "security_level": 0, "url": WEBMAIL, }], "profile.default_content_setting_values.notifications": 2, }) with keep.presenting(): driver = Chrome(options=opts) status = main(driver) except KeyboardInterrupt: status = 0 except Exception as e: logger.critical('Fatal error', exc_info=e) status = 1 finally: driver.quit() exit(status)