Fix file permission system

Signed-off-by: Tiago Garcia <tiago.rgarcia@ua.pt>
This commit is contained in:
Tiago Garcia 2024-12-15 02:46:37 +00:00
parent 5e21a77c5d
commit 8033aa9d60
Signed by: TiagoRG
GPG Key ID: DFCD48E3F420DB42
6 changed files with 80 additions and 15 deletions

View File

@ -9,6 +9,8 @@ class File(db_connection.Model):
document_handle = db_connection.Column(db_connection.String, unique=True, nullable=False) document_handle = db_connection.Column(db_connection.String, unique=True, nullable=False)
name = db_connection.Column(db_connection.String, nullable=False) name = db_connection.Column(db_connection.String, nullable=False)
created_at = db_connection.Column(db_connection.Integer, nullable=False) created_at = db_connection.Column(db_connection.Integer, nullable=False)
acl = db_connection.Column(db_connection.JSON, nullable=False, default=dict)
deleter_id = db_connection.Column(db_connection.Integer, nullable=True)
key = db_connection.Column(db_connection.String, nullable=False) key = db_connection.Column(db_connection.String, nullable=False)
alg = db_connection.Column(db_connection.String, nullable=False) alg = db_connection.Column(db_connection.String, nullable=False)
org_id = db_connection.Column(db_connection.Integer, db_connection.ForeignKey('organizations.id'), nullable=False) org_id = db_connection.Column(db_connection.Integer, db_connection.ForeignKey('organizations.id'), nullable=False)
@ -23,8 +25,10 @@ class File(db_connection.Model):
"document_handle": self.document_handle, "document_handle": self.document_handle,
"name": self.name, "name": self.name,
"created_at": self.created_at, "created_at": self.created_at,
"acl": self.acl,
"key": self.key, "key": self.key,
"alg": self.alg, "alg": self.alg,
"org": {"id": self.org.id, "name": self.org.name}, "org": {"id": self.org.id, "name": self.org.name},
"creator": {"id": self.creator.id, "username": self.creator.username} "creator": {"id": self.creator.id, "username": self.creator.username},
"deleter": {"id": self.deleter.id, "username": self.deleter.username} if self.deleter else None
} }

View File

@ -22,7 +22,7 @@ def file_get_content(file_handle: str):
@file_bp.route("/get/<string:document_handle>/metadata", methods=["GET"]) @file_bp.route("/get/<string:document_handle>/metadata", methods=["GET"])
def file_get_metadata(document_handle: str): def file_get_metadata(document_handle: str):
session_token = request.headers.get("Authorization") session_token = request.headers.get("Authorization")
session = SessionService.validate_session(session_token, required_perms=[Perm.DOC_READ]) session = SessionService.validate_session(session_token, required_perms=[Perm.DOC_READ], doc_handle=document_handle)
if isinstance(session, tuple): if isinstance(session, tuple):
return session return session
@ -145,7 +145,7 @@ def file_delete(document_handle: str):
if not session_token: if not session_token:
return jsonify({"error": "No session token"}), 400 return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, required_perms=[Perm.DOC_DELETE]) session = SessionService.validate_session(session_token, required_perms=[Perm.DOC_DELETE], doc_handle=document_handle)
if isinstance(session, tuple): if isinstance(session, tuple):
return session return session
@ -160,7 +160,7 @@ def file_delete(document_handle: str):
if file.creator_id != session.user_id: if file.creator_id != session.user_id:
return jsonify({"error": "Not authorized to delete file"}), 403 return jsonify({"error": "Not authorized to delete file"}), 403
file = FileService.delete_file(file) file = FileService.delete_file(file, session.user_id)
return jsonify(file.to_dict()) return jsonify(file.to_dict())

View File

@ -6,7 +6,7 @@ from flask import jsonify
from database import db from database import db
from models import File, Organization, User from models import File, Organization, User
from utils import get_hash from utils import get_hash, Perm
class FileService: class FileService:
@ -19,10 +19,18 @@ class FileService:
document_handle = get_hash(file_name), document_handle = get_hash(file_name),
name = file_name, name = file_name,
created_at = int(datetime.now().timestamp()), created_at = int(datetime.now().timestamp()),
acl = {
"manager": Perm.get_int([
Perm.DOC_ACL,
Perm.DOC_READ,
Perm.DOC_DELETE,
])
},
key = key, key = key,
alg = alg, alg = alg,
org_id = org.id, org_id = org.id,
creator_id = user.id, creator_id = user.id,
deleter_id = None,
org = org, org = org,
creator = user creator = user
) )
@ -101,10 +109,11 @@ class FileService:
return db.query(File).filter(File.org_id == org.id).all() return db.query(File).filter(File.org_id == org.id).all()
@staticmethod @staticmethod
def delete_file(file: File) -> File: def delete_file(file: File, deleter_id: int) -> File:
file_path = os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "repository"), file.org.name, file.document_handle) file_path = os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "repository"), file.org.name, file.document_handle)
os.remove(file_path) os.remove(file_path)
file.file_handle = None file.file_handle = None
file.deleter_id = deleter_id
db.commit() db.commit()
db.refresh(file) db.refresh(file)
return file return file

View File

@ -24,9 +24,6 @@ class OrganizationService:
roles = { roles = {
"manager": { "manager": {
"permissions": Perm.get_int([ "permissions": Perm.get_int([
Perm.DOC_ACL,
Perm.DOC_READ,
Perm.DOC_DELETE,
Perm.ROLE_ACL, Perm.ROLE_ACL,
Perm.SUBJECT_NEW, Perm.SUBJECT_NEW,
Perm.SUBJECT_DOWN, Perm.SUBJECT_DOWN,

View File

@ -1,5 +1,5 @@
from database import db from database import db
from models import Organization, User from models import Organization, User, File
from sqlalchemy.orm.attributes import flag_modified from sqlalchemy.orm.attributes import flag_modified
from utils import Perm from utils import Perm
@ -74,7 +74,22 @@ class RoleService:
return org.roles[role] return org.roles[role]
@staticmethod @staticmethod
def check_role_permission(org: Organization, role: str, perm: Perm) -> bool: def check_role_permission(org: Organization, role: str, perm: Perm, doc_handle=None) -> bool:
from services import FileService
if doc_handle:
file = FileService.get_file_by_document_handle(doc_handle)
if not file:
raise ValueError(f"Document {doc_handle} not found")
if not Perm.check_perm(file.acl[role], perm.value):
return False
return True
if role not in org.roles:
raise ValueError(f"Role {role} does not exist in organization {org.name}")
role_perms = org.roles[role]["permissions"] role_perms = org.roles[role]["permissions"]
return Perm.check_perm(role_perms, perm.value) return Perm.check_perm(role_perms, perm.value)
@ -163,6 +178,9 @@ class RoleService:
@staticmethod @staticmethod
def add_perm_to_role(org: Organization, role: str, perm: Perm) -> dict: def add_perm_to_role(org: Organization, role: str, perm: Perm) -> dict:
if perm in [Perm.DOC_ACL, Perm.DOC_READ, Perm.DOC_DELETE]:
raise ValueError(f"Permission {perm} is not allowed for organization's roles")
if role not in org.roles: if role not in org.roles:
raise ValueError(f"Role {role} does not exist in organization {org.name}") raise ValueError(f"Role {role} does not exist in organization {org.name}")
@ -179,6 +197,9 @@ class RoleService:
@staticmethod @staticmethod
def remove_perm_from_role(org: Organization, role: str, perm: Perm) -> dict: def remove_perm_from_role(org: Organization, role: str, perm: Perm) -> dict:
if perm in [Perm.DOC_ACL, Perm.DOC_READ, Perm.DOC_DELETE]:
raise ValueError(f"Permission {perm} is not allowed for organization's roles")
if role not in org.roles: if role not in org.roles:
raise ValueError(f"Role {role} does not exist in organization {org.name}") raise ValueError(f"Role {role} does not exist in organization {org.name}")
@ -192,3 +213,37 @@ class RoleService:
db.commit() db.commit()
db.refresh(org) db.refresh(org)
return org.roles[role] return org.roles[role]
@staticmethod
def add_perm_to_role_in_file(file: File, role: str, perm: Perm) -> dict:
if perm not in [Perm.DOC_ACL, Perm.DOC_READ, Perm.DOC_DELETE]:
raise ValueError(f"Permission {perm} is not allowed for files' roles")
if role not in file.acl:
file.acl[role] = 0
if file.acl[role] & perm.value != 0:
raise ValueError(f"Role {role} already has permission {perm} in file {file.document_handle}")
file.acl[role] |= perm.value
flag_modified(file, "acl")
db.commit()
db.refresh(file)
return file.acl
@staticmethod
def remove_perm_from_role_in_file(file: File, role: str, perm: Perm) -> dict:
if perm not in [Perm.DOC_ACL, Perm.DOC_READ, Perm.DOC_DELETE]:
raise ValueError(f"Permission {perm} is not allowed for files' roles")
if role not in file.acl:
file.acl[role] = 0
if file.acl[role] & perm.value == 0:
raise ValueError(f"Role {role} does not have permission {perm} in file {file.document_handle}")
file.acl[role] &= ~perm.value
flag_modified(file, "acl")
db.commit()
db.refresh(file)
return file.acl

View File

@ -33,7 +33,7 @@ class SessionService:
db.commit() db.commit()
@staticmethod @staticmethod
def validate_session(token: str, required_perms: list[Perm] = None) -> tuple | Session: def validate_session(token: str, required_perms: list[Perm] = None, doc_handle=None) -> tuple | Session:
from services import OrganizationService from services import OrganizationService
if "Bearer" in token: if "Bearer" in token:
@ -53,7 +53,7 @@ class SessionService:
if required_perms: if required_perms:
for perm in required_perms: for perm in required_perms:
if not SessionService.check_permission(session, perm): if not SessionService.check_permission(session, perm, doc_handle):
return jsonify({"error": f"Permission denied, missing required permission: {perm}"}), 403 return jsonify({"error": f"Permission denied, missing required permission: {perm}"}), 403
return session return session
@ -111,7 +111,7 @@ class SessionService:
return session.roles return session.roles
@staticmethod @staticmethod
def check_permission(session: Session, perm: Perm) -> bool: def check_permission(session: Session, perm: Perm, doc_handle=None) -> bool:
from services import OrganizationService, RoleService from services import OrganizationService, RoleService
org = OrganizationService.get_organization(session.org_id) org = OrganizationService.get_organization(session.org_id)
@ -123,7 +123,7 @@ class SessionService:
return False return False
for role in session.roles: for role in session.roles:
if RoleService.check_role_permission(org, role, perm): if RoleService.check_role_permission(org, role, perm, doc_handle):
return True return True
return False return False