File management

Signed-off-by: Tiago Garcia <tiago.rgarcia@ua.pt>
This commit is contained in:
Tiago Garcia 2024-11-20 00:32:16 +00:00
parent b6c57ed294
commit 11adcb21ba
Signed by: TiagoRG
GPG Key ID: DFCD48E3F420DB42
14 changed files with 432 additions and 106 deletions

View File

@ -1,3 +1,4 @@
import os
import sqlalchemy.exc import sqlalchemy.exc
from flask import Flask, request, jsonify from flask import Flask, request, jsonify
from routes import org_bp, user_bp, file_bp from routes import org_bp, user_bp, file_bp
@ -37,6 +38,12 @@ def reset():
with app.app_context(): with app.app_context():
db_connection.drop_all() db_connection.drop_all()
db_connection.create_all() db_connection.create_all()
repos = os.path.join(os.path.dirname(os.path.abspath(__file__)), "repository")
for repo in os.listdir(repos):
if os.path.isdir(os.path.join(repos, repo)):
for file in os.listdir(os.path.join(repos, repo)):
os.remove(os.path.join(repos, repo, file))
os.rmdir(os.path.join(repos, repo))
except sqlalchemy.exc.OperationalError: except sqlalchemy.exc.OperationalError:
return jsonify({"error": "Database error"}), 500 return jsonify({"error": "Database error"}), 500
return jsonify({"message": "Database reset"}), 200 return jsonify({"message": "Database reset"}), 200

View File

@ -5,16 +5,16 @@ class File(db_connection.Model):
__tablename__ = 'files' __tablename__ = 'files'
id = db_connection.Column(db_connection.Integer, primary_key=True, index=True) id = db_connection.Column(db_connection.Integer, primary_key=True, index=True)
file_handle = db_connection.Column(db_connection.String, unique=True, nullable=False) file_handle = db_connection.Column(db_connection.String, unique=True, nullable=True)
document_handle = db_connection.Column(db_connection.String, unique=True, nullable=True) document_handle = db_connection.Column(db_connection.String, unique=True, nullable=False)
name = db_connection.Column(db_connection.String, nullable=False) name = db_connection.Column(db_connection.String, nullable=False)
created_at = db_connection.Column(db_connection.Integer, nullable=False) created_at = db_connection.Column(db_connection.Integer, nullable=False)
# alg: db_connection.Column(db_connection.String, nullable=False) key = db_connection.Column(db_connection.String, nullable=False)
# key: db_connection.Column(db_connection.String, nullable=False) alg = db_connection.Column(db_connection.String, nullable=False)
org_id = db_connection.Column(db_connection.Integer, db_connection.ForeignKey('organizations.id'), nullable=False) org_id = db_connection.Column(db_connection.Integer, db_connection.ForeignKey('organizations.id'), nullable=False)
creator_id = db_connection.Column(db_connection.Integer, db_connection.ForeignKey('users.id'), nullable=False) creator_id = db_connection.Column(db_connection.Integer, db_connection.ForeignKey('users.id'), nullable=False)
org = db_connection.relationship('Organization', back_populates='files') org = db_connection.relationship('Organization', backref=db_connection.backref('org_files', uselist=False))
creator = db_connection.relationship('User', back_populates='files') creator = db_connection.relationship('User', backref=db_connection.backref('created_files', uselist=False))
def to_dict(self): def to_dict(self):
return { return {
@ -23,6 +23,8 @@ class File(db_connection.Model):
"document_handle": self.document_handle, "document_handle": self.document_handle,
"name": self.name, "name": self.name,
"created_at": self.created_at, "created_at": self.created_at,
"key": self.key,
"alg": self.alg,
"org": {"id": self.org.id, "name": self.org.name}, "org": {"id": self.org.id, "name": self.org.name},
"creator": {"id": self.creator.id, "username": self.creator.username} "creator": {"id": self.creator.id, "username": self.creator.username}
} }

View File

