Files
exportunities/main.py

223 lines
8.0 KiB
Python

import argparse
import openpyxl
import logging
import base64
import csv
from selenium.common.exceptions import TimeoutException, StaleElementReferenceException
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from common.jsonrpc2 import ServiceProvider
from common.utils import *
from io import BytesIO
from pathlib import Path
from urllib3 import PoolManager
from itertools import count
from subprocess import Popen
parser = argparse.ArgumentParser(description="Opportunity Export")
parser.add_argument('account', type=str)
parser.add_argument('password', type=str)
parser.add_argument('-p', '--open', action='store_true')
parser.add_argument('-d', '--directory', type=str, default=str(Path.home().joinpath('Downloads')))
parser.add_argument('-e', '--encoding', type=str, default="utf-8-sig")
parser.add_argument('-t', '--timeout', type=int, default=60)
parser.add_argument('-r', '--attempts', type=int, default=3)
parser.add_argument('-l', '--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'])
args = parser.parse_args()
WEBURL = "https://crm.xiaoman.cn/business/export"
APIURL = "https://crm.xiaoman.cn/api/opportunityRead/export"
def main(driver: WebDriver, logger = logging.getLogger('main')):
parameters = vars(args)
http = PoolManager()
sp = ServiceProvider.default()
try:
sp.set('context', lambda: { 'parameters': parameters })
driver.get(sp.run())
except Exception as e:
logger.critical('Unable to load starup page', exc_info=e)
return 2
try:
driver.switch_to.new_window('tab')
driver.set_page_load_timeout(parameters['timeout'])
driver.get(WEBURL)
except TimeoutException:
logger.warning('Timeout')
driver.execute_script("window.stop();")
try:
setup(driver, parameters)
until(lambda x: 'loginProgress' in x.find_element(By.TAG_NAME, "body").get_attribute('class'), watch=False)
account = str(parameters['account'])
password = str(parameters['password'])
logger.info('Logging in as %s (%s)', account.split('@', 1).pop(0).capitalize(), account)
locate("input.account").send_keys(account)
locate("input#password").send_keys(password)
click("input.agree-checkbox")
click("button.login-btn")
except Exception as e:
logger.critical('Unable to login to %s', parameters['url'], exc_info=e)
return 3
logger.info('Waiting for authentication to complete...')
while True:
try:
locate("#container", wait=False)
sidebar = locate(".new-layout-left")
driver.execute_script("arguments[0].remove();", sidebar)
logger.info('Done')
break
except:
sleep(3)
while True:
try:
if buttons := driver.find_elements(By.CSS_SELECTOR, ".export-actions .okki-btn-primary"):
identity = 'EXPORT'
for button in buttons: driver.execute_script("arguments[0].addEventListener('click', () => arguments[0].parentElement.setAttribute(arguments[1], '1'));", button, identity)
break
finally:
sleep(1)
while True:
try:
for index in count(0):
if (driver.execute_script("return arguments[0].parentElement.getAttribute(arguments[1]);", buttons[index], identity)):
driver.execute_script("arguments[0].parentElement.removeAttribute(arguments[1]);", buttons[index], identity)
raise ValueError()
except StaleElementReferenceException:
logger.critical('Error while listening to button events')
return 4
except ValueError:
pass
except:
sleep(1)
continue
try:
locate(".safe-verify-dialog:not([style*='display: none']) .mm-modal-content .mm-modal-body input", wait=False).send_keys(parameters['password'])
click(".mm-modal-footer button.okki-btn-primary")
except:
pass
while True:
try:
sleep(1)
click(".business-export-wrap section:nth-child(3) h2 svg", wait=False)
cell = locate(".business-export-wrap section:nth-child(3) table tbody tr:first-child td:first-child span", wait=False)
if (filename := cell.text) != '--': break
except:
pass
try:
logger.info('New task: %s', filename)
download = locate(".business-export-wrap section:nth-child(3) table tbody tr:first-child td:last-child a")
href = download.get_attribute('href')
response = http.request("GET", href)
text = response.data.decode(parameters['encoding']).splitlines()
data = csv.reader(text)
logger.info('Read %d line(s) total', len(text))
except Exception as e:
logger.critical('Unable to load input data', exc_info=e)
return 5
try:
file = Path('template.xlsx').resolve()
template = openpyxl.load_workbook(file)
except Exception as e:
logger.critical('Unable to load template excel', exc_info=e)
return 6
header = next(data, None)
source = template.active
output = BytesIO()
def preprocess(data: str):
try: return float(data)
except ValueError: pass
try: return int(data)
except ValueError: pass
return data.lstrip("'")
for row in data:
for index, name in enumerate(header):
names = template.defined_names
if name not in names: continue
for _, coord in names[name].destinations:
value = preprocess(row[index])
source[coord] = value
sheet = template.copy_worksheet(source)
logger.debug('%s', sheet.title)
if not len(template.worksheets) > 1:
logger.error('Invalid input')
continue
template.remove(source)
template.save(output)
data = base64.b64encode(output.getbuffer())
text = data.decode('ascii')
mime = 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
href = f"data:{mime};base64,{text}"
filename = filename.replace('.csv', '.xlsx')
driver.switch_to.new_window('tab')
driver.execute_script(
"""
var link = document.createElement('a');
link.download = arguments[0];
link.href = arguments[1];
link.click();
""",
filename,
href
)
if parameters['open']:
try:
path = Path(parameters['directory']) / filename
until(lambda _: path.exists(), watch=False)
Popen(['start', str(path)], shell=True)
except Exception as e:
logger.warning('Unable to open exported excel file at %s', str(path), exc_info=e)
driver.close()
driver.switch_to.window(driver.window_handles[1])
logger.info('Done')
if __name__ == '__main__':
try:
logging.basicConfig(level=logging.INFO, format="[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s", datefmt="%Y-%m-%d %H:%M")
logger = logging.getLogger()
level = logging.getLevelNamesMapping().get(args.log_level, 'INFO')
logger.setLevel(level)
opts = Options()
opts.enable_downloads = True
opts.add_argument('--deny-permission-prompts')
opts.add_experimental_option('prefs', { 'download.default_directory': args.directory })
driver = WebDriver(options=opts)
status = main(driver)
except KeyboardInterrupt:
status = 0
except Exception as e:
logger.critical('Fatal error', exc_info=e)
status = 1
finally:
driver.quit()
exit(status)