rollerverbrauch/app/tools.py

310 lines
9.8 KiB
Python

from sqlalchemy import or_
import requests
import logging
from datetime import date, datetime, timedelta
from .entities import Pitstop, FillingStation
from . import db, app
class RegularCostInstance:
def __init__(self, regular_id, date, odometer, costs, name):
self.id = regular_id
self.date = date
self.odometer = odometer
self.costs = costs
self.name = name
class ConsumableStats:
def __init__(self, vehicle, consumable):
self.name = consumable.name
self.id = consumable.id
self.unit = consumable.unit
self.overall_amount = 0
self.average_distance = 0
self.average_amount_fuelled = 0
self.average_amount_used = 0
self.average_amount = []
self.amounts = []
pitstops = [
stop for stop in vehicle.pitstops if stop.consumable_id == consumable.id
]
pitstop_count = len(pitstops)
if pitstop_count > 0:
for pitstop in pitstops:
self.overall_amount += pitstop.amount
self.amounts.append(StatsEvent(pitstop.date, pitstop.amount))
self.average_amount_fuelled = self.overall_amount / pitstop_count
if pitstop_count > 1:
overall_distance = (
vehicle.pitstops[-1].odometer - vehicle.pitstops[0].odometer
)
self.average_distance = overall_distance / (pitstop_count - 1)
self.average_amount_used = (
100 * (self.overall_amount - pitstops[0].amount) / overall_distance
)
for index in range(1, pitstop_count):
last_ps = pitstops[index - 1]
current_ps = pitstops[index]
self.average_amount.append(
StatsEvent(
current_ps.date,
round(
100
* current_ps.amount
/ (current_ps.odometer - last_ps.odometer),
2,
),
)
)
class VehicleStats:
def __init__(self, vehicle):
self.name = vehicle.name
self.id = vehicle.id
self.overall_distance = 0
self.overall_costs = 0
self.consumables = []
self.odometers = []
self.costs = []
self.costs_per_distance = "N/A"
for consumable in vehicle.consumables:
self.consumables.append(ConsumableStats(vehicle, consumable))
events = get_event_line_for_vehicle(vehicle)
pitstop_count = len(events)
if pitstop_count > 0:
for pitstop in events:
self.odometers.append(StatsEvent(pitstop.date, pitstop.odometer))
if pitstop.costs is not None:
self.overall_costs += pitstop.costs
self.costs.append(StatsEvent(pitstop.date, pitstop.costs))
# add the instances to the overall costs
for regular_cost_instance in calculate_regular_cost_instances(vehicle):
self.overall_costs += regular_cost_instance.costs
self.costs.append(
StatsEvent(regular_cost_instance.date, regular_cost_instance.costs)
)
if pitstop_count > 1:
self.overall_distance = events[-1].odometer - events[0].odometer
self.costs.sort(key=lambda x: x.date)
accumulated_costs = 0
for c in self.costs:
accumulated_costs += c.value
c.value = accumulated_costs
if self.overall_distance > 0:
self.costs_per_distance = float(self.overall_costs) / (
float(self.overall_distance) / 100
)
class StatsEvent:
def __init__(self, date, value):
self.date = date
self.value = value
def __repr__(self):
return str(self.date)
def db_log_add(entity):
logging.info("db_add: %s" % str(entity))
def db_log_delete(entity):
logging.info("db_delete: %s" % str(entity))
def db_log_update(entity):
logging.info("db_update: %s" % str(entity))
def check_vehicle_name_is_unique(current_user, name_field):
"""
Checks if the vehicle name given in the name_field is unique for the vehicles of the current user. An error is added
to the field it the name is not unique.
:param current_user: the user currently logged in
:param name_field: the form field to enter the name to
:return: True if the name is unique, False otherwise.
"""
vehicle_name = name_field.data
for vehicle in current_user.vehicles:
if vehicle.name == vehicle_name:
name_field.default = vehicle_name
name_field.errors.append('Vehicle "%s" already exists.' % vehicle_name)
return False
return True
def get_latest_pitstop_for_vehicle(vehicle_id):
"""
return the latest pit stop for the vehicle with the given id.
:param vehicle_id: the id of the vehicle
:return: the latest pitstop or None if no pitstop exists
"""
latest_pitstop = (
Pitstop.query.filter(Pitstop.vehicle_id == vehicle_id)
.order_by(Pitstop.id.desc())
.first()
)
return latest_pitstop
def get_latest_pitstop_for_vehicle_and_consumable(vehicle_id, consumable_id):
"""
return the latest pit stop for the vehicle and consumable with the given ids.
:param vehicle_id: the id of the vehicle
:param consumable_id: the id of the consumable
:return: the latest pitstop or None if no pitstop exists
"""
latest_pitstop_consumable = (
Pitstop.query.filter(Pitstop.vehicle_id == vehicle_id)
.filter(Pitstop.consumable_id == consumable_id)
.order_by(Pitstop.id.desc())
.first()
)
return latest_pitstop_consumable
def compute_lower_limits_for_new_pitstop(
latest_pitstop, last_pitstop_consumable, consumable_id
):
"""
This function figures out the lower limits for date and odometer of a new pitstop.
:param latest_pitstop:
:param last_pitstop_consumable:
:param consumable_id:
:return:
"""
odometer = 0
date_of_pitstop = date(1970, 1, 1)
amount = 0
costs = 0
if latest_pitstop is not None:
odometer = latest_pitstop.odometer
date_of_pitstop = latest_pitstop.date
if last_pitstop_consumable is not None:
amount = last_pitstop_consumable.amount
costs = last_pitstop_consumable.costs
return Pitstop(odometer, amount, date_of_pitstop, costs, consumable_id)
def pitstop_service_key(x):
return x.date, x.odometer
def get_event_line_for_vehicle(vehicle):
data = []
for pitstop in vehicle.pitstops:
data.append(pitstop)
for service in vehicle.services:
data.append(service)
data.sort(key=pitstop_service_key)
return data
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i : i + n]
def update_filling_station_prices(ids):
max_age = (datetime.now() - timedelta(minutes=15)).strftime("%Y-%m-%d %H:%M")
res = (
db.session.query(FillingStation)
.filter(FillingStation.id.in_(ids))
.filter(
or_(
FillingStation.last_update == None, FillingStation.last_update < max_age
)
)
.all()
)
if len(res) > 0:
id_map = {x.id: x for x in res}
query_ids = [x.id for x in res]
api_key = app.config["TANKERKOENIG_API_KEY"]
url = "https://creativecommons.tankerkoenig.de/json/prices.php"
# documentation tells us to query max 10 filling stations at a time...
for c in chunks(query_ids, 10):
params = {"apikey": api_key, "ids": ",".join(c)}
response = requests.get(url, params=params)
response_json = response.json()
if response_json["ok"]:
prices = response_json["prices"]
for price in prices:
id = price
station_status = prices[id]
id_map[id].open = station_status["status"] == "open"
if id_map[id].open:
id_map[id].diesel = station_status["diesel"]
id_map[id].e10 = station_status["e10"]
id_map[id].e5 = station_status["e5"]
id_map[id].last_update = datetime.now()
else:
logging.error(
"could not update filling stations because of {r} on URL {u}.".format(
r=str(response_json), u=response.url
)
)
db.session.commit()
def calculate_regular_cost_instances(vehicle):
data = []
for regular in vehicle.regulars:
if date.today() < regular.start_at:
# skip regular costs that are not yet active
continue
if regular.ends_at:
end_date = regular.ends_at
else:
end_date = date.today()
start_year = regular.start_at.year
end_year = end_date.year
for year in range(start_year, end_year + 1):
for day in regular.days.split(","):
m, d = day.split("-")
d = date(year, int(m), int(d))
if regular.start_at <= d and d <= end_date:
r = RegularCostInstance(
regular.id,
date=d,
odometer=None,
costs=regular.costs,
name=regular.description,
)
data.append(r)
return data
def get_users_active_vehicle(user):
def selector(vehicle):
if not vehicle.pitstops:
return date.today()
return vehicle.pitstops[-1].date
active_vehicles = [g for g in user.vehicles if g.is_active]
active_vehicles.sort(key=selector, reverse=True)
return active_vehicles