Roles/perms services implemented

Signed-off-by: Tiago Garcia <tiago.rgarcia@ua.pt>
This commit is contained in:
Tiago Garcia 2024-12-14 17:54:01 +00:00
parent 0ce6d25359
commit 17cbf845c7
Signed by: TiagoRG
GPG Key ID: DFCD48E3F420DB42
8 changed files with 294 additions and 74 deletions

View File

@ -7,6 +7,7 @@ class Session(db_connection.Model):
user_id = db_connection.Column(db_connection.Integer, db_connection.ForeignKey('users.id')) user_id = db_connection.Column(db_connection.Integer, db_connection.ForeignKey('users.id'))
org_id = db_connection.Column(db_connection.Integer, db_connection.ForeignKey('organizations.id')) org_id = db_connection.Column(db_connection.Integer, db_connection.ForeignKey('organizations.id'))
token = db_connection.Column(db_connection.String(255), unique=True) token = db_connection.Column(db_connection.String(255), unique=True)
roles = db_connection.Column(db_connection.JSON, default=list)
created_at = db_connection.Column(db_connection.DateTime, server_default=db_connection.func.now()) created_at = db_connection.Column(db_connection.DateTime, server_default=db_connection.func.now())
updated_at = db_connection.Column(db_connection.DateTime, server_default=db_connection.func.now(), server_onupdate=db_connection.func.now()) updated_at = db_connection.Column(db_connection.DateTime, server_default=db_connection.func.now(), server_onupdate=db_connection.func.now())

View File

@ -1,4 +1,5 @@
from .orgs import OrganizationService from .orgs import OrganizationService
from .users import UserService from .users import UserService
from .files import FileService from .files import FileService
from .roles import RoleService
from .sessions import SessionService from .sessions import SessionService

View File

@ -3,7 +3,9 @@ import os.path
from database import db from database import db
from models import Organization, User from models import Organization, User
from sqlalchemy.orm.attributes import flag_modified from sqlalchemy.orm.attributes import flag_modified
from utils.perms import Perm
from services.roles import RoleService
from utils import Perm
class OrganizationService: class OrganizationService:
@ -36,10 +38,6 @@ class OrganizationService:
Perm.ROLE_MOD Perm.ROLE_MOD
]), ]),
"users": [user.id] "users": [user.id]
},
"default": {
"permissions": Perm.get_int([]),
"users": []
} }
} }
@ -59,7 +57,7 @@ class OrganizationService:
db.refresh(organization) db.refresh(organization)
UserService().add_org_to_user(user, organization) UserService().add_org_to_user(user, organization)
UserService().add_role_to_user(user, organization, "manager") RoleService().add_role_to_user(user, organization, "manager")
UserService().add_public_key_to_user(user, organization, public_key) UserService().add_public_key_to_user(user, organization, public_key)
return organization return organization
@ -97,34 +95,6 @@ class OrganizationService:
db.refresh(org) db.refresh(org)
return org return org
@staticmethod
def create_role(org: Organization, role: str, perms: list[Perm]) -> Organization:
roles = org.roles.copy()
roles[role] = {
"permissions": Perm.get_int(perms),
"users": []
}
org.roles = roles
flag_modified(org, "roles")
db.commit()
db.refresh(org)
return org
@staticmethod
def delete_role(org: Organization, role: str) -> Organization:
roles = org.roles.copy()
del roles[role]
org.roles = roles
flag_modified(org, "roles")
db.commit()
db.refresh(org)
return org
@staticmethod
def check_role_permission(org: Organization, role: str, perm: Perm) -> bool:
role_perms = org.roles[role]["permissions"]
return Perm.check_perm(role_perms, perm.value)
@staticmethod @staticmethod
def suspend_user(org: Organization, user: User) -> tuple: def suspend_user(org: Organization, user: User) -> tuple:
if OrganizationService.get_user_status(org, user.id) != "active": if OrganizationService.get_user_status(org, user.id) != "active":

View File

