from __future__ import annotations import collections import concurrent.futures import dataclasses import typing import cache def main() -> None: raw_prices: list[RawPrice] = cache.get('https://refined-prun.github.io/refined-prices/all.json') movers: dict[str, list[Mover]] = collections.defaultdict(list) with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: futures: list[concurrent.futures.Future[Mover | None]] = [] for price in raw_prices: futures.append(executor.submit(analyze_raw_price, price)) for future in futures: if (mover := future.result()) is not None: movers[mover.ticker].append(mover) executor.shutdown() for ticker_movers in movers.values(): ticker_movers.sort(reverse=True) top_movers = sorted(movers.values(), key=lambda m: m[0].score, reverse=True) for commodity in top_movers: print(f'{commodity[0].score:7.1f}', ' '.join(f'{mover.ticker}.{mover.exchange_code}' for mover in commodity)) def analyze_raw_price(price: RawPrice) -> Mover | None: if (traded := price['AverageTraded7D']) is None: return if (vwap7d := price['VWAP7D']) is None or (vwap30d := price['VWAP30D']) is None: return if (bid := price['Bid']) is None or (ask := price['Ask']) is None: return scores = [ (bid - vwap30d) / bid, (vwap30d - ask) / vwap30d, abs(vwap7d - vwap30d) / max(vwap7d, vwap30d), ] score = max(scores) * traded if score > 100: return Mover(price['ExchangeCode'], price['MaterialTicker'], score) class RawPrice(typing.TypedDict): ExchangeCode: str MaterialTicker: str VWAP7D: float | None # volume-weighted average price over last 7 days AverageTraded7D: float | None # averaged daily traded volume over last 7 days VWAP30D: float | None Bid: float | None Ask: float | None HighYesterday: float | None LowYesterday: float | None @dataclasses.dataclass(eq=False, frozen=True, slots=True) class Mover: exchange_code: str ticker: str score: float def __lt__(self, other: Mover) -> bool: return self.score < other.score if __name__ == '__main__': main()