sio-2425/delivery2/server/routes/role.py

324 lines
10 KiB
Python

import json
from flask import Blueprint, request, jsonify
from services import UserService, SessionService, OrganizationService, RoleService
from utils import Perm, PermOperation
role_bp = Blueprint("role", __name__)
@role_bp.route("/create", methods=["POST"])
def role_create():
data = request.json
if type(data) is str:
data = json.loads(data)
if "role" not in data or "perms" not in data:
return jsonify({"error": "Missing required fields"}), 400
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, [Perm.ROLE_NEW])
if not session:
return jsonify({"error": "Not authenticated"}), 401
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
try:
role = RoleService.create_role(org, data["role"], data["perms"])
except ValueError as e:
return jsonify({"error": str(e)}), 400
return jsonify(role), 201
@role_bp.route("/<string:role>/list/users", methods=["GET"])
def role_list_users(role):
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token)
if not session:
return jsonify({"error": "Not authenticated"}), 401
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
users = RoleService.get_users_in_role(org, role)
if isinstance(users, tuple):
return users
return jsonify(users), 200
@role_bp.route("/<string:role>/list/perms", methods=["GET"])
def role_list_perms(role):
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token)
if not session:
return jsonify({"error": "Not authenticated"}), 401
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
perms = RoleService.get_perms_for_role(org, role, return_str=True)
if isinstance(perms, tuple):
return perms
return jsonify(perms), 200
@role_bp.route("/<string:role>/suspend", methods=["POST"])
def role_suspend(role):
data = request.json
if type(data) is str:
data = json.loads(data)
if "user" not in data:
return jsonify({"error": "Missing required fields"}), 400
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, [Perm.ROLE_DOWN])
if not session:
return jsonify({"error": "Not authenticated"}), 401
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
status = RoleService.change_role_status(org, role, "suspended")
if isinstance(status, tuple):
return status
return jsonify({"message": "Role suspended"}), 200
@role_bp.route("/<string:role>/activate", methods=["POST"])
def role_activate(role):
data = request.json
if type(data) is str:
data = json.loads(data)
if "user" not in data:
return jsonify({"error": "Missing required fields"}), 400
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, [Perm.ROLE_UP])
if not session:
return jsonify({"error": "Not authenticated"}), 401
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
status = RoleService.change_role_status(org, role, "active")
if isinstance(status, tuple):
return status
return jsonify({"message": "Role activated"}), 200
@role_bp.route("/<string:role>/user/add/<username>", methods=["POST"])
def role_user_add(role, username):
data = request.json
if type(data) is str:
data = json.loads(data)
if "user" not in data:
return jsonify({"error": "Missing required fields"}), 400
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, [Perm.ROLE_MOD])
if not session:
return jsonify({"error": "Not authenticated"}), 401
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
user = UserService.get_user_by_username(username)
if not user:
return jsonify({"error": "User not found"}), 404
role = RoleService.add_user_to_role(role, org, user)
if isinstance(role, tuple):
return role
return jsonify({"message": "User added to role"}), 200
@role_bp.route("/<string:role>/user/remove/<username>", methods=["POST"])
def role_user_remove(role, username):
data = request.json
if type(data) is str:
data = json.loads(data)
if "user" not in data:
return jsonify({"error": "Missing required fields"}), 400
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, [Perm.ROLE_MOD])
if not session:
return jsonify({"error": "Not authenticated"}), 401
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
user = UserService.get_user_by_username(username)
if not user:
return jsonify({"error": "User not found"}), 404
role = RoleService.remove_user_from_role(role, org, user)
if isinstance(role, tuple):
return role
return jsonify({"message": "User removed from role"}), 200
@role_bp.route("/<string:role>/perm/add/<perm>", methods=["POST"])
def role_perm_add(role, perm):
data = request.json
if type(data) is str:
data = json.loads(data)
if "user" not in data:
return jsonify({"error": "Missing required fields"}), 400
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, [Perm.ROLE_MOD])
if not session:
return jsonify({"error": "Not authenticated"}), 401
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
role = RoleService.change_perm_on_role(org, role, Perm.from_str(perm), PermOperation.ADD)
if isinstance(role, tuple):
return role
return jsonify({"message": "Permission added to role"}), 200
@role_bp.route("/<string:role>/perm/remove/<perm>", methods=["POST"])
def role_perm_remove(role, perm):
data = request.json
if type(data) is str:
data = json.loads(data)
if "user" not in data:
return jsonify({"error": "Missing required fields"}), 400
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, [Perm.ROLE_MOD])
if not session:
return jsonify({"error": "Not authenticated"}), 401
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
role = RoleService.change_perm_on_role(org, role, Perm.from_str(perm), PermOperation.REMOVE)
if isinstance(role, tuple):
return role
return jsonify({"message": "Permission removed from role"}), 200
@role_bp.route("/session/assume/<string:role>", methods=["POST"])
def role_session_assume(role):
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token)
if not session:
return jsonify({"error": "Not authenticated"}), 401
if not RoleService.get_role(session.org_id, role):
return jsonify({"error": "Role not found"}), 404
session = SessionService.change_role(session, role, "add")
if isinstance(session, tuple):
return session
return jsonify(session.to_dict()), 200
@role_bp.route("/session/drop/<string:role>", methods=["POST"])
def role_session_drop(role):
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token)
if not session:
return jsonify({"error": "Not authenticated"}), 401
if not RoleService.get_role(session.org_id, role):
return jsonify({"error": "Role not found"}), 404
session = SessionService.change_role(session, role, "drop")
if isinstance(session, tuple):
return session
return jsonify(session.to_dict()), 200
@role_bp.route("/session/list", methods=["GET"])
def role_session_list():
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token)
if not session:
return jsonify({"error": "Not authenticated"}), 401
roles = SessionService.list_roles(session)
return jsonify(roles), 200
@role_bp.route("/perm/<string:perm>/roles", methods=["GET"])
def perm_list_roles(perm):
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token)
if not session:
return jsonify({"error": "Not authenticated"}), 401
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
roles = RoleService.get_roles_for_perm(org, Perm(perm))
if isinstance(roles, tuple):
return roles
return jsonify(roles), 200