From 37b6027fddc5094859c284ad156b656c192f8b1b Mon Sep 17 00:00:00 2001 From: eneller Date: Wed, 16 Apr 2025 08:27:02 +0200 Subject: [PATCH] feat: request counting, logging --- pyproject.toml | 2 +- src/uulm_utils/main.py | 75 ++++++++++++++++++++++++++---------------- 2 files changed, 48 insertions(+), 29 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index eaff06a..2bdb115 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uulm-utils" -version = "1.0" +version = "1.1" description = "Collection of helpers for Ulm University" readme = "README.md" requires-python = ">=3.12" diff --git a/src/uulm_utils/main.py b/src/uulm_utils/main.py index 14f7aac..c39d921 100644 --- a/src/uulm_utils/main.py +++ b/src/uulm_utils/main.py @@ -1,6 +1,6 @@ import asyncclick as click import questionary -from playwright.async_api import async_playwright, Playwright, expect +from playwright.async_api import Page, async_playwright import pandas as pd from dotenv import load_dotenv @@ -39,8 +39,9 @@ async def run_playwright(headless: bool): @click.option('--password','-p', envvar='UULM_PASSWORD', prompt='Enter your kiz password:', hide_input=True) @click.option('--headful', is_flag=True, help='Show the browser window') @click.option('--debug', '-d', is_flag=True, help='Set the log level to DEBUG') +@click.option('--log-level', '-l', type=click.Choice(logging.getLevelNamesMapping().keys()),default = 'INFO') @click.pass_context -async def cli(ctx, username, password, headful, debug): +async def cli(ctx, username, password, headful, debug, log_level): ''' Passing username and password is supported through multiple ways as entering your password visibly into your shell history is discouraged for security reasons. @@ -50,7 +51,7 @@ async def cli(ctx, username, password, headful, debug): - using a `.env` file in the current working directory with the same variables - interactive mode, if none of the above was specified ''' - logging.basicConfig(level=logging.WARNING,format='%(asctime)s - %(levelname)s - %(message)s') + logging.basicConfig(level=log_level,format='%(asctime)s - %(levelname)s - %(message)s') if(debug): logger.setLevel(logging.DEBUG) ctx.ensure_object(dict) ctx.obj['HEADLESS'] = not headful @@ -88,10 +89,11 @@ async def coronang(ctx, target_times, before): ''' CORONANG_VERSION='v1.8.00' CORONANG_URL="https://campusonline.uni-ulm.de/CoronaNG/user/mycorona.html" - logger.info('Parsed input times as %s', target_times) + logger.debug('Parsed input times as %s', target_times) before_seconds = timedelta(seconds=before) target_times = sorted(list(target_times)) async for page, browser, context in run_playwright(ctx.obj['HEADLESS']): + loop = asyncio.get_event_loop() await page.goto(CORONANG_URL) server_version = await page.locator("css=#mblock_innen > a:nth-child(1)").inner_text() if(server_version != CORONANG_VERSION): @@ -100,38 +102,55 @@ async def coronang(ctx, target_times, before): # iterate over staggered login for target_time in target_times: - # wait for execution - done = False - while not done: + # waiting loop for execution + while True: server_str = await page.locator("css=#mblock_innen").inner_text() server_time = datetime.strptime(server_str.split().pop(), "%H:%M:%S") dtime = target_time -server_time dtime_before = dtime - before_seconds dtime_after = dtime + before_seconds - logger.debug('Server Time: %s, delta: %s', server_time, dtime_before) - # execute + logger.debug('Server Time: %s, delta: %s', server_time.time(), dtime_before) + # window started? if dtime_before < timedelta(0): - # login necessary? - if (await page.locator("input[name=\"uid\"]").count()) >0: - logger.debug('Logging in') - await page.locator("input[name=\"uid\"]").click() - await page.locator("input[name=\"uid\"]").fill(ctx.obj['USERNAME']) - await page.locator("input[name=\"password\"]").click() - await page.locator("input[name=\"password\"]").fill(ctx.obj['PASSWORD']) - await page.get_by_role("button", name="Anmelden").click() - logger.debug('Loading Overview Page') - await page.goto(CORONANG_URL) - await page.get_by_role("table", name="Ihre Beobachtungen. Sie kö").get_by_role("button").click() - await page.get_by_role("table", name="Ihre Beobachtungen. Sie kö").get_by_role("combobox").select_option("5") - await page.get_by_role("cell", name="An Markierten teilnehmen Ausf").get_by_role("button").click() - if dtime_after < timedelta(0): - logger.debug('Iteration for time %s over', target_time.time()) - break + time_prev = loop.time() + i = 0 + # spamming loop + while True: + time_delta: float = loop.time() - time_prev + if time_delta > 1: + logger.info('%.2f requests per second sent', i / time_delta) + time_prev = loop.time() + i = 0 + # login necessary? + if (await page.locator("input[name=\"uid\"]").count()) >0: + logger.info('Logging in') + await page.locator("input[name=\"uid\"]").click() + await page.locator("input[name=\"uid\"]").fill(ctx.obj['USERNAME']) + await page.locator("input[name=\"password\"]").click() + await page.locator("input[name=\"password\"]").fill(ctx.obj['PASSWORD']) + await page.get_by_role("button", name="Anmelden").click() + logger.info('Loading Overview Page') + await page.goto(CORONANG_URL) + await page.get_by_role("table", name="Ihre Beobachtungen. Sie kö").get_by_role("button").click() + await page.get_by_role("table", name="Ihre Beobachtungen. Sie kö").get_by_role("combobox").select_option("5") + await page.get_by_role("cell", name="An Markierten teilnehmen Ausf").get_by_role("button").click() + await page.reload() + # window ended? + # NOTE replace this with local time when spamming POST requests? + if dtime_after < timedelta(0): + break + # spam reload + # TODO set timeout, tweak + await page.reload() + i +=1 + # after loop completion + logger.info('Iteration for time %s over', target_time.time()) + break # out of waiting loop # not in time window? else: await asyncio.sleep(1) - logger.debug('Resending') - await page.reload() + logger.info('Waiting for event window. Reloading') + await page.reload() return @cli.command() @@ -146,7 +165,7 @@ async def sport(ctx, target_times, target_course, before): ''' print(target_course) # TODO Check Version in HTML Head - logger.info('Parsed input times as %s', target_times) + logger.debug('Parsed input times as %s', target_times) before_seconds = timedelta(seconds=before) target_times = sorted(list(target_times)) async for page, browser, context in run_playwright(ctx.obj['HEADLESS']):