rollerverbrauch/app/db/__init__.py

93 lines
2.9 KiB
Python

'''
@author: shing19m
'''
from datetime import datetime
import sqlite3
class Db(object):
'''
classdocs
'''
def __init__(self, db_file):
'''
Constructor
'''
self.db = sqlite3.connect(db_file)
def __del__(self):
self.db.close()
def getAllPitStops(self):
return self._perform_query('select * from pitstops order by id asc')
def getAllServices(self):
return self._perform_query('select * from services')
def getLastPitStop(self):
pitstops = self._perform_query('select * from pitstops order by id desc limit 1')
if len(pitstops) == 0:
return {'date': datetime.strftime(datetime.now(), '%Y-%m-%d'), 'odometer': 0, 'litres': 0}
return pitstops[0]
def get_service_warning_info(self):
info = self._perform_query('select (odometer_planned - (select odometer from pitstops order by id desc limit 1)) km_left, tasks from services where date is null order by odometer_planned asc limit 1;')
if len(info) == 0:
return None
return info[0]
def get_next_undone_service(self):
services = self._perform_query('select * from services where date is null limit 1')
if len(services) == 0:
return None
return services[0]
def get_salt_for_user(self, user):
salt = self._perform_query_param('select salt from users where name = ?', [user])
if len(salt) == 0:
return None
return salt[0]['salt']
def check_password_for_user(self, user, password):
user = self._perform_query_param('select * from users where name = ? and password = ?', [user, password])
if len(user) == 0:
return False
return True
def _perform_query_param(self, query, data):
cursor = self.db.execute(query, data)
names = list(map(lambda x: x[0], cursor.description))
result = []
for row in cursor.fetchall():
row_result = {}
for index in range(0, len(names)):
row_result[names[index]] = row[index]
result.append(row_result)
return result
def _perform_query(self, query):
cursor = self.db.execute(query)
names = list(map(lambda x: x[0], cursor.description))
result = []
for row in cursor.fetchall():
row_result = {}
for index in range(0, len(names)):
row_result[names[index]] = row[index]
result.append(row_result)
return result
def addPitStop(self, date, odometer, litres):
self.db.execute('insert into pitstops (date, odometer, litres) values (?, ?, ?)', [date, odometer, litres])
self.db.commit()
def init_db(self, resource):
with resource as f:
sql_commands = f.read()
self.db.cursor().executescript(sql_commands)
self.db.commit()