@ -6,20 +6,19 @@ class Organization(db_connection.Model):
id = db_connection.Column(db_connection.Integer, primary_key=True, index=True) id = db_connection.Column(db_connection.Integer, primary_key=True, index=True)
name = db_connection.Column(db_connection.String, unique=True, index=True, nullable=False) name = db_connection.Column(db_connection.String, unique=True, index=True, nullable=False)
users = db_connection.Column(db_connection.JSON, nullable=False, default=dict) users = db_connection.Column(db_connection.JSON, nullable=False, default=dict)
owner_id = db_connection.Column(db_connection.Integer, db_connection.ForeignKey('users.id')) users_id = db_connection.Column(db_connection.Integer, db_connection.ForeignKey('users.id'))
owner = db_connection.relationship('User', backref=db_connection.backref('owned_organization', uselist=False)) manager = db_connection.relationship('User', backref=db_connection.backref('owned_organization', uselist=False))
files = db_connection.relationship('File', back_populates='org')
def to_dict(self): def to_dict(self):
return { return {
"id": self.id, "id": self.id,
"name": self.name, "name": self.name,
"owner": self.owner.to_dict(), "manager": self.manager.to_dict(),
"users": [{"id": user_id, "user_data": { "users": [{"id": user_id, "user_data": {
"username": user_data["username"], "username": user_data["username"],
"full_name": user_data["full_name"], "full_name": user_data["full_name"],
"email": user_data["email"], "email": user_data["email"],
"status": user_data["status"] "status": user_data["status"]
}} for user_id, user_data in self.users.items()], }} for user_id, user_data in self.users.items()],
"files": [{"id": file.id, "name": file.name, "file_handle": file.file_handle} for file in self.files] # "files": [{"id": file.id, "name": file.name, "file_handle": file.file_handle} for file in self.files]
} }

View File

@ -12,8 +12,6 @@ class User(db_connection.Model):
public_keys = db_connection.Column(db_connection.JSON, nullable=False, default=dict) public_keys = db_connection.Column(db_connection.JSON, nullable=False, default=dict)
orgs = db_connection.Column(db_connection.JSON, nullable=False, default=dict) orgs = db_connection.Column(db_connection.JSON, nullable=False, default=dict)
files = db_connection.relationship('File', back_populates='creator')
def to_dict(self): def to_dict(self):
return { return {
"id": self.id, "id": self.id,
@ -21,5 +19,5 @@ class User(db_connection.Model):
"full_name": self.full_name, "full_name": self.full_name,
"email": self.email, "email": self.email,
"orgs": [{"id": org_id, "name": org_data["name"], "status": org_data["status"]} for org_id, org_data in self.orgs.items()], "orgs": [{"id": org_id, "name": org_data["name"], "status": org_data["status"]} for org_id, org_data in self.orgs.items()],
"files": [{"id": file.id, "name": file.name, "file_handle": file.file_handle} for file in self.files] # "files": [{"id": file.id, "name": file.name, "file_handle": file.file_handle} for file in self.files]
} }

View File

