Restructure for flask_sqlalchemy

Signed-off-by: Tiago Garcia <tiago.rgarcia@ua.pt>
This commit is contained in:
Tiago Garcia 2024-11-13 02:49:43 +00:00
parent 0b0b42dac7
commit 50cd873bdc
Signed by: TiagoRG
GPG Key ID: DFCD48E3F420DB42
16 changed files with 173 additions and 332 deletions

20
delivery1/server/app.py Normal file
View File

@ -0,0 +1,20 @@
from flask import Flask
from routes import org_bp, user_bp, file_bp
from database import db_connection
from models import Organization, User, File
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///database.db"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = True
app.config["SQLALCHEMY_AUTOCOMMIT"] = False
app.config["SQLALCHEMY_AUTOFLUSH"] = False
db_connection.init_app(app)
with app.app_context():
db_connection.create_all()
app.register_blueprint(org_bp, url_prefix="/org")
app.register_blueprint(user_bp, url_prefix="/user")
app.register_blueprint(file_bp, url_prefix="/file")
if __name__ == "__main__":
app.run(debug=True)

View File

@ -0,0 +1,4 @@
from flask_sqlalchemy import SQLAlchemy
db_connection = SQLAlchemy()
db = db_connection.session

View File

@ -1,11 +0,0 @@
import os
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
script_dir = os.path.dirname(os.path.abspath(__file__))
DATABASE_URL = f"sqlite:///{os.path.join(script_dir, 'database.db')}"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

View File

@ -1,13 +0,0 @@
import os
from database import Base, engine
from models import Organization, User, File
def setup_db():
db_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "database.db")
if os.path.exists(db_path):
os.remove(db_path)
Base.metadata.create_all(bind=engine)
if __name__ == "__main__":
setup_db()

View File

@ -1,19 +1,28 @@
from sqlalchemy import Integer, String, ForeignKey from database import db_connection
from sqlalchemy.orm import relationship, Mapped, mapped_column
from database import Base
from dataclasses import dataclass
@dataclass class File(db_connection.Model):
class File(Base):
__tablename__ = 'files' __tablename__ = 'files'
id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True) id = db_connection.Column(db_connection.Integer, primary_key=True, index=True)
file_handle: Mapped[str] = mapped_column(String, unique=True, nullable=False) file_handle = db_connection.Column(db_connection.String, unique=True, nullable=False)
document_handle: Mapped[str] = mapped_column(String, unique=True, nullable=True) document_handle = db_connection.Column(db_connection.String, unique=True, nullable=True)
name: Mapped[str] = mapped_column(String, nullable=False) name = db_connection.Column(db_connection.String, nullable=False)
created_at: Mapped[int] = mapped_column(Integer, nullable=False) created_at = db_connection.Column(db_connection.Integer, nullable=False)
org_id: Mapped[int] = mapped_column(Integer, ForeignKey('organizations.id'), nullable=False) # alg: db_connection.Column(db_connection.String, nullable=False)
creator_id: Mapped[int] = mapped_column(Integer, ForeignKey('users.id'), nullable=False) # key: db_connection.Column(db_connection.String, nullable=False)
org: Mapped['Organization'] = relationship('Organization', back_populates='files') org_id = db_connection.Column(db_connection.Integer, db_connection.ForeignKey('organizations.id'), nullable=False)
creator: Mapped['User'] = relationship('User', back_populates='files') creator_id = db_connection.Column(db_connection.Integer, db_connection.ForeignKey('users.id'), nullable=False)
org = db_connection.relationship('Organization', back_populates='files')
creator = db_connection.relationship('User', back_populates='files')
def to_dict(self):
return {
"id": self.id,
"file_handle": self.file_handle,
"document_handle": self.document_handle,
"name": self.name,
"created_at": self.created_at,
"org": {"id": self.org.id, "name": self.org.name},
"creator": {"id": self.creator.id, "username": self.creator.username}
}

View File

@ -1,14 +1,18 @@
from sqlalchemy import Integer, String, ForeignKey from database import db_connection
from sqlalchemy.orm import relationship, Mapped, mapped_column
from database import Base
from dataclasses import dataclass
@dataclass class Organization(db_connection.Model):
class Organization(Base):
__tablename__ = 'organizations' __tablename__ = 'organizations'
id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True) id = db_connection.Column(db_connection.Integer, primary_key=True, index=True)
name: Mapped[str] = mapped_column(String, unique=True, index=True, nullable=False) name = db_connection.Column(db_connection.String, unique=True, index=True, nullable=False)
owner_id: Mapped[int] = mapped_column(Integer, ForeignKey('users.id'), nullable=False) owner_id = db_connection.Column(db_connection.Integer, db_connection.ForeignKey('users.id'), nullable=False)
owner: Mapped['User'] = relationship('User', back_populates='orgs') owner = db_connection.relationship('User', back_populates='orgs')
files: Mapped[list['File']] = relationship('File', back_populates='org') files = db_connection.relationship('File', back_populates='org')
def to_dict(self):
return {
"id": self.id,
"name": self.name,
"owner": self.owner.to_dict(),
"files": [{"id": file.id, "name": file.name, "file_handle": file.file_handle} for file in self.files]
}

