from flask import jsonify from database import db from models import Organization, User, File from sqlalchemy.orm.attributes import flag_modified from utils import Perm, PermOperation class RoleService: @staticmethod def create_role(org: Organization, role: str, perms: list[Perm]) -> dict | tuple: if role in org.roles: return jsonify({"error": f"Role {role} already exists in organization {org.name}"}), 400 roles = org.roles.copy() roles[role] = { "permissions": Perm.get_int(perms), "users": [], "status": "active" } org.roles = roles flag_modified(org, "roles") db.commit() db.refresh(org) return org.roles[role] @staticmethod def delete_role(org: Organization, role: str) -> dict: roles = org.roles.copy() del roles[role] org.roles = roles flag_modified(org, "roles") db.commit() db.refresh(org) return roles @staticmethod def change_role_status(org: Organization, role: str, status: str) -> Organization | tuple: if role not in org.roles: return jsonify({"error": f"Role {role} does not exist in organization {org.name}"}), 400 if org.roles[role]["status"] == status: return jsonify({"error": f"Role {role} is already {status} in organization {org.name}"}), 400 roles = org.roles.copy() roles[role]["status"] = status org.roles = roles flag_modified(org, "roles") db.commit() db.refresh(org) return org @staticmethod def get_role(org: Organization, role: str) -> dict | None: if role not in org.roles: return None return org.roles[role] @staticmethod def check_role_permission(org: Organization, role: str, perm: Perm, doc_handle=None) -> bool | tuple: from services import FileService if doc_handle: file = FileService.get_file_by_document_handle(doc_handle) if not file: return jsonify({"error": "File not found"}), 404 if role not in file.acl or not Perm.check_perm(file.acl[role], perm.value): return False return True if role not in org.roles: return jsonify({"error": f"Role {role} does not exist in organization {org.name}"}), 400 role_perms = org.roles[role]["permissions"] return Perm.check_perm(role_perms, perm.value) @staticmethod def check_user_permission(user: User, org: Organization, perm: Perm) -> bool: for role in user.roles[org.id]: if RoleService.check_role_permission(org, role, perm): return True return False @staticmethod def get_roles(org: Organization) -> dict: return org.roles @staticmethod def get_users_in_role(org: Organization, role: str) -> list | tuple: from services import UserService if role not in org.roles: return jsonify({"error": f"Role {role} does not exist in organization {org.name}"}), 400 return [UserService.get_user(user_id).username for user_id in org.roles[role]["users"]] @staticmethod def get_roles_for_user(user: User, org: Organization) -> list: return [role for role in org.roles if user.id in org.roles[role]["users"]] @staticmethod def get_perms_for_role(org: Organization, role: str, return_str=False) -> list[Perm | str]: from services import FileService perms_list = Perm.get_perms(org.roles[role]["permissions"], return_str) for f in FileService.list_files_in_org(org): perms_list.append({f.document_handle: Perm.get_perms(f.acl[role], return_str)}) return perms_list @staticmethod def get_roles_for_perm(org: Organization, perm: Perm) -> list: from services import FileService roles = [] if Perm.get_int([perm]) > 0b111: for role in org.roles: if RoleService.check_role_permission(org, role, perm): roles.append(role) else: for file in FileService.list_files_in_org(org): file_roles = [] for role in file.acl: if Perm.check_perm(file.acl[role], perm.value): file_roles.append(role) if file_roles: roles.append({file.name: file_roles}) return roles @staticmethod def add_user_to_role(role: str, org: Organization, user: User) -> User | tuple: if role not in org.roles: return jsonify({"error": f"Role {role} does not exist in organization {org.name}"}), 400 if user.id in org.roles[role]["users"]: return jsonify({"error": f"User {user.username} already has role {role} in organization {org.name}"}), 400 if org.roles[role]["status"] != "active": return jsonify({"error": f"Role {role} is not active in organization {org.name}"}), 400 roles = user.roles.copy() if str(org.id) not in roles or not roles[str(org.id)]: roles[str(org.id)] = [] roles[str(org.id)].append(role) user.roles = roles flag_modified(user, "roles") db.commit() db.refresh(user) roles = org.roles.copy() roles[role]["users"].append(user.id) org.roles = roles flag_modified(org, "roles") db.commit() db.refresh(org) return user @staticmethod def remove_user_from_role(role: str, org: Organization, user: User) -> User | tuple: if role not in org.roles: return jsonify({"error": f"Role {role} does not exist in organization {org.name}"}), 400 if user.id not in org.roles[role]["users"]: return jsonify({"error": f"User {user.username} does not have role {role} in organization {org.name}"}), 400 if org.roles[role]["status"] != "active": return jsonify({"error": f"Role {role} is not active in organization {org.name}"}), 400 roles = user.roles.copy() roles[str(org.id)].remove(role) user.roles = roles flag_modified(user, "roles") db.commit() db.refresh(user) roles = org.roles.copy() roles[role]["users"].remove(user.id) org.roles = roles flag_modified(org, "roles") db.commit() db.refresh(org) return user @staticmethod def change_perm_on_role(org: Organization, role: str, perm: Perm, operation: PermOperation) -> dict | tuple: if Perm.get_int([perm]) <= 0b111: return jsonify({"error": f"Permission {perm} is not allowed for organization's roles"}), 400 if role not in org.roles: return jsonify({"error": f"Role {role} does not exist in organization {org.name}"}), 400 if org.roles[role]["status"] != "active": return jsonify({"error": f"Role {role} is not active in organization {org.name}"}), 400 roles = org.roles.copy() roles[role]["permissions"] = PermOperation.calc(roles[role]["permissions"], perm, operation) org.roles = roles flag_modified(org, "roles") db.commit() db.refresh(org) return org.roles[role] @staticmethod def change_perm_on_role_in_file(file: File, role: str, perm: Perm, operation: PermOperation) -> dict | tuple: if Perm.get_int([perm]) > 0b111: return jsonify({"error": f"Permission {perm} is not allowed for files' roles"}), 400 if role not in file.acl: file.acl[role] = 0 if (file.acl[role] & perm.value != 0 and operation == PermOperation.ADD) or (file.acl[role] & perm.value == 0 and operation == PermOperation.REMOVE): return jsonify({"error": f"Role {role} already has permission {perm} in file {file.name}"}), 400 file.acl[role] = PermOperation.calc(file.acl[role], perm, operation) flag_modified(file, "acl") db.commit() db.refresh(file) return file.acl