sio-2425/delivery2/server/services/roles.py

249 lines
8.5 KiB
Python

from database import db
from models import Organization, User, File
from sqlalchemy.orm.attributes import flag_modified
from utils import Perm
class RoleService:
@staticmethod
def create_role(org: Organization, role: str, perms: list[Perm]) -> dict:
if role in org.roles:
raise ValueError(f"Role {role} already exists in organization {org.name}")
roles = org.roles.copy()
roles[role] = {
"permissions": Perm.get_int(perms),
"users": [],
"status": "active"
}
org.roles = roles
flag_modified(org, "roles")
db.commit()
db.refresh(org)
return org.roles[role]
@staticmethod
def delete_role(org: Organization, role: str) -> dict:
roles = org.roles.copy()
del roles[role]
org.roles = roles
flag_modified(org, "roles")
db.commit()
db.refresh(org)
return roles
@staticmethod
def activate_role(org: Organization, role: str) -> Organization:
if role not in org.roles:
raise ValueError(f"Role {role} does not exist in organization {org.name}")
if org.roles[role]["status"] == "active":
raise ValueError(f"Role {role} is already active in organization {org.name}")
roles = org.roles.copy()
roles[role]["status"] = "active"
org.roles = roles
flag_modified(org, "roles")
db.commit()
db.refresh(org)
return org
@staticmethod
def suspend_role(org: Organization, role: str) -> Organization:
if role == "manager":
raise ValueError(f"Role {role} cannot be suspended in organization {org.name}")
if role not in org.roles:
raise ValueError(f"Role {role} does not exist in organization {org.name}")
if org.roles[role]["status"] == "suspended":
raise ValueError(f"Role {role} is already suspended in organization {org.name}")
roles = org.roles.copy()
roles[role]["status"] = "suspended"
org.roles = roles
flag_modified(org, "roles")
db.commit()
db.refresh(org)
return org
@staticmethod
def get_role(org: Organization, role: str) -> dict:
if role not in org.roles:
raise ValueError(f"Role {role} does not exist in organization {org.name}")
return org.roles[role]
@staticmethod
def check_role_permission(org: Organization, role: str, perm: Perm, doc_handle=None) -> bool:
from services import FileService
if doc_handle:
file = FileService.get_file_by_document_handle(doc_handle)
if not file:
raise ValueError(f"Document {doc_handle} not found")
if not Perm.check_perm(file.acl[role], perm.value):
return False
return True
if role not in org.roles:
raise ValueError(f"Role {role} does not exist in organization {org.name}")
role_perms = org.roles[role]["permissions"]
return Perm.check_perm(role_perms, perm.value)
@staticmethod
def check_user_permission(user: User, org: Organization, perm: Perm) -> bool:
for role in user.roles[org.id]:
if RoleService.check_role_permission(org, role, perm):
return True
return False
@staticmethod
def list_roles(org: Organization) -> dict:
return org.roles
@staticmethod
def list_users_in_role(org: Organization, role: str) -> list:
return org.roles[role]["users"]
@staticmethod
def list_roles_for_user(user: User, org: Organization) -> list:
return [role for role in org.roles if user.id in org.roles[role]["users"]]
@staticmethod
def list_perms_for_role(org: Organization, role: str, return_str=False) -> list[Perm | str]:
return Perm.get_perms(org.roles[role]["permissions"], return_str)
@staticmethod
def list_roles_for_perm(org: Organization, perm: Perm) -> list:
roles = []
for role in org.roles:
if RoleService.check_role_permission(org, role, perm):
roles.append(role)
return roles
@staticmethod
def add_role_to_user(user: User, org: Organization, role: str) -> User:
if role not in org.roles:
raise ValueError(f"Role {role} does not exist in organization {org.name}")
if user.id in org.roles[role]["users"]:
raise ValueError(f"User {user.username} already has role {role} in organization {org.name}")
if org.roles[role]["status"] != "active":
raise ValueError(f"Role {role} is not active in organization {org.name}")
roles = user.roles.copy()
roles[org.id] = role
user.roles = roles
flag_modified(user, "roles")
db.commit()
db.refresh(user)
roles = org.roles.copy()
roles[role]["users"].append(user.id)
org.roles = roles
flag_modified(org, "roles")
db.commit()
db.refresh(org)
return user
@staticmethod
def remove_role_from_user(user: User, org: Organization, role: str) -> User:
if role not in org.roles:
raise ValueError(f"Role {role} does not exist in organization {org.name}")
if user.id not in org.roles[role]["users"]:
raise ValueError(f"User {user.username} does not have role {role} in organization {org.name}")
if org.roles[role]["status"] != "active":
raise ValueError(f"Role {role} is not active in organization {org.name}")
roles = user.roles.copy()
roles.pop(org.id)
user.roles = roles
flag_modified(user, "roles")
db.commit()
db.refresh(user)
roles = org.roles.copy()
roles[role]["users"].remove(user.id)
org.roles = roles
flag_modified(org, "roles")
db.commit()
db.refresh(org)
return user
@staticmethod
def add_perm_to_role(org: Organization, role: str, perm: Perm) -> dict:
if perm in [Perm.DOC_ACL, Perm.DOC_READ, Perm.DOC_DELETE]:
raise ValueError(f"Permission {perm} is not allowed for organization's roles")
if role not in org.roles:
raise ValueError(f"Role {role} does not exist in organization {org.name}")
if org.roles[role]["status"] != "active":
raise ValueError(f"Role {role} is not active in organization {org.name}")
roles = org.roles.copy()
roles[role]["permissions"] |= perm.value
org.roles = roles
flag_modified(org, "roles")
db.commit()
db.refresh(org)
return org.roles[role]
@staticmethod
def remove_perm_from_role(org: Organization, role: str, perm: Perm) -> dict:
if perm in [Perm.DOC_ACL, Perm.DOC_READ, Perm.DOC_DELETE]:
raise ValueError(f"Permission {perm} is not allowed for organization's roles")
if role not in org.roles:
raise ValueError(f"Role {role} does not exist in organization {org.name}")
if org.roles[role]["status"] != "active":
raise ValueError(f"Role {role} is not active in organization {org.name}")
roles = org.roles.copy()
roles[role]["permissions"] &= ~perm.value
org.roles = roles
flag_modified(org, "roles")
db.commit()
db.refresh(org)
return org.roles[role]
@staticmethod
def add_perm_to_role_in_file(file: File, role: str, perm: Perm) -> dict:
if perm not in [Perm.DOC_ACL, Perm.DOC_READ, Perm.DOC_DELETE]:
raise ValueError(f"Permission {perm} is not allowed for files' roles")
if role not in file.acl:
file.acl[role] = 0
if file.acl[role] & perm.value != 0:
raise ValueError(f"Role {role} already has permission {perm} in file {file.document_handle}")
file.acl[role] |= perm.value
flag_modified(file, "acl")
db.commit()
db.refresh(file)
return file.acl
@staticmethod
def remove_perm_from_role_in_file(file: File, role: str, perm: Perm) -> dict:
if perm not in [Perm.DOC_ACL, Perm.DOC_READ, Perm.DOC_DELETE]:
raise ValueError(f"Permission {perm} is not allowed for files' roles")
if role not in file.acl:
file.acl[role] = 0
if file.acl[role] & perm.value == 0:
raise ValueError(f"Role {role} does not have permission {perm} in file {file.document_handle}")
file.acl[role] &= ~perm.value
flag_modified(file, "acl")
db.commit()
db.refresh(file)
return file.acl