@ -1,14 +1,175 @@
from flask import Blueprint, request, jsonify from flask import Blueprint, request, jsonify, send_file, Response
from services import FileService
import utils
from services import FileService, OrganizationService, UserService, SessionService
file_bp = Blueprint("file", __name__) file_bp = Blueprint("file", __name__)
upload_service = FileService()
@file_bp.route("/get", methods=["GET"])
def file_get(): @file_bp.route("/get/<string:file_handle>/content", methods=["GET"])
data = request.json def file_get_content(file_handle: str):
file_handle = data["file_handle"]
file = FileService.get_file_by_file_handle(file_handle) file = FileService.get_file_by_file_handle(file_handle)
if not file: if not file:
return jsonify({"error": "File not found"}), 404 return jsonify({"error": "File not found"}), 404
return jsonify(file.to_dict()), 200
file_content = FileService.get_file_content(file)
return send_file(file_content, as_attachment=True, download_name=file.name)
@file_bp.route("/get/<string:document_handle>/metadata", methods=["GET"])
def file_get_metadata(document_handle: str):
session_token = request.headers.get("Authorization")
session = SessionService.validate_session(session_token)
if isinstance(session, tuple):
return session
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
file = FileService.get_file_by_document_handle(document_handle)
if not file:
return jsonify({"error": "File not found"}), 404
return jsonify(file.to_dict())
@file_bp.route("/upload/metadata", methods=["POST"])
def file_upload_metadata():
session_token = request.headers.get("Authorization")
print(session_token)
session = SessionService.validate_session(session_token)
if isinstance(session, tuple):
return session
data = request.json
if "document_name" not in data or "key" not in data or "alg" not in data:
return jsonify({"error": "Missing required fields"}), 400
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
user = UserService.get_user(session.user_id)
if not user:
return jsonify({"error": "User not found"}), 404
file = upload_service.create_file(session.token, org, user, data["document_name"], data["key"], data["alg"])
return jsonify(file.to_dict()), 201
@file_bp.route("/upload/content", methods=["POST"])
def file_upload_content():
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token)
if isinstance(session, tuple):
return session
if "file" not in request.files:
return jsonify({"error": "No file data"}), 400
file = request.files.get("file")
if file.filename == "":
return jsonify({"error": "No file selected for uploading"}), 400
if not file:
return jsonify({"error": "Invalid file data"}), 400
file = upload_service.write_file(session_token, file.stream)
if isinstance(file, tuple):
return file
return jsonify(file.to_dict()), 201
@file_bp.route("/list", methods=["GET"])
def file_list():
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token)
if isinstance(session, tuple):
return session
data = request.json
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
if "datetime" in data:
try:
datetime_value = int(data["datetime"]["value"])
datetime_relation = data["datetime"]["relation"]
except ValueError:
return jsonify({"error": "Invalid datetime value"}), 400
if "username" in data:
user = UserService.get_user_by_username(data["username"])
if not user:
return jsonify({"error": "User not found"}), 404
files = FileService.list_files_in_org(org)
return jsonify([file.to_dict() for file in files if file.creator_id == user.id and (
utils.check_valid_time(file.created_at, datetime_value, datetime_relation)
if "datetime" in data else True
)])
files = FileService.list_files_in_org(org)
return jsonify([file.to_dict() for file in files if (utils.check_valid_time(file.created_at, datetime_value, datetime_relation) if "datetime" in data else True)])
@file_bp.route("/delete/<string:document_handle>", methods=["POST"])
def file_delete(document_handle: str):
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token)
if isinstance(session, tuple):
return session
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
file = FileService.get_file_by_document_handle(document_handle)
if not file:
return jsonify({"error": "File not found"}), 404
if file.creator_id != session.user_id:
return jsonify({"error": "Not authorized to delete file"}), 403
file = FileService.delete_file(file)
return jsonify(file.to_dict())
################################################
@file_bp.route("/create_dummy", methods=["POST"])
def file_create_dummy():
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token)
if isinstance(session, tuple):
return session
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
user = UserService.get_user(session.user_id)
if not user:
return jsonify({"error": "User not found"}), 404
file = FileService.create_dummy_file(org, user)
return jsonify(file.to_dict()), 201

View File