View File

@ -1,16 +1,24 @@
from sqlalchemy import Integer, String from database import db_connection
from sqlalchemy.orm import relationship, Mapped, mapped_column
from database import Base
from dataclasses import dataclass
@dataclass
class User(Base): class User(db_connection.Model):
__tablename__ = 'users' __tablename__ = 'users'
id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True) id = db_connection.Column(db_connection.Integer, primary_key=True, index=True)
username: Mapped[str] = mapped_column(String, unique=True, index=True, nullable=False) username = db_connection.Column(db_connection.String, unique=True, index=True, nullable=False)
full_name: Mapped[str] = mapped_column(String, nullable=False) full_name = db_connection.Column(db_connection.String, nullable=False)
email: Mapped[str] = mapped_column(String, unique=True, index=True, nullable=False) email = db_connection.Column(db_connection.String, unique=True, index=True, nullable=False)
public_key: Mapped[str] = mapped_column(String, nullable=False) public_key = db_connection.Column(db_connection.String, nullable=False)
orgs: Mapped[list['Organization']] = relationship('Organization', back_populates='owner') orgs = db_connection.relationship('Organization', back_populates='owner')
files: Mapped[list['File']] = relationship('File', back_populates='creator') files = db_connection.relationship('File', back_populates='creator')
def to_dict(self):
return {
"id": self.id,
"username": self.username,
"full_name": self.full_name,
"email": self.email,
"public_key": self.public_key,
"orgs": [{"id": org.id, "name": org.name} for org in self.orgs],
"files": [{"id": file.id, "name": file.name, "file_handle": file.file_handle} for file in self.files]
}

View File

@ -1,3 +1,3 @@
flask flask
sqlalchemy flask_sqlalchemy
pytest pytest

View File

@ -0,0 +1,3 @@
from .org import org_bp
from .user import user_bp
from .file import file_bp

View File

@ -0,0 +1,4 @@
from flask import Blueprint, request, jsonify
from services import FileService
file_bp = Blueprint("file", __name__)

View File

@ -0,0 +1,16 @@
from flask import Blueprint, request, jsonify
from services import OrganizationService
org_bp = Blueprint("org", __name__)
@org_bp.route("/create", methods=["POST"])
def create():
data = request.json
org = OrganizationService.create_organization(
name=data["name"],
username=data["username"],
full_name=data["full_name"],
email=data["email"],
public_key=data["public_key"]
)
return jsonify(org.to_dict()), 201

View File

@ -0,0 +1,4 @@
from flask import Blueprint, request, jsonify
from services import UserService
user_bp = Blueprint("user", __name__)

View File

@ -2,16 +2,13 @@ import os
from datetime import datetime from datetime import datetime
from typing import List, Type from typing import List, Type
from sqlalchemy.orm import Session from database import db
from models import File, Organization, User from models import File, Organization, User
class FileService: class FileService:
def __init__(self, db: Session): @staticmethod
self.db = db def create_dummy_file(org: Organization, user: User) -> File:
self.repos = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "repository")
def create_dummy_file(self, org: Organization, user: User) -> File:
file = File( file = File(
file_handle = "dummy_file", file_handle = "dummy_file",
document_handle = "org/dummy_file.txt", document_handle = "org/dummy_file.txt",
@ -23,34 +20,40 @@ class FileService:
creator = user creator = user
) )
file_path = os.path.join(self.repos, file.document_handle) file_path = os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "repository"), file.document_handle)
with open(file_path, "w") as f: with open(file_path, "w") as f:
f.write("Dummy file content") f.write("Dummy file content")
self.db.add(file) db.add(file)
self.db.commit() db.commit()
self.db.refresh(file) db.refresh(file)
return file return file
def get_file(self, file_id: int) -> File | None: @staticmethod
return self.db.query(File).filter(File.id == file_id).first() def get_file(file_id: int) -> File | None:
return db.query(File).filter(File.id == file_id).first()
def get_file_by_document_handle(self, document_handle: str) -> File | None: @staticmethod
return self.db.query(File).filter(File.document_handle == document_handle).first() def get_file_by_document_handle(document_handle: str) -> File | None:
return db.query(File).filter(File.document_handle == document_handle).first()
def get_file_by_file_handle(self, file_handle: str) -> File | None: @staticmethod
return self.db.query(File).filter(File.file_handle == file_handle).first() def get_file_by_file_handle(file_handle: str) -> File | None:
return db.query(File).filter(File.file_handle == file_handle).first()
def list_files(self) -> list[Type[File]]: @staticmethod
return self.db.query(File).all() def list_files() -> list[Type[File]]:
return db.query(File).all()
def list_files_in_org(self, org: Organization) -> list[Type[File]]: @staticmethod
return self.db.query(File).filter(File.org_id == org.id).all() def list_files_in_org(org: Organization) -> list[Type[File]]:
return db.query(File).filter(File.org_id == org.id).all()
def delete_file(self, file: File) -> File: @staticmethod
file_path = os.path.join(self.repos, file.document_handle) def delete_file(file: File) -> File:
file_path = os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "repository"), file.document_handle)
os.remove(file_path) os.remove(file_path)
file.document_handle = None file.document_handle = None
self.db.commit() db.commit()
self.db.refresh(file) db.refresh(file)
return file return file

