from sqlalchemy import or_ import requests import logging from datetime import date, datetime, timedelta from .entities import Pitstop, FillingStation from . import db, app class RegularCostInstance: def __init__(self, regular_id, date, odometer, costs, name): self.id = regular_id self.date = date self.odometer = odometer self.costs = costs self.name = name class ConsumableStats: def __init__(self, vehicle, consumable): self.name = consumable.name self.id = consumable.id self.unit = consumable.unit self.overall_amount = 0 self.average_distance = 0 self.average_amount_fuelled = 0 self.average_amount_used = 0 self.average_amount = [] self.amounts = [] pitstops = [ stop for stop in vehicle.pitstops if stop.consumable_id == consumable.id ] pitstop_count = len(pitstops) if pitstop_count > 0: for pitstop in pitstops: self.overall_amount += pitstop.amount self.amounts.append(StatsEvent(pitstop.date, pitstop.amount)) self.average_amount_fuelled = self.overall_amount / pitstop_count if pitstop_count > 1: overall_distance = ( vehicle.pitstops[-1].odometer - vehicle.pitstops[0].odometer ) self.average_distance = overall_distance / (pitstop_count - 1) self.average_amount_used = ( 100 * (self.overall_amount - pitstops[0].amount) / overall_distance ) for index in range(1, pitstop_count): last_ps = pitstops[index - 1] current_ps = pitstops[index] self.average_amount.append( StatsEvent( current_ps.date, round( 100 * current_ps.amount / (current_ps.odometer - last_ps.odometer), 2, ), ) ) class VehicleStats: def __init__(self, vehicle): self.name = vehicle.name self.id = vehicle.id self.overall_distance = 0 self.overall_costs = 0 self.consumables = [] self.odometers = [] self.costs = [] self.costs_per_distance = "N/A" for consumable in vehicle.consumables: self.consumables.append(ConsumableStats(vehicle, consumable)) events = get_event_line_for_vehicle(vehicle) pitstop_count = len(events) if pitstop_count > 0: for pitstop in events: self.odometers.append(StatsEvent(pitstop.date, pitstop.odometer)) if pitstop.costs is not None: self.overall_costs += pitstop.costs self.costs.append(StatsEvent(pitstop.date, pitstop.costs)) # add the instances to the overall costs for regular_cost_instance in calculate_regular_cost_instances(vehicle): self.overall_costs += regular_cost_instance.costs self.costs.append( StatsEvent(regular_cost_instance.date, regular_cost_instance.costs) ) if pitstop_count > 1: self.overall_distance = events[-1].odometer - events[0].odometer self.costs.sort(key=lambda x: x.date) accumulated_costs = 0 for c in self.costs: accumulated_costs += c.value c.value = accumulated_costs if self.overall_distance > 0: self.costs_per_distance = float(self.overall_costs) / ( float(self.overall_distance) / 100 ) class StatsEvent: def __init__(self, date, value): self.date = date self.value = value def __repr__(self): return str(self.date) def db_log_add(entity): logging.info("db_add: %s" % str(entity)) def db_log_delete(entity): logging.info("db_delete: %s" % str(entity)) def db_log_update(entity): logging.info("db_update: %s" % str(entity)) def check_vehicle_name_is_unique(current_user, name_field): """ Checks if the vehicle name given in the name_field is unique for the vehicles of the current user. An error is added to the field it the name is not unique. :param current_user: the user currently logged in :param name_field: the form field to enter the name to :return: True if the name is unique, False otherwise. """ vehicle_name = name_field.data for vehicle in current_user.vehicles: if vehicle.name == vehicle_name: name_field.default = vehicle_name name_field.errors.append('Vehicle "%s" already exists.' % vehicle_name) return False return True def get_latest_pitstop_for_vehicle(vehicle_id): """ return the latest pit stop for the vehicle with the given id. :param vehicle_id: the id of the vehicle :return: the latest pitstop or None if no pitstop exists """ latest_pitstop = ( Pitstop.query.filter(Pitstop.vehicle_id == vehicle_id) .order_by(Pitstop.id.desc()) .first() ) return latest_pitstop def get_latest_pitstop_for_vehicle_and_consumable(vehicle_id, consumable_id): """ return the latest pit stop for the vehicle and consumable with the given ids. :param vehicle_id: the id of the vehicle :param consumable_id: the id of the consumable :return: the latest pitstop or None if no pitstop exists """ latest_pitstop_consumable = ( Pitstop.query.filter(Pitstop.vehicle_id == vehicle_id) .filter(Pitstop.consumable_id == consumable_id) .order_by(Pitstop.id.desc()) .first() ) return latest_pitstop_consumable def compute_lower_limits_for_new_pitstop( latest_pitstop, last_pitstop_consumable, consumable_id ): """ This function figures out the lower limits for date and odometer of a new pitstop. :param latest_pitstop: :param last_pitstop_consumable: :param consumable_id: :return: """ odometer = 0 date_of_pitstop = date(1970, 1, 1) amount = 0 costs = 0 if latest_pitstop is not None: odometer = latest_pitstop.odometer date_of_pitstop = latest_pitstop.date if last_pitstop_consumable is not None: amount = last_pitstop_consumable.amount costs = last_pitstop_consumable.costs return Pitstop(odometer, amount, date_of_pitstop, costs, consumable_id) def pitstop_service_key(x): return x.date, x.odometer def get_event_line_for_vehicle(vehicle): data = [] for pitstop in vehicle.pitstops: data.append(pitstop) for service in vehicle.services: data.append(service) data.sort(key=pitstop_service_key) return data def chunks(l, n): """Yield successive n-sized chunks from l.""" for i in range(0, len(l), n): yield l[i : i + n] def update_filling_station_prices(ids): max_age = (datetime.now() - timedelta(minutes=15)).strftime("%Y-%m-%d %H:%M") res = ( db.session.query(FillingStation) .filter(FillingStation.id.in_(ids)) .filter( or_( FillingStation.last_update == None, FillingStation.last_update < max_age ) ) .all() ) if len(res) > 0: id_map = {x.id: x for x in res} query_ids = [x.id for x in res] api_key = app.config["TANKERKOENIG_API_KEY"] url = "https://creativecommons.tankerkoenig.de/json/prices.php" # documentation tells us to query max 10 filling stations at a time... for c in chunks(query_ids, 10): params = {"apikey": api_key, "ids": ",".join(c)} response = requests.get(url, params=params) response_json = response.json() if response_json["ok"]: prices = response_json["prices"] for price in prices: id = price station_status = prices[id] id_map[id].open = station_status["status"] == "open" if id_map[id].open: id_map[id].diesel = station_status["diesel"] id_map[id].e10 = station_status["e10"] id_map[id].e5 = station_status["e5"] id_map[id].last_update = datetime.now() else: logging.error( "could not update filling stations because of {r} on URL {u}.".format( r=str(response_json), u=response.url ) ) db.session.commit() def calculate_regular_cost_instances(vehicle): data = [] for regular in vehicle.regulars: if date.today() < regular.start_at: # skip regular costs that are not yet active continue if regular.ends_at: end_date = regular.ends_at else: end_date = date.today() start_year = regular.start_at.year end_year = end_date.year for year in range(start_year, end_year + 1): for day in regular.days.split(","): m, d = day.split("-") d = date(year, int(m), int(d)) if regular.start_at <= d and d <= end_date: r = RegularCostInstance( regular.id, date=d, odometer=None, costs=regular.costs, name=regular.description, ) data.append(r) return data def get_users_active_vehicle(user): def selector(vehicle): if not vehicle.pitstops: return date.today() return vehicle.pitstops[-1].date active_vehicles = [g for g in user.vehicles if g.is_active] active_vehicles.sort(key=selector, reverse=True) return active_vehicles