@ -1,13 +1,17 @@
import json import json
import utils
from flask import Blueprint, request, jsonify from flask import Blueprint, request, jsonify
from services import UserService, SessionService, OrganizationService from services import UserService, SessionService, OrganizationService
from utils import data_checks
user_bp = Blueprint("user", __name__) user_bp = Blueprint("user", __name__)
@user_bp.route("/login", methods=["POST"]) @user_bp.route("/login", methods=["POST"])
def user_login(): def user_login():
data = request.json data = request.json
if "username" not in data or "org" not in data:
return jsonify({"error": "Missing required fields"}), 400
user = UserService.get_user_by_username(data["username"]) user = UserService.get_user_by_username(data["username"])
if not user: if not user:
return jsonify({"error": "User not found"}), 404 return jsonify({"error": "User not found"}), 404
@ -16,25 +20,17 @@ def user_login():
if not org: if not org:
return jsonify({"error": "Organization not found"}), 404 return jsonify({"error": "Organization not found"}), 404
id_str = str(org.id)
if id_str not in user.public_keys:
return jsonify({"error": "User not associated with organization"}), 403
if user.public_keys[id_str] != data["public_key"]:
return jsonify({"error": "Invalid public key"}), 403
session = SessionService.create_session(user, org) session = SessionService.create_session(user, org)
return jsonify(session.to_dict()), 201 return jsonify(session.to_dict()), 201
@user_bp.route("/logout", methods=["POST"]) @user_bp.route("/logout", methods=["POST"])
def user_logout(): def user_logout():
data = request.json session_token = request.headers.get("Authorization")
session_file = data["session_file"] if not session_token:
session_data = json.loads(session_file) return jsonify({"error": "No session token"}), 400
session_token = session_data["token"]
session = SessionService.get_session(session_token)
session = SessionService.get_session(session_token)
if not session: if not session:
return jsonify({"error": "Not authenticated"}), 401 return jsonify({"error": "Not authenticated"}), 401
@ -44,17 +40,16 @@ def user_logout():
@user_bp.route("/list", methods=["GET"]) @user_bp.route("/list", methods=["GET"])
def user_list(): def user_list():
data = request.json session_token = request.headers.get("Authorization")
if "session_file" not in data: if not session_token:
return jsonify({"error": "No session file"}), 400 return jsonify({"error": "No session token"}), 400
session_file = data["session_file"] session = SessionService.validate_session(session_token)
session_data = json.loads(session_file)
session = data_checks.validate_session_file(session_data)
if isinstance(session, tuple): if isinstance(session, tuple):
return session return session
data = request.json
org = OrganizationService.get_organization(session.org_id) org = OrganizationService.get_organization(session.org_id)
if not org: if not org:
return jsonify({"error": "Organization not found"}), 404 return jsonify({"error": "Organization not found"}), 404
@ -71,22 +66,23 @@ def user_list():
@user_bp.route("/create", methods=["POST"]) @user_bp.route("/create", methods=["POST"])
def user_create(): def user_create():
data = request.json session_token = request.headers.get("Authorization")
if "session_file" not in data or "username" not in data or "full_name" not in data or "email" not in data or "public_key" not in data: if not session_token:
return jsonify({"error": "Missing required fields"}), 400 return jsonify({"error": "No session token"}), 400
session_file = data["session_file"] session = SessionService.validate_session(session_token)
session_data = json.loads(session_file)
session = data_checks.validate_session_file(session_data)
if isinstance(session, tuple): if isinstance(session, tuple):
return session return session
data = request.json
if "username" not in data or "full_name" not in data or "email" not in data or "public_key" not in data:
return jsonify({"error": "Missing required fields"}), 400
org = OrganizationService.get_organization(session.org_id) org = OrganizationService.get_organization(session.org_id)
if not org: if not org:
return jsonify({"error": "Organization not found"}), 404 return jsonify({"error": "Organization not found"}), 404
if org.owner.id != session.user_id: if org.manager.id != session.user_id:
return jsonify({"error": "Not authorized to create users"}), 403 return jsonify({"error": "Not authorized to create users"}), 403
user = UserService.get_user_by_username(data["username"]) user = UserService.get_user_by_username(data["username"])
@ -102,52 +98,46 @@ def user_create():
return jsonify(user.to_dict()), 201 return jsonify(user.to_dict()), 201
@user_bp.route("/suspend", methods=["POST"]) @user_bp.route("/<string:username>/suspend", methods=["POST"])
def user_suspend(): def user_suspend(username):
data = request.json session_token = request.headers.get("Authorization")
if "session_file" not in data or "username" not in data: if not session_token:
return jsonify({"error": "Missing required fields"}), 400 return jsonify({"error": "No session token"}), 400
session_file = data["session_file"] session = SessionService.validate_session(session_token)
session_data = json.loads(session_file)
session = data_checks.validate_session_file(session_data)
if isinstance(session, tuple): if isinstance(session, tuple):
return session return session
org = OrganizationService.get_organization(session.org_id) org = OrganizationService.get_organization(session.org_id)
if not org: if not org:
return jsonify({"error": "Organization not found"}), 404 return jsonify({"error": "Organization not found"}), 404
if org.owner.id != session.user_id: if org.manager.id != session.user_id:
return jsonify({"error": "Not authorized to suspend users"}), 403 return jsonify({"error": "Not authorized to suspend users"}), 403
user = UserService.get_user_by_username(data["username"]) user = UserService.get_user_by_username(username)
if not user: if not user:
return jsonify({"error": "User not found"}), 404 return jsonify({"error": "User not found"}), 404
return OrganizationService.suspend_user(org, user) return OrganizationService.suspend_user(org, user)
@user_bp.route("/activate", methods=["POST"]) @user_bp.route("/<string:username>/activate", methods=["POST"])
def user_unsuspend(): def user_unsuspend(username):
data = request.json session_token = request.headers.get("Authorization")
if "session_file" not in data or "username" not in data: if not session_token:
return jsonify({"error": "Missing required fields"}), 400 return jsonify({"error": "No session token"}), 400
session_file = data["session_file"] session = SessionService.validate_session(session_token)
session_data = json.loads(session_file)
session = data_checks.validate_session_file(session_data)
if isinstance(session, tuple): if isinstance(session, tuple):
return session return session
org = OrganizationService.get_organization(session.org_id) org = OrganizationService.get_organization(session.org_id)
if not org: if not org:
return jsonify({"error": "Organization not found"}), 404 return jsonify({"error": "Organization not found"}), 404
if org.owner.id != session.user_id: if org.manager.id != session.user_id:
return jsonify({"error": "Not authorized to unsuspend users"}), 403 return jsonify({"error": "Not authorized to unsuspend users"}), 403
user = UserService.get_user_by_username(data["username"]) user = UserService.get_user_by_username(username)
if not user: if not user:
return jsonify({"error": "User not found"}), 404 return jsonify({"error": "User not found"}), 404

