2024-12-16 18:38:24 +00:00
|
|
|
import json
|
|
|
|
from flask import Blueprint, request, jsonify
|
|
|
|
from services import UserService, SessionService, OrganizationService, RoleService
|
|
|
|
from utils import Perm, PermOperation
|
|
|
|
|
|
|
|
role_bp = Blueprint("role", __name__)
|
|
|
|
|
|
|
|
|
|
|
|
@role_bp.route("/create", methods=["POST"])
|
|
|
|
def role_create():
|
|
|
|
data = request.json
|
|
|
|
if type(data) is str:
|
|
|
|
data = json.loads(data)
|
|
|
|
|
|
|
|
if "role" not in data or "perms" not in data:
|
|
|
|
return jsonify({"error": "Missing required fields"}), 400
|
|
|
|
|
|
|
|
session_token = request.headers.get("Authorization")
|
|
|
|
if not session_token:
|
|
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
|
|
|
|
session = SessionService.validate_session(session_token, [Perm.ROLE_NEW])
|
|
|
|
if not session:
|
|
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
|
|
if not org:
|
|
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
|
|
|
|
try:
|
|
|
|
role = RoleService.create_role(org, data["role"], data["perms"])
|
|
|
|
except ValueError as e:
|
|
|
|
return jsonify({"error": str(e)}), 400
|
|
|
|
|
|
|
|
return jsonify(role), 201
|
|
|
|
|
|
|
|
|
2024-12-16 19:07:16 +00:00
|
|
|
@role_bp.route("/<string:role>/list/users", methods=["GET"])
|
2024-12-16 18:38:24 +00:00
|
|
|
def role_list_users(role):
|
|
|
|
session_token = request.headers.get("Authorization")
|
|
|
|
if not session_token:
|
|
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
|
|
|
|
session = SessionService.validate_session(session_token)
|
|
|
|
if not session:
|
|
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
|
|
if not org:
|
|
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
|
|
|
|
try:
|
|
|
|
users = RoleService.get_users_in_role(org, role)
|
|
|
|
except ValueError as e:
|
|
|
|
return jsonify({"error": str(e)}), 400
|
|
|
|
return jsonify(users), 200
|
|
|
|
|
|
|
|
|
2024-12-16 19:07:16 +00:00
|
|
|
@role_bp.route("/<string:role>/list/perms", methods=["GET"])
|
2024-12-16 18:38:24 +00:00
|
|
|
def role_list_perms(role):
|
|
|
|
session_token = request.headers.get("Authorization")
|
|
|
|
if not session_token:
|
|
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
|
|
|
|
session = SessionService.validate_session(session_token)
|
|
|
|
if not session:
|
|
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
|
|
if not org:
|
|
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
|
|
|
|
try:
|
|
|
|
perms = RoleService.get_perms_for_role(org, role)
|
|
|
|
except ValueError as e:
|
|
|
|
return jsonify({"error": str(e)}), 400
|
|
|
|
return jsonify(perms), 200
|
|
|
|
|
|
|
|
|
2024-12-16 19:07:16 +00:00
|
|
|
@role_bp.route("/<string:role>/suspend", methods=["POST"])
|
2024-12-16 18:38:24 +00:00
|
|
|
def role_suspend(role):
|
|
|
|
data = request.json
|
|
|
|
if type(data) is str:
|
|
|
|
data = json.loads(data)
|
|
|
|
|
|
|
|
if "user" not in data:
|
|
|
|
return jsonify({"error": "Missing required fields"}), 400
|
|
|
|
|
|
|
|
session_token = request.headers.get("Authorization")
|
|
|
|
if not session_token:
|
|
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
|
|
|
|
session = SessionService.validate_session(session_token, [Perm.ROLE_DOWN])
|
|
|
|
if not session:
|
|
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
|
|
if not org:
|
|
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
|
|
|
|
try:
|
|
|
|
RoleService.change_role_status(org, role, "suspended")
|
|
|
|
except ValueError as e:
|
|
|
|
return jsonify({"error": str(e)}), 400
|
|
|
|
|
|
|
|
return jsonify({"message": "Role suspended"}), 200
|
|
|
|
|
|
|
|
|
2024-12-16 19:07:16 +00:00
|
|
|
@role_bp.route("/<string:role>/activate", methods=["POST"])
|
2024-12-16 18:38:24 +00:00
|
|
|
def role_activate(role):
|
|
|
|
data = request.json
|
|
|
|
if type(data) is str:
|
|
|
|
data = json.loads(data)
|
|
|
|
|
|
|
|
if "user" not in data:
|
|
|
|
return jsonify({"error": "Missing required fields"}), 400
|
|
|
|
|
|
|
|
session_token = request.headers.get("Authorization")
|
|
|
|
if not session_token:
|
|
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
|
|
|
|
session = SessionService.validate_session(session_token, [Perm.ROLE_UP])
|
|
|
|
if not session:
|
|
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
|
|
if not org:
|
|
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
|
|
|
|
try:
|
|
|
|
RoleService.change_role_status(org, role, "active")
|
|
|
|
except ValueError as e:
|
|
|
|
return jsonify({"error": str(e)}), 400
|
|
|
|
|
|
|
|
return jsonify({"message": "Role activated"}), 200
|
|
|
|
|
|
|
|
|
2024-12-16 19:07:16 +00:00
|
|
|
@role_bp.route("/<string:role>/user/add/<username>", methods=["POST"])
|
2024-12-16 18:38:24 +00:00
|
|
|
def role_user_add(role, username):
|
|
|
|
data = request.json
|
|
|
|
if type(data) is str:
|
|
|
|
data = json.loads(data)
|
|
|
|
|
|
|
|
if "user" not in data:
|
|
|
|
return jsonify({"error": "Missing required fields"}), 400
|
|
|
|
|
|
|
|
session_token = request.headers.get("Authorization")
|
|
|
|
if not session_token:
|
|
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
|
|
|
|
session = SessionService.validate_session(session_token, [Perm.ROLE_MOD])
|
|
|
|
if not session:
|
|
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
|
|
if not org:
|
|
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
|
|
|
|
user = UserService.get_user_by_username(username)
|
|
|
|
if not user:
|
|
|
|
return jsonify({"error": "User not found"}), 404
|
|
|
|
|
|
|
|
try:
|
|
|
|
RoleService.add_user_to_role(role, org, user)
|
|
|
|
except ValueError as e:
|
|
|
|
return jsonify({"error": str(e)}), 400
|
|
|
|
|
|
|
|
return jsonify({"message": "User added to role"}), 200
|
|
|
|
|
|
|
|
|
2024-12-16 19:07:16 +00:00
|
|
|
@role_bp.route("/<string:role>/user/remove/<username>", methods=["POST"])
|
2024-12-16 18:38:24 +00:00
|
|
|
def role_user_remove(role, username):
|
|
|
|
data = request.json
|
|
|
|
if type(data) is str:
|
|
|
|
data = json.loads(data)
|
|
|
|
|
|
|
|
if "user" not in data:
|
|
|
|
return jsonify({"error": "Missing required fields"}), 400
|
|
|
|
|
|
|
|
session_token = request.headers.get("Authorization")
|
|
|
|
if not session_token:
|
|
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
|
|
|
|
session = SessionService.validate_session(session_token, [Perm.ROLE_MOD])
|
|
|
|
if not session:
|
|
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
|
|
if not org:
|
|
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
|
|
|
|
user = UserService.get_user_by_username(username)
|
|
|
|
if not user:
|
|
|
|
return jsonify({"error": "User not found"}), 404
|
|
|
|
|
|
|
|
try:
|
|
|
|
RoleService.remove_user_from_role(role, org, user)
|
|
|
|
except ValueError as e:
|
|
|
|
return jsonify({"error": str(e)}), 400
|
|
|
|
|
|
|
|
return jsonify({"message": "User removed from role"}), 200
|
|
|
|
|
|
|
|
|
2024-12-16 19:07:16 +00:00
|
|
|
@role_bp.route("/<string:role>/perm/add/<perm>", methods=["POST"])
|
2024-12-16 18:38:24 +00:00
|
|
|
def role_perm_add(role, perm):
|
|
|
|
data = request.json
|
|
|
|
if type(data) is str:
|
|
|
|
data = json.loads(data)
|
|
|
|
|
|
|
|
if "user" not in data:
|
|
|
|
return jsonify({"error": "Missing required fields"}), 400
|
|
|
|
|
|
|
|
session_token = request.headers.get("Authorization")
|
|
|
|
if not session_token:
|
|
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
|
|
|
|
session = SessionService.validate_session(session_token, [Perm.ROLE_MOD])
|
|
|
|
if not session:
|
|
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
|
|
if not org:
|
|
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
|
|
|
|
try:
|
|
|
|
RoleService.change_perm_on_role(org, role, Perm.from_str(perm), PermOperation.ADD)
|
|
|
|
except ValueError as e:
|
|
|
|
return jsonify({"error": str(e)}), 400
|
|
|
|
|
|
|
|
return jsonify({"message": "Permission added to role"}), 200
|
|
|
|
|
|
|
|
|
2024-12-16 19:07:16 +00:00
|
|
|
@role_bp.route("/<string:role>/perm/remove/<perm>", methods=["POST"])
|
2024-12-16 18:38:24 +00:00
|
|
|
def role_perm_remove(role, perm):
|
|
|
|
data = request.json
|
|
|
|
if type(data) is str:
|
|
|
|
data = json.loads(data)
|
|
|
|
|
|
|
|
if "user" not in data:
|
|
|
|
return jsonify({"error": "Missing required fields"}), 400
|
|
|
|
|
|
|
|
session_token = request.headers.get("Authorization")
|
|
|
|
if not session_token:
|
|
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
|
|
|
|
session = SessionService.validate_session(session_token, [Perm.ROLE_MOD])
|
|
|
|
if not session:
|
|
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
|
|
if not org:
|
|
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
|
|
|
|
try:
|
|
|
|
RoleService.change_perm_on_role(org, role, Perm.from_str(perm), PermOperation.REMOVE)
|
|
|
|
except ValueError as e:
|
|
|
|
return jsonify({"error": str(e)}), 400
|
|
|
|
|
|
|
|
return jsonify({"message": "Permission removed from role"}), 200
|
|
|
|
|
|
|
|
|
2024-12-16 19:07:16 +00:00
|
|
|
@role_bp.route("/session/assume/<string:role>", methods=["POST"])
|
2024-12-16 18:38:24 +00:00
|
|
|
def role_session_assume(role):
|
|
|
|
session_token = request.headers.get("Authorization")
|
|
|
|
if not session_token:
|
|
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
|
|
|
|
session = SessionService.validate_session(session_token)
|
|
|
|
if not session:
|
|
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
|
|
|
|
if not RoleService.get_role(session.org_id, role):
|
|
|
|
return jsonify({"error": "Role not found"}), 404
|
|
|
|
|
|
|
|
try:
|
|
|
|
SessionService.change_role(session, role, "add")
|
|
|
|
except ValueError as e:
|
|
|
|
return jsonify({"error": str(e)}), 400
|
|
|
|
|
|
|
|
return jsonify(session.to_dict()), 200
|
|
|
|
|
|
|
|
|
2024-12-16 19:07:16 +00:00
|
|
|
@role_bp.route("/session/drop/<string:role>", methods=["POST"])
|
2024-12-16 18:38:24 +00:00
|
|
|
def role_session_drop(role):
|
|
|
|
session_token = request.headers.get("Authorization")
|
|
|
|
if not session_token:
|
|
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
|
|
|
|
session = SessionService.validate_session(session_token)
|
|
|
|
if not session:
|
|
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
|
|
|
|
if not RoleService.get_role(session.org_id, role):
|
|
|
|
return jsonify({"error": "Role not found"}), 404
|
|
|
|
|
|
|
|
try:
|
|
|
|
SessionService.change_role(session, role, "drop")
|
|
|
|
except ValueError as e:
|
|
|
|
return jsonify({"error": str(e)}), 400
|
|
|
|
|
|
|
|
return jsonify(session.to_dict()), 200
|
|
|
|
|
|
|
|
|
|
|
|
@role_bp.route("/session/list", methods=["GET"])
|
|
|
|
def role_session_list():
|
|
|
|
session_token = request.headers.get("Authorization")
|
|
|
|
if not session_token:
|
|
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
|
|
|
|
session = SessionService.validate_session(session_token)
|
|
|
|
if not session:
|
|
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
|
|
|
|
roles = SessionService.list_roles(session)
|
|
|
|
return jsonify(roles), 200
|
|
|
|
|
2024-12-16 19:07:16 +00:00
|
|
|
@role_bp.route("/perm/<string:perm>/roles", methods=["GET"])
|
2024-12-16 18:38:24 +00:00
|
|
|
def perm_list_roles(perm):
|
|
|
|
session_token = request.headers.get("Authorization")
|
|
|
|
if not session_token:
|
|
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
|
|
|
|
session = SessionService.validate_session(session_token)
|
|
|
|
if not session:
|
|
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
|
|
if not org:
|
|
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
|
|
|
|
try:
|
|
|
|
roles = RoleService.get_roles_for_perm(org, Perm(perm))
|
|
|
|
except ValueError as e:
|
|
|
|
return jsonify({"error": str(e)}), 400
|
|
|
|
return jsonify(roles), 200
|