import base64 import json from cryptography.hazmat.primitives import serialization from cryptography.exceptions import InvalidSignature from flask import Blueprint, request, jsonify from services import UserService, SessionService, OrganizationService, RoleService from utils import Perm, generate_parameters, generate_key_pair, derive_keys, decrypt from utils.comms_encryption import encrypt_response, decrypt_request from database import security_service from utils.exceptions import SessionException user_bp = Blueprint("user", __name__) @user_bp.route("/login", methods=["POST"]) def user_login(): if request.headers.get("Content-Type") == "application/octet-stream": if session_token := request.headers.get("Authorization"): data = request.data session = SessionService.get_session(session_token) if not session: return jsonify({"error": "Not authenticated"}), 401 if session.verified: return encrypt_response(session.to_dict(), 201, security_service.get_key(session)) try: data = decrypt_request(data.decode(), security_service.get_key(session)) except Exception: return jsonify({"error": "Invalid encryption"}), 400 if not "signature" in data: return jsonify({"error": "Missing required fields"}), 400 signature = data["signature"] signature = base64.b64decode(signature) session = SessionService.verify_session(session_token, signature) if isinstance(session, tuple): return session return encrypt_response(session.to_dict(), 201, security_service.get_key(session)) return jsonify({"error": "No session token"}), 400 if request.json and "username" in request.json and "org" in request.json: data = request.json if type(data) is str: data = json.loads(data) user = UserService.get_user_by_username(data["username"]) if not user: return jsonify({"error": "User not found"}), 404 org = OrganizationService.get_organization_by_name(data["org"]) if not org: return jsonify({"error": "Organization not found"}), 404 session = SessionService.create_session(user, org) payload = { "token": session.token, "challenge": session.challenge } return encrypt_response(payload, 201, security_service.get_key(session)) return jsonify({"error": "Missing required fields"}), 400 @user_bp.route("/dh", methods=["POST"]) def user_dh(): data = request.json if type(data) is str: data = json.loads(data) if "user" not in data or "org" not in data: return jsonify({"error": "Missing required fields"}), 400 if "parameters" in data: parameters_bytes = bytes.fromhex(data["parameters"]) parameters = serialization.load_pem_parameters(parameters_bytes) private_key, public_key = generate_key_pair(parameters) security_service.register_key(data['user'], data['org'], private_key) public_key_content = public_key.public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo) return jsonify({ "public_key": public_key_content.hex() }), 200 elif "public_key" in data: client_public_key = data["public_key"] client_public_key = serialization.load_pem_public_key(bytes.fromhex(client_public_key)) private_key = security_service.keys[f'{data["org"]}/{data["user"]}'] derived_key = derive_keys(private_key, client_public_key) security_service.register_key(data['user'], data['org'], derived_key) return jsonify({"message": "Keys exchanged"}), 200 else: return jsonify({"error": "Missing required fields"}), 400 @user_bp.route("/logout", methods=["POST"]) def user_logout(): session_token = request.headers.get("Authorization") if not session_token: return jsonify({"error": "No session token"}), 400 session = SessionService.get_session(session_token) if not session: return jsonify({"error": "Not authenticated"}), 401 SessionService.delete_session(session) return jsonify({"message": "Logged out"}), 200 @user_bp.route("/list", methods=["GET"]) def user_list(): if request.headers.get("Content-Type") != "application/octet-stream": return jsonify({"error": "Invalid request"}), 400 session_token = request.headers.get("Authorization") if not session_token: return jsonify({"error": "No session token"}), 400 try: session, data = SessionService.validate_session(session_token, data=request.data) except SessionException as e: return jsonify({"error": e.message}), e.code org = OrganizationService.get_organization(session.org_id) if not org: return jsonify({"error": "Organization not found"}), 404 if "username" in data: user = UserService.get_user_by_username(data["username"]) if not user: return jsonify({"error": "User not found"}), 404 return encrypt_response(user.to_dict(), 200, security_service.get_key(session)) users = OrganizationService.get_users_in_organization(org) return encrypt_response(users, 200, security_service.get_key(session)) @user_bp.route("/create", methods=["POST"]) def user_create(): if request.headers.get("Content-Type") != "application/octet-stream": return jsonify({"error": "Invalid request"}), 400 session_token = request.headers.get("Authorization") if not session_token: return jsonify({"error": "No session token"}), 400 try: session, data = SessionService.validate_session(session_token, data=request.data, required_perms=[Perm.SUBJECT_NEW]) except SessionException as e: return jsonify({"error": e.message}), e.code if "username" not in data or "full_name" not in data or "email" not in data or "public_key" not in data: return jsonify({"error": "Missing required fields"}), 400 org = OrganizationService.get_organization(session.org_id) if not org: return jsonify({"error": "Organization not found"}), 404 user = UserService.get_user_by_username(data["username"]) if not user: user = UserService.create_user( username=data["username"], full_name=data["full_name"], email=data["email"], public_key=data["public_key"], org=org ) return encrypt_response(user.to_dict(), 201, security_service.get_key(session)) @user_bp.route("//roles", methods=["GET"]) def user_roles(username): session_token = request.headers.get("Authorization") if not session_token: return jsonify({"error": "No session token"}), 400 try: session = SessionService.validate_session(session_token) except SessionException as e: return jsonify({"error": e.message}), e.code org = OrganizationService.get_organization(session.org_id) if not org: return jsonify({"error": "Organization not found"}), 404 user = UserService.get_user_by_username(username) if not user: return jsonify({"error": "User not found"}), 404 roles = RoleService.get_roles_for_user(user, org) return encrypt_response(roles, 200, security_service.get_key(session)) @user_bp.route("//suspend", methods=["POST"]) def user_suspend(username): session_token = request.headers.get("Authorization") if not session_token: return jsonify({"error": "No session token"}), 400 try: session = SessionService.validate_session(session_token, required_perms=[Perm.SUBJECT_DOWN]) except SessionException as e: return jsonify({"error": e.message}), e.code org = OrganizationService.get_organization(session.org_id) if not org: return jsonify({"error": "Organization not found"}), 404 user = UserService.get_user_by_username(username) if not user: return jsonify({"error": "User not found"}), 404 return OrganizationService.suspend_user(org, user) @user_bp.route("//activate", methods=["POST"]) def user_unsuspend(username): session_token = request.headers.get("Authorization") if not session_token: return jsonify({"error": "No session token"}), 400 try: session = SessionService.validate_session(session_token, required_perms=[Perm.SUBJECT_UP]) except SessionException as e: return jsonify({"error": e.message}), e.code org = OrganizationService.get_organization(session.org_id) if not org: return jsonify({"error": "Organization not found"}), 404 user = UserService.get_user_by_username(username) if not user: return jsonify({"error": "User not found"}), 404 return OrganizationService.activate_user(org, user)