243 lines
8.9 KiB
Python
243 lines
8.9 KiB
Python
import base64
|
|
import json
|
|
from cryptography.hazmat.primitives import serialization
|
|
|
|
from cryptography.exceptions import InvalidSignature
|
|
from flask import Blueprint, request, jsonify
|
|
from services import UserService, SessionService, OrganizationService, RoleService
|
|
from utils import Perm, generate_parameters, generate_key_pair, derive_keys, decrypt
|
|
from utils.comms_encryption import encrypt_response, decrypt_request
|
|
from database import security_service
|
|
from utils.exceptions import SessionException
|
|
|
|
user_bp = Blueprint("user", __name__)
|
|
|
|
@user_bp.route("/login", methods=["POST"])
|
|
def user_login():
|
|
if request.headers.get("Content-Type") == "application/octet-stream":
|
|
if session_token := request.headers.get("Authorization"):
|
|
data = request.data
|
|
|
|
session = SessionService.get_session(session_token)
|
|
if not session:
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
if session.verified:
|
|
return encrypt_response(session.to_dict(), 201, security_service.get_key(session))
|
|
|
|
try:
|
|
data = decrypt_request(data.decode(), security_service.get_key(session))
|
|
except Exception:
|
|
return jsonify({"error": "Invalid encryption"}), 400
|
|
|
|
if not "signature" in data:
|
|
return jsonify({"error": "Missing required fields"}), 400
|
|
|
|
signature = data["signature"]
|
|
signature = base64.b64decode(signature)
|
|
|
|
session = SessionService.verify_session(session_token, signature)
|
|
if isinstance(session, tuple):
|
|
return session
|
|
return encrypt_response(session.to_dict(), 201, security_service.get_key(session))
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
if request.json and "username" in request.json and "org" in request.json:
|
|
data = request.json
|
|
if type(data) is str:
|
|
data = json.loads(data)
|
|
|
|
user = UserService.get_user_by_username(data["username"])
|
|
if not user:
|
|
return jsonify({"error": "User not found"}), 404
|
|
|
|
org = OrganizationService.get_organization_by_name(data["org"])
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
session = SessionService.create_session(user, org)
|
|
payload = {
|
|
"token": session.token,
|
|
"challenge": session.challenge
|
|
}
|
|
return encrypt_response(payload, 201, security_service.get_key(session))
|
|
|
|
return jsonify({"error": "Missing required fields"}), 400
|
|
|
|
|
|
@user_bp.route("/dh", methods=["POST"])
|
|
def user_dh():
|
|
data = request.json
|
|
if type(data) is str:
|
|
data = json.loads(data)
|
|
|
|
if "user" not in data or "org" not in data:
|
|
return jsonify({"error": "Missing required fields"}), 400
|
|
|
|
if "parameters" in data:
|
|
parameters_bytes = bytes.fromhex(data["parameters"])
|
|
parameters = serialization.load_pem_parameters(parameters_bytes)
|
|
|
|
private_key, public_key = generate_key_pair(parameters)
|
|
|
|
security_service.register_key(data['user'], data['org'], private_key)
|
|
|
|
public_key_content = public_key.public_bytes(encoding=serialization.Encoding.PEM,
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo)
|
|
return jsonify({
|
|
"public_key": public_key_content.hex()
|
|
}), 200
|
|
|
|
elif "public_key" in data:
|
|
client_public_key = data["public_key"]
|
|
client_public_key = serialization.load_pem_public_key(bytes.fromhex(client_public_key))
|
|
private_key = security_service.keys[f'{data["org"]}/{data["user"]}']
|
|
derived_key = derive_keys(private_key, client_public_key)
|
|
security_service.register_key(data['user'], data['org'], derived_key)
|
|
return jsonify({"message": "Keys exchanged"}), 200
|
|
|
|
else:
|
|
return jsonify({"error": "Missing required fields"}), 400
|
|
|
|
@user_bp.route("/logout", methods=["POST"])
|
|
def user_logout():
|
|
session_token = request.headers.get("Authorization")
|
|
if not session_token:
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
session = SessionService.get_session(session_token)
|
|
if not session:
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
SessionService.delete_session(session)
|
|
return jsonify({"message": "Logged out"}), 200
|
|
|
|
|
|
@user_bp.route("/list", methods=["GET"])
|
|
def user_list():
|
|
if request.headers.get("Content-Type") != "application/octet-stream":
|
|
return jsonify({"error": "Invalid request"}), 400
|
|
|
|
session_token = request.headers.get("Authorization")
|
|
if not session_token:
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
try:
|
|
session, data = SessionService.validate_session(session_token, data=request.data)
|
|
except SessionException as e:
|
|
return jsonify({"error": e.message}), e.code
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
if "username" in data:
|
|
user = UserService.get_user_by_username(data["username"])
|
|
if not user:
|
|
return jsonify({"error": "User not found"}), 404
|
|
return encrypt_response(user.to_dict(), 200, security_service.get_key(session))
|
|
|
|
users = OrganizationService.get_users_in_organization(org)
|
|
return encrypt_response(users, 200, security_service.get_key(session))
|
|
|
|
|
|
@user_bp.route("/create", methods=["POST"])
|
|
def user_create():
|
|
if request.headers.get("Content-Type") != "application/octet-stream":
|
|
return jsonify({"error": "Invalid request"}), 400
|
|
|
|
session_token = request.headers.get("Authorization")
|
|
if not session_token:
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
try:
|
|
session, data = SessionService.validate_session(session_token, data=request.data, required_perms=[Perm.SUBJECT_NEW])
|
|
except SessionException as e:
|
|
return jsonify({"error": e.message}), e.code
|
|
|
|
if "username" not in data or "full_name" not in data or "email" not in data or "public_key" not in data:
|
|
return jsonify({"error": "Missing required fields"}), 400
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
user = UserService.get_user_by_username(data["username"])
|
|
if not user:
|
|
user = UserService.create_user(
|
|
username=data["username"],
|
|
full_name=data["full_name"],
|
|
email=data["email"],
|
|
public_key=data["public_key"],
|
|
org=org
|
|
)
|
|
|
|
return encrypt_response(user.to_dict(), 201, security_service.get_key(session))
|
|
|
|
|
|
@user_bp.route("/<string:username>/roles", methods=["GET"])
|
|
def user_roles(username):
|
|
session_token = request.headers.get("Authorization")
|
|
if not session_token:
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
try:
|
|
session = SessionService.validate_session(session_token)
|
|
except SessionException as e:
|
|
return jsonify({"error": e.message}), e.code
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
user = UserService.get_user_by_username(username)
|
|
if not user:
|
|
return jsonify({"error": "User not found"}), 404
|
|
|
|
roles = RoleService.get_roles_for_user(user, org)
|
|
return encrypt_response(roles, 200, security_service.get_key(session))
|
|
|
|
@user_bp.route("/<string:username>/suspend", methods=["POST"])
|
|
def user_suspend(username):
|
|
session_token = request.headers.get("Authorization")
|
|
if not session_token:
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
try:
|
|
session = SessionService.validate_session(session_token, required_perms=[Perm.SUBJECT_DOWN])
|
|
except SessionException as e:
|
|
return jsonify({"error": e.message}), e.code
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
user = UserService.get_user_by_username(username)
|
|
if not user:
|
|
return jsonify({"error": "User not found"}), 404
|
|
|
|
return OrganizationService.suspend_user(org, user)
|
|
|
|
|
|
@user_bp.route("/<string:username>/activate", methods=["POST"])
|
|
def user_unsuspend(username):
|
|
session_token = request.headers.get("Authorization")
|
|
if not session_token:
|
|
return jsonify({"error": "No session token"}), 400
|
|
|
|
try:
|
|
session = SessionService.validate_session(session_token, required_perms=[Perm.SUBJECT_UP])
|
|
except SessionException as e:
|
|
return jsonify({"error": e.message}), e.code
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
user = UserService.get_user_by_username(username)
|
|
if not user:
|
|
return jsonify({"error": "User not found"}), 404
|
|
|
|
return OrganizationService.activate_user(org, user)
|
|
|