""" @author: jlusiardi """ from datetime import datetime import sqlite3 class Db(object): """ classdocs """ def __init__(self, db_file): """ Constructor """ self.db = sqlite3.connect(db_file) def __del__(self): self.db.close() def get_all_pit_stops(self): return self._perform_query('select * from pitstops order by id asc') def get_last_pit_stop(self): pitstops = self._perform_query('select * from pitstops order by id desc limit 1') if len(pitstops) == 0: return {'date': datetime.strftime(datetime.now(), '%Y-%m-%d'), 'odometer': 0, 'litres': 0} return pitstops[0] def add_pit_stop(self, date, odometer, litres): self.db.execute('insert into pitstops (date, odometer, litres) values (?, ?, ?)', [date, odometer, litres]) self.db.commit() def get_salt_for_user(self, user): salt = self._perform_query_param('select salt from users where name = ?', [user]) if len(salt) == 0: return None return salt[0]['salt'] def check_password_for_user(self, user, password): user = self._perform_query_param('select * from users where name = ? and password = ?', [user, password]) if len(user) == 0: return False return True def _perform_query_param(self, query, data): cursor = self.db.execute(query, data) names = list(map(lambda x: x[0], cursor.description)) result = [] for row in cursor.fetchall(): row_result = {} for index in range(0, len(names)): row_result[names[index]] = row[index] result.append(row_result) return result def _perform_query(self, query): cursor = self.db.execute(query) names = list(map(lambda x: x[0], cursor.description)) result = [] for row in cursor.fetchall(): row_result = {} for index in range(0, len(names)): row_result[names[index]] = row[index] result.append(row_result) return result def init_db(self, resource): with resource as f: sql_commands = f.read() self.db.cursor().executescript(sql_commands) self.db.commit()