rollerverbrauch/app/tools.py

215 lines
7.2 KiB
Python

from sqlalchemy import or_
import requests
import logging
from datetime import date, datetime, timedelta
from .entities import Pitstop, FillingStation
from . import db, app
class ConsumableStats:
def __init__(self, vehicle, consumable):
self.name = consumable.name
self.id = consumable.id
self.unit = consumable.unit
self.overall_amount = 0
self.average_distance = 0
self.average_amount_fuelled = 0
self.average_amount_used = 0
self.average_amount = []
self.amounts = []
pitstops = [stop for stop in vehicle.pitstops if stop.consumable_id == consumable.id]
pitstop_count = len(pitstops)
if pitstop_count > 0:
for pitstop in pitstops:
self.overall_amount += pitstop.amount
self.amounts.append(StatsEvent(pitstop.date, pitstop.amount))
self.average_amount_fuelled = self.overall_amount / pitstop_count
if pitstop_count > 1:
overall_distance = vehicle.pitstops[-1].odometer - vehicle.pitstops[0].odometer
self.average_distance = overall_distance / (pitstop_count - 1)
self.average_amount_used = 100 * (self.overall_amount - pitstops[0].amount) / overall_distance
for index in range(1, pitstop_count):
last_ps = pitstops[index - 1]
current_ps = pitstops[index]
self.average_amount.append(
StatsEvent(
current_ps.date,
round(100 * current_ps.amount / (current_ps.odometer - last_ps.odometer), 2)))
class VehicleStats:
def __init__(self, vehicle):
self.name = vehicle.name
self.id = vehicle.id
self.overall_distance = 0
self.overall_costs = 0
self.consumables = []
self.odometers = []
for consumable in vehicle.consumables:
self.consumables.append(ConsumableStats(vehicle, consumable))
events = get_event_line_for_vehicle(vehicle)
pitstop_count = len(events)
if pitstop_count > 0:
for pitstop in events:
self.odometers.append(StatsEvent(pitstop.date, pitstop.odometer))
if pitstop.costs is not None:
self.overall_costs += pitstop.costs
if pitstop_count > 1:
self.overall_distance = events[-1].odometer - events[0].odometer
class StatsEvent:
def __init__(self, date, value):
self.date = date
self.value = value
def db_log_add(entity):
logging.info('db_add: %s' % str(entity))
def db_log_delete(entity):
logging.info('db_delete: %s' % str(entity))
def db_log_update(entity):
logging.info('db_update: %s' % str(entity))
def check_vehicle_name_is_unique(current_user, name_field):
"""
Checks if the vehicle name given in the name_field is unique for the vehicles of the current user. An error is added
to the field it the name is not unique.
:param current_user: the user currently logged in
:param name_field: the form field to enter the name to
:return: True if the name is unique, False otherwise.
"""
vehicle_name = name_field.data
for vehicle in current_user.vehicles:
if vehicle.name == vehicle_name:
name_field.default = vehicle_name
name_field.errors.append('Vehicle "%s" already exists.' % vehicle_name)
return False
return True
def get_latest_pitstop_for_vehicle(vehicle_id):
"""
return the latest pit stop for the vehicle with the given id.
:param vehicle_id: the id of the vehicle
:return: the latest pitstop or None if no pitstop exists
"""
latest_pitstop = Pitstop.query \
.filter(Pitstop.vehicle_id == vehicle_id) \
.order_by(Pitstop.id.desc()) \
.first()
return latest_pitstop
def get_latest_pitstop_for_vehicle_and_consumable(vehicle_id, consumable_id):
"""
return the latest pit stop for the vehicle and consumable with the given ids.
:param vehicle_id: the id of the vehicle
:param consumable_id: the id of the consumable
:return: the latest pitstop or None if no pitstop exists
"""
latest_pitstop_consumable = Pitstop.query \
.filter(Pitstop.vehicle_id == vehicle_id) \
.filter(Pitstop.consumable_id == consumable_id) \
.order_by(Pitstop.id.desc()) \
.first()
return latest_pitstop_consumable
def compute_lower_limits_for_new_pitstop(latest_pitstop, last_pitstop_consumable, consumable_id):
"""
This function figures out the lower limits for date and odometer of a new pitstop.
:param latest_pitstop:
:param last_pitstop_consumable:
:param consumable_id:
:return:
"""
odometer = 0
date_of_pitstop = date(1970, 1, 1)
amount = 0
costs = 0
if latest_pitstop is not None:
odometer = latest_pitstop.odometer
date_of_pitstop = latest_pitstop.date
if last_pitstop_consumable is not None:
amount = last_pitstop_consumable.amount
costs = last_pitstop_consumable.costs
return Pitstop(odometer, amount, date_of_pitstop, costs, consumable_id)
def pitstop_service_key(x):
return x.odometer, x.date
def get_event_line_for_vehicle(vehicle):
data = []
for pitstop in vehicle.pitstops:
data.append(pitstop)
for service in vehicle.services:
data.append(service)
data.sort(key=pitstop_service_key)
return data
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
def update_filling_station_prices(ids):
max_age = (datetime.now() - timedelta(minutes=15)).strftime('%Y-%m-%d %H:%M')
res = db.session. \
query(FillingStation). \
filter(FillingStation.id.in_(ids)). \
filter(or_(FillingStation.last_update == None, FillingStation.last_update < max_age)). \
all()
if len(res) > 0:
id_map = {x.id: x for x in res}
query_ids = [x.id for x in res]
api_key = app.config['TANKERKOENIG_API_KEY']
url = 'https://creativecommons.tankerkoenig.de/json/prices.php'
# documentation tells us to query max 10 filling stations at a time...
for c in chunks(query_ids, 10):
params = {
'apikey': api_key, 'ids': ','.join(c)
}
response = requests.get(url, params=params)
response_json = response.json()
if response_json['ok']:
print(response_json)
prices = response_json['prices']
for price in prices:
id = price
station_status = prices[id]
id_map[id].open = station_status['status'] == 'open'
if id_map[id].open:
id_map[id].diesel = station_status['diesel']
id_map[id].e10 = station_status['e10']
id_map[id].e5 = station_status['e5']
id_map[id].last_update = datetime.now()
else:
logging.error(
'could not update filling stations because of {r} on URL {u}.'.format(r=str(response_json),
u=response.url))
db.session.commit()