sio-2425/delivery2/server/routes/user.py

243 lines
8.9 KiB
Python

import base64
import json
from cryptography.hazmat.primitives import serialization
from cryptography.exceptions import InvalidSignature
from flask import Blueprint, request, jsonify
from services import UserService, SessionService, OrganizationService, RoleService
from utils import Perm, generate_parameters, generate_key_pair, derive_keys, decrypt
from utils.comms_encryption import encrypt_response, decrypt_request
from database import security_service
from utils.exceptions import SessionException
user_bp = Blueprint("user", __name__)
@user_bp.route("/login", methods=["POST"])
def user_login():
if request.headers.get("Content-Type") == "application/octet-stream":
if session_token := request.headers.get("Authorization"):
data = request.data
session = SessionService.get_session(session_token)
if not session:
return jsonify({"error": "Not authenticated"}), 401
if session.verified:
return encrypt_response(session.to_dict(), 201, security_service.get_key(session))
try:
data = decrypt_request(data.decode(), security_service.get_key(session))
except Exception:
return jsonify({"error": "Invalid encryption"}), 400
if not "signature" in data:
return jsonify({"error": "Missing required fields"}), 400
signature = data["signature"]
signature = base64.b64decode(signature)
session = SessionService.verify_session(session_token, signature)
if isinstance(session, tuple):
return session
return encrypt_response(session.to_dict(), 201, security_service.get_key(session))
return jsonify({"error": "No session token"}), 400
if request.json and "username" in request.json and "org" in request.json:
data = request.json
if type(data) is str:
data = json.loads(data)
user = UserService.get_user_by_username(data["username"])
if not user:
return jsonify({"error": "User not found"}), 404
org = OrganizationService.get_organization_by_name(data["org"])
if not org:
return jsonify({"error": "Organization not found"}), 404
session = SessionService.create_session(user, org)
payload = {
"token": session.token,
"challenge": session.challenge
}
return encrypt_response(payload, 201, security_service.get_key(session))
return jsonify({"error": "Missing required fields"}), 400
@user_bp.route("/dh", methods=["POST"])
def user_dh():
data = request.json
if type(data) is str:
data = json.loads(data)
if "user" not in data or "org" not in data:
return jsonify({"error": "Missing required fields"}), 400
if "parameters" in data:
parameters_bytes = bytes.fromhex(data["parameters"])
parameters = serialization.load_pem_parameters(parameters_bytes)
private_key, public_key = generate_key_pair(parameters)
security_service.register_key(data['user'], data['org'], private_key)
public_key_content = public_key.public_bytes(encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
return jsonify({
"public_key": public_key_content.hex()
}), 200
elif "public_key" in data:
client_public_key = data["public_key"]
client_public_key = serialization.load_pem_public_key(bytes.fromhex(client_public_key))
private_key = security_service.keys[f'{data["org"]}/{data["user"]}']
derived_key = derive_keys(private_key, client_public_key)
security_service.register_key(data['user'], data['org'], derived_key)
return jsonify({"message": "Keys exchanged"}), 200
else:
return jsonify({"error": "Missing required fields"}), 400
@user_bp.route("/logout", methods=["POST"])
def user_logout():
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
session = SessionService.get_session(session_token)
if not session:
return jsonify({"error": "Not authenticated"}), 401
SessionService.delete_session(session)
return jsonify({"message": "Logged out"}), 200
@user_bp.route("/list", methods=["GET"])
def user_list():
if request.headers.get("Content-Type") != "application/octet-stream":
return jsonify({"error": "Invalid request"}), 400
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
try:
session, data = SessionService.validate_session(session_token, data=request.data)
except SessionException as e:
return jsonify({"error": e.message}), e.code
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
if "username" in data:
user = UserService.get_user_by_username(data["username"])
if not user:
return jsonify({"error": "User not found"}), 404
return encrypt_response(user.to_dict(), 200, security_service.get_key(session))
users = OrganizationService.get_users_in_organization(org)
return encrypt_response(users, 200, security_service.get_key(session))
@user_bp.route("/create", methods=["POST"])
def user_create():
if request.headers.get("Content-Type") != "application/octet-stream":
return jsonify({"error": "Invalid request"}), 400
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
try:
session, data = SessionService.validate_session(session_token, data=request.data, required_perms=[Perm.SUBJECT_NEW])
except SessionException as e:
return jsonify({"error": e.message}), e.code
if "username" not in data or "full_name" not in data or "email" not in data or "public_key" not in data:
return jsonify({"error": "Missing required fields"}), 400
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
user = UserService.get_user_by_username(data["username"])
if not user:
user = UserService.create_user(
username=data["username"],
full_name=data["full_name"],
email=data["email"],
public_key=data["public_key"],
org=org
)
return encrypt_response(user.to_dict(), 201, security_service.get_key(session))
@user_bp.route("/<string:username>/roles", methods=["GET"])
def user_roles(username):
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
try:
session = SessionService.validate_session(session_token)
except SessionException as e:
return jsonify({"error": e.message}), e.code
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
user = UserService.get_user_by_username(username)
if not user:
return jsonify({"error": "User not found"}), 404
roles = RoleService.get_roles_for_user(user, org)
return encrypt_response(roles, 200, security_service.get_key(session))
@user_bp.route("/<string:username>/suspend", methods=["POST"])
def user_suspend(username):
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
try:
session = SessionService.validate_session(session_token, required_perms=[Perm.SUBJECT_DOWN])
except SessionException as e:
return jsonify({"error": e.message}), e.code
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
user = UserService.get_user_by_username(username)
if not user:
return jsonify({"error": "User not found"}), 404
return OrganizationService.suspend_user(org, user)
@user_bp.route("/<string:username>/activate", methods=["POST"])
def user_unsuspend(username):
session_token = request.headers.get("Authorization")
if not session_token:
return jsonify({"error": "No session token"}), 400
try:
session = SessionService.validate_session(session_token, required_perms=[Perm.SUBJECT_UP])
except SessionException as e:
return jsonify({"error": e.message}), e.code
org = OrganizationService.get_organization(session.org_id)
if not org:
return jsonify({"error": "Organization not found"}), 404
user = UserService.get_user_by_username(username)
if not user:
return jsonify({"error": "User not found"}), 404
return OrganizationService.activate_user(org, user)