import os.path from database import db from models import Organization, User from sqlalchemy.orm.attributes import flag_modified from utils.perms import Perm class OrganizationService: @staticmethod def create_organization(name: str, username: str, full_name: str, email: str, public_key: str) -> Organization: from services import UserService user = UserService().get_user_by_username(username) if not user: user = UserService().create_user(username, full_name, email, public_key) project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) repos = os.path.join(project_root, "repository") if not os.path.exists(os.path.join(repos, name)): os.mkdir(os.path.join(repos, name)) roles = { "manager": { "permissions": Perm.get_int([ Perm.DOC_ACL, Perm.DOC_READ, Perm.DOC_DELETE, Perm.ROLE_ACL, Perm.SUBJECT_NEW, Perm.SUBJECT_DOWN, Perm.SUBJECT_UP, Perm.DOC_NEW, Perm.ROLE_NEW, Perm.ROLE_DOWN, Perm.ROLE_UP, Perm.ROLE_MOD ]), "users": [user.id] }, "default": { "permissions": Perm.get_int([]), "users": [] } } organization = Organization( name=name, roles=roles, users={user.id: { "username": user.username, "full_name": user.full_name, "email": user.email, "status": "active" }} ) db.add(organization) db.commit() db.refresh(organization) UserService().add_org_to_user(user, organization) UserService().add_role_to_user(user, organization, "manager") UserService().add_public_key_to_user(user, organization, public_key) return organization @staticmethod def list_organizations(): return db.query(Organization).all() @staticmethod def get_organization(org_id: int) -> Organization | None: return db.query(Organization).filter(Organization.id == org_id).first() @staticmethod def get_organization_by_name(name: str) -> Organization | None: return db.query(Organization).filter(Organization.name == name).first() @staticmethod def get_users_in_organization(org: Organization) -> list[User]: return db.query(Organization).filter(Organization.id == org.id).first().users @staticmethod def get_user_status(org: Organization, user_id: int) -> str: return db.query(Organization).filter(Organization.id == org.id).first().users[str(user_id)]["status"] @staticmethod def add_user_to_organization(org: Organization, user: User) -> Organization: org.users[str(user.id)] = { "username": user.username, "full_name": user.full_name, "email": user.email, "status": "active" } flag_modified(org, "users") db.commit() db.refresh(org) return org @staticmethod def create_role(org: Organization, role: str, perms: list[Perm]) -> Organization: roles = org.roles.copy() roles[role] = { "permissions": Perm.get_int(perms), "users": [] } org.roles = roles flag_modified(org, "roles") db.commit() db.refresh(org) return org @staticmethod def delete_role(org: Organization, role: str) -> Organization: roles = org.roles.copy() del roles[role] org.roles = roles flag_modified(org, "roles") db.commit() db.refresh(org) return org @staticmethod def check_role_permission(org: Organization, role: str, perm: Perm) -> bool: role_perms = org.roles[role]["permissions"] return Perm.check_perm(role_perms, perm.value) @staticmethod def suspend_user(org: Organization, user: User) -> tuple: if OrganizationService.get_user_status(org, user.id) != "active": return {"error": "User already suspended"}, 400 if user.roles[org.id] == "manager": return {"error": "Cannot suspend manager"}, 400 org.users[str(user.id)]["status"] = "suspended" flag_modified(org, "users") db.commit() db.refresh(org) return {"message": "User suspended"}, 200 @staticmethod def activate_user(org: Organization, user: User) -> tuple: if OrganizationService.get_user_status(org, user.id) != "suspended": return {"error": "User already active"}, 400 org.users[str(user.id)]["status"] = "active" flag_modified(org, "users") db.commit() db.refresh(org) return {"message": "User activated"}, 200