View File

@ -1,18 +1,16 @@
import os.path import os.path
from sqlalchemy.orm import Session from database import db
from models import Organization from models import Organization
class OrganizationService: class OrganizationService:
def __init__(self, db: Session): @staticmethod
self.db = db def create_organization(name: str, username: str, full_name: str, email: str, public_key: str) -> Organization:
def create_organization(self, name: str, username: str, full_name: str, email: str, public_key: str) -> Organization:
from services import UserService from services import UserService
user = UserService(self.db).get_user_by_username(username) user = UserService().get_user_by_username(username)
if not user: if not user:
user = UserService(self.db).create_user(username, full_name, email, public_key) user = UserService().create_user(username, full_name, email, public_key)
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
repos = os.path.join(project_root, "repository") repos = os.path.join(project_root, "repository")
@ -25,19 +23,22 @@ class OrganizationService:
owner=user owner=user
) )
self.db.add(organization) db.add(organization)
self.db.commit() db.commit()
self.db.refresh(organization) db.refresh(organization)
UserService(self.db).add_org_to_user(user, organization) UserService().add_org_to_user(user, organization)
return organization return organization
def get_organization(self, org_id: int) -> Organization | None: @staticmethod
return self.db.query(Organization).filter(Organization.id == org_id).first() def get_organization(org_id: int) -> Organization | None:
return db.query(Organization).filter(Organization.id == org_id).first()
def get_organization_by_name(self, name: str) -> Organization | None: @staticmethod
return self.db.query(Organization).filter(Organization.name == name).first() def get_organization_by_name(name: str) -> Organization | None:
return db.query(Organization).filter(Organization.name == name).first()
def list_organizations(self): @staticmethod
return self.db.query(Organization).all() def list_organizations():
return db.query(Organization).all()

View File

@ -1,12 +1,10 @@
from sqlalchemy.orm import Session from database import db
from models import User, Organization from models import User, Organization
class UserService: class UserService:
def __init__(self, db: Session): @staticmethod
self.db = db def create_user(username: str, full_name: str, email: str, public_key: str, org: Organization = None) -> User:
def create_user(self, username: str, full_name: str, email: str, public_key: str, org: Organization = None) -> User:
user = User( user = User(
username=username, username=username,
full_name=full_name, full_name=full_name,
@ -14,19 +12,22 @@ class UserService:
public_key=public_key, public_key=public_key,
orgs=[org] if org else [] orgs=[org] if org else []
) )
self.db.add(user) db.add(user)
self.db.commit() db.commit()
self.db.refresh(user) db.refresh(user)
return user return user
def get_user(self, user_id: int) -> User | None: @staticmethod
return self.db.query(User).filter(User.id == user_id).first() def get_user(user_id: int) -> User | None:
return db.query(User).filter(User.id == user_id).first()
def get_user_by_username(self, username: str) -> User | None: @staticmethod
return self.db.query(User).filter(User.username == username).first() def get_user_by_username(username: str) -> User | None:
return db.query(User).filter(User.username == username).first()
def add_org_to_user(self, user: User, org: Organization) -> User: @staticmethod
def add_org_to_user(user: User, org: Organization) -> User:
user.orgs.append(org) user.orgs.append(org)
self.db.commit() db.commit()
self.db.refresh(user) db.refresh(user)
return user return user

View File