@ -0,0 +1,194 @@
from database import db
from models import Organization, User
from sqlalchemy.orm.attributes import flag_modified
from utils import Perm
class RoleService:
@staticmethod
def create_role(org: Organization, role: str, perms: list[Perm]) -> dict:
if role in org.roles:
raise ValueError(f"Role {role} already exists in organization {org.name}")
roles = org.roles.copy()
roles[role] = {
"permissions": Perm.get_int(perms),
"users": [],
"status": "active"
}
org.roles = roles
flag_modified(org, "roles")
db.commit()
db.refresh(org)
return org.roles[role]
@staticmethod
def delete_role(org: Organization, role: str) -> dict:
roles = org.roles.copy()
del roles[role]
org.roles = roles
flag_modified(org, "roles")
db.commit()
db.refresh(org)
return roles
@staticmethod
def activate_role(org: Organization, role: str) -> Organization:
if role not in org.roles:
raise ValueError(f"Role {role} does not exist in organization {org.name}")
if org.roles[role]["status"] == "active":
raise ValueError(f"Role {role} is already active in organization {org.name}")
roles = org.roles.copy()
roles[role]["status"] = "active"
org.roles = roles
flag_modified(org, "roles")
db.commit()
db.refresh(org)
return org
@staticmethod
def suspend_role(org: Organization, role: str) -> Organization:
if role == "manager":
raise ValueError(f"Role {role} cannot be suspended in organization {org.name}")
if role not in org.roles:
raise ValueError(f"Role {role} does not exist in organization {org.name}")
if org.roles[role]["status"] == "suspended":
raise ValueError(f"Role {role} is already suspended in organization {org.name}")
roles = org.roles.copy()
roles[role]["status"] = "suspended"
org.roles = roles
flag_modified(org, "roles")
db.commit()
db.refresh(org)
return org
@staticmethod
def get_role(org: Organization, role: str) -> dict:
if role not in org.roles:
raise ValueError(f"Role {role} does not exist in organization {org.name}")
return org.roles[role]
@staticmethod
def check_role_permission(org: Organization, role: str, perm: Perm) -> bool:
role_perms = org.roles[role]["permissions"]
return Perm.check_perm(role_perms, perm.value)
@staticmethod
def check_user_permission(user: User, org: Organization, perm: Perm) -> bool:
for role in user.roles[org.id]:
if RoleService.check_role_permission(org, role, perm):
return True
return False
@staticmethod
def list_roles(org: Organization) -> dict:
return org.roles
@staticmethod
def list_users_in_role(org: Organization, role: str) -> list:
return org.roles[role]["users"]
@staticmethod
def list_roles_for_user(user: User, org: Organization) -> list:
return [role for role in org.roles if user.id in org.roles[role]["users"]]
@staticmethod
def list_perms_for_role(org: Organization, role: str, return_str=False) -> list[Perm | str]:
return Perm.get_perms(org.roles[role]["permissions"], return_str)
@staticmethod
def list_roles_for_perm(org: Organization, perm: Perm) -> list:
roles = []
for role in org.roles:
if RoleService.check_role_permission(org, role, perm):
roles.append(role)
return roles
@staticmethod
def add_role_to_user(user: User, org: Organization, role: str) -> User:
if role not in org.roles:
raise ValueError(f"Role {role} does not exist in organization {org.name}")
if user.id in org.roles[role]["users"]:
raise ValueError(f"User {user.username} already has role {role} in organization {org.name}")
if org.roles[role]["status"] != "active":
raise ValueError(f"Role {role} is not active in organization {org.name}")
roles = user.roles.copy()
roles[org.id] = role
user.roles = roles
flag_modified(user, "roles")
db.commit()
db.refresh(user)
roles = org.roles.copy()
roles[role]["users"].append(user.id)
org.roles = roles
flag_modified(org, "roles")
db.commit()
db.refresh(org)
return user
@staticmethod
def remove_role_from_user(user: User, org: Organization, role: str) -> User:
if role not in org.roles:
raise ValueError(f"Role {role} does not exist in organization {org.name}")
if user.id not in org.roles[role]["users"]:
raise ValueError(f"User {user.username} does not have role {role} in organization {org.name}")
if org.roles[role]["status"] != "active":
raise ValueError(f"Role {role} is not active in organization {org.name}")
roles = user.roles.copy()
roles.pop(org.id)
user.roles = roles
flag_modified(user, "roles")
db.commit()
db.refresh(user)
roles = org.roles.copy()
roles[role]["users"].remove(user.id)
org.roles = roles
flag_modified(org, "roles")
db.commit()
db.refresh(org)
return user
@staticmethod
def add_perm_to_role(org: Organization, role: str, perm: Perm) -> dict:
if role not in org.roles:
raise ValueError(f"Role {role} does not exist in organization {org.name}")
if org.roles[role]["status"] != "active":
raise ValueError(f"Role {role} is not active in organization {org.name}")
roles = org.roles.copy()
roles[role]["permissions"] |= perm.value
org.roles = roles
flag_modified(org, "roles")
db.commit()
db.refresh(org)
return org.roles[role]
@staticmethod
def remove_perm_from_role(org: Organization, role: str, perm: Perm) -> dict:
if role not in org.roles:
raise ValueError(f"Role {role} does not exist in organization {org.name}")
if org.roles[role]["status"] != "active":
raise ValueError(f"Role {role} is not active in organization {org.name}")
roles = org.roles.copy()
roles[role]["permissions"] &= ~perm.value
org.roles = roles
flag_modified(org, "roles")
db.commit()
db.refresh(org)
return org.roles[role]

