sio-2425/delivery2/server/routes/role.py

298 lines
10 KiB
Python

import json
from flask import Blueprint, request, jsonify
from database import security_service
from services import UserService, SessionService, OrganizationService, RoleService
from utils import Perm, PermOperation
from utils.comms_encryption import encrypt_response
from utils.exceptions import SessionException
role_bp = Blueprint("role", __name__)
@role_bp.route("/create", methods=["POST"])
def role_create():
if request.headers.get("Content-Type") != "application/octet-stream":
return jsonify({"error": "Invalid request"}), 400
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
try:
session, data = SessionService.validate_session(session_token, data=request.data, required_perms=[Perm.ROLE_NEW])
except SessionException as e:
return jsonify({"error": e.message}), e.code
if "role" not in data:
return jsonify({"error": "Missing required fields"}), 400
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
try:
role = RoleService.create_role(org, data["role"], [])
except ValueError as e:
return jsonify({"error": str(e)}), 400
return encrypt_response(role, 201, security_service.get_key(session))
@role_bp.route("/<string:role>/list/users", methods=["GET"])
def role_list_users(role):
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token)
if not session:
return jsonify({"error": "Not authenticated"}), 401
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
users = RoleService.get_users_in_role(org, role)
if isinstance(users, tuple):
return users
return encrypt_response(users, 200, security_service.get_key(session))
@role_bp.route("/<string:role>/list/perms", methods=["GET"])
def role_list_perms(role):
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token)
if not session:
return jsonify({"error": "Not authenticated"}), 401
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
perms = RoleService.get_perms_for_role(org, role, return_str=True)
if isinstance(perms, tuple):
return perms
return encrypt_response(perms, 200, security_service.get_key(session))
@role_bp.route("/<string:role>/suspend", methods=["POST"])
def role_suspend(role):
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, required_perms=[Perm.ROLE_DOWN])
if not session:
return jsonify({"error": "Not authenticated"}), 401
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
status = RoleService.change_role_status(org, role, "suspended")
if isinstance(status, tuple):
return status
return jsonify({"message": "Role suspended"}), 200
@role_bp.route("/<string:role>/activate", methods=["POST"])
def role_activate(role):
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, required_perms=[Perm.ROLE_UP])
if not session:
return jsonify({"error": "Not authenticated"}), 401
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
status = RoleService.change_role_status(org, role, "active")
if isinstance(status, tuple):
return status
return jsonify({"message": "Role activated"}), 200
@role_bp.route("/<string:role>/user/add/<username>", methods=["POST"])
def role_user_add(role, username):
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, required_perms=[Perm.ROLE_MOD])
if not session:
return jsonify({"error": "Not authenticated"}), 401
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
user = UserService.get_user_by_username(username)
if not user:
return jsonify({"error": "User not found"}), 404
role = RoleService.add_user_to_role(role, org, user)
if isinstance(role, tuple):
return role
return jsonify({"message": "User added to role"}), 200
@role_bp.route("/<string:role>/user/remove/<username>", methods=["POST"])
def role_user_remove(role, username):
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, required_perms=[Perm.ROLE_MOD])
if not session:
return jsonify({"error": "Not authenticated"}), 401
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
user = UserService.get_user_by_username(username)
if not user:
return jsonify({"error": "User not found"}), 404
role = RoleService.remove_user_from_role(role, org, user)
if isinstance(role, tuple):
return role
return jsonify({"message": "User removed from role"}), 200
@role_bp.route("/<string:role>/perm/add/<perm>", methods=["POST"])
def role_perm_add(role, perm):
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, required_perms=[Perm.ROLE_MOD])
if not session:
return jsonify({"error": "Not authenticated"}), 401
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
role = RoleService.change_perm_on_role(org, role, Perm.from_str(perm), PermOperation.ADD)
if isinstance(role, tuple):
return role
return jsonify({"message": "Permission added to role"}), 200
@role_bp.route("/<string:role>/perm/remove/<perm>", methods=["POST"])
def role_perm_remove(role, perm):
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, required_perms=[Perm.ROLE_MOD])
if not session:
return jsonify({"error": "Not authenticated"}), 401
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
role = RoleService.change_perm_on_role(org, role, Perm.from_str(perm), PermOperation.REMOVE)
if isinstance(role, tuple):
return role
return jsonify({"message": "Permission removed from role"}), 200
@role_bp.route("/session/assume/<string:role>", methods=["POST"])
def role_session_assume(role):
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token)
if not session:
return jsonify({"error": "Not authenticated"}), 401
org = OrganizationService.get_organization(session.org_id)
if not RoleService.get_role(org, role):
return jsonify({"error": "Role not found"}), 404
session = SessionService.change_role(session, role, "add")
if isinstance(session, tuple):
return session
return encrypt_response(session.to_dict(), 200, security_service.get_key(session))
@role_bp.route("/session/drop/<string:role>", methods=["POST"])
def role_session_drop(role):
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token)
if not session:
return jsonify({"error": "Not authenticated"}), 401
org = OrganizationService.get_organization(session.org_id)
if not RoleService.get_role(org, role):
return jsonify({"error": "Role not found"}), 404
session = SessionService.change_role(session, role, "drop")
if isinstance(session, tuple):
return session
return encrypt_response(session.to_dict(), 200, security_service.get_key(session))
@role_bp.route("/session/list", methods=["GET"])
def role_session_list():
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token)
if not session:
return jsonify({"error": "Not authenticated"}), 401
roles = SessionService.list_roles(session)
return encrypt_response(roles, 200, security_service.get_key(session))
@role_bp.route("/perm/<string:perm>/roles", methods=["GET"])
def perm_list_roles(perm):
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token)
if not session:
return jsonify({"error": "Not authenticated"}), 401
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
try:
roles = RoleService.get_roles_for_perm(org, Perm.from_str(perm))
except ValueError:
return jsonify({"error": "Invalid permission"}), 400
except NameError:
return jsonify({"error": "Invalid permission"}), 400
if isinstance(roles, tuple):
return roles
return encrypt_response(roles, 200, security_service.get_key(session))