prepare.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. from __future__ import annotations
  2. import collections
  3. import dataclasses
  4. import csv
  5. import json
  6. import sys
  7. import typing
  8. def main() -> None:
  9. (month,) = sys.argv[1:]
  10. with open(f'rawData/{month}.csv', 'r', newline='') as f:
  11. data = read_data(f)
  12. bases_data: dict[str, dict[str, int]] = {r.company_id: {'bases': r.num, 'rank': r.rank} for r in data['BASES']}
  13. with open(f'www/data/base-data-{month}.json', 'w') as f:
  14. json.dump(bases_data, f)
  15. with open(f'rawData/{month}-prices.json', 'r') as f:
  16. prices = get_prices(f)
  17. prod_data, company_data = get_prod_and_company_data(data, prices)
  18. with open(f'www/data/prod-data-{month}.json', 'w') as f:
  19. json.dump(prod_data, f)
  20. with open(f'www/data/company-data-{month}.json', 'w') as f:
  21. json.dump(company_data, f)
  22. def read_data(f: typing.TextIO) -> dict[str, list[Row]]:
  23. data: dict[str, list[Row]] = collections.defaultdict(list)
  24. reader = csv.reader(f)
  25. for row in reader:
  26. data[row[0]].append(Row(int(row[1]), int(row[2]), row[3]))
  27. return data
  28. def get_prices(f: typing.TextIO) -> typing.Mapping[str, float]:
  29. raw_prices: typing.Sequence[Price] = json.load(f)
  30. volumes: dict[str, float] = collections.defaultdict(float)
  31. traded: dict[str, int] = collections.defaultdict(int)
  32. for price in raw_prices:
  33. if price['Traded30D'] is None:
  34. continue
  35. assert price['VWAP30D'] is not None
  36. volumes[price['MaterialTicker']] += price['VWAP30D'] * price['Traded30D']
  37. traded[price['MaterialTicker']] += price['Traded30D']
  38. prices = {ticker: volume / traded[ticker] for ticker, volume in volumes.items()}
  39. hardcoded_prices = {
  40. 'AFP': 116868,
  41. 'ANZ': 70601,
  42. 'BFP': 23408,
  43. 'BND': 230,
  44. 'BID': 47011,
  45. 'CRU': 169623,
  46. 'CQT': 378452,
  47. 'FUN': 124010,
  48. 'GCH': 18303,
  49. 'GNZ': 30361,
  50. 'HNZ': 93580,
  51. 'PFG': 2677222,
  52. 'PK': 869,
  53. 'RDS': 598170,
  54. 'SDM': 1721027,
  55. 'SST': 5863587,
  56. 'SU': 157860,
  57. 'TOR': 540169,
  58. 'VCB': 673713,
  59. 'WOR': 202000,
  60. }
  61. assert frozenset(prices).isdisjoint(hardcoded_prices)
  62. prices.update(hardcoded_prices)
  63. return prices
  64. def get_prod_and_company_data(data: dict[str, list[Row]], prices: typing.Mapping[str, float]
  65. ) -> tuple[typing.Mapping[str, ProdData], typing.Mapping[str, typing.Any]]:
  66. prod: dict[str, ProdData] = {}
  67. individual: dict[str, dict[str, CompanyTickerData]] = collections.defaultdict(dict)
  68. totals: dict[str, CompanyTotals] = collections.defaultdict(lambda: {'volume': 0.0})
  69. for section, rows in data.items():
  70. if (ticker := get_production_ticker(section)) is None:
  71. continue
  72. price = prices.get(ticker)
  73. if price is None:
  74. continue
  75. prod_amount = sum(row.num for row in rows) / 30
  76. prod[ticker] = {'amount': prod_amount, 'volume': prod_amount * price}
  77. for row in rows:
  78. amount = row.num / 30
  79. volume = amount * price
  80. individual[row.company_id][ticker] = {
  81. 'amount': amount,
  82. 'volume': volume,
  83. 'rank': row.rank,
  84. }
  85. totals[row.company_id]['volume'] += volume
  86. company_data = {'totals': add_company_ranks(totals), 'individual': dict(individual)}
  87. return prod, company_data
  88. def get_production_ticker(section: str) -> str | None:
  89. prefix = 'PRODUCTION_'
  90. suffix = '_DAYS_30'
  91. if not section.startswith(prefix) or not section.endswith(suffix):
  92. return None
  93. return section[len(prefix):-len(suffix)]
  94. def add_company_ranks(totals: dict[str, CompanyTotals]) -> dict[str, CompanyTotals]:
  95. ranked = sorted(totals.items(), key=lambda item: item[1]['volume'], reverse=True)
  96. for rank, (company_id, company_totals) in enumerate(ranked, start=1):
  97. company_totals['volumeRank'] = rank
  98. return totals
  99. @dataclasses.dataclass(frozen=True, slots=True, eq=False)
  100. class Row:
  101. rank: int
  102. num: int
  103. company_id: str
  104. class Price(typing.TypedDict):
  105. MaterialTicker: str
  106. VWAP30D: float | None
  107. Traded30D: int | None
  108. class ProdData(typing.TypedDict):
  109. amount: float
  110. volume: float
  111. class CompanyTickerData(typing.TypedDict):
  112. amount: float
  113. volume: float
  114. rank: int
  115. class CompanyTotals(typing.TypedDict, total=False):
  116. volume: float
  117. volumeRank: int
  118. if __name__ == '__main__':
  119. main()