import os import io from datetime import datetime from typing import List, Type from flask import jsonify from database import db from models import File, Organization, User from utils import get_hash class FileService: def __init__(self): self.current_requests = {} def create_file(self, session_token: str, org: Organization, user: User, file_name: str, key: str, alg: str, nonce: str) -> File: file = File( file_handle = None, document_handle = get_hash(file_name), name = file_name, created_at = int(datetime.now().timestamp()), key = key, alg = alg, nonce = nonce, org_id = org.id, creator_id = user.id, org = org, creator = user ) self.current_requests[session_token] = file db.add(file) db.commit() db.refresh(file) return file def write_file(self, session_token: str, file_data: bytes) -> File | tuple: if session_token not in self.current_requests: return jsonify({"error": "No file upload request found"}), 400 file = self.current_requests[session_token] file_path = os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "repository"), file.org.name, file.document_handle) with open(file_path, "wb") as f: f.write(file_data) file.file_handle = get_hash(file_data) db.commit() db.refresh(file) del self.current_requests[session_token] return file @staticmethod def create_dummy_file(org: Organization, user: User) -> File: file = File( file_handle = "dummy_file", document_handle = "org/dummy_file.txt", name = "dummy_file", created_at = int(datetime.now().timestamp()), org_id = 1, creator_id = 1, org = org, creator = user ) file_path = os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "repository"), file.document_handle) with open(file_path, "w") as f: f.write("Dummy file content") db.add(file) db.commit() db.refresh(file) return file @staticmethod def get_file(file_id: int) -> File | None: return db.query(File).filter(File.id == file_id).first() @staticmethod def get_file_by_document_handle(document_handle: str) -> File | None: return db.query(File).filter(File.document_handle == document_handle).first() @staticmethod def get_file_by_file_handle(file_handle: str) -> File | None: return db.query(File).filter(File.file_handle == file_handle).first() @staticmethod def get_file_content(file: File) -> io.BytesIO: file_path = os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "repository"), file.org.name, file.document_handle) with open(file_path, "rb") as f: return io.BytesIO(f.read()) @staticmethod def list_files() -> list[Type[File]]: return db.query(File).all() @staticmethod def list_files_in_org(org: Organization) -> list[Type[File]]: return db.query(File).filter(File.org_id == org.id).all() @staticmethod def delete_file(file: File) -> File: file_path = os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "repository"), file.org.name, file.document_handle) os.remove(file_path) file.file_handle = None db.commit() db.refresh(file) return file