View File

@ -1,12 +1,57 @@
import os import os
import io
from datetime import datetime from datetime import datetime
from typing import List, Type from typing import List, Type
from flask import jsonify
from database import db from database import db
from models import File, Organization, User from models import File, Organization, User
from utils import get_hash
class FileService: class FileService:
def __init__(self):
self.current_requests = {}
def create_file(self, session_token: str, org: Organization, user: User, file_name: str, key: str, alg: str) -> File:
file = File(
file_handle = None,
document_handle = get_hash(file_name),
name = file_name,
created_at = int(datetime.now().timestamp()),
key = key,
alg = alg,
org_id = org.id,
creator_id = user.id,
org = org,
creator = user
)
self.current_requests[session_token] = file
db.add(file)
db.commit()
db.refresh(file)
return file
def write_file(self, session_token: str, file_data: bytes) -> File | tuple:
if session_token not in self.current_requests:
return jsonify({"error": "No file upload request found"}), 400
file = self.current_requests[session_token]
file_path = os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "repository"), file.org.name, file.document_handle)
with open(file_path, "wb") as f:
f.write(file_data)
file.file_handle = get_hash(file_data)
db.commit()
db.refresh(file)
del self.current_requests[session_token]
return file
@staticmethod @staticmethod
def create_dummy_file(org: Organization, user: User) -> File: def create_dummy_file(org: Organization, user: User) -> File:
file = File( file = File(
@ -41,6 +86,12 @@ class FileService:
def get_file_by_file_handle(file_handle: str) -> File | None: def get_file_by_file_handle(file_handle: str) -> File | None:
return db.query(File).filter(File.file_handle == file_handle).first() return db.query(File).filter(File.file_handle == file_handle).first()
@staticmethod
def get_file_content(file: File) -> io.BytesIO:
file_path = os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "repository"), file.org.name, file.document_handle)
with open(file_path, "rb") as f:
return io.BytesIO(f.read())
@staticmethod @staticmethod
def list_files() -> list[Type[File]]: def list_files() -> list[Type[File]]:
return db.query(File).all() return db.query(File).all()
@ -51,9 +102,9 @@ class FileService:
@staticmethod @staticmethod
def delete_file(file: File) -> File: def delete_file(file: File) -> File:
file_path = os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "repository"), file.document_handle) file_path = os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "repository"), file.org.name, file.document_handle)
os.remove(file_path) os.remove(file_path)
file.document_handle = None file.file_handle = None
db.commit() db.commit()
db.refresh(file) db.refresh(file)
return file return file

View File

