sio-2425/delivery2/server/routes/file.py

213 lines
8.1 KiB
Python

import json
from flask import Blueprint, request, jsonify, send_file, Response
from database import security_service
from utils import Perm, get_hex_from_temp_file, get_hash, check_valid_time, PermOperation
from services import FileService, OrganizationService, UserService, SessionService, RoleService
from utils.comms_encryption import encrypt_response
from utils.exceptions import SessionException
file_bp = Blueprint("file", __name__)
upload_service = FileService()
@file_bp.route("/get/<string:file_handle>/content", methods=["GET"])
def file_get_content(file_handle: str):
file = FileService.get_file_by_file_handle(file_handle)
if not file:
return jsonify({"error": "File not found"}), 404
file_content = FileService.get_file_content(file)
return send_file(file_content, as_attachment=True, download_name=file.name)
@file_bp.route("/get/<string:document_handle>/metadata", methods=["GET"])
def file_get_metadata(document_handle: str):
session_token = request.headers.get("Authorization")
try:
session = SessionService.validate_session(session_token, required_perms=[Perm.DOC_READ], doc_handle=document_handle)
except SessionException as e:
return jsonify({"error": e.message}), e.code
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
file = FileService.get_file_by_document_handle(document_handle)
if not file:
return jsonify({"error": "File not found"}), 404
return encrypt_response(file.to_dict() | file.get_encrytion(), 200, security_service.get_key(session))
@file_bp.route("/upload/metadata", methods=["POST"])
def file_upload_metadata():
if request.headers.get("Content-Type") != "application/octet-stream":
return jsonify({"error": "Invalid request"}), 400
session_token = request.headers.get("Authorization")
try:
session, data = SessionService.validate_session(session_token, data=request.data, required_perms=[Perm.DOC_NEW])
except SessionException as e:
return jsonify({"error": e.message}), e.code
if "document_name" not in data or "key" not in data or "alg" not in data:
return jsonify({"error": "Missing required fields"}), 400
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
user = UserService.get_user(session.user_id)
if not user:
return jsonify({"error": "User not found"}), 404
file = upload_service.create_file(session.token, org, user, data["document_name"], data["key"], data["alg"])
return encrypt_response(file.to_dict(), 201, security_service.get_key(session))
@file_bp.route("/upload/content", methods=["POST"])
def file_upload_content():
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, required_perms=[Perm.DOC_NEW])
if isinstance(session, tuple):
return session
if "file" not in request.files:
return jsonify({"error": "No file data"}), 400
file = request.files.get("file")
if file.filename == "":
return jsonify({"error": "No file selected for uploading"}), 400
if not file:
return jsonify({"error": "Invalid file data"}), 400
file_data = get_hex_from_temp_file(file.stream)
file_sum = request.headers.get("File-Checksum")
if not file_sum:
return jsonify({"error": "No file checksum provided"}), 400
if file_sum != str(get_hash(file_data)):
return jsonify({"error": "File checksum mismatch"}), 400
file = upload_service.write_file(session_token, file_sum, file_data)
if isinstance(file, tuple):
return file
return encrypt_response(file.to_dict(), 201, security_service.get_key(session))
@file_bp.route("/list", methods=["GET"])
def file_list():
if request.headers.get("Content-Type") != "application/octet-stream":
return jsonify({"error": "Invalid request"}), 400
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
try:
session, data = SessionService.validate_session(session_token, data=request.data)
except SessionException as e:
return jsonify({"error": e.message}), e.code
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
if "datetime" in data:
try:
datetime_value = int(data["datetime"]["value"])
datetime_relation = data["datetime"]["relation"]
except ValueError:
return jsonify({"error": "Invalid datetime value"}), 400
if "username" in data:
user = UserService.get_user_by_username(data["username"])
if not user:
return jsonify({"error": "User not found"}), 404
files = FileService.list_files_in_org(org)
return encrypt_response([file.to_dict() for file in files if file.creator_id == user.id and (
check_valid_time(file.created_at, datetime_value, datetime_relation)
if "datetime" in data else True
)], 200, security_service.get_key(session))
files = FileService.list_files_in_org(org)
return encrypt_response([file.to_dict() for file in files if (check_valid_time(file.created_at, datetime_value, datetime_relation) if "datetime" in data else True)],
200, security_service.get_key(session))
@file_bp.route("/delete/<string:document_handle>", methods=["POST"])
def file_delete(document_handle: str):
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
try:
session = SessionService.validate_session(session_token, required_perms=[Perm.DOC_DELETE], doc_handle=document_handle)
except SessionException as e:
return jsonify({"error": e.message}), e.code
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
file = FileService.get_file_by_document_handle(document_handle)
if not file:
return jsonify({"error": "File not found"}), 404
if file.creator_id != session.user_id:
return jsonify({"error": "Not authorized to delete file"}), 403
file = FileService.delete_file(file, session.user_id)
return encrypt_response(file.to_dict(), 200, security_service.get_key(session))
@file_bp.route("/acl/<string:document_handle>", methods=["POST"])
def file_acl(document_handle: str):
if request.headers.get("Content-Type") != "application/octet-stream":
return jsonify({"error": "Invalid request"}), 400
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
try:
session, data = SessionService.validate_session(session_token, data=request.data, required_perms=[Perm.DOC_ACL], doc_handle=document_handle)
except SessionException as e:
return jsonify({"error": e.message}), e.code
if "role" not in data or "perm" not in data or "operation" not in data:
return jsonify({"error": "Missing required fields"}), 400
role = data["role"]
perm = Perm.from_str(data["perm"])
operation = PermOperation.ADD if data["operation"] == "add" else PermOperation.REMOVE
if isinstance(session, tuple):
return session
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
file = FileService.get_file_by_document_handle(document_handle)
if not file:
return jsonify({"error": "File not found"}), 404
if role not in org.roles:
return jsonify({"error": "Role not found"}), 404
acl = RoleService.change_perm_on_role_in_file(file, role, perm, operation)
if isinstance(acl, tuple):
return acl
return encrypt_response(file.to_dict(), 200, security_service.get_key(session))