From 495a9ab4eab288bbaea3c8fc4ba9ce142af9aef5 Mon Sep 17 00:00:00 2001 From: Tiago Garcia Date: Tue, 17 Dec 2024 00:55:08 +0000 Subject: [PATCH] fix exceptions Signed-off-by: Tiago Garcia --- delivery2/server/routes/file.py | 7 ++- delivery2/server/routes/role.py | 77 ++++++++++++--------------- delivery2/server/routes/user.py | 7 ++- delivery2/server/services/roles.py | 52 +++++++++--------- delivery2/server/services/sessions.py | 29 +++++----- 5 files changed, 81 insertions(+), 91 deletions(-) diff --git a/delivery2/server/routes/file.py b/delivery2/server/routes/file.py index a4c657a..4c1937c 100644 --- a/delivery2/server/routes/file.py +++ b/delivery2/server/routes/file.py @@ -195,10 +195,9 @@ def file_acl(): if role not in org.roles: return jsonify({"error": "Role not found"}), 404 - try: - RoleService.change_perm_on_role_in_file(file, role, perm, operation) - except ValueError as e: - return jsonify({"error": str(e)}), 400 + acl = RoleService.change_perm_on_role_in_file(file, role, perm, operation) + if isinstance(acl, tuple): + return acl return jsonify(file.to_dict()), 200 diff --git a/delivery2/server/routes/role.py b/delivery2/server/routes/role.py index eb9f447..3b927cd 100644 --- a/delivery2/server/routes/role.py +++ b/delivery2/server/routes/role.py @@ -49,10 +49,9 @@ def role_list_users(role): if not org: return jsonify({"error": "Organization not found"}), 404 - try: - users = RoleService.get_users_in_role(org, role) - except ValueError as e: - return jsonify({"error": str(e)}), 400 + users = RoleService.get_users_in_role(org, role) + if isinstance(users, tuple): + return users return jsonify(users), 200 @@ -70,10 +69,9 @@ def role_list_perms(role): if not org: return jsonify({"error": "Organization not found"}), 404 - try: - perms = RoleService.get_perms_for_role(org, role, return_str=True) - except ValueError as e: - return jsonify({"error": str(e)}), 400 + perms = RoleService.get_perms_for_role(org, role, return_str=True) + if isinstance(perms, tuple): + return perms return jsonify(perms), 200 @@ -98,10 +96,9 @@ def role_suspend(role): if not org: return jsonify({"error": "Organization not found"}), 404 - try: - RoleService.change_role_status(org, role, "suspended") - except ValueError as e: - return jsonify({"error": str(e)}), 400 + status = RoleService.change_role_status(org, role, "suspended") + if isinstance(status, tuple): + return status return jsonify({"message": "Role suspended"}), 200 @@ -127,10 +124,9 @@ def role_activate(role): if not org: return jsonify({"error": "Organization not found"}), 404 - try: - RoleService.change_role_status(org, role, "active") - except ValueError as e: - return jsonify({"error": str(e)}), 400 + status = RoleService.change_role_status(org, role, "active") + if isinstance(status, tuple): + return status return jsonify({"message": "Role activated"}), 200 @@ -160,10 +156,9 @@ def role_user_add(role, username): if not user: return jsonify({"error": "User not found"}), 404 - try: - RoleService.add_user_to_role(role, org, user) - except ValueError as e: - return jsonify({"error": str(e)}), 400 + role = RoleService.add_user_to_role(role, org, user) + if isinstance(role, tuple): + return role return jsonify({"message": "User added to role"}), 200 @@ -193,10 +188,9 @@ def role_user_remove(role, username): if not user: return jsonify({"error": "User not found"}), 404 - try: - RoleService.remove_user_from_role(role, org, user) - except ValueError as e: - return jsonify({"error": str(e)}), 400 + role = RoleService.remove_user_from_role(role, org, user) + if isinstance(role, tuple): + return role return jsonify({"message": "User removed from role"}), 200 @@ -222,10 +216,9 @@ def role_perm_add(role, perm): if not org: return jsonify({"error": "Organization not found"}), 404 - try: - RoleService.change_perm_on_role(org, role, Perm.from_str(perm), PermOperation.ADD) - except ValueError as e: - return jsonify({"error": str(e)}), 400 + role = RoleService.change_perm_on_role(org, role, Perm.from_str(perm), PermOperation.ADD) + if isinstance(role, tuple): + return role return jsonify({"message": "Permission added to role"}), 200 @@ -251,10 +244,9 @@ def role_perm_remove(role, perm): if not org: return jsonify({"error": "Organization not found"}), 404 - try: - RoleService.change_perm_on_role(org, role, Perm.from_str(perm), PermOperation.REMOVE) - except ValueError as e: - return jsonify({"error": str(e)}), 400 + role = RoleService.change_perm_on_role(org, role, Perm.from_str(perm), PermOperation.REMOVE) + if isinstance(role, tuple): + return role return jsonify({"message": "Permission removed from role"}), 200 @@ -272,10 +264,9 @@ def role_session_assume(role): if not RoleService.get_role(session.org_id, role): return jsonify({"error": "Role not found"}), 404 - try: - SessionService.change_role(session, role, "add") - except ValueError as e: - return jsonify({"error": str(e)}), 400 + session = SessionService.change_role(session, role, "add") + if isinstance(session, tuple): + return session return jsonify(session.to_dict()), 200 @@ -293,10 +284,9 @@ def role_session_drop(role): if not RoleService.get_role(session.org_id, role): return jsonify({"error": "Role not found"}), 404 - try: - SessionService.change_role(session, role, "drop") - except ValueError as e: - return jsonify({"error": str(e)}), 400 + session = SessionService.change_role(session, role, "drop") + if isinstance(session, tuple): + return session return jsonify(session.to_dict()), 200 @@ -328,8 +318,7 @@ def perm_list_roles(perm): if not org: return jsonify({"error": "Organization not found"}), 404 - try: - roles = RoleService.get_roles_for_perm(org, Perm(perm)) - except ValueError as e: - return jsonify({"error": str(e)}), 400 + roles = RoleService.get_roles_for_perm(org, Perm(perm)) + if isinstance(roles, tuple): + return roles return jsonify(roles), 200 \ No newline at end of file diff --git a/delivery2/server/routes/user.py b/delivery2/server/routes/user.py index ffd4893..79d3c9f 100644 --- a/delivery2/server/routes/user.py +++ b/delivery2/server/routes/user.py @@ -40,10 +40,9 @@ def user_login(): signature = data["signature"] signature = base64.b64decode(signature) - try: - SessionService.verify_session(session_token, signature) - except InvalidSignature: - return jsonify({"error": "Invalid signature"}), 400 + session = SessionService.verify_session(session_token, signature) + if isinstance(session, tuple): + return session return jsonify(session.to_dict()), 200 return jsonify({"error": "Missing required fields"}), 400 diff --git a/delivery2/server/services/roles.py b/delivery2/server/services/roles.py index df25c28..5457ede 100644 --- a/delivery2/server/services/roles.py +++ b/delivery2/server/services/roles.py @@ -1,3 +1,5 @@ +from flask import jsonify + from database import db from models import Organization, User, File from sqlalchemy.orm.attributes import flag_modified @@ -6,9 +8,9 @@ from utils import Perm, PermOperation class RoleService: @staticmethod - def create_role(org: Organization, role: str, perms: list[Perm]) -> dict: + def create_role(org: Organization, role: str, perms: list[Perm]) -> dict | tuple: if role in org.roles: - raise ValueError(f"Role {role} already exists in organization {org.name}") + return jsonify({"error": f"Role {role} already exists in organization {org.name}"}), 400 roles = org.roles.copy() roles[role] = { @@ -33,12 +35,12 @@ class RoleService: return roles @staticmethod - def change_role_status(org: Organization, role: str, status: str) -> Organization: + def change_role_status(org: Organization, role: str, status: str) -> Organization | tuple: if role not in org.roles: - raise ValueError(f"Role {role} does not exist in organization {org.name}") + return jsonify({"error": f"Role {role} does not exist in organization {org.name}"}), 400 if org.roles[role]["status"] == status: - raise ValueError(f"Role {role} is already {status} in organization {org.name}") + return jsonify({"error": f"Role {role} is already {status} in organization {org.name}"}), 400 roles = org.roles.copy() roles[role]["status"] = status @@ -55,13 +57,13 @@ class RoleService: return org.roles[role] @staticmethod - def check_role_permission(org: Organization, role: str, perm: Perm, doc_handle=None) -> bool: + def check_role_permission(org: Organization, role: str, perm: Perm, doc_handle=None) -> bool | tuple: from services import FileService if doc_handle: file = FileService.get_file_by_document_handle(doc_handle) if not file: - raise ValueError(f"Document {doc_handle} not found") + return jsonify({"error": "File not found"}), 404 if not Perm.check_perm(file.acl[role], perm.value): return False @@ -69,7 +71,7 @@ class RoleService: return True if role not in org.roles: - raise ValueError(f"Role {role} does not exist in organization {org.name}") + return jsonify({"error": f"Role {role} does not exist in organization {org.name}"}), 400 role_perms = org.roles[role]["permissions"] return Perm.check_perm(role_perms, perm.value) @@ -86,9 +88,9 @@ class RoleService: return org.roles @staticmethod - def get_users_in_role(org: Organization, role: str) -> list: + def get_users_in_role(org: Organization, role: str) -> list | tuple: if role not in org.roles: - raise ValueError(f"Role {role} does not exist in organization {org.name}") + return jsonify({"error": f"Role {role} does not exist in organization {org.name}"}), 400 return org.roles[role]["users"] @staticmethod @@ -108,15 +110,15 @@ class RoleService: return roles @staticmethod - def add_user_to_role(role: str, org: Organization, user: User) -> User: + def add_user_to_role(role: str, org: Organization, user: User) -> User | tuple: if role not in org.roles: - raise ValueError(f"Role {role} does not exist in organization {org.name}") + return jsonify({"error": f"Role {role} does not exist in organization {org.name}"}), 400 if user.id in org.roles[role]["users"]: - raise ValueError(f"User {user.username} already has role {role} in organization {org.name}") + return jsonify({"error": f"User {user.username} already has role {role} in organization {org.name}"}), 400 if org.roles[role]["status"] != "active": - raise ValueError(f"Role {role} is not active in organization {org.name}") + return jsonify({"error": f"Role {role} is not active in organization {org.name}"}), 400 roles = user.roles.copy() roles[org.id] = role @@ -134,15 +136,15 @@ class RoleService: return user @staticmethod - def remove_user_from_role(role: str, org: Organization, user: User) -> User: + def remove_user_from_role(role: str, org: Organization, user: User) -> User | tuple: if role not in org.roles: - raise ValueError(f"Role {role} does not exist in organization {org.name}") + return jsonify({"error": f"Role {role} does not exist in organization {org.name}"}), 400 if user.id not in org.roles[role]["users"]: - raise ValueError(f"User {user.username} does not have role {role} in organization {org.name}") + return jsonify({"error": f"User {user.username} does not have role {role} in organization {org.name}"}), 400 if org.roles[role]["status"] != "active": - raise ValueError(f"Role {role} is not active in organization {org.name}") + return jsonify({"error": f"Role {role} is not active in organization {org.name}"}), 400 roles = user.roles.copy() roles.pop(org.id) @@ -160,15 +162,15 @@ class RoleService: return user @staticmethod - def change_perm_on_role(org: Organization, role: str, perm: Perm, operation: PermOperation) -> dict: + def change_perm_on_role(org: Organization, role: str, perm: Perm, operation: PermOperation) -> dict | tuple: if Perm.get_int([perm]) <= 0b111: - raise ValueError(f"Permission {perm} is not allowed for organization's roles") + return jsonify({"error": f"Permission {perm} is not allowed for organization's roles"}), 400 if role not in org.roles: - raise ValueError(f"Role {role} does not exist in organization {org.name}") + return jsonify({"error": f"Role {role} does not exist in organization {org.name}"}), 400 if org.roles[role]["status"] != "active": - raise ValueError(f"Role {role} is not active in organization {org.name}") + return jsonify({"error": f"Role {role} is not active in organization {org.name}"}), 400 roles = org.roles.copy() roles[role]["permissions"] = PermOperation.calc(roles[role]["permissions"], perm, operation) @@ -179,15 +181,15 @@ class RoleService: return org.roles[role] @staticmethod - def change_perm_on_role_in_file(file: File, role: str, perm: Perm, operation: PermOperation) -> dict: + def change_perm_on_role_in_file(file: File, role: str, perm: Perm, operation: PermOperation) -> dict | tuple: if Perm.get_int([perm]) > 0b111: - raise ValueError(f"Permission {perm} is not allowed for files' roles") + return jsonify({"error": f"Permission {perm} is not allowed for files' roles"}), 400 if role not in file.acl: file.acl[role] = 0 if file.acl[role] & perm.value != 0: - raise ValueError(f"Role {role} already has permission {perm} in file {file.document_handle}") + return jsonify({"error": f"Role {role} already has permission {perm} in file {file.document_handle}"}), 400 file.acl[role] = PermOperation.calc(file.acl[role], perm, operation) flag_modified(file, "acl") diff --git a/delivery2/server/services/sessions.py b/delivery2/server/services/sessions.py index 4c3378d..f5438dd 100644 --- a/delivery2/server/services/sessions.py +++ b/delivery2/server/services/sessions.py @@ -32,10 +32,10 @@ class SessionService: def verify_session(token: str, signature: bytes): session = SessionService.get_session(token) if not session: - raise ValueError(f"Session {token} not found") + return jsonify({"error": "Session not found"}), 401 public_key_pem = User.query.get(session.user_id).public_keys.get(str(session.org_id)) if not public_key_pem: - raise ValueError(f"Public key not found for user {session.user_id} in organization {session.org_id}") + return jsonify({"error": "Public key not found"}), 404 public_key = load_pem_public_key(public_key_pem.encode()) public_key.verify( signature, @@ -79,10 +79,10 @@ class SessionService: if status != "active": return jsonify({"error": "User is not active"}), 403 - #if required_perms: - # for perm in required_perms: - # if not SessionService.check_permission(session, perm, doc_handle): - # return jsonify({"error": f"Permission denied, missing required permission: {perm}"}), 403 + if required_perms: + for perm in required_perms: + if not SessionService.check_permission(session, perm, doc_handle): + return jsonify({"error": f"Permission denied, missing required permission: {perm}"}), 403 return session @@ -92,37 +92,38 @@ class SessionService: org = OrganizationService.get_organization(session.org_id) if not org: - raise ValueError(f"Organization {session.org_id} not found") + return jsonify({"error": "Organization not found"}), 404 user = User.query.get(session.user_id) if not user: - raise ValueError(f"User {session.user_id} not found") + return jsonify({"error": "User not found"}), 404 if role not in org.roles: - raise ValueError(f"Role {role} does not exist in organization {org.name}") + return jsonify({"error": f"Role {role} does not exist in organization {org.name}"}), 404 if operation == "add": if role not in user.roles[org.id]: - raise ValueError(f"User {user.username} does not have role {role}") + return jsonify({"error": f"User {user.username} does not have role {role}"}), 400 if role in session.roles: - raise ValueError(f"User {user.username} already has role {role} in current session") + return jsonify({"error": f"User {user.username} already has role {role} in current session"}), 400 session.roles.append(role) elif operation == "drop": if role not in user.roles[org.id]: - raise ValueError(f"User {user.username} does not have role {role}") + return jsonify({"error": f"User {user.username} does not have role {role}"}), 400 if role not in session.roles: - raise ValueError(f"User {user.username} does not have role {role} in current session") + return jsonify({"error": f"User {user.username} does not have role {role} in current session"}), 400 session.roles.remove(role) else: - raise ValueError(f"Invalid operation {operation}") + return jsonify({"error": "Invalid operation"}), 400 flag_modified(session, "roles") db.commit() db.refresh(session) + return session @staticmethod def list_roles(session: Session) -> list: