213 lines
8.1 KiB
Python
213 lines
8.1 KiB
Python
import json
|
|
|
|
from flask import Blueprint, request, jsonify, send_file, Response
|
|
|
|
from database import security_service
|
|
from utils import Perm, get_hex_from_temp_file, get_hash, check_valid_time, PermOperation
|
|
from services import FileService, OrganizationService, UserService, SessionService, RoleService
|
|
from utils.comms_encryption import encrypt_response
|
|
from utils.exceptions import SessionException
|
|
|
|
file_bp = Blueprint("file", __name__)
|
|
upload_service = FileService()
|
|
|
|
|
|
@file_bp.route("/get/<string:file_handle>/content", methods=["GET"])
|
|
def file_get_content(file_handle: str):
|
|
file = FileService.get_file_by_file_handle(file_handle)
|
|
if not file:
|
|
return jsonify({"error": "File not found"}), 404
|
|
|
|
file_content = FileService.get_file_content(file)
|
|
return send_file(file_content, as_attachment=True, download_name=file.name)
|
|
|
|
|
|
@file_bp.route("/get/<string:document_handle>/metadata", methods=["GET"])
|
|
def file_get_metadata(document_handle: str):
|
|
session_token = request.headers.get("Authorization")
|
|
try:
|
|
session = SessionService.validate_session(session_token, required_perms=[Perm.DOC_READ], doc_handle=document_handle)
|
|
except SessionException as e:
|
|
return jsonify({"error": e.message}), e.code
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
file = FileService.get_file_by_document_handle(document_handle)
|
|
if not file:
|
|
return jsonify({"error": "File not found"}), 404
|
|
|
|
return encrypt_response(file.to_dict() | file.get_encrytion(), 200, security_service.get_key(session))
|
|
|
|
|
|
@file_bp.route("/upload/metadata", methods=["POST"])
|
|
def file_upload_metadata():
|
|
if request.headers.get("Content-Type") != "application/octet-stream":
|
|
return jsonify({"error": "Invalid request"}), 400
|
|
|
|
session_token = request.headers.get("Authorization")
|
|
try:
|
|
session, data = SessionService.validate_session(session_token, data=request.data, required_perms=[Perm.DOC_NEW])
|
|
except SessionException as e:
|
|
return jsonify({"error": e.message}), e.code
|
|
|
|
if "document_name" not in data or "key" not in data or "alg" not in data:
|
|
return jsonify({"error": "Missing required fields"}), 400
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
user = UserService.get_user(session.user_id)
|
|
if not user:
|
|
return jsonify({"error": "User not found"}), 404
|
|
|
|
file = upload_service.create_file(session.token, org, user, data["document_name"], data["key"], data["alg"])
|
|
return encrypt_response(file.to_dict(), 201, security_service.get_key(session))
|
|
|
|
|
|
@file_bp.route("/upload/content", methods=["POST"])
|
|
def file_upload_content():
|
|
session_token = request.headers.get("Authorization")
|
|
if not session_token:
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
session = SessionService.validate_session(session_token, required_perms=[Perm.DOC_NEW])
|
|
if isinstance(session, tuple):
|
|
return session
|
|
|
|
if "file" not in request.files:
|
|
return jsonify({"error": "No file data"}), 400
|
|
|
|
file = request.files.get("file")
|
|
if file.filename == "":
|
|
return jsonify({"error": "No file selected for uploading"}), 400
|
|
|
|
if not file:
|
|
return jsonify({"error": "Invalid file data"}), 400
|
|
|
|
file_data = get_hex_from_temp_file(file.stream)
|
|
|
|
file_sum = request.headers.get("File-Checksum")
|
|
if not file_sum:
|
|
return jsonify({"error": "No file checksum provided"}), 400
|
|
|
|
if file_sum != str(get_hash(file_data)):
|
|
return jsonify({"error": "File checksum mismatch"}), 400
|
|
|
|
file = upload_service.write_file(session_token, file_sum, file_data)
|
|
if isinstance(file, tuple):
|
|
return file
|
|
|
|
return encrypt_response(file.to_dict(), 201, security_service.get_key(session))
|
|
|
|
|
|
@file_bp.route("/list", methods=["GET"])
|
|
def file_list():
|
|
if request.headers.get("Content-Type") != "application/octet-stream":
|
|
return jsonify({"error": "Invalid request"}), 400
|
|
|
|
session_token = request.headers.get("Authorization")
|
|
if not session_token:
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
try:
|
|
session, data = SessionService.validate_session(session_token, data=request.data)
|
|
except SessionException as e:
|
|
return jsonify({"error": e.message}), e.code
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
if "datetime" in data:
|
|
try:
|
|
datetime_value = int(data["datetime"]["value"])
|
|
datetime_relation = data["datetime"]["relation"]
|
|
except ValueError:
|
|
return jsonify({"error": "Invalid datetime value"}), 400
|
|
|
|
if "username" in data:
|
|
user = UserService.get_user_by_username(data["username"])
|
|
if not user:
|
|
return jsonify({"error": "User not found"}), 404
|
|
files = FileService.list_files_in_org(org)
|
|
return encrypt_response([file.to_dict() for file in files if file.creator_id == user.id and (
|
|
check_valid_time(file.created_at, datetime_value, datetime_relation)
|
|
if "datetime" in data else True
|
|
)], 200, security_service.get_key(session))
|
|
|
|
files = FileService.list_files_in_org(org)
|
|
return encrypt_response([file.to_dict() for file in files if (check_valid_time(file.created_at, datetime_value, datetime_relation) if "datetime" in data else True)],
|
|
200, security_service.get_key(session))
|
|
|
|
|
|
@file_bp.route("/delete/<string:document_handle>", methods=["POST"])
|
|
def file_delete(document_handle: str):
|
|
session_token = request.headers.get("Authorization")
|
|
if not session_token:
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
try:
|
|
session = SessionService.validate_session(session_token, required_perms=[Perm.DOC_DELETE], doc_handle=document_handle)
|
|
except SessionException as e:
|
|
return jsonify({"error": e.message}), e.code
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
file = FileService.get_file_by_document_handle(document_handle)
|
|
if not file:
|
|
return jsonify({"error": "File not found"}), 404
|
|
|
|
if file.creator_id != session.user_id:
|
|
return jsonify({"error": "Not authorized to delete file"}), 403
|
|
|
|
file = FileService.delete_file(file, session.user_id)
|
|
return encrypt_response(file.to_dict(), 200, security_service.get_key(session))
|
|
|
|
|
|
@file_bp.route("/acl/<string:document_handle>", methods=["POST"])
|
|
def file_acl(document_handle: str):
|
|
if request.headers.get("Content-Type") != "application/octet-stream":
|
|
return jsonify({"error": "Invalid request"}), 400
|
|
|
|
session_token = request.headers.get("Authorization")
|
|
if not session_token:
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
try:
|
|
session, data = SessionService.validate_session(session_token, data=request.data, required_perms=[Perm.DOC_ACL], doc_handle=document_handle)
|
|
except SessionException as e:
|
|
return jsonify({"error": e.message}), e.code
|
|
|
|
if "role" not in data or "perm" not in data or "operation" not in data:
|
|
return jsonify({"error": "Missing required fields"}), 400
|
|
|
|
role = data["role"]
|
|
perm = Perm.from_str(data["perm"])
|
|
operation = PermOperation.ADD if data["operation"] == "add" else PermOperation.REMOVE
|
|
|
|
if isinstance(session, tuple):
|
|
return session
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
file = FileService.get_file_by_document_handle(document_handle)
|
|
if not file:
|
|
return jsonify({"error": "File not found"}), 404
|
|
|
|
if role not in org.roles:
|
|
return jsonify({"error": "Role not found"}), 404
|
|
|
|
acl = RoleService.change_perm_on_role_in_file(file, role, perm, operation)
|
|
if isinstance(acl, tuple):
|
|
return acl
|
|
|
|
return encrypt_response(file.to_dict(), 200, security_service.get_key(session))
|
|
|