import argparse import openpyxl import logging import base64 import csv from selenium.common.exceptions import StaleElementReferenceException, TimeoutException from selenium.webdriver.chrome.webdriver import WebDriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.remote.webelement import WebElement from selenium.webdriver.common.bidi.network import Request as NetworkRequest from io import BytesIO from common import jsonrpc2 from typing import Callable from pathlib import Path from urllib3 import PoolManager from subprocess import Popen parser = argparse.ArgumentParser(description="Opportunity Export") parser.add_argument('account', type=str) parser.add_argument('password', type=str) parser.add_argument('-e', '--encoding', type=str, default="utf-8-sig") parser.add_argument('-t', '--timeout', type=int, default=60) parser.add_argument('-r', '--attempts', type=int, default=3) parser.add_argument('-l', '--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']) args = parser.parse_args() WEBURL = "https://crm.xiaoman.cn/business/export" APIURL = "https://crm.xiaoman.cn/api/opportunityRead/export" def main(driver: WebDriver, logger = logging.getLogger('main')): try: http = PoolManager() context = jsonrpc2.ConnectionContext() parameters = vars(args) driver.get(str(Path('index.html').resolve())) driver.execute_script(jsonrpc2.prelude(), parameters) except Exception as e: logger.critical('Unable to load starup page', exc_info=e) return 2 try: driver.switch_to.new_window('tab') driver.set_page_load_timeout(parameters.get('timeout')) driver.get(WEBURL) except TimeoutException: logger.warning('Timeout') driver.execute_script("window.stop();") def until(condition: Callable[[WebDriver], bool], watch=True): try: WebDriverWait(driver, parameters.get('timeout')).until(condition) except (TimeoutException, StaleElementReferenceException): pass if watch: WebDriverWait(driver, parameters.get('timeout')).until_not(condition) return True def sleep(seconds: float): with context: try: WebDriverWait(driver, seconds, seconds).until(lambda _: False) except: pass return True def locate(selector, wait=True, condition=EC.visibility_of_element_located) -> WebElement: while True: try: locator = (By.CSS_SELECTOR, selector) if not wait: return driver.find_element(*locator) presence = EC.presence_of_element_located(locator) element = WebDriverWait(driver, parameters.get('timeout')).until(presence, 'Timeout') driver.execute_script("arguments[0].scrollIntoView({ block: 'center', inline: 'nearest' });", element) if condition is not None: element = WebDriverWait(driver, parameters.get('timeout')).until(condition(locator), 'Timeout') return element except StaleElementReferenceException: continue def click(selector: str|WebElement, wait=True, condition=EC.element_to_be_clickable): element = locate(selector, wait, condition) if isinstance(selector, str) else selector counter = lambda: int(element.get_attribute('taximeter') or 0) error = False value = counter() driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].setAttribute('taximeter', arguments[1] + 1));", element, value) for _ in range(parameters.get('attempts')): try: if not error: element.click() else: driver.execute_script("arguments[0].click();", element) except StaleElementReferenceException: break except: error = True continue try: WebDriverWait(driver, parameters.get('interval')).until(lambda _: counter() > value) break except TimeoutException: continue except: break try: until(lambda x: 'loginProgress' in x.find_element(By.TAG_NAME, "body").get_attribute('class'), watch=False) account = str(parameters.get('account')) password = str(parameters.get('password')) logger.info('Logging in as %s (%s)', account.split('@', 1).pop(0).capitalize(), account) locate("input.account").send_keys(account) locate("input#password").send_keys(password) click("input.agree-checkbox") click("button.login-btn") except Exception as e: logger.critical('Unable to login to %s', parameters.get('url'), exc_info=e) return 3 logger.info('Waiting for authentication to complete...') while True: try: locate("#container", wait=False) logger.info('Done') break except: sleep(3) ready = False sidebar = locate(".new-layout-left") driver.execute_script("arguments[0].remove();", sidebar) def handle(request: NetworkRequest): nonlocal ready if request.url == APIURL: ready = True try: request.continue_request() except Exception as e: logger.debug('Network request error', exc_info=e) event = 'before_request' driver.network.add_request_handler(event, handle) while True: if not ready: sleep(1) continue try: locate(".safe-verify-dialog:not([style*='display: none']) .mm-modal-content .mm-modal-body input", wait=False).send_keys(parameters.get('password')) click(".mm-modal-footer button.okki-btn-primary", wait=False) except: pass try: sleep(1) click(".business-export-wrap section:nth-child(3) h2 svg", wait=False) cell = locate(".business-export-wrap section:nth-child(3) table tbody tr:first-child td:first-child span", wait=False) filename = cell.text except: continue if filename == '--': sleep(1) continue try: logger.info('New task: %s', filename) download = locate(".business-export-wrap section:nth-child(3) table tbody tr:first-child td:last-child a") href = download.get_attribute('href') response = http.request("GET", href) text = response.data.decode(parameters.get('encoding')).splitlines() data = csv.reader(text) logger.info('Read %d line(s) total', len(text)) except Exception as e: logger.critical('Unable to load input data', exc_info=e) return 4 try: file = Path('template.xlsx').resolve() template = openpyxl.load_workbook(file) except Exception as e: logger.critical('Unable to load template excel', exc_info=e) return 5 header = next(data, None) source = template.active output = BytesIO() def preprocess(data: str): try: return float(data) except ValueError: pass try: return int(data) except ValueError: pass return data.lstrip("'") for row in data: for index, name in enumerate(header): names = template.defined_names if name not in names: continue for _, coord in names[name].destinations: value = preprocess(row[index]) source[coord] = value sheet = template.copy_worksheet(source) logger.info('New sheet: %s', sheet.title) if not len(template.worksheets) > 1: logger.error('Invalid input') ready = False continue template.remove(source) template.save(output) data = base64.b64encode(output.getbuffer()) text = data.decode('ascii') mime = 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' href = f"data:{mime};base64,{text}" filename = filename.replace('.csv', '.xlsx') driver.switch_to.new_window('tab') driver.execute_script( """ var link = document.createElement('a'); link.download = arguments[0]; link.href = arguments[1]; link.click(); """, filename, href ) try: Popen(['start', (Path.home() / 'Downloads' / filename).as_posix()], shell=True) except Exception as e: logger.warning('Unable to open exported excel file at %s' % file, exc_info=e) driver.close() driver.switch_to.window(driver.window_handles[1]) logger.info('Done') ready = False if __name__ == '__main__': try: logging.basicConfig(level=logging.INFO, format="[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s", datefmt="%Y-%m-%d %H:%M") logger = logging.getLogger() logger.info('Initializing...') level = logging.getLevelNamesMapping().get(args.log_level, 'INFO') logger.setLevel(level) jsonrpc2.run(logger) logger.info('Creating automation instance') opts = Options() opts.enable_bidi = True opts.enable_downloads = True driver = WebDriver(options=opts) status = main(driver) except Exception as e: logger.critical('Fatal error', exc_info=e) status = 1 finally: driver.quit() exit(status)