sio-2425/delivery2/server/routes/user.py

191 lines
6.1 KiB
Python
Raw Normal View History

import base64
import json
from cryptography.exceptions import InvalidSignature
from flask import Blueprint, request, jsonify
from services import UserService, SessionService, OrganizationService, RoleService
from utils import Perm
user_bp = Blueprint("user", __name__)
@user_bp.route("/login", methods=["POST"])
def user_login():
data = request.json
if type(data) is str:
data = json.loads(data)
if "username" in data and "org" in data:
user = UserService.get_user_by_username(data["username"])
if not user:
return jsonify({"error": "User not found"}), 404
org = OrganizationService.get_organization_by_name(data["org"])
if not org:
return jsonify({"error": "Organization not found"}), 404
session = SessionService.create_session(user, org)
return jsonify(session.to_dict()), 201
elif session_token := request.headers.get("Authorization"):
session = SessionService.get_session(session_token)
if not session:
return jsonify({"error": "Not authenticated"}), 401
if session.verified:
return jsonify(session.to_dict()), 200
if not "signature" in data:
return jsonify({"error": "Missing required fields"}), 400
signature = data["signature"]
signature = base64.b64decode(signature)
try:
SessionService.verify_session(session_token, signature)
except InvalidSignature:
return jsonify({"error": "Invalid signature"}), 400
return jsonify(session.to_dict()), 200
return jsonify({"error": "Missing required fields"}), 400
@user_bp.route("/logout", methods=["POST"])
def user_logout():
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.get_session(session_token)
if not session:
return jsonify({"error": "Not authenticated"}), 401
SessionService.delete_session(session)
return jsonify({"message": "Logged out"}), 200
@user_bp.route("/list", methods=["GET"])
def user_list():
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token)
if isinstance(session, tuple):
return session
data = request.json
if type(data) is str:
data = json.loads(data)
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
if "username" in data:
user = UserService.get_user_by_username(data["username"])
if not user:
return jsonify({"error": "User not found"}), 404
return jsonify(user.to_dict()), 200
users = OrganizationService.get_users_in_organization(org)
return jsonify(users), 200
@user_bp.route("/create", methods=["POST"])
def user_create():
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, required_perms=[Perm.SUBJECT_NEW])
if isinstance(session, tuple):
return session
data = request.json
if type(data) is str:
data = json.loads(data)
if "username" not in data or "full_name" not in data or "email" not in data or "public_key" not in data:
return jsonify({"error": "Missing required fields"}), 400
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
user = UserService.get_user_by_username(data["username"])
if not user:
user = UserService.create_user(
username=data["username"],
full_name=data["full_name"],
email=data["email"],
public_key=data["public_key"],
org=org
)
return jsonify(user.to_dict()), 201
@user_bp.route("/<string:username>/roles", methods=["GET"])
def user_roles(username):
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token)
if isinstance(session, tuple):
return session
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
user = UserService.get_user_by_username(username)
if not user:
return jsonify({"error": "User not found"}), 404
roles = RoleService.get_roles_for_user(user, org)
return jsonify(roles), 200
@user_bp.route("/<string:username>/suspend", methods=["POST"])
def user_suspend(username):
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, required_perms=[Perm.SUBJECT_DOWN])
if isinstance(session, tuple):
return session
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
user = UserService.get_user_by_username(username)
if not user:
return jsonify({"error": "User not found"}), 404
return OrganizationService.suspend_user(org, user)
@user_bp.route("/<string:username>/activate", methods=["POST"])
def user_unsuspend(username):
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, required_perms=[Perm.SUBJECT_UP])
if isinstance(session, tuple):
return session
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
user = UserService.get_user_by_username(username)
if not user:
return jsonify({"error": "User not found"}), 404
return OrganizationService.activate_user(org, user)