298 lines
10 KiB
Python
298 lines
10 KiB
Python
import json
|
|
from flask import Blueprint, request, jsonify
|
|
|
|
from database import security_service
|
|
from services import UserService, SessionService, OrganizationService, RoleService
|
|
from utils import Perm, PermOperation
|
|
from utils.comms_encryption import encrypt_response
|
|
from utils.exceptions import SessionException
|
|
|
|
role_bp = Blueprint("role", __name__)
|
|
|
|
|
|
@role_bp.route("/create", methods=["POST"])
|
|
def role_create():
|
|
if request.headers.get("Content-Type") != "application/octet-stream":
|
|
return jsonify({"error": "Invalid request"}), 400
|
|
|
|
session_token = request.headers.get("Authorization")
|
|
if not session_token:
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
try:
|
|
session, data = SessionService.validate_session(session_token, data=request.data, required_perms=[Perm.ROLE_NEW])
|
|
except SessionException as e:
|
|
return jsonify({"error": e.message}), e.code
|
|
|
|
if "role" not in data:
|
|
return jsonify({"error": "Missing required fields"}), 400
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
try:
|
|
role = RoleService.create_role(org, data["role"], [])
|
|
except ValueError as e:
|
|
return jsonify({"error": str(e)}), 400
|
|
|
|
return encrypt_response(role, 201, security_service.get_key(session))
|
|
|
|
|
|
@role_bp.route("/<string:role>/list/users", methods=["GET"])
|
|
def role_list_users(role):
|
|
session_token = request.headers.get("Authorization")
|
|
if not session_token:
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
session = SessionService.validate_session(session_token)
|
|
if not session:
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
users = RoleService.get_users_in_role(org, role)
|
|
if isinstance(users, tuple):
|
|
return users
|
|
|
|
return encrypt_response(users, 200, security_service.get_key(session))
|
|
|
|
|
|
@role_bp.route("/<string:role>/list/perms", methods=["GET"])
|
|
def role_list_perms(role):
|
|
session_token = request.headers.get("Authorization")
|
|
if not session_token:
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
session = SessionService.validate_session(session_token)
|
|
if not session:
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
perms = RoleService.get_perms_for_role(org, role, return_str=True)
|
|
if isinstance(perms, tuple):
|
|
return perms
|
|
|
|
return encrypt_response(perms, 200, security_service.get_key(session))
|
|
|
|
|
|
@role_bp.route("/<string:role>/suspend", methods=["POST"])
|
|
def role_suspend(role):
|
|
session_token = request.headers.get("Authorization")
|
|
if not session_token:
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
session = SessionService.validate_session(session_token, required_perms=[Perm.ROLE_DOWN])
|
|
if not session:
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
status = RoleService.change_role_status(org, role, "suspended")
|
|
if isinstance(status, tuple):
|
|
return status
|
|
|
|
return jsonify({"message": "Role suspended"}), 200
|
|
|
|
|
|
@role_bp.route("/<string:role>/activate", methods=["POST"])
|
|
def role_activate(role):
|
|
session_token = request.headers.get("Authorization")
|
|
if not session_token:
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
session = SessionService.validate_session(session_token, required_perms=[Perm.ROLE_UP])
|
|
if not session:
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
status = RoleService.change_role_status(org, role, "active")
|
|
if isinstance(status, tuple):
|
|
return status
|
|
|
|
return jsonify({"message": "Role activated"}), 200
|
|
|
|
|
|
@role_bp.route("/<string:role>/user/add/<username>", methods=["POST"])
|
|
def role_user_add(role, username):
|
|
session_token = request.headers.get("Authorization")
|
|
if not session_token:
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
session = SessionService.validate_session(session_token, required_perms=[Perm.ROLE_MOD])
|
|
if not session:
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
user = UserService.get_user_by_username(username)
|
|
if not user:
|
|
return jsonify({"error": "User not found"}), 404
|
|
|
|
role = RoleService.add_user_to_role(role, org, user)
|
|
if isinstance(role, tuple):
|
|
return role
|
|
|
|
return jsonify({"message": "User added to role"}), 200
|
|
|
|
|
|
@role_bp.route("/<string:role>/user/remove/<username>", methods=["POST"])
|
|
def role_user_remove(role, username):
|
|
session_token = request.headers.get("Authorization")
|
|
if not session_token:
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
session = SessionService.validate_session(session_token, required_perms=[Perm.ROLE_MOD])
|
|
if not session:
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
user = UserService.get_user_by_username(username)
|
|
if not user:
|
|
return jsonify({"error": "User not found"}), 404
|
|
|
|
role = RoleService.remove_user_from_role(role, org, user)
|
|
if isinstance(role, tuple):
|
|
return role
|
|
|
|
return jsonify({"message": "User removed from role"}), 200
|
|
|
|
|
|
@role_bp.route("/<string:role>/perm/add/<perm>", methods=["POST"])
|
|
def role_perm_add(role, perm):
|
|
session_token = request.headers.get("Authorization")
|
|
if not session_token:
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
session = SessionService.validate_session(session_token, required_perms=[Perm.ROLE_MOD])
|
|
if not session:
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
role = RoleService.change_perm_on_role(org, role, Perm.from_str(perm), PermOperation.ADD)
|
|
if isinstance(role, tuple):
|
|
return role
|
|
|
|
return jsonify({"message": "Permission added to role"}), 200
|
|
|
|
|
|
@role_bp.route("/<string:role>/perm/remove/<perm>", methods=["POST"])
|
|
def role_perm_remove(role, perm):
|
|
session_token = request.headers.get("Authorization")
|
|
if not session_token:
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
session = SessionService.validate_session(session_token, required_perms=[Perm.ROLE_MOD])
|
|
if not session:
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
role = RoleService.change_perm_on_role(org, role, Perm.from_str(perm), PermOperation.REMOVE)
|
|
if isinstance(role, tuple):
|
|
return role
|
|
|
|
return jsonify({"message": "Permission removed from role"}), 200
|
|
|
|
|
|
@role_bp.route("/session/assume/<string:role>", methods=["POST"])
|
|
def role_session_assume(role):
|
|
session_token = request.headers.get("Authorization")
|
|
if not session_token:
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
session = SessionService.validate_session(session_token)
|
|
if not session:
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
|
|
if not RoleService.get_role(org, role):
|
|
return jsonify({"error": "Role not found"}), 404
|
|
|
|
session = SessionService.change_role(session, role, "add")
|
|
if isinstance(session, tuple):
|
|
return session
|
|
|
|
return encrypt_response(session.to_dict(), 200, security_service.get_key(session))
|
|
|
|
|
|
@role_bp.route("/session/drop/<string:role>", methods=["POST"])
|
|
def role_session_drop(role):
|
|
session_token = request.headers.get("Authorization")
|
|
if not session_token:
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
session = SessionService.validate_session(session_token)
|
|
if not session:
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
|
|
if not RoleService.get_role(org, role):
|
|
return jsonify({"error": "Role not found"}), 404
|
|
|
|
session = SessionService.change_role(session, role, "drop")
|
|
if isinstance(session, tuple):
|
|
return session
|
|
|
|
return encrypt_response(session.to_dict(), 200, security_service.get_key(session))
|
|
|
|
|
|
@role_bp.route("/session/list", methods=["GET"])
|
|
def role_session_list():
|
|
session_token = request.headers.get("Authorization")
|
|
if not session_token:
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
session = SessionService.validate_session(session_token)
|
|
if not session:
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
roles = SessionService.list_roles(session)
|
|
return encrypt_response(roles, 200, security_service.get_key(session))
|
|
|
|
@role_bp.route("/perm/<string:perm>/roles", methods=["GET"])
|
|
def perm_list_roles(perm):
|
|
session_token = request.headers.get("Authorization")
|
|
if not session_token:
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
session = SessionService.validate_session(session_token)
|
|
if not session:
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
try:
|
|
roles = RoleService.get_roles_for_perm(org, Perm.from_str(perm))
|
|
except ValueError:
|
|
return jsonify({"error": "Invalid permission"}), 400
|
|
except NameError:
|
|
return jsonify({"error": "Invalid permission"}), 400
|
|
if isinstance(roles, tuple):
|
|
return roles
|
|
|
|
return encrypt_response(roles, 200, security_service.get_key(session)) |