sio-2425/delivery2/server/services/sessions.py

129 lines
3.5 KiB
Python
Raw Normal View History

import secrets
from sqlalchemy.orm.attributes import flag_modified
from database import db
from models import Session, User, Organization
from flask import jsonify
from utils import Perm
class SessionService:
@staticmethod
def create_session(user: User, org: Organization) -> Session:
session = Session(
user_id=user.id,
org_id=org.id,
token=secrets.token_hex(128),
roles=[]
)
db.add(session)
db.commit()
db.refresh(session)
return session
@staticmethod
def get_session(token: str) -> Session | None:
return db.query(Session).filter(Session.token == token).first()
@staticmethod
def delete_session(session: Session) -> None:
db.delete(session)
db.commit()
@staticmethod
def validate_session(token: str, required_perms: list[Perm] = None) -> tuple | Session:
from services import OrganizationService
if "Bearer" in token:
token = token.split(" ")[1]
session = SessionService.get_session(token)
if not session:
return jsonify({"error": "Not authenticated"}), 401
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
status = OrganizationService.get_user_status(org, session.user_id)
if status != "active":
return jsonify({"error": "User is not active"}), 403
if required_perms:
for perm in required_perms:
if not SessionService.check_permission(session, perm):
return jsonify({"error": f"Permission denied, missing required permission: {perm}"}), 403
return session
@staticmethod
def assume_role(session: Session, role: str) -> bool:
from services import OrganizationService
org = OrganizationService.get_organization(session.org_id)
if not org:
return False
user = User.query.get(session.user_id)
if not user:
return False
if role not in user.roles[org.id]:
return False
if role in session.roles:
return False
session.roles.append(role)
flag_modified(session, "roles")
db.commit()
db.refresh(session)
return True
@staticmethod
def drop_role(session: Session, role: str) -> bool:
from services import OrganizationService
org = OrganizationService.get_organization(session.org_id)
if not org:
return False
user = User.query.get(session.user_id)
if not user:
return False
if role not in user.roles[org.id]:
return False
if role not in session.roles:
return False
session.roles.remove(role)
flag_modified(session, "roles")
db.commit()
db.refresh(session)
return True
@staticmethod
def list_roles(session: Session) -> list:
return session.roles
@staticmethod
def check_permission(session: Session, perm: Perm) -> bool:
from services import OrganizationService, RoleService
org = OrganizationService.get_organization(session.org_id)
if not org:
return False
user = User.query.get(session.user_id)
if not user:
return False
for role in session.roles:
if RoleService.check_role_permission(org, role, perm):
return True
return False