@ -1,212 +0,0 @@
import os
from pprint import PrettyPrinter
import database.setup_db
from database import SessionLocal
from services import OrganizationService, UserService, FileService
database.setup_db.setup_db()
db = SessionLocal()
org_service = OrganizationService(db)
user_service = UserService(db)
file_service = FileService(db)
def test_create_organization():
org = org_service.create_organization(
name="Test Org",
username="testuser",
full_name="Test User",
email="test@mail.com",
public_key="abc123"
)
assert org.name == "Test Org"
assert org.owner.username == "testuser"
assert org.owner.full_name == "Test User"
assert org.owner.email == "test@mail.com"
assert org.owner.public_key == "abc123"
assert org.owner.orgs[0].name == "Test Org"
db.delete(org.owner)
db.delete(org)
db.commit()
def test_create_user():
user = user_service.create_user(
username="testuser",
full_name="Test User",
email="test@mail.com",
public_key="abc123"
)
assert user.username == "testuser"
assert user.full_name == "Test User"
assert user.email == "test@mail.com"
assert user.public_key == "abc123"
db.delete(user)
db.commit()
def test_get_organization():
org = org_service.create_organization(
name="Test Org",
username="testuser",
full_name="Test User",
email="test@mail.com",
public_key="abc123"
)
org2 = org_service.get_organization(org.id)
org3 = org_service.get_organization_by_name("Test Org")
assert org2.name == "Test Org"
assert org3.name == "Test Org"
assert org2.owner.username == "testuser"
assert org3.owner.username == "testuser"
db.delete(org.owner)
db.delete(org)
db.commit()
def test_list_organizations():
org = org_service.create_organization(
name="Test Org",
username="testuser",
full_name="Test User",
email="test@mail.com",
public_key="abc123"
)
org2 = org_service.create_organization(
name="Test Org2",
username="testuser2",
full_name="Test User2",
email="mail2@test.com",
public_key="def456"
)
orgs = org_service.list_organizations()
assert len(orgs) == 2
assert orgs[0].name == "Test Org"
assert orgs[1].name == "Test Org2"
assert orgs == [org, org2]
db.delete(org.owner)
db.delete(org)
db.delete(org2.owner)
db.delete(org2)
db.commit()
def test_get_user():
user = user_service.create_user(
username="testuser",
full_name="Test User",
email="test@mail.com",
public_key="abc123"
)
user2 = user_service.get_user(user.id)
user3 = user_service.get_user_by_username("testuser")
assert user2.username == "testuser"
assert user3.username == "testuser"
assert user2.full_name == "Test User"
assert user3.full_name == "Test User"
assert user2.email == "test@mail.com"
assert user3.email == "test@mail.com"
assert user2.public_key == "abc123"
assert user3.public_key == "abc123"
org = org_service.create_organization(
name="Test Org",
username="testuser",
full_name="Test User",
email="test@mail.com",
public_key="abc123"
)
user4 = user_service.get_user(user.id)
assert user4.orgs[0].name == "Test Org"
db.delete(org)
db.delete(user)
db.commit()
def test_create_dummy_file():
org = org_service.create_organization(
name="org",
username="testuser",
full_name="Test User",
email="test@mail.com",
public_key="abc123"
)
file = file_service.create_dummy_file(org, org.owner)
assert file.file_handle == "dummy_file"
assert file.document_handle == "org/dummy_file.txt"
assert file.name == "dummy_file"
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
repos = os.path.join(project_root, "repository")
with open(os.path.join(repos, file.document_handle), "r") as f:
assert f.read() == "Dummy file content"
os.remove(os.path.join(repos, file.document_handle))
db.delete(file)
db.delete(org.owner)
db.delete(org)
db.commit()
def test_get_file():
org = org_service.create_organization(
name="org",
username="testuser",
full_name="Test User",
email="test@mail.com",
public_key="abc123"
)
file = file_service.create_dummy_file(org, org.owner)
file2 = file_service.get_file_by_file_handle("dummy_file")
assert file2.file_handle == "dummy_file"
assert file2.document_handle == "org/dummy_file.txt"
assert file2.name == "dummy_file"
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
repos = os.path.join(project_root, "repository")
os.remove(os.path.join(repos, file.document_handle))
db.delete(file)
db.delete(org.owner)
db.delete(org)
db.commit()
def test_delete_file():
org = org_service.create_organization(
name="org",
username="testuser",
full_name="Test User",
email="test@mail.com",
public_key="abc123"
)
file = file_service.create_dummy_file(org, org.owner)
file2 = file_service.delete_file(file)
assert file2.file_handle == "dummy_file"
assert file2.document_handle is None
assert os.path.exists(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "repository", "org", "dummy_file.txt")) == False
db.delete(file)
db.delete(org.owner)
db.delete(org)
db.commit()