First step towards flask security user handling

This commit is contained in:
Joachim Lusiardi 2016-04-17 08:59:11 +02:00
parent 808e0d80d3
commit bbfa7b9341
3 changed files with 74 additions and 71 deletions

View File

@ -5,6 +5,8 @@ from flask import render_template, make_response
from flask import request, redirect, g from flask import request, redirect, g
from flask import url_for from flask import url_for
from flask_sqlalchemy import SQLAlchemy from flask_sqlalchemy import SQLAlchemy
from flask.ext.security import Security, SQLAlchemyUserDatastore, \
UserMixin, RoleMixin, login_required, utils
import uuid import uuid
import hashlib import hashlib
import time import time
@ -13,38 +15,53 @@ from functools import wraps
app = Flask(__name__) app = Flask(__name__)
DATABASE = '/data/rollerverbrauch.db' DATABASE = '/data/rollerverbrauch.db'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///'+DATABASE app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///'+DATABASE
sqldb = SQLAlchemy(app) db = SQLAlchemy(app)
DEBUG = True app.config['DEBUG'] = True
SECRET_KEY = 'development key' app.config['SECRET_KEY'] = 'development key'
app.config['SECURITY_PASSWORD_HASH'] = 'pbkdf2_sha512'
app.config['SECURITY_PASSWORD_SALT'] = 'xxxxxxxxxxxxxxxxxxxxxx'
app.config.from_object(__name__) app.config.from_object(__name__)
class User(sqldb.Model): roles_users = db.Table('roles_users',
id = sqldb.Column(sqldb.Integer, primary_key=True) db.Column('user_id', db.Integer(), db.ForeignKey('user.id')),
username = sqldb.Column(sqldb.String(80), unique=True) db.Column('role_id', db.Integer(), db.ForeignKey('role.id')))
email = sqldb.Column(sqldb.String(120), unique=True)
salt = sqldb.Column(sqldb.String(8))
password_hash = sqldb.Column(sqldb.String(64))
def __init__(self, username, email, password):
self.username = username class Role(db.Model, RoleMixin):
self.email = email id = db.Column(db.Integer(), primary_key=True)
self.salt = uuid.uuid4().hex name = db.Column(db.String(80), unique=True)
m = hashlib.sha256(password.encode('utf-8')) description = db.Column(db.String(255))
m = hashlib.sha256((m.hexdigest() + self.salt).encode('utf-8')).hexdigest()
self.password_hash = m def __str__(self):
pass return self.name
def __hash__(self):
return hash(self.name)
class User(db.Model, UserMixin):
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(255), unique=True)
password = db.Column(db.String(255))
active = db.Column(db.Boolean())
confirmed_at = db.Column(db.DateTime())
roles = db.relationship(
'Role',
secondary=roles_users,
backref=db.backref('users', lazy='dynamic')
)
def __repr__(self): def __repr__(self):
return '<User %r>' % self.username return '<User %r>' % self.username
class Pitstop(sqldb.Model): class Pitstop(db.Model):
id = sqldb.Column(sqldb.Integer, primary_key=True) id = db.Column(db.Integer, primary_key=True)
date = sqldb.Column(sqldb.Date) date = db.Column(db.Date)
odometer = sqldb.Column(sqldb.Integer) odometer = db.Column(db.Integer)
litres = sqldb.Column(sqldb.Numeric(5,2)) litres = db.Column(db.Numeric(5,2))
def __init__(self, odometer, litres, date): def __init__(self, odometer, litres, date):
self.odometer = odometer self.odometer = odometer
@ -54,41 +71,31 @@ class Pitstop(sqldb.Model):
def __repr__(self): def __repr__(self):
return '<Pitstop %r km, %r l>' % (self.odometer, self.litres) return '<Pitstop %r km, %r l>' % (self.odometer, self.litres)
sqldb.create_all()
if User.query.filter_by(username='jlusiardi').first() is None:
user1 = User('jlusiardi', 'joachim@lusiardi.de', 'pitstops')
sqldb.session.add(user1)
sqldb.session.commit()
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
security = Security(app, user_datastore)
def check_auth(username, password): @app.before_first_request
user = User.query.filter_by(username=username).first() def before_first_request():
if user is None: db.create_all()
return False
salt = user.salt
m = hashlib.sha256(password.encode('utf-8'))
m = hashlib.sha256((m.hexdigest()+salt).encode('utf-8'))
digest = m.hexdigest()
ok = (User.query.filter_by(username=username, password_hash=digest).first() is not None)
if not ok:
app.logger.error("digest: " + digest)
return ok
user_datastore.find_or_create_role(name='admin', description='Administrator')
user_datastore.find_or_create_role(name='end-user', description='End user')
def authenticate(): encrypted_password = utils.encrypt_password('password')
resp = make_response(render_template('login_required.html'), 401) if not user_datastore.get_user('someone@example.com'):
resp.headers['WWW-Authenticate'] = 'Basic realm="Login Required"' user_datastore.create_user(email='someone@example.com', password=encrypted_password)
return resp if not user_datastore.get_user('admin@example.com'):
user_datastore.create_user(email='admin@example.com', password=encrypted_password)
# Commit any database changes; the User and Roles must exist before we can add a Role to the User
db.session.commit()
def requires_auth(f): # Give one User has the "end-user" role, while the other has the "admin" role. (This will have no effect if the
@wraps(f) # Users already have these Roles.) Again, commit any database changes.
def decorated(*args, **kwargs): user_datastore.add_role_to_user('someone@example.com', 'end-user')
auth = request.authorization user_datastore.add_role_to_user('admin@example.com', 'admin')
if not auth or not check_auth(auth.username, auth.password): db.session.commit()
return authenticate()
return f(*args, **kwargs)
return decorated
@app.before_request @app.before_request
@ -96,32 +103,27 @@ def before_request():
g.data = {} g.data = {}
@app.teardown_request
def teardown_request(exception):
pass
@app.route('/') @app.route('/')
@requires_auth @login_required
def index(): def index():
return redirect(url_for('get_pit_stops')) return redirect(url_for('get_pit_stops'))
@app.route('/pitstops', methods=['POST']) @app.route('/pitstops', methods=['POST'])
@requires_auth @login_required
def create_pit_stop(): def create_pit_stop():
last_pitstop = Pitstop.query.order_by(Pitstop.date.desc()).first() last_pitstop = Pitstop.query.order_by(Pitstop.date.desc()).first()
if last_pitstop is None: if last_pitstop is None:
last_pitstop = Pitstop(0, 0, None) last_pitstop = Pitstop(0, 0, None)
error_msg = {} error_msg = {}
date_of_pitstop = request.form['date'] date_of_pitstop = request.form['date']
try: try:
date_of_pitstop = datetime.strptime(date_of_pitstop, '%Y-%m-%d').strftime('%Y-%m-%d') date_of_pitstop = datetime.strptime(date_of_pitstop, '%Y-%m-%d').strftime('%Y-%m-%d')
except ValueError: except ValueError:
error_msg['date'] = 'invalid date, only YYYY-MM-DD is allowed' error_msg['date'] = 'invalid date, only YYYY-MM-DD is allowed'
date_of_pitstop = request.form['date'] date_of_pitstop = request.form['date']
odometer = request.form['odometer'] odometer = request.form['odometer']
try: try:
odometer = int(odometer) odometer = int(odometer)
@ -133,7 +135,7 @@ def create_pit_stop():
odometer = request.form['odometer'] odometer = request.form['odometer']
if odometer is None: if odometer is None:
odometer = request.form['odometer'] odometer = request.form['odometer']
litres = request.form['litres'] litres = request.form['litres']
try: try:
litres = float(litres) litres = float(litres)
@ -145,21 +147,21 @@ def create_pit_stop():
litres = request.form['litres'] litres = request.form['litres']
if litres is None: if litres is None:
litres = request.form['litres'] litres = request.form['litres']
# error checking here # error checking here
if len(error_msg) > 0: if len(error_msg) > 0:
data = {'last': {'date': date_of_pitstop, 'odometer': odometer, 'litres': litres}, 'error': error_msg} data = {'last': {'date': date_of_pitstop, 'odometer': odometer, 'litres': litres}, 'error': error_msg}
return render_template('newPitStopForm.html', data=data) return render_template('newPitStopForm.html', data=data)
new_stop = Pitstop(odometer, litres, datetime.strptime(date_of_pitstop, '%Y-%m-%d')) new_stop = Pitstop(odometer, litres, datetime.strptime(date_of_pitstop, '%Y-%m-%d'))
sqldb.session.add(new_stop) db.session.add(new_stop)
sqldb.session.commit() db.session.commit()
return redirect(url_for('get_pit_stops')) return redirect(url_for('get_pit_stops'))
@app.route('/pitstops/createForm', methods=['GET']) @app.route('/pitstops/createForm', methods=['GET'])
@requires_auth @login_required
def create_pit_stop_form(): def create_pit_stop_form():
last_stop = Pitstop.query.order_by(Pitstop.date.desc()).first() last_stop = Pitstop.query.order_by(Pitstop.date.desc()).first()
if last_stop is None: if last_stop is None:
@ -174,7 +176,7 @@ def create_pit_stop_form():
@app.route('/pitstops', methods=['GET']) @app.route('/pitstops', methods=['GET'])
@requires_auth @login_required
def get_pit_stops(): def get_pit_stops():
data = prepare_pit_stops(Pitstop.query.all()) data = prepare_pit_stops(Pitstop.query.all())
g.data['pitstops'] = data g.data['pitstops'] = data
@ -182,13 +184,13 @@ def get_pit_stops():
@app.route('/manual', methods=['GET']) @app.route('/manual', methods=['GET'])
@requires_auth @login_required
def get_manual(): def get_manual():
return render_template('manual.html', data=g.data) return render_template('manual.html', data=g.data)
@app.route('/statistics', methods=['GET']) @app.route('/statistics', methods=['GET'])
@requires_auth @login_required
def get_statistics(): def get_statistics():
pitstops = Pitstop.query.all() pitstops = Pitstop.query.all()
count = len(pitstops) count = len(pitstops)
@ -197,7 +199,7 @@ def get_statistics():
average_distance = 0 average_distance = 0
average_litres_fuelled = 0 average_litres_fuelled = 0
average_litres_used = 0 average_litres_used = 0
if count > 0: if count > 0:
sum_litres = 0 sum_litres = 0
for pitstop in pitstops: for pitstop in pitstops:

View File

@ -1,2 +1,3 @@
Flask Flask
Flask-SQLAlchemy Flask-SQLAlchemy
Flask-Security

View File

@ -66,7 +66,7 @@
{% endblock %} {% endblock %}
</div> </div>
</div> </div>
{{ current_user.email }}
</body> </body>
</html> </html>