import argparse import openpyxl import logging import base64 import csv from selenium.common.exceptions import TimeoutException, StaleElementReferenceException from selenium.webdriver.chrome.webdriver import WebDriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from common.jsonrpc2 import ServiceProvider from common.utils import * from io import BytesIO from pathlib import Path from urllib3 import PoolManager from itertools import count from subprocess import Popen parser = argparse.ArgumentParser(description="Opportunity Export") parser.add_argument('account', type=str) parser.add_argument('password', type=str) parser.add_argument('-p', '--open', action='store_true') parser.add_argument('-d', '--directory', type=str, default=str(Path.home().joinpath('Downloads'))) parser.add_argument('-e', '--encoding', type=str, default="utf-8-sig") parser.add_argument('-t', '--timeout', type=int, default=60) parser.add_argument('-r', '--attempts', type=int, default=3) parser.add_argument('-l', '--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']) args = parser.parse_args() WEBURL = "https://crm.xiaoman.cn/business/export" APIURL = "https://crm.xiaoman.cn/api/opportunityRead/export" def main(driver: WebDriver, logger = logging.getLogger('main')): parameters = vars(args) http = PoolManager() sp = ServiceProvider.default() try: sp.set('context', lambda: { 'parameters': parameters }) driver.get(sp.run()) except Exception as e: logger.critical('Unable to load starup page', exc_info=e) return 2 try: driver.switch_to.new_window('tab') driver.set_page_load_timeout(parameters['timeout']) driver.get(WEBURL) except TimeoutException: logger.warning('Timeout') driver.execute_script("window.stop();") try: setup(driver, parameters) until(lambda x: 'loginProgress' in x.find_element(By.TAG_NAME, "body").get_attribute('class'), watch=False) account = str(parameters['account']) password = str(parameters['password']) logger.info('Logging in as %s (%s)', account.split('@', 1).pop(0).capitalize(), account) locate("input.account").send_keys(account) locate("input#password").send_keys(password) click("input.agree-checkbox") click("button.login-btn") except Exception as e: logger.critical('Unable to login to %s', parameters['url'], exc_info=e) return 3 logger.info('Waiting for authentication to complete...') while True: try: locate("#container", wait=False) sidebar = locate(".new-layout-left") driver.execute_script("arguments[0].remove();", sidebar) logger.info('Done') break except: sleep(3) while True: try: if buttons := driver.find_elements(By.CSS_SELECTOR, ".export-actions .okki-btn-primary"): identity = 'EXPORT' for button in buttons: driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].parentElement.setAttribute(arguments[1], '1'));", button, identity) break finally: sleep(1) while True: try: for index in count(0): if (driver.execute_script("return arguments[0].parentElement.getAttribute(arguments[1]);", buttons[index], identity)): driver.execute_script("arguments[0].parentElement.removeAttribute(arguments[1]);", buttons[index], identity) raise ValueError() except StaleElementReferenceException: logger.critical('Error while listening to button events') return 4 except ValueError: pass except: sleep(1) continue try: locate(".safe-verify-dialog:not([style*='display: none']) .mm-modal-content .mm-modal-body input", wait=False).send_keys(parameters['password']) click(".mm-modal-footer button.okki-btn-primary") except: pass while True: try: sleep(1) click(".business-export-wrap section:nth-child(3) h2 svg", wait=False) cell = locate(".business-export-wrap section:nth-child(3) table tbody tr:first-child td:first-child span", wait=False) if (filename := cell.text) != '--': break except: pass try: logger.info('New task: %s', filename) download = locate(".business-export-wrap section:nth-child(3) table tbody tr:first-child td:last-child a") href = download.get_attribute('href') response = http.request("GET", href) text = response.data.decode(parameters['encoding']).splitlines() data = csv.reader(text) logger.info('Read %d line(s) total', len(text)) except Exception as e: logger.critical('Unable to load input data', exc_info=e) return 5 try: file = Path('template.xlsx').resolve() template = openpyxl.load_workbook(file) except Exception as e: logger.critical('Unable to load template excel', exc_info=e) return 6 header = next(data, None) source = template.active output = BytesIO() def preprocess(data: str): try: return float(data) except ValueError: pass try: return int(data) except ValueError: pass return data.lstrip("'") for row in data: for index, name in enumerate(header): names = template.defined_names if name not in names: continue for _, coord in names[name].destinations: value = preprocess(row[index]) source[coord] = value sheet = template.copy_worksheet(source) sheet.title = 'Copy' logger.debug('%s', sheet.title) if not len(template.worksheets) > 1: logger.error('Invalid input') continue template.remove(source) template.save(output) data = base64.b64encode(output.getbuffer()) text = data.decode('ascii') mime = 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' href = f"data:{mime};base64,{text}" filename = filename.replace('.csv', '.xlsx') driver.switch_to.new_window('tab') driver.execute_script( """ var link = document.createElement('a'); link.download = arguments[0]; link.href = arguments[1]; link.click(); """, filename, href ) if parameters['open']: try: path = Path(parameters['directory']) / filename until(lambda _: path.exists(), watch=False) Popen(['start', str(path)], shell=True) except Exception as e: logger.warning('Unable to open exported excel file at %s', str(path), exc_info=e) driver.close() driver.switch_to.window(driver.window_handles[1]) logger.info('Done') if __name__ == '__main__': try: logging.basicConfig(level=logging.INFO, format="[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s", datefmt="%Y-%m-%d %H:%M") logger = logging.getLogger() level = logging.getLevelNamesMapping().get(args.log_level, 'INFO') logger.setLevel(level) opts = Options() opts.enable_downloads = True opts.add_argument('--deny-permission-prompts') opts.add_experimental_option('prefs', { 'download.default_directory': args.directory }) driver = WebDriver(options=opts) status = main(driver) except KeyboardInterrupt: status = 0 except Exception as e: logger.critical('Fatal error', exc_info=e) status = 1 finally: driver.quit() exit(status)