import json from flask import Blueprint, request, jsonify from services import UserService, SessionService, OrganizationService, RoleService from utils import Perm, PermOperation role_bp = Blueprint("role", __name__) @role_bp.route("/create", methods=["POST"]) def role_create(): data = request.json if type(data) is str: data = json.loads(data) if "role" not in data or "perms" not in data: return jsonify({"error": "Missing required fields"}), 400 session_token = request.headers.get("Authorization") if not session_token: return jsonify({"error": "No session token"}), 400 session = SessionService.validate_session(session_token, [Perm.ROLE_NEW]) if not session: return jsonify({"error": "Not authenticated"}), 401 org = OrganizationService.get_organization(session.org_id) if not org: return jsonify({"error": "Organization not found"}), 404 try: role = RoleService.create_role(org, data["role"], data["perms"]) except ValueError as e: return jsonify({"error": str(e)}), 400 return jsonify(role), 201 @role_bp.route("//list/users", methods=["GET"]) def role_list_users(role): session_token = request.headers.get("Authorization") if not session_token: return jsonify({"error": "No session token"}), 400 session = SessionService.validate_session(session_token) if not session: return jsonify({"error": "Not authenticated"}), 401 org = OrganizationService.get_organization(session.org_id) if not org: return jsonify({"error": "Organization not found"}), 404 users = RoleService.get_users_in_role(org, role) if isinstance(users, tuple): return users return jsonify(users), 200 @role_bp.route("//list/perms", methods=["GET"]) def role_list_perms(role): session_token = request.headers.get("Authorization") if not session_token: return jsonify({"error": "No session token"}), 400 session = SessionService.validate_session(session_token) if not session: return jsonify({"error": "Not authenticated"}), 401 org = OrganizationService.get_organization(session.org_id) if not org: return jsonify({"error": "Organization not found"}), 404 perms = RoleService.get_perms_for_role(org, role, return_str=True) if isinstance(perms, tuple): return perms return jsonify(perms), 200 @role_bp.route("//suspend", methods=["POST"]) def role_suspend(role): data = request.json if type(data) is str: data = json.loads(data) if "user" not in data: return jsonify({"error": "Missing required fields"}), 400 session_token = request.headers.get("Authorization") if not session_token: return jsonify({"error": "No session token"}), 400 session = SessionService.validate_session(session_token, [Perm.ROLE_DOWN]) if not session: return jsonify({"error": "Not authenticated"}), 401 org = OrganizationService.get_organization(session.org_id) if not org: return jsonify({"error": "Organization not found"}), 404 status = RoleService.change_role_status(org, role, "suspended") if isinstance(status, tuple): return status return jsonify({"message": "Role suspended"}), 200 @role_bp.route("//activate", methods=["POST"]) def role_activate(role): data = request.json if type(data) is str: data = json.loads(data) if "user" not in data: return jsonify({"error": "Missing required fields"}), 400 session_token = request.headers.get("Authorization") if not session_token: return jsonify({"error": "No session token"}), 400 session = SessionService.validate_session(session_token, [Perm.ROLE_UP]) if not session: return jsonify({"error": "Not authenticated"}), 401 org = OrganizationService.get_organization(session.org_id) if not org: return jsonify({"error": "Organization not found"}), 404 status = RoleService.change_role_status(org, role, "active") if isinstance(status, tuple): return status return jsonify({"message": "Role activated"}), 200 @role_bp.route("//user/add/", methods=["POST"]) def role_user_add(role, username): data = request.json if type(data) is str: data = json.loads(data) if "user" not in data: return jsonify({"error": "Missing required fields"}), 400 session_token = request.headers.get("Authorization") if not session_token: return jsonify({"error": "No session token"}), 400 session = SessionService.validate_session(session_token, [Perm.ROLE_MOD]) if not session: return jsonify({"error": "Not authenticated"}), 401 org = OrganizationService.get_organization(session.org_id) if not org: return jsonify({"error": "Organization not found"}), 404 user = UserService.get_user_by_username(username) if not user: return jsonify({"error": "User not found"}), 404 role = RoleService.add_user_to_role(role, org, user) if isinstance(role, tuple): return role return jsonify({"message": "User added to role"}), 200 @role_bp.route("//user/remove/", methods=["POST"]) def role_user_remove(role, username): data = request.json if type(data) is str: data = json.loads(data) if "user" not in data: return jsonify({"error": "Missing required fields"}), 400 session_token = request.headers.get("Authorization") if not session_token: return jsonify({"error": "No session token"}), 400 session = SessionService.validate_session(session_token, [Perm.ROLE_MOD]) if not session: return jsonify({"error": "Not authenticated"}), 401 org = OrganizationService.get_organization(session.org_id) if not org: return jsonify({"error": "Organization not found"}), 404 user = UserService.get_user_by_username(username) if not user: return jsonify({"error": "User not found"}), 404 role = RoleService.remove_user_from_role(role, org, user) if isinstance(role, tuple): return role return jsonify({"message": "User removed from role"}), 200 @role_bp.route("//perm/add/", methods=["POST"]) def role_perm_add(role, perm): data = request.json if type(data) is str: data = json.loads(data) if "user" not in data: return jsonify({"error": "Missing required fields"}), 400 session_token = request.headers.get("Authorization") if not session_token: return jsonify({"error": "No session token"}), 400 session = SessionService.validate_session(session_token, [Perm.ROLE_MOD]) if not session: return jsonify({"error": "Not authenticated"}), 401 org = OrganizationService.get_organization(session.org_id) if not org: return jsonify({"error": "Organization not found"}), 404 role = RoleService.change_perm_on_role(org, role, Perm.from_str(perm), PermOperation.ADD) if isinstance(role, tuple): return role return jsonify({"message": "Permission added to role"}), 200 @role_bp.route("//perm/remove/", methods=["POST"]) def role_perm_remove(role, perm): data = request.json if type(data) is str: data = json.loads(data) if "user" not in data: return jsonify({"error": "Missing required fields"}), 400 session_token = request.headers.get("Authorization") if not session_token: return jsonify({"error": "No session token"}), 400 session = SessionService.validate_session(session_token, [Perm.ROLE_MOD]) if not session: return jsonify({"error": "Not authenticated"}), 401 org = OrganizationService.get_organization(session.org_id) if not org: return jsonify({"error": "Organization not found"}), 404 role = RoleService.change_perm_on_role(org, role, Perm.from_str(perm), PermOperation.REMOVE) if isinstance(role, tuple): return role return jsonify({"message": "Permission removed from role"}), 200 @role_bp.route("/session/assume/", methods=["POST"]) def role_session_assume(role): session_token = request.headers.get("Authorization") if not session_token: return jsonify({"error": "No session token"}), 400 session = SessionService.validate_session(session_token) if not session: return jsonify({"error": "Not authenticated"}), 401 org = OrganizationService.get_organization(session.org_id) if not RoleService.get_role(org, role): return jsonify({"error": "Role not found"}), 404 session = SessionService.change_role(session, role, "add") if isinstance(session, tuple): return session return jsonify(session.to_dict()), 200 @role_bp.route("/session/drop/", methods=["POST"]) def role_session_drop(role): session_token = request.headers.get("Authorization") if not session_token: return jsonify({"error": "No session token"}), 400 session = SessionService.validate_session(session_token) if not session: return jsonify({"error": "Not authenticated"}), 401 org = OrganizationService.get_organization(session.org_id) if not RoleService.get_role(org, role): return jsonify({"error": "Role not found"}), 404 session = SessionService.change_role(session, role, "drop") if isinstance(session, tuple): return session return jsonify(session.to_dict()), 200 @role_bp.route("/session/list", methods=["GET"]) def role_session_list(): session_token = request.headers.get("Authorization") if not session_token: return jsonify({"error": "No session token"}), 400 session = SessionService.validate_session(session_token) if not session: return jsonify({"error": "Not authenticated"}), 401 roles = SessionService.list_roles(session) return jsonify(roles), 200 @role_bp.route("/perm//roles", methods=["GET"]) def perm_list_roles(perm): session_token = request.headers.get("Authorization") if not session_token: return jsonify({"error": "No session token"}), 400 session = SessionService.validate_session(session_token) if not session: return jsonify({"error": "Not authenticated"}), 401 org = OrganizationService.get_organization(session.org_id) if not org: return jsonify({"error": "Organization not found"}), 404 roles = RoleService.get_roles_for_perm(org, Perm(perm)) if isinstance(roles, tuple): return roles return jsonify(roles), 200