156 lines
5.0 KiB
Python
156 lines
5.0 KiB
Python
import json
|
|
from flask import Blueprint, request, jsonify
|
|
from services import UserService, SessionService, OrganizationService
|
|
from utils import data_checks
|
|
|
|
user_bp = Blueprint("user", __name__)
|
|
|
|
@user_bp.route("/login", methods=["POST"])
|
|
def user_login():
|
|
data = request.json
|
|
user = UserService.get_user_by_username(data["username"])
|
|
if not user:
|
|
return jsonify({"error": "User not found"}), 404
|
|
|
|
org = OrganizationService.get_organization_by_name(data["org"])
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
id_str = str(org.id)
|
|
if id_str not in user.public_keys:
|
|
return jsonify({"error": "User not associated with organization"}), 403
|
|
|
|
if user.public_keys[id_str] != data["public_key"]:
|
|
return jsonify({"error": "Invalid public key"}), 403
|
|
|
|
session = SessionService.create_session(user, org)
|
|
return jsonify(session.to_dict()), 201
|
|
|
|
|
|
@user_bp.route("/logout", methods=["POST"])
|
|
def user_logout():
|
|
data = request.json
|
|
session_file = data["session_file"]
|
|
session_data = json.loads(session_file)
|
|
session_token = session_data["token"]
|
|
session = SessionService.get_session(session_token)
|
|
|
|
if not session:
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
SessionService.delete_session(session)
|
|
return jsonify({"message": "Logged out"}), 200
|
|
|
|
|
|
@user_bp.route("/list", methods=["GET"])
|
|
def user_list():
|
|
data = request.json
|
|
if "session_file" not in data:
|
|
return jsonify({"error": "No session file"}), 400
|
|
|
|
session_file = data["session_file"]
|
|
session_data = json.loads(session_file)
|
|
|
|
session = data_checks.validate_session_file(session_data)
|
|
if isinstance(session, tuple):
|
|
return session
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
if "username" in data:
|
|
user = UserService.get_user_by_username(data["username"])
|
|
if not user:
|
|
return jsonify({"error": "User not found"}), 404
|
|
return jsonify(user.to_dict()), 200
|
|
|
|
users = OrganizationService.get_users_in_organization(org)
|
|
return jsonify(users), 200
|
|
|
|
|
|
@user_bp.route("/create", methods=["POST"])
|
|
def user_create():
|
|
data = request.json
|
|
if "session_file" not in data or "username" not in data or "full_name" not in data or "email" not in data or "public_key" not in data:
|
|
return jsonify({"error": "Missing required fields"}), 400
|
|
|
|
session_file = data["session_file"]
|
|
session_data = json.loads(session_file)
|
|
|
|
session = data_checks.validate_session_file(session_data)
|
|
if isinstance(session, tuple):
|
|
return session
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
|
|
if org.owner.id != session.user_id:
|
|
return jsonify({"error": "Not authorized to create users"}), 403
|
|
|
|
user = UserService.get_user_by_username(data["username"])
|
|
if not user:
|
|
user = UserService.create_user(
|
|
username=data["username"],
|
|
full_name=data["full_name"],
|
|
email=data["email"],
|
|
public_key=data["public_key"],
|
|
org=org
|
|
)
|
|
|
|
return jsonify(user.to_dict()), 201
|
|
|
|
|
|
@user_bp.route("/suspend", methods=["POST"])
|
|
def user_suspend():
|
|
data = request.json
|
|
if "session_file" not in data or "username" not in data:
|
|
return jsonify({"error": "Missing required fields"}), 400
|
|
|
|
session_file = data["session_file"]
|
|
session_data = json.loads(session_file)
|
|
|
|
session = data_checks.validate_session_file(session_data)
|
|
if isinstance(session, tuple):
|
|
return session
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
if org.owner.id != session.user_id:
|
|
return jsonify({"error": "Not authorized to suspend users"}), 403
|
|
|
|
user = UserService.get_user_by_username(data["username"])
|
|
if not user:
|
|
return jsonify({"error": "User not found"}), 404
|
|
|
|
return OrganizationService.suspend_user(org, user)
|
|
|
|
|
|
@user_bp.route("/activate", methods=["POST"])
|
|
def user_unsuspend():
|
|
data = request.json
|
|
if "session_file" not in data or "username" not in data:
|
|
return jsonify({"error": "Missing required fields"}), 400
|
|
|
|
session_file = data["session_file"]
|
|
session_data = json.loads(session_file)
|
|
|
|
session = data_checks.validate_session_file(session_data)
|
|
if isinstance(session, tuple):
|
|
return session
|
|
|
|
org = OrganizationService.get_organization(session.org_id)
|
|
if not org:
|
|
return jsonify({"error": "Organization not found"}), 404
|
|
if org.owner.id != session.user_id:
|
|
return jsonify({"error": "Not authorized to unsuspend users"}), 403
|
|
|
|
user = UserService.get_user_by_username(data["username"])
|
|
if not user:
|
|
return jsonify({"error": "User not found"}), 404
|
|
|
|
return OrganizationService.activate_user(org, user)
|
|
|