View File

@ -1,8 +1,13 @@
import secrets import secrets
from sqlalchemy.orm.attributes import flag_modified
from database import db from database import db
from models import Session, User, Organization from models import Session, User, Organization
from flask import jsonify from flask import jsonify
from utils import Perm
class SessionService: class SessionService:
@staticmethod @staticmethod
@ -10,7 +15,8 @@ class SessionService:
session = Session( session = Session(
user_id=user.id, user_id=user.id,
org_id=org.id, org_id=org.id,
token=secrets.token_hex(128) token=secrets.token_hex(128),
roles=[]
) )
db.add(session) db.add(session)
db.commit() db.commit()
@ -46,3 +52,73 @@ class SessionService:
return jsonify({"error": "User is not active"}), 403 return jsonify({"error": "User is not active"}), 403
return session return session
@staticmethod
def assume_role(session: Session, role: str) -> bool:
from services import OrganizationService
org = OrganizationService.get_organization(session.org_id)
if not org:
return False
user = User.query.get(session.user_id)
if not user:
return False
if role not in user.roles[org.id]:
return False
if role in session.roles:
return False
session.roles.append(role)
flag_modified(session, "roles")
db.commit()
db.refresh(session)
return True
@staticmethod
def drop_role(session: Session, role: str) -> bool:
from services import OrganizationService
org = OrganizationService.get_organization(session.org_id)
if not org:
return False
user = User.query.get(session.user_id)
if not user:
return False
if role not in user.roles[org.id]:
return False
if role not in session.roles:
return False
session.roles.remove(role)
flag_modified(session, "roles")
db.commit()
db.refresh(session)
return True
@staticmethod
def list_roles(session: Session) -> list:
return session.roles
@staticmethod
def check_permission(session: Session, perm: Perm) -> bool:
from services import OrganizationService, RoleService
org = OrganizationService.get_organization(session.org_id)
if not org:
return False
user = User.query.get(session.user_id)
if not user:
return False
for role in session.roles:
if RoleService.check_role_permission(org, role, perm):
return True
return False

View File

@ -46,40 +46,6 @@ class UserService:
db.refresh(user) db.refresh(user)
return user return user
@staticmethod
def add_role_to_user(user: User, org: Organization, role: str) -> User:
roles = user.roles.copy()
roles[org.id] = role
user.roles = roles
flag_modified(user, "roles")
db.commit()
db.refresh(user)
roles = org.roles.copy()
roles[role]["users"].append(user.id)
org.roles = roles
flag_modified(org, "roles")
db.commit()
db.refresh(org)
return user
@staticmethod
def remove_role_from_user(user: User, org: Organization, role: str) -> User:
roles = user.roles.copy()
roles.pop(org.id)
user.roles = roles
flag_modified(user, "roles")
db.commit()
db.refresh(user)
roles = org.roles.copy()
roles[role]["users"].remove(user.id)
org.roles = roles
flag_modified(org, "roles")
db.commit()
db.refresh(org)
return user
@staticmethod @staticmethod
def add_public_key_to_user(user: User, org: Organization, public_key: str) -> User: def add_public_key_to_user(user: User, org: Organization, public_key: str) -> User:
public_keys = user.public_keys.copy() public_keys = user.public_keys.copy()

View File

@ -1,2 +1,3 @@
from .checks import check_valid_time from .checks import check_valid_time
from .hashing import get_hash, get_hex_from_temp_file from .hashing import get_hash, get_hex_from_temp_file
from .perms import Perm

View File

@ -15,12 +15,23 @@ class Perm(Enum):
ROLE_UP = 0b010000000000 ROLE_UP = 0b010000000000
ROLE_MOD = 0b100000000000 ROLE_MOD = 0b100000000000
def __str__(self):
bit = 0
value = self.value
while not value & 0b1:
bit += 1
value >>= 1
return f"{self.name}({bit})"
@staticmethod @staticmethod
def get_perm(bit_array: int): def get_perms(bit_array: int, return_str=False):
perms = []
bit = 0b1
for perm in Perm: for perm in Perm:
if bit_array == perm.value: if (bit_array & bit) == (perm.value & 0b1):
return perm perms.append(perm)
return None bit >>= 1
return [str(p) for p in perms] if return_str else perms
@staticmethod @staticmethod
def get_int(perms): def get_int(perms):