@ -20,7 +20,7 @@ class OrganizationService:
organization = Organization( organization = Organization(
name=name, name=name,
owner=user, manager=user,
users={user.id: { users={user.id: {
"username": user.username, "username": user.username,
"full_name": user.full_name, "full_name": user.full_name,
@ -76,8 +76,8 @@ class OrganizationService:
if OrganizationService.get_user_status(org, user.id) != "active": if OrganizationService.get_user_status(org, user.id) != "active":
return {"error": "User already suspended"}, 400 return {"error": "User already suspended"}, 400
if org.owner.id == user.id: if org.manager.id == user.id:
return {"error": "Cannot suspend owner"}, 400 return {"error": "Cannot suspend manager"}, 400
org.users[str(user.id)]["status"] = "suspended" org.users[str(user.id)]["status"] = "suspended"
flag_modified(org, "users") flag_modified(org, "users")

View File

@ -1,6 +1,7 @@
import secrets import secrets
from database import db from database import db
from models import Session, User, Organization from models import Session, User, Organization
from flask import jsonify
class SessionService: class SessionService:
@ -23,4 +24,25 @@ class SessionService:
@staticmethod @staticmethod
def delete_session(session: Session) -> None: def delete_session(session: Session) -> None:
db.delete(session) db.delete(session)
db.commit() db.commit()
@staticmethod
def validate_session(token: str) -> tuple | Session:
from services import OrganizationService
if "Bearer" in token:
token = token.split(" ")[1]
session = SessionService.get_session(token)
if not session:
return jsonify({"error": "Not authenticated"}), 401
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
status = OrganizationService.get_user_status(org, session.user_id)
if status != "active":
return jsonify({"error": "User is not active"}), 403
return session

View File

@ -0,0 +1,106 @@
### Reset database
POST http://localhost:5000/reset
Content-Type: application/json
{
"password": "123"
}
### Create a new organization
POST http://localhost:5000/org/create
Content-Type: application/json
{
"name": "org",
"username": "username",
"full_name": "Full Name",
"email": "user@mail.com",
"public_key": "null"
}
### Login
POST http://localhost:5000/user/login
Content-Type: application/json
{
"username": "username",
"org": "org"
}
> {% client.global.set("token", response.body["token"]) %}
### Upload dummy file metadata
POST http://localhost:5000/file/upload/metadata
Content-Type: application/json
Authorization: {{token}}
{
"document_name": "dummy_file.txt",
"key": "arfarf",
"alg": "ftgtrg"
}
> {% client.global.set("document_handle", response.body["document_handle"]) %}
#### Upload dummy file content, through send file
#POST http://localhost:5000/file/upload/content
#Content-Type: multipart/form-data; boundary=----WebKitFormBoundary7MA4YWxkTrZu0gW
#Authorization: {{token}}
#
#----WebKitFormBoundary7MA4YWxkTrZu0gW
#Content-Disposition: form-data; name="file"; filename="dummy_file.txt"
#Content-Type: text/plain
#
#This is a dummy file
#----WebKitFormBoundary7MA4YWxkTrZu0gW
### List files (with no data)
GET http://localhost:5000/file/list
Content-Type: application/json
Authorization: {{token}}
{}
### List files by user
GET http://localhost:5000/file/list
Content-Type: application/json
Authorization: {{token}}
{
"username": "username"
}
### List files by datetime
GET http://localhost:5000/file/list
Content-Type: application/json
Authorization: {{token}}
{
"datetime": {
"relation": "ot",
"value": "1731863876"
}
}
### Get file metadata
GET http://localhost:5000/file/get/{{document_handle}}/metadata
Content-Type: application/json
Authorization: {{token}}
> {% client.global.set("file_handle", response.body["file_handle"]) %}
### Get file content
GET http://localhost:5000/file/get/{{file_handle}}/content
Content-Type: application/json
Authorization: {{token}}
### Delete dummy file
POST http://localhost:5000/file/delete/{{document_handle}}
Content-Type: application/json
Authorization: {{token}}
### Logout
POST http://localhost:5000/user/logout
Content-Type: application/json
Authorization: {{token}}

View File

@ -1 +1,2 @@
from .data_checks import validate_session_file from .checks import check_valid_time
from .hashing import get_hash

View File

@ -0,0 +1,9 @@
def check_valid_time(time: int, relation_time: int, relation: str) -> bool:
if relation == 'ot':
return time < relation_time
elif relation == 'nt':
return time > relation_time
elif relation == 'eq':
return time == relation_time
else:
raise ValueError('Invalid relation: {}'.format(relation))

View File

@ -1,28 +0,0 @@
import json
from flask import jsonify
from services import SessionService, OrganizationService
from models import Session
def validate_session_file(data) -> tuple | Session:
"""
Check if the session file is valid, and return the session object if it is
:param data: session file data (json)
:return: Session object or error response
"""
if "token" not in data:
return jsonify({"error": "No session token"}), 400
session_token = data["token"]
session = SessionService.get_session(session_token)
if not session:
return jsonify({"error": "Not authenticated"}), 401
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
status = OrganizationService.get_user_status(org, session.user_id)
if status != "active":
return jsonify({"error": "User is not active"}), 403
return session

View File

@ -0,0 +1,8 @@
import cryptography.hazmat.primitives.hashes
def get_hash(data):
if isinstance(data, str):
data = data.encode('utf-8')
digest = cryptography.hazmat.primitives.hashes.Hash(cryptography.hazmat.primitives.hashes.SHA256())
digest.update(data)
return digest.finalize().hex()