sio-2425/delivery2/server/services/sessions.py

167 lines
5.7 KiB
Python

import json
import secrets
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.serialization import load_pem_public_key
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes
from sqlalchemy.orm.attributes import flag_modified
from database import db, security_service
from models import Session, User, Organization
from flask import jsonify
from utils import Perm
from utils.comms_encryption import decrypt_request
from utils.exceptions import SessionException
class SessionService:
@staticmethod
def create_session(user: User, org: Organization) -> Session:
session = Session(
user_id=user.id,
org_id=org.id,
token=secrets.token_hex(128),
roles=[],
challenge=secrets.token_hex(128),
verified=False
)
db.add(session)
db.commit()
db.refresh(session)
return session
@staticmethod
def verify_session(token: str, signature: bytes):
session = SessionService.get_session(token)
if not session:
return jsonify({"error": "Session not found"}), 401
public_key_pem = User.query.get(session.user_id).public_keys.get(str(session.org_id))
if not public_key_pem:
return jsonify({"error": "Public key not found"}), 404
public_key = load_pem_public_key(public_key_pem.encode())
try:
public_key.verify(
signature,
session.challenge.encode(),
padding.PKCS1v15(),
hashes.SHA256()
)
except InvalidSignature:
return jsonify({"error": "Invalid signature"}), 403
session.challenge = None
session.verified = True
db.commit()
db.refresh(session)
return session
@staticmethod
def get_session(token: str) -> Session | None:
return db.query(Session).filter(Session.token == token).first()
@staticmethod
def delete_session(session: Session) -> None:
db.delete(session)
db.commit()
@staticmethod
def validate_session(token: str, data: bytes = None, required_perms: list[Perm] = None, doc_handle=None) -> tuple | Session:
from services import OrganizationService, UserService
if "Bearer" in token:
token = token.split(" ")[1]
session = SessionService.get_session(token)
if not session:
raise SessionException("Not authenticated", 401)
if not session.verified:
raise SessionException("Session has not yet been verified", 403)
user = UserService.get_user(session.user_id)
if not user:
raise SessionException("User not found", 404)
org = OrganizationService.get_organization(session.org_id)
if not org:
raise SessionException("Organization not found", 404)
status = OrganizationService.get_user_status(org, session.user_id)
if status != "active":
raise SessionException("User is not active", 403)
if data is not None:
try:
data = decrypt_request(data.decode(), security_service.get_key(session))
except Exception:
raise SessionException("Failed to decrypt request data", 400)
if required_perms:
for perm in required_perms:
if not SessionService.check_permission(session, perm, doc_handle):
raise SessionException(f"Permission denied, missing required permission: {perm}", 403)
return (session, data) if data is not None else session
@staticmethod
def change_role(session: Session, role: str, operation: str):
from services import OrganizationService
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
user = User.query.get(session.user_id)
if not user:
return jsonify({"error": "User not found"}), 404
if role not in org.roles:
return jsonify({"error": f"Role {role} does not exist in organization {org.name}"}), 404
if operation == "add":
if role not in user.roles[str(org.id)]:
return jsonify({"error": f"User {user.username} does not have role {role}"}), 400
if role in session.roles:
return jsonify({"error": f"User {user.username} already has role {role} in current session"}), 400
session.roles.append(role)
elif operation == "drop":
if role not in user.roles[str(org.id)]:
print(user.roles)
return jsonify({"error": f"User {user.username} does not have role {role}"}), 400
if role not in session.roles:
return jsonify({"error": f"User {user.username} does not have role {role} in current session"}), 400
session.roles.remove(role)
else:
return jsonify({"error": "Invalid operation"}), 400
flag_modified(session, "roles")
db.commit()
db.refresh(session)
return session
@staticmethod
def list_roles(session: Session) -> list:
return session.roles
@staticmethod
def check_permission(session: Session, perm: Perm, doc_handle=None) -> bool:
from services import OrganizationService, RoleService
org = OrganizationService.get_organization(session.org_id)
if not org:
return False
user = User.query.get(session.user_id)
if not user:
return False
for role in session.roles:
if RoleService.check_role_permission(org, role, perm, doc_handle):
return True
return False