New endpoints for delivery 2 commands
Signed-off-by: Tiago Garcia <tiago.rgarcia@ua.pt>
This commit is contained in:
parent
7725c7d3f7
commit
e228e4f340
|
@ -74,6 +74,9 @@ Mainly, it's divided into 3 categories:
|
|||
- `Authorization: token`
|
||||
- Optional payload parameters:
|
||||
- `username`: Filter by username.
|
||||
- `GET /user/<username>/roles`: Returns a list of roles for a user.
|
||||
- Required headers:
|
||||
- `Authorization: token`
|
||||
- `GET /file/list`: Returns a list of files.
|
||||
- Required headers:
|
||||
- `Authorization: token`
|
||||
|
@ -85,6 +88,28 @@ Mainly, it's divided into 3 categories:
|
|||
- `POST /user/logout`: Logs out a user.
|
||||
- Required headers:
|
||||
- `Authorization: token`
|
||||
- `POST /role/session/assume`: Assumes a role in the session.
|
||||
- Required headers:
|
||||
- `Authorization: token`
|
||||
- Required payload fields:
|
||||
- `role`: Role name.
|
||||
- `POST /role/session/drop`: Drops a role from the session.
|
||||
- Required headers:
|
||||
- `Authorization: token`
|
||||
- Required payload fields:
|
||||
- `role`: Role name.
|
||||
- `GET /role/session/list`: Lists the roles for the session.
|
||||
- Required headers:
|
||||
- `Authorization: token`
|
||||
- `GET /role/<role>/list/users`: Lists the users for a role.
|
||||
- Required headers:
|
||||
- `Authorization: token`
|
||||
- `GET /role/<role>/list/perms`: Lists the permissions for a role.
|
||||
- Required headers:
|
||||
- `Authorization: token`
|
||||
- `GET /role/perm/<perm>/roles`: Lists the roles with a permission.
|
||||
- Required headers:
|
||||
- `Authorization: token`
|
||||
|
||||
#### Authorized Endpoints
|
||||
|
||||
|
@ -124,3 +149,26 @@ Mainly, it's divided into 3 categories:
|
|||
- `POST /file/delete/<document_handle>`: Deletes a file.
|
||||
- Required headers:
|
||||
- `Authorization: token
|
||||
- `POST /role/create`: Creates a new role.
|
||||
- Required headers:
|
||||
- `Authorization: token`
|
||||
- Required payload fields:
|
||||
- `role`: Role name.
|
||||
- `POST /role/<role>/suspend`: Suspends a role.
|
||||
- Required headers:
|
||||
- `Authorization: token`
|
||||
- `POST /role/<role>/activate`: Activates a role.
|
||||
- Required headers:
|
||||
- `Authorization: token`
|
||||
- `POST /role/<role>/user/add/<username>`: Adds a user to a role.
|
||||
- Required headers:
|
||||
- `Authorization: token`
|
||||
- `POST /role/<role>/user/remove/<username>`: Removes a user from a role.
|
||||
- Required headers:
|
||||
- `Authorization: token`
|
||||
- `POST /role/<role>/perm/add/<perm>`: Adds a permission to a role.
|
||||
- Required headers:
|
||||
- `Authorization: token`
|
||||
- `POST /role/<role>/perm/remove/<perm>`: Removes a permission from a role.
|
||||
- Required headers:
|
||||
- `Authorization: token`
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import os
|
||||
import sqlalchemy.exc
|
||||
from flask import Flask, request, jsonify
|
||||
from routes import org_bp, user_bp, file_bp
|
||||
from routes import org_bp, user_bp, file_bp, role_bp
|
||||
from database import db_connection, db
|
||||
from models import Organization, User, File, Session
|
||||
|
||||
|
@ -22,6 +22,7 @@ with app.app_context():
|
|||
app.register_blueprint(org_bp, url_prefix="/org")
|
||||
app.register_blueprint(user_bp, url_prefix="/user")
|
||||
app.register_blueprint(file_bp, url_prefix="/file")
|
||||
app.register_blueprint(role_bp, url_prefix="/role")
|
||||
|
||||
|
||||
@app.route("/", methods=["GET"])
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
from .org import org_bp
|
||||
from .user import user_bp
|
||||
from .file import file_bp
|
||||
from .role import role_bp
|
|
@ -0,0 +1,335 @@
|
|||
import json
|
||||
from flask import Blueprint, request, jsonify
|
||||
from services import UserService, SessionService, OrganizationService, RoleService
|
||||
from utils import Perm, PermOperation
|
||||
|
||||
role_bp = Blueprint("role", __name__)
|
||||
|
||||
|
||||
@role_bp.route("/create", methods=["POST"])
|
||||
def role_create():
|
||||
data = request.json
|
||||
if type(data) is str:
|
||||
data = json.loads(data)
|
||||
|
||||
if "role" not in data or "perms" not in data:
|
||||
return jsonify({"error": "Missing required fields"}), 400
|
||||
|
||||
session_token = request.headers.get("Authorization")
|
||||
if not session_token:
|
||||
return jsonify({"error": "No session token"}), 400
|
||||
|
||||
session = SessionService.validate_session(session_token, [Perm.ROLE_NEW])
|
||||
if not session:
|
||||
return jsonify({"error": "Not authenticated"}), 401
|
||||
|
||||
org = OrganizationService.get_organization(session.org_id)
|
||||
if not org:
|
||||
return jsonify({"error": "Organization not found"}), 404
|
||||
|
||||
try:
|
||||
role = RoleService.create_role(org, data["role"], data["perms"])
|
||||
except ValueError as e:
|
||||
return jsonify({"error": str(e)}), 400
|
||||
|
||||
return jsonify(role), 201
|
||||
|
||||
|
||||
@role_bp.route("/<role>/list/users", methods=["GET"])
|
||||
def role_list_users(role):
|
||||
session_token = request.headers.get("Authorization")
|
||||
if not session_token:
|
||||
return jsonify({"error": "No session token"}), 400
|
||||
|
||||
session = SessionService.validate_session(session_token)
|
||||
if not session:
|
||||
return jsonify({"error": "Not authenticated"}), 401
|
||||
|
||||
org = OrganizationService.get_organization(session.org_id)
|
||||
if not org:
|
||||
return jsonify({"error": "Organization not found"}), 404
|
||||
|
||||
try:
|
||||
users = RoleService.get_users_in_role(org, role)
|
||||
except ValueError as e:
|
||||
return jsonify({"error": str(e)}), 400
|
||||
return jsonify(users), 200
|
||||
|
||||
|
||||
@role_bp.route("/<role>/list/perms", methods=["GET"])
|
||||
def role_list_perms(role):
|
||||
session_token = request.headers.get("Authorization")
|
||||
if not session_token:
|
||||
return jsonify({"error": "No session token"}), 400
|
||||
|
||||
session = SessionService.validate_session(session_token)
|
||||
if not session:
|
||||
return jsonify({"error": "Not authenticated"}), 401
|
||||
|
||||
org = OrganizationService.get_organization(session.org_id)
|
||||
if not org:
|
||||
return jsonify({"error": "Organization not found"}), 404
|
||||
|
||||
try:
|
||||
perms = RoleService.get_perms_for_role(org, role)
|
||||
except ValueError as e:
|
||||
return jsonify({"error": str(e)}), 400
|
||||
return jsonify(perms), 200
|
||||
|
||||
|
||||
@role_bp.route("/<role>/suspend", methods=["POST"])
|
||||
def role_suspend(role):
|
||||
data = request.json
|
||||
if type(data) is str:
|
||||
data = json.loads(data)
|
||||
|
||||
if "user" not in data:
|
||||
return jsonify({"error": "Missing required fields"}), 400
|
||||
|
||||
session_token = request.headers.get("Authorization")
|
||||
if not session_token:
|
||||
return jsonify({"error": "No session token"}), 400
|
||||
|
||||
session = SessionService.validate_session(session_token, [Perm.ROLE_DOWN])
|
||||
if not session:
|
||||
return jsonify({"error": "Not authenticated"}), 401
|
||||
|
||||
org = OrganizationService.get_organization(session.org_id)
|
||||
if not org:
|
||||
return jsonify({"error": "Organization not found"}), 404
|
||||
|
||||
try:
|
||||
RoleService.change_role_status(org, role, "suspended")
|
||||
except ValueError as e:
|
||||
return jsonify({"error": str(e)}), 400
|
||||
|
||||
return jsonify({"message": "Role suspended"}), 200
|
||||
|
||||
|
||||
@role_bp.route("/<role>/activate", methods=["POST"])
|
||||
def role_activate(role):
|
||||
data = request.json
|
||||
if type(data) is str:
|
||||
data = json.loads(data)
|
||||
|
||||
if "user" not in data:
|
||||
return jsonify({"error": "Missing required fields"}), 400
|
||||
|
||||
session_token = request.headers.get("Authorization")
|
||||
if not session_token:
|
||||
return jsonify({"error": "No session token"}), 400
|
||||
|
||||
session = SessionService.validate_session(session_token, [Perm.ROLE_UP])
|
||||
if not session:
|
||||
return jsonify({"error": "Not authenticated"}), 401
|
||||
|
||||
org = OrganizationService.get_organization(session.org_id)
|
||||
if not org:
|
||||
return jsonify({"error": "Organization not found"}), 404
|
||||
|
||||
try:
|
||||
RoleService.change_role_status(org, role, "active")
|
||||
except ValueError as e:
|
||||
return jsonify({"error": str(e)}), 400
|
||||
|
||||
return jsonify({"message": "Role activated"}), 200
|
||||
|
||||
|
||||
@role_bp.route("/<role>/user/add/<username>", methods=["POST"])
|
||||
def role_user_add(role, username):
|
||||
data = request.json
|
||||
if type(data) is str:
|
||||
data = json.loads(data)
|
||||
|
||||
if "user" not in data:
|
||||
return jsonify({"error": "Missing required fields"}), 400
|
||||
|
||||
session_token = request.headers.get("Authorization")
|
||||
if not session_token:
|
||||
return jsonify({"error": "No session token"}), 400
|
||||
|
||||
session = SessionService.validate_session(session_token, [Perm.ROLE_MOD])
|
||||
if not session:
|
||||
return jsonify({"error": "Not authenticated"}), 401
|
||||
|
||||
org = OrganizationService.get_organization(session.org_id)
|
||||
if not org:
|
||||
return jsonify({"error": "Organization not found"}), 404
|
||||
|
||||
user = UserService.get_user_by_username(username)
|
||||
if not user:
|
||||
return jsonify({"error": "User not found"}), 404
|
||||
|
||||
try:
|
||||
RoleService.add_user_to_role(role, org, user)
|
||||
except ValueError as e:
|
||||
return jsonify({"error": str(e)}), 400
|
||||
|
||||
return jsonify({"message": "User added to role"}), 200
|
||||
|
||||
|
||||
@role_bp.route("/<role>/user/remove/<username>", methods=["POST"])
|
||||
def role_user_remove(role, username):
|
||||
data = request.json
|
||||
if type(data) is str:
|
||||
data = json.loads(data)
|
||||
|
||||
if "user" not in data:
|
||||
return jsonify({"error": "Missing required fields"}), 400
|
||||
|
||||
session_token = request.headers.get("Authorization")
|
||||
if not session_token:
|
||||
return jsonify({"error": "No session token"}), 400
|
||||
|
||||
session = SessionService.validate_session(session_token, [Perm.ROLE_MOD])
|
||||
if not session:
|
||||
return jsonify({"error": "Not authenticated"}), 401
|
||||
|
||||
org = OrganizationService.get_organization(session.org_id)
|
||||
if not org:
|
||||
return jsonify({"error": "Organization not found"}), 404
|
||||
|
||||
user = UserService.get_user_by_username(username)
|
||||
if not user:
|
||||
return jsonify({"error": "User not found"}), 404
|
||||
|
||||
try:
|
||||
RoleService.remove_user_from_role(role, org, user)
|
||||
except ValueError as e:
|
||||
return jsonify({"error": str(e)}), 400
|
||||
|
||||
return jsonify({"message": "User removed from role"}), 200
|
||||
|
||||
|
||||
@role_bp.route("/<role>/perm/add/<perm>", methods=["POST"])
|
||||
def role_perm_add(role, perm):
|
||||
data = request.json
|
||||
if type(data) is str:
|
||||
data = json.loads(data)
|
||||
|
||||
if "user" not in data:
|
||||
return jsonify({"error": "Missing required fields"}), 400
|
||||
|
||||
session_token = request.headers.get("Authorization")
|
||||
if not session_token:
|
||||
return jsonify({"error": "No session token"}), 400
|
||||
|
||||
session = SessionService.validate_session(session_token, [Perm.ROLE_MOD])
|
||||
if not session:
|
||||
return jsonify({"error": "Not authenticated"}), 401
|
||||
|
||||
org = OrganizationService.get_organization(session.org_id)
|
||||
if not org:
|
||||
return jsonify({"error": "Organization not found"}), 404
|
||||
|
||||
try:
|
||||
RoleService.change_perm_on_role(org, role, Perm.from_str(perm), PermOperation.ADD)
|
||||
except ValueError as e:
|
||||
return jsonify({"error": str(e)}), 400
|
||||
|
||||
return jsonify({"message": "Permission added to role"}), 200
|
||||
|
||||
|
||||
@role_bp.route("/<role>/perm/remove/<perm>", methods=["POST"])
|
||||
def role_perm_remove(role, perm):
|
||||
data = request.json
|
||||
if type(data) is str:
|
||||
data = json.loads(data)
|
||||
|
||||
if "user" not in data:
|
||||
return jsonify({"error": "Missing required fields"}), 400
|
||||
|
||||
session_token = request.headers.get("Authorization")
|
||||
if not session_token:
|
||||
return jsonify({"error": "No session token"}), 400
|
||||
|
||||
session = SessionService.validate_session(session_token, [Perm.ROLE_MOD])
|
||||
if not session:
|
||||
return jsonify({"error": "Not authenticated"}), 401
|
||||
|
||||
org = OrganizationService.get_organization(session.org_id)
|
||||
if not org:
|
||||
return jsonify({"error": "Organization not found"}), 404
|
||||
|
||||
try:
|
||||
RoleService.change_perm_on_role(org, role, Perm.from_str(perm), PermOperation.REMOVE)
|
||||
except ValueError as e:
|
||||
return jsonify({"error": str(e)}), 400
|
||||
|
||||
return jsonify({"message": "Permission removed from role"}), 200
|
||||
|
||||
|
||||
@role_bp.route("/session/assume/<role>", methods=["POST"])
|
||||
def role_session_assume(role):
|
||||
session_token = request.headers.get("Authorization")
|
||||
if not session_token:
|
||||
return jsonify({"error": "No session token"}), 400
|
||||
|
||||
session = SessionService.validate_session(session_token)
|
||||
if not session:
|
||||
return jsonify({"error": "Not authenticated"}), 401
|
||||
|
||||
if not RoleService.get_role(session.org_id, role):
|
||||
return jsonify({"error": "Role not found"}), 404
|
||||
|
||||
try:
|
||||
SessionService.change_role(session, role, "add")
|
||||
except ValueError as e:
|
||||
return jsonify({"error": str(e)}), 400
|
||||
|
||||
return jsonify(session.to_dict()), 200
|
||||
|
||||
|
||||
@role_bp.route("/session/drop/<role>", methods=["POST"])
|
||||
def role_session_drop(role):
|
||||
session_token = request.headers.get("Authorization")
|
||||
if not session_token:
|
||||
return jsonify({"error": "No session token"}), 400
|
||||
|
||||
session = SessionService.validate_session(session_token)
|
||||
if not session:
|
||||
return jsonify({"error": "Not authenticated"}), 401
|
||||
|
||||
if not RoleService.get_role(session.org_id, role):
|
||||
return jsonify({"error": "Role not found"}), 404
|
||||
|
||||
try:
|
||||
SessionService.change_role(session, role, "drop")
|
||||
except ValueError as e:
|
||||
return jsonify({"error": str(e)}), 400
|
||||
|
||||
return jsonify(session.to_dict()), 200
|
||||
|
||||
|
||||
@role_bp.route("/session/list", methods=["GET"])
|
||||
def role_session_list():
|
||||
session_token = request.headers.get("Authorization")
|
||||
if not session_token:
|
||||
return jsonify({"error": "No session token"}), 400
|
||||
|
||||
session = SessionService.validate_session(session_token)
|
||||
if not session:
|
||||
return jsonify({"error": "Not authenticated"}), 401
|
||||
|
||||
roles = SessionService.list_roles(session)
|
||||
return jsonify(roles), 200
|
||||
|
||||
@role_bp.route("/perm/<perm>/roles", methods=["GET"])
|
||||
def perm_list_roles(perm):
|
||||
session_token = request.headers.get("Authorization")
|
||||
if not session_token:
|
||||
return jsonify({"error": "No session token"}), 400
|
||||
|
||||
session = SessionService.validate_session(session_token)
|
||||
if not session:
|
||||
return jsonify({"error": "Not authenticated"}), 401
|
||||
|
||||
org = OrganizationService.get_organization(session.org_id)
|
||||
if not org:
|
||||
return jsonify({"error": "Organization not found"}), 404
|
||||
|
||||
try:
|
||||
roles = RoleService.get_roles_for_perm(org, Perm(perm))
|
||||
except ValueError as e:
|
||||
return jsonify({"error": str(e)}), 400
|
||||
return jsonify(roles), 200
|
|
@ -1,7 +1,6 @@
|
|||
import json
|
||||
import utils
|
||||
from flask import Blueprint, request, jsonify
|
||||
from services import UserService, SessionService, OrganizationService
|
||||
from services import UserService, SessionService, OrganizationService, RoleService
|
||||
from utils import Perm
|
||||
|
||||
user_bp = Blueprint("user", __name__)
|
||||
|
@ -103,6 +102,28 @@ def user_create():
|
|||
return jsonify(user.to_dict()), 201
|
||||
|
||||
|
||||
@user_bp.route("/<string:username>/roles", methods=["GET"])
|
||||
def user_roles(username):
|
||||
session_token = request.headers.get("Authorization")
|
||||
if not session_token:
|
||||
return jsonify({"error": "No session token"}), 400
|
||||
|
||||
session = SessionService.validate_session(session_token)
|
||||
if isinstance(session, tuple):
|
||||
return session
|
||||
|
||||
org = OrganizationService.get_organization(session.org_id)
|
||||
if not org:
|
||||
return jsonify({"error": "Organization not found"}), 404
|
||||
|
||||
user = UserService.get_user_by_username(username)
|
||||
if not user:
|
||||
return jsonify({"error": "User not found"}), 404
|
||||
|
||||
roles = RoleService.get_roles_for_user(user, org)
|
||||
return jsonify(roles), 200
|
||||
|
||||
|
||||
@user_bp.route("/<string:username>/suspend", methods=["POST"])
|
||||
def user_suspend(username):
|
||||
session_token = request.headers.get("Authorization")
|
||||
|
|
|
@ -54,7 +54,7 @@ class OrganizationService:
|
|||
db.refresh(organization)
|
||||
|
||||
UserService().add_org_to_user(user, organization)
|
||||
RoleService().add_role_to_user(user, organization, "manager")
|
||||
RoleService().add_user_to_role(user, organization, "manager")
|
||||
UserService().add_public_key_to_user(user, organization, public_key)
|
||||
|
||||
return organization
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
from database import db
|
||||
from models import Organization, User, File
|
||||
from sqlalchemy.orm.attributes import flag_modified
|
||||
from utils import Perm
|
||||
from utils import Perm, PermOperation
|
||||
|
||||
|
||||
class RoleService:
|
||||
|
@ -33,15 +33,15 @@ class RoleService:
|
|||
return roles
|
||||
|
||||
@staticmethod
|
||||
def activate_role(org: Organization, role: str) -> Organization:
|
||||
def change_role_status(org: Organization, role: str, status: str) -> Organization:
|
||||
if role not in org.roles:
|
||||
raise ValueError(f"Role {role} does not exist in organization {org.name}")
|
||||
|
||||
if org.roles[role]["status"] == "active":
|
||||
raise ValueError(f"Role {role} is already active in organization {org.name}")
|
||||
if org.roles[role]["status"] == status:
|
||||
raise ValueError(f"Role {role} is already {status} in organization {org.name}")
|
||||
|
||||
roles = org.roles.copy()
|
||||
roles[role]["status"] = "active"
|
||||
roles[role]["status"] = status
|
||||
org.roles = roles
|
||||
flag_modified(org, "roles")
|
||||
db.commit()
|
||||
|
@ -49,28 +49,9 @@ class RoleService:
|
|||
return org
|
||||
|
||||
@staticmethod
|
||||
def suspend_role(org: Organization, role: str) -> Organization:
|
||||
if role == "manager":
|
||||
raise ValueError(f"Role {role} cannot be suspended in organization {org.name}")
|
||||
|
||||
def get_role(org: Organization, role: str) -> dict | None:
|
||||
if role not in org.roles:
|
||||
raise ValueError(f"Role {role} does not exist in organization {org.name}")
|
||||
|
||||
if org.roles[role]["status"] == "suspended":
|
||||
raise ValueError(f"Role {role} is already suspended in organization {org.name}")
|
||||
|
||||
roles = org.roles.copy()
|
||||
roles[role]["status"] = "suspended"
|
||||
org.roles = roles
|
||||
flag_modified(org, "roles")
|
||||
db.commit()
|
||||
db.refresh(org)
|
||||
return org
|
||||
|
||||
@staticmethod
|
||||
def get_role(org: Organization, role: str) -> dict:
|
||||
if role not in org.roles:
|
||||
raise ValueError(f"Role {role} does not exist in organization {org.name}")
|
||||
return None
|
||||
return org.roles[role]
|
||||
|
||||
@staticmethod
|
||||
|
@ -101,23 +82,25 @@ class RoleService:
|
|||
return False
|
||||
|
||||
@staticmethod
|
||||
def list_roles(org: Organization) -> dict:
|
||||
def get_roles(org: Organization) -> dict:
|
||||
return org.roles
|
||||
|
||||
@staticmethod
|
||||
def list_users_in_role(org: Organization, role: str) -> list:
|
||||
def get_users_in_role(org: Organization, role: str) -> list:
|
||||
if role not in org.roles:
|
||||
raise ValueError(f"Role {role} does not exist in organization {org.name}")
|
||||
return org.roles[role]["users"]
|
||||
|
||||
@staticmethod
|
||||
def list_roles_for_user(user: User, org: Organization) -> list:
|
||||
def get_roles_for_user(user: User, org: Organization) -> list:
|
||||
return [role for role in org.roles if user.id in org.roles[role]["users"]]
|
||||
|
||||
@staticmethod
|
||||
def list_perms_for_role(org: Organization, role: str, return_str=False) -> list[Perm | str]:
|
||||
def get_perms_for_role(org: Organization, role: str, return_str=False) -> list[Perm | str]:
|
||||
return Perm.get_perms(org.roles[role]["permissions"], return_str)
|
||||
|
||||
@staticmethod
|
||||
def list_roles_for_perm(org: Organization, perm: Perm) -> list:
|
||||
def get_roles_for_perm(org: Organization, perm: Perm) -> list:
|
||||
roles = []
|
||||
for role in org.roles:
|
||||
if RoleService.check_role_permission(org, role, perm):
|
||||
|
@ -125,7 +108,7 @@ class RoleService:
|
|||
return roles
|
||||
|
||||
@staticmethod
|
||||
def add_role_to_user(user: User, org: Organization, role: str) -> User:
|
||||
def add_user_to_role(role: str, org: Organization, user: User) -> User:
|
||||
if role not in org.roles:
|
||||
raise ValueError(f"Role {role} does not exist in organization {org.name}")
|
||||
|
||||
|
@ -151,7 +134,7 @@ class RoleService:
|
|||
return user
|
||||
|
||||
@staticmethod
|
||||
def remove_role_from_user(user: User, org: Organization, role: str) -> User:
|
||||
def remove_user_from_role(role: str, org: Organization, user: User) -> User:
|
||||
if role not in org.roles:
|
||||
raise ValueError(f"Role {role} does not exist in organization {org.name}")
|
||||
|
||||
|
@ -177,8 +160,8 @@ class RoleService:
|
|||
return user
|
||||
|
||||
@staticmethod
|
||||
def add_perm_to_role(org: Organization, role: str, perm: Perm) -> dict:
|
||||
if perm in [Perm.DOC_ACL, Perm.DOC_READ, Perm.DOC_DELETE]:
|
||||
def change_perm_on_role(org: Organization, role: str, perm: Perm, operation: PermOperation) -> dict:
|
||||
if Perm.get_int([perm]) <= 0b111:
|
||||
raise ValueError(f"Permission {perm} is not allowed for organization's roles")
|
||||
|
||||
if role not in org.roles:
|
||||
|
@ -188,7 +171,7 @@ class RoleService:
|
|||
raise ValueError(f"Role {role} is not active in organization {org.name}")
|
||||
|
||||
roles = org.roles.copy()
|
||||
roles[role]["permissions"] |= perm.value
|
||||
roles[role]["permissions"] = PermOperation.calc(roles[role]["permissions"], perm, operation)
|
||||
org.roles = roles
|
||||
flag_modified(org, "roles")
|
||||
db.commit()
|
||||
|
@ -196,27 +179,8 @@ class RoleService:
|
|||
return org.roles[role]
|
||||
|
||||
@staticmethod
|
||||
def remove_perm_from_role(org: Organization, role: str, perm: Perm) -> dict:
|
||||
if perm in [Perm.DOC_ACL, Perm.DOC_READ, Perm.DOC_DELETE]:
|
||||
raise ValueError(f"Permission {perm} is not allowed for organization's roles")
|
||||
|
||||
if role not in org.roles:
|
||||
raise ValueError(f"Role {role} does not exist in organization {org.name}")
|
||||
|
||||
if org.roles[role]["status"] != "active":
|
||||
raise ValueError(f"Role {role} is not active in organization {org.name}")
|
||||
|
||||
roles = org.roles.copy()
|
||||
roles[role]["permissions"] &= ~perm.value
|
||||
org.roles = roles
|
||||
flag_modified(org, "roles")
|
||||
db.commit()
|
||||
db.refresh(org)
|
||||
return org.roles[role]
|
||||
|
||||
@staticmethod
|
||||
def add_perm_to_role_in_file(file: File, role: str, perm: Perm) -> dict:
|
||||
if perm not in [Perm.DOC_ACL, Perm.DOC_READ, Perm.DOC_DELETE]:
|
||||
def change_perm_on_role_in_file(file: File, role: str, perm: Perm, operation: PermOperation) -> dict:
|
||||
if Perm.get_int([perm]) > 0b111:
|
||||
raise ValueError(f"Permission {perm} is not allowed for files' roles")
|
||||
|
||||
if role not in file.acl:
|
||||
|
@ -225,24 +189,7 @@ class RoleService:
|
|||
if file.acl[role] & perm.value != 0:
|
||||
raise ValueError(f"Role {role} already has permission {perm} in file {file.document_handle}")
|
||||
|
||||
file.acl[role] |= perm.value
|
||||
flag_modified(file, "acl")
|
||||
db.commit()
|
||||
db.refresh(file)
|
||||
return file.acl
|
||||
|
||||
@staticmethod
|
||||
def remove_perm_from_role_in_file(file: File, role: str, perm: Perm) -> dict:
|
||||
if perm not in [Perm.DOC_ACL, Perm.DOC_READ, Perm.DOC_DELETE]:
|
||||
raise ValueError(f"Permission {perm} is not allowed for files' roles")
|
||||
|
||||
if role not in file.acl:
|
||||
file.acl[role] = 0
|
||||
|
||||
if file.acl[role] & perm.value == 0:
|
||||
raise ValueError(f"Role {role} does not have permission {perm} in file {file.document_handle}")
|
||||
|
||||
file.acl[role] &= ~perm.value
|
||||
file.acl[role] = PermOperation.calc(file.acl[role], perm, operation)
|
||||
flag_modified(file, "acl")
|
||||
db.commit()
|
||||
db.refresh(file)
|
||||
|
|
|
@ -59,52 +59,42 @@ class SessionService:
|
|||
return session
|
||||
|
||||
@staticmethod
|
||||
def assume_role(session: Session, role: str) -> bool:
|
||||
def change_role(session: Session, role: str, operation: str):
|
||||
from services import OrganizationService
|
||||
|
||||
org = OrganizationService.get_organization(session.org_id)
|
||||
if not org:
|
||||
return False
|
||||
raise ValueError(f"Organization {session.org_id} not found")
|
||||
|
||||
user = User.query.get(session.user_id)
|
||||
if not user:
|
||||
return False
|
||||
raise ValueError(f"User {session.user_id} not found")
|
||||
|
||||
if role not in org.roles:
|
||||
raise ValueError(f"Role {role} does not exist in organization {org.name}")
|
||||
|
||||
if operation == "add":
|
||||
if role not in user.roles[org.id]:
|
||||
return False
|
||||
raise ValueError(f"User {user.username} does not have role {role}")
|
||||
|
||||
if role in session.roles:
|
||||
return False
|
||||
raise ValueError(f"User {user.username} already has role {role} in current session")
|
||||
|
||||
session.roles.append(role)
|
||||
flag_modified(session, "roles")
|
||||
db.commit()
|
||||
db.refresh(session)
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def drop_role(session: Session, role: str) -> bool:
|
||||
from services import OrganizationService
|
||||
|
||||
org = OrganizationService.get_organization(session.org_id)
|
||||
if not org:
|
||||
return False
|
||||
|
||||
user = User.query.get(session.user_id)
|
||||
if not user:
|
||||
return False
|
||||
|
||||
elif operation == "drop":
|
||||
if role not in user.roles[org.id]:
|
||||
return False
|
||||
raise ValueError(f"User {user.username} does not have role {role}")
|
||||
|
||||
if role not in session.roles:
|
||||
return False
|
||||
raise ValueError(f"User {user.username} does not have role {role} in current session")
|
||||
|
||||
session.roles.remove(role)
|
||||
else:
|
||||
raise ValueError(f"Invalid operation {operation}")
|
||||
|
||||
flag_modified(session, "roles")
|
||||
db.commit()
|
||||
db.refresh(session)
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def list_roles(session: Session) -> list:
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
from .checks import check_valid_time
|
||||
from .hashing import get_hash, get_hex_from_temp_file
|
||||
from .perms import Perm
|
||||
from .perms import Perm, PermOperation
|
|
@ -23,6 +23,10 @@ class Perm(Enum):
|
|||
value >>= 1
|
||||
return f"{self.name}({bit})"
|
||||
|
||||
@staticmethod
|
||||
def from_str(perm_name):
|
||||
return Perm[perm_name]
|
||||
|
||||
@staticmethod
|
||||
def get_perms(bit_array: int, return_str=False):
|
||||
perms = []
|
||||
|
@ -42,3 +46,13 @@ class Perm(Enum):
|
|||
@staticmethod
|
||||
def check_perm(perm, bit_array: int):
|
||||
return perm.value & bit_array == perm.value
|
||||
|
||||
class PermOperation(Enum):
|
||||
ADD = 0
|
||||
REMOVE = 1
|
||||
|
||||
@staticmethod
|
||||
def calc(bit_array: int, perm: Perm, operation) -> int:
|
||||
if operation == PermOperation.ADD:
|
||||
return bit_array | perm.value
|
||||
return bit_array & ~perm.value
|
Loading…
Reference in New Issue