111 lines
3.7 KiB
Python
111 lines
3.7 KiB
Python
import os
|
|
import io
|
|
from datetime import datetime
|
|
from typing import List, Type
|
|
from flask import jsonify
|
|
|
|
from database import db
|
|
from models import File, Organization, User
|
|
from utils import get_hash
|
|
|
|
|
|
class FileService:
|
|
def __init__(self):
|
|
self.current_requests = {}
|
|
|
|
def create_file(self, session_token: str, org: Organization, user: User, file_name: str, key: str, alg: str, nonce: str) -> File:
|
|
file = File(
|
|
file_handle = None,
|
|
document_handle = get_hash(file_name),
|
|
name = file_name,
|
|
created_at = int(datetime.now().timestamp()),
|
|
key = key,
|
|
alg = alg,
|
|
nonce = nonce,
|
|
org_id = org.id,
|
|
creator_id = user.id,
|
|
org = org,
|
|
creator = user
|
|
)
|
|
|
|
db.add(file)
|
|
db.commit()
|
|
db.refresh(file)
|
|
self.current_requests[session_token] = file.id
|
|
|
|
return file
|
|
|
|
|
|
def write_file(self, session_token: str, file_handle: str, file_data: bytes) -> File | tuple:
|
|
if session_token not in self.current_requests:
|
|
return jsonify({"error": "No file upload request found"}), 400
|
|
|
|
file = db.query(File).filter(File.id == self.current_requests[session_token]).first()
|
|
file_path = os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "repository"), file.org.name, file.document_handle)
|
|
with open(file_path, "wb") as f:
|
|
f.write(file_data)
|
|
|
|
file.file_handle = file_handle
|
|
db.commit()
|
|
db.refresh(file)
|
|
|
|
del self.current_requests[session_token]
|
|
return file
|
|
|
|
|
|
@staticmethod
|
|
def create_dummy_file(org: Organization, user: User) -> File:
|
|
file = File(
|
|
file_handle = "dummy_file",
|
|
document_handle = "org/dummy_file.txt",
|
|
name = "dummy_file",
|
|
created_at = int(datetime.now().timestamp()),
|
|
org_id = 1,
|
|
creator_id = 1,
|
|
org = org,
|
|
creator = user
|
|
)
|
|
|
|
file_path = os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "repository"), file.document_handle)
|
|
with open(file_path, "w") as f:
|
|
f.write("Dummy file content")
|
|
|
|
db.add(file)
|
|
db.commit()
|
|
db.refresh(file)
|
|
return file
|
|
|
|
@staticmethod
|
|
def get_file(file_id: int) -> File | None:
|
|
return db.query(File).filter(File.id == file_id).first()
|
|
|
|
@staticmethod
|
|
def get_file_by_document_handle(document_handle: str) -> File | None:
|
|
return db.query(File).filter(File.document_handle == document_handle).first()
|
|
|
|
@staticmethod
|
|
def get_file_by_file_handle(file_handle: str) -> File | None:
|
|
return db.query(File).filter(File.file_handle == file_handle).first()
|
|
|
|
@staticmethod
|
|
def get_file_content(file: File) -> io.BytesIO:
|
|
file_path = os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "repository"), file.org.name, file.document_handle)
|
|
with open(file_path, "rb") as f:
|
|
return io.BytesIO(f.read())
|
|
|
|
@staticmethod
|
|
def list_files() -> list[Type[File]]:
|
|
return db.query(File).all()
|
|
|
|
@staticmethod
|
|
def list_files_in_org(org: Organization) -> list[Type[File]]:
|
|
return db.query(File).filter(File.org_id == org.id).all()
|
|
|
|
@staticmethod
|
|
def delete_file(file: File) -> File:
|
|
file_path = os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "repository"), file.org.name, file.document_handle)
|
|
os.remove(file_path)
|
|
file.file_handle = None
|
|
db.commit()
|
|
db.refresh(file)
|
|
return file |