Implement server/client security

Co-authored-by: Rúben Gomes <rlcg@ua.pt>
Signed-off-by: Tiago Garcia <tiago.rgarcia@ua.pt>
This commit is contained in:
Tiago Garcia 2024-12-19 23:46:31 +00:00
parent b156337e62
commit d758ea4cd6
Signed by: TiagoRG
GPG Key ID: DFCD48E3F420DB42
41 changed files with 851 additions and 388 deletions

View File

@ -2,3 +2,4 @@ from .asymmetric_functs import *
from .symmetric_encryption import * from .symmetric_encryption import *
from .digest import * from .digest import *
from .key_pair import * from .key_pair import *
from .diffie_hellman import *

View File

@ -60,7 +60,7 @@ def encrypt(content, key):
iv = os.urandom(16) iv = os.urandom(16)
cipher = Cipher(algorithms.AES(key), modes.CFB(iv), backend=default_backend()) cipher = Cipher(algorithms.AES(key), modes.CFB(iv), backend=default_backend())
encryptor = cipher.encryptor() encryptor = cipher.encryptor()
ciphertext = iv + encryptor.update(content) + encryptor.finalize() ciphertext = iv + encryptor.update(content.encode()) + encryptor.finalize()
return ciphertext return ciphertext

View File

@ -67,3 +67,26 @@ def decrypt_file(key, input_file, output_file=None) -> str:
except UnicodeDecodeError: except UnicodeDecodeError:
return plaintext_content return plaintext_content
def encrypt_response_with_iv(input_string: str) -> bytes:
iv = os.urandom(16)
cipher = Cipher(algorithms.AES(iv), modes.CFB(iv))
encryptor = cipher.encryptor()
plaintext_bytes = input_string.encode('utf-8')
ciphertext = encryptor.update(plaintext_bytes) + encryptor.finalize()
encrypted_data = iv + ciphertext
return encrypted_data
def decrypt_request_with_iv(encrypted_data: bytes) -> str:
iv = encrypted_data[:16]
ciphertext = encrypted_data[16:]
cipher = Cipher(algorithms.AES(iv), modes.CFB(iv))
decryptor = cipher.decryptor()
plaintext_bytes = decryptor.update(ciphertext) + decryptor.finalize()
return plaintext_bytes.decode('utf-8')

View File

@ -7,6 +7,7 @@ import json
import argparse import argparse
from lib import digest from lib import digest
from lib.diffie_hellman import *
from subject import main from subject import main
logging.basicConfig(format='%(levelname)s\t- %(message)s') logging.basicConfig(format='%(levelname)s\t- %(message)s')
@ -63,12 +64,23 @@ def aclDoc(args):
document_handle = digest.get_hash(bytes(args.name, encoding='utf-8')) document_handle = digest.get_hash(bytes(args.name, encoding='utf-8'))
payload = {'document_handle' : document_handle, 'role' : args.role, 'perm' : args.permission, 'operation' : change} derived_key = bytes.fromhex(args.session['derived_key'])
payload = json.dumps({'role' : args.role, 'perm' : args.permission, 'operation' : change})
try:
payload = encrypt(payload, derived_key).hex()
except Exception:
logger.error("Failed to encrypt the content")
sys.exit(-1)
headers = {
'Authorization': args.session['token'],
'Content-Type': 'application/octet-stream'
}
try: try:
req = requests.post(f'http://{state['REP_ADDRESS']}/file/acl', req = requests.post(f'http://{state['REP_ADDRESS']}/file/acl/{document_handle}',
json=json.dumps(payload), data=payload,
headers={'Authorization': args.session['token']}) headers=headers)
req.raise_for_status() req.raise_for_status()
except requests.exceptions.HTTPError: except requests.exceptions.HTTPError:
logger.error("%d: %s", req.status_code, req.json()['error']) logger.error("%d: %s", req.status_code, req.json()['error'])
@ -78,8 +90,11 @@ def aclDoc(args):
sys.exit(-1) sys.exit(-1)
# Operation success # Operation success
if req.status_code == 200:
logger.info("ACL changed succesfully.") logger.info("ACL changed succesfully.")
sys.exit(0) sys.exit(0)
logger.error("Failed to change ACL")
sys.exit(-1)
if __name__ == '__main__': if __name__ == '__main__':
aclDoc(sys.argv[1:]) aclDoc(sys.argv[1:])

View File

@ -53,8 +53,11 @@ def activateSubject(args):
logger.error("Failed to obtain response from server.") logger.error("Failed to obtain response from server.")
sys.exit(-1) sys.exit(-1)
if req.status_code == 200:
logger.info("Subject %s has been activated." % args.username) logger.info("Subject %s has been activated." % args.username)
sys.exit(0) sys.exit(0)
logger.error("Failed to activate subject")
sys.exit(-1)
if __name__ == '__main__': if __name__ == '__main__':
activateSubject(sys.argv[1:]) activateSubject(sys.argv[1:])

View File

@ -8,6 +8,8 @@ import argparse
from subject import main from subject import main
from lib.diffie_hellman import *
sys.path.append(os.path.abspath("../../")) sys.path.append(os.path.abspath("../../"))
from lib.symmetric_encryption import * from lib.symmetric_encryption import *
from lib import digest from lib import digest
@ -53,12 +55,24 @@ def addDoc(args):
#Encrypt content #Encrypt content
key, content = encrypt_file(BASE_DIR + args.file, BASE_DIR + 'encryptedText') key, content = encrypt_file(BASE_DIR + args.file, BASE_DIR + 'encryptedText')
derived_key = bytes.fromhex(args.session['derived_key'])
#Upload document metadata #Upload document metadata
doc = {'document_name' : args.name, 'key' : key.hex(), 'alg' : 'AES-CFB' } doc = json.dumps({'document_name' : args.name, 'key' : key.hex(), 'alg' : 'AES-CFB' })
try:
doc = encrypt(doc, derived_key).hex()
except Exception:
logger.error("Failed to encrypt the content")
sys.exit(-1)
headers = {
'Authorization': args.session['token'],
'Content-Type': 'application/octet-stream'
}
try: try:
req = requests.post(f'http://{state['REP_ADDRESS']}/file/upload/metadata', json=json.dumps(doc), req = requests.post(f'http://{state['REP_ADDRESS']}/file/upload/metadata',
headers={'Authorization': args.session['token']}) data=doc,
headers=headers)
req.raise_for_status() req.raise_for_status()
except requests.exceptions.HTTPError: except requests.exceptions.HTTPError:
@ -90,8 +104,11 @@ def addDoc(args):
#Delete temporary file #Delete temporary file
os.remove(BASE_DIR + 'encryptedText') os.remove(BASE_DIR + 'encryptedText')
if req.status_code == 201:
logger.info("Document uploaded successfully.") logger.info("Document uploaded successfully.")
sys.exit(0) sys.exit(0)
logger.error("Failed to add document.")
sys.exit(-1)
if __name__ == '__main__': if __name__ == '__main__':
addDoc(sys.argv[1:]) addDoc(sys.argv[1:])

View File

@ -61,9 +61,8 @@ def addPermission(args):
]: ]:
isPerm = True isPerm = True
if isPerm:
try: try:
req = requests.post(f'http://{state['REP_ADDRESS']}/role/' + args.role + '/perm/add/' + args.value, req = requests.post(f'http://{state['REP_ADDRESS']}/role/' + args.role + f'/{"perm" if isPerm else "user"}/add/' + args.value,
headers={'Authorization': args.session['token']}) headers={'Authorization': args.session['token']})
req.raise_for_status() req.raise_for_status()
@ -75,23 +74,7 @@ def addPermission(args):
logger.error("Failed to obtain response from server.") logger.error("Failed to obtain response from server.")
sys.exit(-1) sys.exit(-1)
logger.info("Permission added successfully to role.") logger.info("Permission added successfully to role." if isPerm else "User added successfully to role.")
sys.exit(0)
else:
try:
req = requests.post(f'http://{state['REP_ADDRESS']}/role/' + args.role + '/user/add/' + args.value,
headers={'Authorization': args.session['token']})
req.raise_for_status()
except requests.exceptions.HTTPError:
logger.error("%d: %s", req.status_code, req.json()['error'])
sys.exit(-1)
except requests.exceptions.RequestException as errex:
logger.error("Failed to obtain response from server.")
sys.exit(-1)
logger.info("User added successfully to role.")
sys.exit(0) sys.exit(0)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -6,6 +6,7 @@ import requests
import json import json
import argparse import argparse
from lib.diffie_hellman import *
from subject import main from subject import main
logging.basicConfig(format='%(levelname)s\t- %(message)s') logging.basicConfig(format='%(levelname)s\t- %(message)s')
@ -44,10 +45,23 @@ def addRole(args):
with open(BASE_DIR + args.session, 'r') as f: with open(BASE_DIR + args.session, 'r') as f:
args.session = json.load(f) args.session = json.load(f)
derived_key = bytes.fromhex(args.session['derived_key'])
payload = json.dumps({'role' : args.role})
try:
payload = encrypt(payload, derived_key).hex()
except Exception:
logger.error("Failed to encrypt the content")
sys.exit(-1)
headers = {
'Authorization': args.session['token'],
'Content-Type': 'application/octet-stream'
}
try: try:
req = requests.post(f'http://{state['REP_ADDRESS']}/role/create', req = requests.post(f'http://{state['REP_ADDRESS']}/role/create',
json=json.dumps({'role' : args.role}), data=payload,
headers={'Authorization': args.session['token']}) headers=headers)
req.raise_for_status() req.raise_for_status()
except requests.exceptions.HTTPError: except requests.exceptions.HTTPError:
@ -58,8 +72,11 @@ def addRole(args):
logger.error("Failed to obtain response from server.") logger.error("Failed to obtain response from server.")
sys.exit(-1) sys.exit(-1)
if req.status_code == 201:
logger.info("Role %s added.", args.role) logger.info("Role %s added.", args.role)
sys.exit(0) sys.exit(0)
logger.error('Failed to add role.')
sys.exit(-1)
if __name__ == '__main__': if __name__ == '__main__':
addRole(sys.argv[1:]) addRole(sys.argv[1:])

View File

@ -6,6 +6,7 @@ import requests
import json import json
import argparse import argparse
from lib.diffie_hellman import *
from subject import main from subject import main
sys.path.append(os.path.abspath('../../')) sys.path.append(os.path.abspath('../../'))
@ -53,10 +54,23 @@ def addSubject(args):
pubKey = asymmetric_functs.load_public_key(BASE_DIR + args.credentials) pubKey = asymmetric_functs.load_public_key(BASE_DIR + args.credentials)
subject = {'username' : args.username, 'full_name' : args.name, 'email' : args.email, 'public_key' : pubKey} derived_key = bytes.fromhex(args.session['derived_key'])
subject = json.dumps({'username' : args.username, 'full_name' : args.name, 'email' : args.email, 'public_key' : pubKey})
try:
subject = encrypt(subject, derived_key).hex()
except Exception:
logger.error("Failed to encrypt the content")
sys.exit(-1)
headers = {
'Authorization': args.session['token'],
'Content-Type': 'application/octet-stream'
}
try: try:
req = requests.post(f'http://{state['REP_ADDRESS']}/user/create', json=json.dumps(subject), headers={'Authorization': args.session['token']}) req = requests.post(f'http://{state['REP_ADDRESS']}/user/create',
data=subject,
headers=headers)
req.raise_for_status() req.raise_for_status()
except requests.exceptions.HTTPError: except requests.exceptions.HTTPError:

View File

@ -55,8 +55,11 @@ def assumeRole(args):
logger.error("Failed to obtain response from server.") logger.error("Failed to obtain response from server.")
sys.exit(-1) sys.exit(-1)
if req.status_code == 200:
logger.info("You assumed the role %s.", args.role) logger.info("You assumed the role %s.", args.role)
sys.exit(0) sys.exit(0)
logger.error("Failed to assume role %s.", args.role)
sys.exit(-1)
if __name__ == '__main__': if __name__ == '__main__':
assumeRole(sys.argv[1:]) assumeRole(sys.argv[1:])

View File

@ -7,6 +7,8 @@ import json
import re import re
import argparse import argparse
from bin.lib import encrypt_response_with_iv, decrypt_request_with_iv
sys.path.append(os.path.abspath("../")) sys.path.append(os.path.abspath("../"))
from subject import main from subject import main
@ -56,10 +58,11 @@ def createOrganization(args):
# load public key from file # load public key from file
pubKey = asymmetric_functs.load_public_key(BASE_DIR + args.pubkey) pubKey = asymmetric_functs.load_public_key(BASE_DIR + args.pubkey)
input = {'name' : args.org, 'username' : args.username, 'full_name' : args.name, 'email' : args.email, 'public_key' : pubKey} payload = encrypt_response_with_iv(json.dumps({'name' : args.org, 'username' : args.username, 'full_name' : args.name, 'email' : args.email, 'public_key' : pubKey}))
headers = {'Content-Type': 'application/octet-stream'}
try: try:
req = requests.post(f'http://{state['REP_ADDRESS']}/org/create', json=json.dumps(input)) req = requests.post(f'http://{state['REP_ADDRESS']}/org/create', data=payload, headers=headers)
req.raise_for_status() req.raise_for_status()
except requests.exceptions.HTTPError: except requests.exceptions.HTTPError:
@ -71,7 +74,12 @@ def createOrganization(args):
sys.exit(-1) sys.exit(-1)
if req.status_code == 201: if req.status_code == 201:
logger.info("Organization created successfully.") try:
response_data = json.loads(decrypt_request_with_iv(bytes.fromhex(req.text)))
except Exception as e:
logger.error("Failed to decrypt the content: %s", e)
sys.exit(1)
logger.info(json.dumps(response_data, indent=4))
sys.exit(0) sys.exit(0)
logger.error("Failed to create organization.") logger.error("Failed to create organization.")
sys.exit(-1) sys.exit(-1)

View File

@ -8,8 +8,9 @@ import json
import requests import requests
from cryptography.hazmat.primitives.serialization import load_pem_private_key from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import hashes, serialization
from lib.diffie_hellman import *
from subject import main from subject import main
# Identity attributes # Identity attributes
@ -40,7 +41,7 @@ def createSession(args):
args = parser.parse_args() args = parser.parse_args()
if not args.org or not args.username or not args.credentials or not args.session: if not args.org or not args.username or not args.password or not args.credentials or not args.session:
logger.error("Need organization, username, credentials and session file") logger.error("Need organization, username, credentials and session file")
sys.exit(1) sys.exit(1)
@ -48,18 +49,81 @@ def createSession(args):
logger.error("File '" + args.credentials + "' not found.") logger.error("File '" + args.credentials + "' not found.")
sys.exit(1) sys.exit(1)
session = {'org' : args.org, 'username' : args.username} # perform diffie-hellman key exchange
# OPTIONAL: ask for parameters
generator = 2; key_size = 1024
parameters = generate_parameters(generator, key_size)
#print( type(json.dumps(session))) parameters_send = parameters.parameter_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.ParameterFormat.PKCS3).hex()
# send parameters to server
try: try:
req = requests.post(f'http://{state['REP_ADDRESS']}/user/login', json=json.dumps(session)) req = requests.post(f'http://{state['REP_ADDRESS']}/user/dh', json=json.dumps({'user': args.username,
'org': args.org,
'parameters': parameters_send}))
req.raise_for_status() req.raise_for_status()
except requests.exceptions.HTTPError:
logger.error("%d: %s", req.status_code, req.json()['error'])
sys.exit(-1)
except requests.exceptions.RequestException as errex: except requests.exceptions.RequestException as errex:
logger.error("Failed to obtain response from server") logger.error("Failed to obtain response from server")
sys.exit(-1) sys.exit(-1)
private_key, public_key = generate_key_pair(parameters)
response = req.json() response = req.json()
server_public_key = serialization.load_pem_public_key(bytes.fromhex(response['public_key']))
derived_key = derive_keys(private_key, server_public_key)
# save derived key
with open(BASE_DIR + 'derived_key.pem', 'wb') as f:
f.write(derived_key)
# send public key to server
public_key_content = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
).hex()
try:
req = requests.post(f'http://{state['REP_ADDRESS']}/user/dh', json=json.dumps({'user': args.username,
'org': args.org,
'public_key': public_key_content}))
req.raise_for_status()
except requests.exceptions.HTTPError:
logger.error("%d: %s", req.status_code, req.json()['error'])
sys.exit(-1)
except requests.exceptions.RequestException as errex:
logger.error("Failed to obtain response from server")
sys.exit(-1)
if req.status_code == 200:
logger.info("Diffie-Hellman key exchange completed successfully")
else:
logger.error("Failed to complete Diffie-Hellman key exchange")
sys.exit(-1)
try:
session = {'org': args.org, 'username': args.username}
req = requests.post(f'http://{state['REP_ADDRESS']}/user/login', json=json.dumps(session))
req.raise_for_status()
except requests.exceptions.HTTPError:
logger.error("%d: %s", req.status_code, req.json()['error'])
sys.exit(-1)
except requests.exceptions.RequestException as errex:
logger.error("Failed to obtain response from server")
sys.exit(-1)
req = decrypt(bytes.fromhex(req.text), derived_key).decode('utf-8')
response = json.loads(req)
challenge = response['challenge'] challenge = response['challenge']
with open(BASE_DIR + args.credentials, 'rb') as f: with open(BASE_DIR + args.credentials, 'rb') as f:
@ -76,7 +140,15 @@ def createSession(args):
) )
try: try:
req = requests.post(f'http://{state['REP_ADDRESS']}/user/login', json=json.dumps({'signature' : base64.b64encode(signature).decode('utf-8')}), headers={'Authorization': response['token']}) json_payload = json.dumps({'signature' : base64.b64encode(signature).decode('utf-8')})
json_payload = encrypt(json_payload, derived_key).hex()
headers = {
'Authorization': response['token'],
'Content-Type': 'application/octet-stream'
}
req = requests.post(f'http://{state['REP_ADDRESS']}/user/login', data=json_payload, headers=headers)
req.raise_for_status() req.raise_for_status()
except requests.exceptions.HTTPError: except requests.exceptions.HTTPError:
@ -87,12 +159,18 @@ def createSession(args):
logger.error("Failed to obtain response from server") logger.error("Failed to obtain response from server")
sys.exit(-1) sys.exit(-1)
with open(BASE_DIR + args.session, 'w') as f:
json.dump(req.json(), f)
if req.status_code == 201: if req.status_code == 201:
try:
response_data = json.loads(decrypt(bytes.fromhex(req.text), derived_key).decode('utf-8'))
except Exception:
logger.error("Failed to decrypt the content")
sys.exit(1)
session = response_data | {'derived_key': derived_key.hex()}
with open(BASE_DIR + args.session, 'w') as f:
json.dump(session, f)
logger.info("Session created successfully") logger.info("Session created successfully")
sys.exit(0) sys.exit(0)
else:
logger.error("Failed to create session") logger.error("Failed to create session")
sys.exit(-1) sys.exit(-1)

View File

@ -26,7 +26,7 @@ def decryptFile(args):
args = parser.parse_args() args = parser.parse_args()
if not args.encrypted or not args.metadata: if not args.encrypted or not args.metadata:
logger.error("Need encrypted file and it's metadata.") logger.error("Need encrypted file and its metadata.")
sys.exit(1) sys.exit(1)
# If first argument is not a file or not found # If first argument is not a file or not found
@ -38,7 +38,7 @@ def decryptFile(args):
content = symmetric_encryption.decrypt_file(bytes.fromhex(metadata['key']), BASE_DIR + args.encrypted) content = symmetric_encryption.decrypt_file(bytes.fromhex(metadata['key']), BASE_DIR + args.encrypted)
with open(BASE_DIR + 'decryped_file.txt', 'w') as f: with open(BASE_DIR + 'decrypted_file.txt', 'w') as f:
f.write(content) f.write(content)
# Send decrypted content to stdout # Send decrypted content to stdout

View File

@ -58,8 +58,11 @@ def delDoc(args):
logger.error("Failed to obtain response from server.") logger.error("Failed to obtain response from server.")
sys.exit(-1) sys.exit(-1)
if req.status_code == 200:
logger.info("You deleted the document %s", args.name) logger.info("You deleted the document %s", args.name)
sys.exit(0) sys.exit(0)
logger.error("Failed to delete document.")
sys.exit(-1)
if __name__ == '__main__': if __name__ == '__main__':
delDoc(sys.argv[1:]) delDoc(sys.argv[1:])

View File

@ -55,8 +55,11 @@ def dropRole(args):
logger.error("Failed to obtain response from server.") logger.error("Failed to obtain response from server.")
sys.exit(-1) sys.exit(-1)
if req.status_code == 200:
logger.info("You dropped the role %s", args.role) logger.info("You dropped the role %s", args.role)
sys.exit(0) sys.exit(0)
logger.error("Failed to assume role %s.", args.role)
sys.exit(-1)
if __name__ == '__main__': if __name__ == '__main__':
dropRole(sys.argv[1:]) dropRole(sys.argv[1:])

View File

@ -6,6 +6,7 @@ import requests
import json import json
import argparse import argparse
from lib.diffie_hellman import decrypt
from subject import main from subject import main
sys.path.append(os.path.abspath("../../")) sys.path.append(os.path.abspath("../../"))
@ -62,9 +63,15 @@ def getDoc(args):
logger.error("Failed to obtain response from server.") logger.error("Failed to obtain response from server.")
sys.exit(1) sys.exit(1)
metadata = metadata.json() if metadata.status_code == 200:
try:
#Get file with file_handle provided by metadata metadata = json.loads(decrypt(bytes.fromhex(metadata.text), bytes.fromhex(args.session['derived_key'])).decode('utf-8'))
except Exception:
logger.error("Failed to decrypt the content")
sys.exit(1)
else:
logger.error("Failed to get metadata")
sys.exit(-1)
try: try:
file = requests.get(f'http://{state['REP_ADDRESS']}/file/get/' + metadata['file_handle'] + '/content') file = requests.get(f'http://{state['REP_ADDRESS']}/file/get/' + metadata['file_handle'] + '/content')
@ -81,7 +88,7 @@ def getDoc(args):
file = file.content file = file.content
if not digest.get_hash(file) == metadata['file_handle']: if not digest.get_hash(file) == metadata['file_handle']:
logger.error("Files integrity is lost.") logger.error("File's integrity was lost.")
sys.exit(-1) sys.exit(-1)
with open(BASE_DIR + 'encrypted_file', 'wb') as f: with open(BASE_DIR + 'encrypted_file', 'wb') as f:

View File

@ -6,6 +6,8 @@ import requests
import json import json
import argparse import argparse
from lib.diffie_hellman import decrypt
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..')) sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))
from lib import digest from lib import digest
@ -46,6 +48,7 @@ def getDocMetadata(args):
args.session = json.load(f) args.session = json.load(f)
doc_name = digest.get_hash(bytes(args.name, encoding='utf-8')) doc_name = digest.get_hash(bytes(args.name, encoding='utf-8'))
derived_key = bytes.fromhex(args.session['derived_key'])
try: try:
metadata = requests.get(f'http://{state['REP_ADDRESS']}/file/get/' + doc_name + '/metadata', headers={'Authorization': args.session['token']}) metadata = requests.get(f'http://{state['REP_ADDRESS']}/file/get/' + doc_name + '/metadata', headers={'Authorization': args.session['token']})
@ -59,10 +62,18 @@ def getDocMetadata(args):
logger.error("Failed to obtain response from server.") logger.error("Failed to obtain response from server.")
sys.exit(-1) sys.exit(-1)
metadata = metadata.json() if metadata.status_code == 200:
try:
sys.stdout.write(json.dumps(metadata)) response_data = json.loads(decrypt(bytes.fromhex(metadata.text), derived_key).decode('utf-8'))
except Exception:
logger.error("Failed to decrypt the content")
sys.exit(1)
sys.stdout.write(json.dumps(response_data, indent=4))
sys.exit(0) sys.exit(0)
else:
logger.error("Failed to retrieve metadata")
sys.exit(-1)
if __name__ == '__main__': if __name__ == '__main__':
getDocMetadata(sys.argv[1:]) getDocMetadata(sys.argv[1:])

View File

@ -8,6 +8,7 @@ import argparse
import datetime import datetime
from subject import main from subject import main
from lib.diffie_hellman import decrypt, encrypt
logging.basicConfig(format='%(levelname)s\t- %(message)s') logging.basicConfig(format='%(levelname)s\t- %(message)s')
logger = logging.getLogger() logger = logging.getLogger()
@ -36,9 +37,7 @@ def list_docs(args):
parser.add_argument('session', nargs='?', default=None) parser.add_argument('session', nargs='?', default=None)
parser.add_argument("-s", '--username', nargs=1, help="Username") parser.add_argument("-s", '--username', nargs=1, help="Username")
parser.add_argument("-dnt", '--newerThan', help="Date new than") parser.add_argument("-d", "--date", nargs=2, help="Date")
parser.add_argument("-dot", '--olderThan', help="Date older than")
parser.add_argument("-deq", '--equalTo', help="Date equal to")
args = parser.parse_args() args = parser.parse_args()
@ -55,89 +54,54 @@ def list_docs(args):
with open(BASE_DIR + args.session, 'r') as f: with open(BASE_DIR + args.session, 'r') as f:
args.session = json.load(f) args.session = json.load(f)
if args.newerThan: payload = {}
#Convert date str to datetime in seconds
args.newerThan = validDate(args.newerThan)
args.newerThan = int(args.newerThan.timestamp())
try: if args.username:
subjects = requests.get(f'http://{state['REP_ADDRESS']}/file/list', payload['username'] = args.username[0]
json=json.dumps({'username' : args.username,'datetime' : {'value' : args.newerThan, 'relation' : 'nt'}}),
headers={'Authorization': args.session['token']})
subjects.raise_for_status()
except requests.exceptions.HTTPError: if args.date:
logger.error("%d: %s", subjects.status_code, subjects.json()['error']) if not args.date[0] or not args.date[1]:
logger.error("Need date.")
sys.exit(-1) sys.exit(-1)
if args.date[0] in ['nt', 'ot', 'et']:
except requests.exceptions.RequestException as errex: payload['datetime'] = {'value' : args.date[1], 'relation' : args.date[0]}
logger.error("Failed to obtain response from server.")
sys.exit(-1)
subjects = subjects.json()
elif args.equalTo:
#Convert date str to datetime in seconds
args.equalTo = validDate(args.equalTo)
args.equalTo = int(args.equalTo.timestamp())
try:
subjects = requests.get(f'http://{state['REP_ADDRESS']}/file/list',
json=json.dumps({'username' : args.username,
'datetime' : {'value' : args.equalTo, 'relation' : 'eq'}}),
headers={'Authorization': args.session['token']})
subjects.raise_for_status()
except requests.exceptions.HTTPError:
logger.error("%d: %s", subjects.status_code, subjects
.json()['error'])
sys.exit(-1)
except requests.exceptions.RequestException as errex:
logger.error("Failed to obtain response from server.")
sys.exit(-1)
subjects = subjects.json()
elif args.olderThan:
#Convert date str to datetime in seconds
args.olderThan = validDate(args.olderThan)
args.olderThan = int(args.olderThan.timestamp())
try:
subjects = requests.get(f'http://{state['REP_ADDRESS']}/file/list',
json=json.dumps({'username' : args.username,
'datetime' : {'value' : args.olderThan, 'relation' : 'ot'}}),
headers={'Authorization': args.session['token']})
subjects.raise_for_status()
except requests.exceptions.HTTPError:
logger.error("%d: %s", subjects.status_code, subjects.json()['error'])
sys.exit(-1)
except requests.exceptions.RequestException as errex:
logger.error("Failed to obtain response from server.")
sys.exit(-1)
subjects = subjects.json()
else: else:
logger.error("Invalid date relation. Use nt, ot or et.")
sys.exit(-1)
derived_key = bytes.fromhex(args.session['derived_key'])
try: try:
subjects = requests.get(f'http://{state['REP_ADDRESS']}/file/list', json=json.dumps({}), headers={'Authorization': args.session['token']}) payload = json.dumps(payload)
subjects.raise_for_status() payload = encrypt(payload, derived_key).hex()
headers = {
'Authorization': args.session['token'],
'Content-Type': 'application/octet-stream'
}
req = requests.get(f'http://{state['REP_ADDRESS']}/file/list', data=payload, headers=headers)
req.raise_for_status()
except requests.exceptions.HTTPError: except requests.exceptions.HTTPError:
logger.error("%d: %s", subjects.status_code, subjects.json()['error']) logger.error("%d: %s", req.status_code, req.json()['error'])
sys.exit(-1) sys.exit(-1)
except requests.exceptions.RequestException as errex: except requests.exceptions.RequestException as errex:
logger.error("Failed to obtain response from server.") logger.error("Failed to obtain response from server.")
sys.exit(-1) sys.exit(-1)
subjects = subjects.json() if req.status_code == 200:
try:
logger.info(json.dumps(subjects, indent=4)) response_data = json.loads(decrypt(bytes.fromhex(req.text), derived_key).decode('utf-8'))
except Exception:
logger.error("Failed to decrypt the content")
sys.exit(1)
logger.info(json.dumps(response_data, indent=4))
sys.exit(0) sys.exit(0)
else:
logger.error("Failed to get documents")
sys.exit(-1)
if __name__ == '__main__': if __name__ == '__main__':
list_docs(sys.argv[1:]) list_docs(sys.argv[1:])

View File

@ -5,6 +5,7 @@ import logging
import json import json
import requests import requests
from lib.symmetric_encryption import decrypt_request_with_iv
from subject import main from subject import main
# Identity attributes # Identity attributes
@ -22,6 +23,7 @@ def listOrganizations():
orgs.raise_for_status() orgs.raise_for_status()
except requests.exceptions.HTTPError: except requests.exceptions.HTTPError:
# We know this is useless, but we don't give a damn and still put it here
logger.error("%d: %s", orgs.status_code, orgs.json()['error']) logger.error("%d: %s", orgs.status_code, orgs.json()['error'])
sys.exit(-1) sys.exit(-1)
@ -29,7 +31,9 @@ def listOrganizations():
logger.error("Failed to obtain response from server.") logger.error("Failed to obtain response from server.")
sys.exit(-1) sys.exit(-1)
logger.info(json.dumps(orgs.json(), indent=4)) orgs = json.loads(decrypt_request_with_iv(bytes.fromhex(orgs.text)))
logger.info(json.dumps(orgs, indent=4))
sys.exit(0) sys.exit(0)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -7,6 +7,7 @@ import json
import argparse import argparse
from subject import main from subject import main
from lib.diffie_hellman import decrypt
logging.basicConfig(format='%(levelname)s\t- %(message)s') logging.basicConfig(format='%(levelname)s\t- %(message)s')
logger = logging.getLogger() logger = logging.getLogger()
@ -56,10 +57,20 @@ def listPermissionRoles(args):
logger.error("Failed to obtain response from server.") logger.error("Failed to obtain response from server.")
sys.exit(-1) sys.exit(-1)
roles = req.json() derived_key = bytes.fromhex(args.session['derived_key'])
logger.info(json.dumps(roles, indent=4))
if req.status_code == 200:
try:
response_data = json.loads(decrypt(bytes.fromhex(req.text), derived_key).decode('utf-8'))
except Exception:
logger.error("Failed to decrypt the content")
sys.exit(1)
logger.info(json.dumps(response_data, indent=4))
sys.exit(0) sys.exit(0)
else:
logger.error("Failed to get roles with permission %s", args.permission)
sys.exit(-1)
if __name__ == '__main__': if __name__ == '__main__':
listPermissionRoles(sys.argv[1:]) listPermissionRoles(sys.argv[1:])

View File

@ -7,6 +7,7 @@ import json
import argparse import argparse
from subject import main from subject import main
from lib.diffie_hellman import decrypt
logging.basicConfig(format='%(levelname)s\t- %(message)s') logging.basicConfig(format='%(levelname)s\t- %(message)s')
logger = logging.getLogger() logger = logging.getLogger()
@ -56,10 +57,18 @@ def listRolePermissions(args):
logger.error("Failed to obtain response from server.") logger.error("Failed to obtain response from server.")
sys.exit(-1) sys.exit(-1)
perms = req.json() derived_key = bytes.fromhex(args.session['derived_key'])
logger.info(json.dumps(perms, indent=4)) if req.status_code == 200:
try:
response_data = json.loads(decrypt(bytes.fromhex(req.text), derived_key).decode('utf-8'))
except Exception:
logger.error("Failed to decrypt the content")
sys.exit(1)
logger.info(json.dumps(response_data, indent=4))
sys.exit(0) sys.exit(0)
else:
logger.error("Failed to get permissions of role %s", args.role)
sys.exit(-1)
if __name__ == '__main__': if __name__ == '__main__':
listRolePermissions(sys.argv[1:]) listRolePermissions(sys.argv[1:])

View File

@ -7,6 +7,7 @@ import json
import argparse import argparse
from subject import main from subject import main
from lib.diffie_hellman import decrypt
logging.basicConfig(format='%(levelname)s\t- %(message)s') logging.basicConfig(format='%(levelname)s\t- %(message)s')
logger = logging.getLogger() logger = logging.getLogger()
@ -56,10 +57,18 @@ def listRoleSubjects(args):
logger.error("Failed to obtain response from server.") logger.error("Failed to obtain response from server.")
sys.exit(-1) sys.exit(-1)
subjects = req.json() derived_key = bytes.fromhex(args.session['derived_key'])
logger.info(json.dumps(subjects, indent=4)) if req.status_code == 200:
try:
response_data = json.loads(decrypt(bytes.fromhex(req.text), derived_key).decode('utf-8'))
except Exception:
logger.error("Failed to decrypt the content")
sys.exit(1)
logger.info(json.dumps(response_data, indent=4))
sys.exit(0) sys.exit(0)
else:
logger.error("Failed to get subjects with role %s", args.role)
sys.exit(-1)
if __name__ == '__main__': if __name__ == '__main__':
listRoleSubjects(sys.argv[1:]) listRoleSubjects(sys.argv[1:])

View File

@ -7,6 +7,7 @@ import json
import argparse import argparse
from subject import main from subject import main
from lib.diffie_hellman import decrypt
logging.basicConfig(format='%(levelname)s\t- %(message)s') logging.basicConfig(format='%(levelname)s\t- %(message)s')
logger = logging.getLogger() logger = logging.getLogger()
@ -56,8 +57,18 @@ def listRoles(args):
logger.error("Failed to obtain response from server.") logger.error("Failed to obtain response from server.")
sys.exit(-1) sys.exit(-1)
roles = req.json() derived_key = bytes.fromhex(args.session['derived_key'])
logger.info(json.dumps(roles, indent=4)) if req.status_code == 200:
try:
response_data = json.loads(decrypt(bytes.fromhex(req.text), derived_key).decode('utf-8'))
except Exception:
logger.error("Failed to decrypt the content")
sys.exit(1)
logger.info(json.dumps(response_data, indent=4))
sys.exit(0)
else:
logger.error("Failed to get roles of session")
sys.exit(-1)
if __name__ == '__main__': if __name__ == '__main__':
listRoles(sys.argv[1:]) listRoles(sys.argv[1:])

View File

@ -7,6 +7,7 @@ import json
import argparse import argparse
from subject import main from subject import main
from lib.diffie_hellman import decrypt
logging.basicConfig(format='%(levelname)s\t- %(message)s') logging.basicConfig(format='%(levelname)s\t- %(message)s')
logger = logging.getLogger() logger = logging.getLogger()
@ -56,10 +57,18 @@ def listSubjectRoles(args):
logger.error("Failed to obtain response from server.") logger.error("Failed to obtain response from server.")
sys.exit(-1) sys.exit(-1)
roles = req.json() derived_key = bytes.fromhex(args.session['derived_key'])
logger.info(json.dumps(roles, indent=4)) if req.status_code == 200:
try:
response_data = json.loads(decrypt(bytes.fromhex(req.text), derived_key).decode('utf-8'))
except Exception:
logger.error("Failed to decrypt the content")
sys.exit(1)
logger.info(json.dumps(response_data, indent=4))
sys.exit(0) sys.exit(0)
else:
logger.error("Failed to get roles for user.")
sys.exit(-1)
if __name__ == '__main__': if __name__ == '__main__':
listSubjectRoles(sys.argv[1:]) listSubjectRoles(sys.argv[1:])

View File

@ -7,6 +7,7 @@ import json
import argparse import argparse
from subject import main from subject import main
from lib.diffie_hellman import decrypt, encrypt
logging.basicConfig(format='%(levelname)s\t- %(message)s') logging.basicConfig(format='%(levelname)s\t- %(message)s')
logger = logging.getLogger() logger = logging.getLogger()
@ -43,38 +44,45 @@ def list_subjects(args):
with open(BASE_DIR + args.session, 'r') as f: with open(BASE_DIR + args.session, 'r') as f:
args.session = json.load(f) args.session = json.load(f)
payload = {}
derived_key = bytes.fromhex(args.session['derived_key'])
if args.username: if args.username:
payload['username'] = args.username[0]
try: try:
subjects = requests.get(f'http://{state['REP_ADDRESS']}/user/list', payload = json.dumps(payload)
json=json.dumps({'username' : args.username}), payload = encrypt(payload, derived_key).hex()
headers={'Authorization': args.session['token']})
subjects.raise_for_status() headers = {
'Authorization': args.session['token'],
'Content-Type': 'application/octet-stream'
}
req = requests.get(f'http://{state['REP_ADDRESS']}/user/list',
data=payload,
headers=headers)
req.raise_for_status()
except requests.exceptions.HTTPError: except requests.exceptions.HTTPError:
logger.error("%d: %s", subjects.status_code, subjects.json()['error']) logger.error("%d: %s", req.status_code, req.json()['error'])
sys.exit(-1) sys.exit(-1)
except requests.exceptions.RequestException as errex: except requests.exceptions.RequestException as errex:
logger.error("Failed to obtain response from server.") logger.error("Failed to obtain response from server.")
sys.exit(-1) sys.exit(-1)
else: if req.status_code == 200:
try: try:
subjects = requests.get(f'http://{state['REP_ADDRESS']}/user/list', response_data = json.loads(decrypt(bytes.fromhex(req.text), derived_key).decode('utf-8'))
json=json.dumps({}), except Exception:
headers={'Authorization': args.session['token']}) logger.error("Failed to decrypt the content")
subjects.raise_for_status() sys.exit(1)
logger.info(json.dumps(response_data, indent=4))
except requests.exceptions.HTTPError:
logger.error("%d: %s", subjects.status_code, subjects.json()['error'])
sys.exit(-1)
except requests.exceptions.RequestException as errex:
logger.error("Failed to obtain response from server.")
sys.exit(-1)
logger.info(json.dumps(subjects.json(), indent=4))
sys.exit(0) sys.exit(0)
else:
logger.error("Failed to get subjects")
sys.exit(-1)
if __name__ == '__main__': if __name__ == '__main__':
list_subjects(sys.argv[1:]) list_subjects(sys.argv[1:])

View File

@ -60,9 +60,8 @@ def removePermission(args):
]: ]:
isPerm = True isPerm = True
if isPerm:
try: try:
req = requests.post(f'http://{state['REP_ADDRESS']}/role/' + args.role + '/perm/remove/' + args.value, req = requests.post(f'http://{state['REP_ADDRESS']}/role/' + args.role + f'/{"perm" if isPerm else "user"}/remove/' + args.value,
headers={'Authorization': args.session['token']}) headers={'Authorization': args.session['token']})
req.raise_for_status() req.raise_for_status()
@ -74,23 +73,7 @@ def removePermission(args):
logger.error("Failed to obtain response from server.") logger.error("Failed to obtain response from server.")
sys.exit(-1) sys.exit(-1)
logger.info("Permission removed from role successfully.") logger.info("Permission removed successfully from role." if isPerm else "User removed successfully from role.")
sys.exit(0)
else:
try:
req = requests.post(f'http://{state['REP_ADDRESS']}/role/' + args.role + '/user/remove/' + args.value,
headers={'Authorization': args.session['token']})
req.raise_for_status()
except requests.exceptions.HTTPError:
logger.error("%d: %s", req.status_code, req.json()['error'])
sys.exit(-1)
except requests.exceptions.RequestException as errex:
logger.error("Failed to obtain response from server.")
sys.exit(-1)
logger.info("Subject removed from role successfully.")
sys.exit(0) sys.exit(0)

View File

@ -1,4 +1,6 @@
from flask_sqlalchemy import SQLAlchemy from flask_sqlalchemy import SQLAlchemy
from services.security import SecurityService
db_connection = SQLAlchemy() db_connection = SQLAlchemy()
db = db_connection.session db = db_connection.session
security_service = SecurityService()

View File

@ -19,8 +19,5 @@ class Session(db_connection.Model):
"user_id": self.user_id, "user_id": self.user_id,
"org_id": self.org_id, "org_id": self.org_id,
"token": self.token, "token": self.token,
"created_at": self.created_at,
"updated_at": self.updated_at,
"challenge": self.challenge,
"verified": self.verified "verified": self.verified
} }

View File

@ -2,8 +2,11 @@ import json
from flask import Blueprint, request, jsonify, send_file, Response from flask import Blueprint, request, jsonify, send_file, Response
from database import security_service
from utils import Perm, get_hex_from_temp_file, get_hash, check_valid_time, PermOperation from utils import Perm, get_hex_from_temp_file, get_hash, check_valid_time, PermOperation
from services import FileService, OrganizationService, UserService, SessionService, RoleService from services import FileService, OrganizationService, UserService, SessionService, RoleService
from utils.comms_encryption import encrypt_response
from utils.exceptions import SessionException
file_bp = Blueprint("file", __name__) file_bp = Blueprint("file", __name__)
upload_service = FileService() upload_service = FileService()
@ -22,9 +25,10 @@ def file_get_content(file_handle: str):
@file_bp.route("/get/<string:document_handle>/metadata", methods=["GET"]) @file_bp.route("/get/<string:document_handle>/metadata", methods=["GET"])
def file_get_metadata(document_handle: str): def file_get_metadata(document_handle: str):
session_token = request.headers.get("Authorization") session_token = request.headers.get("Authorization")
try:
session = SessionService.validate_session(session_token, required_perms=[Perm.DOC_READ], doc_handle=document_handle) session = SessionService.validate_session(session_token, required_perms=[Perm.DOC_READ], doc_handle=document_handle)
if isinstance(session, tuple): except SessionException as e:
return session return jsonify({"error": e.message}), e.code
org = OrganizationService.get_organization(session.org_id) org = OrganizationService.get_organization(session.org_id)
if not org: if not org:
@ -34,19 +38,20 @@ def file_get_metadata(document_handle: str):
if not file: if not file:
return jsonify({"error": "File not found"}), 404 return jsonify({"error": "File not found"}), 404
return jsonify(file.to_dict() | file.get_encrytion()) return encrypt_response(file.to_dict() | file.get_encrytion(), 200, security_service.get_key(session))
@file_bp.route("/upload/metadata", methods=["POST"]) @file_bp.route("/upload/metadata", methods=["POST"])
def file_upload_metadata(): def file_upload_metadata():
session_token = request.headers.get("Authorization") if request.headers.get("Content-Type") != "application/octet-stream":
session = SessionService.validate_session(session_token, required_perms=[Perm.DOC_NEW]) return jsonify({"error": "Invalid request"}), 400
if isinstance(session, tuple):
return session session_token = request.headers.get("Authorization")
try:
session, data = SessionService.validate_session(session_token, data=request.data, required_perms=[Perm.DOC_NEW])
except SessionException as e:
return jsonify({"error": e.message}), e.code
data = request.json
if type(data) is str:
data = json.loads(data)
if "document_name" not in data or "key" not in data or "alg" not in data: if "document_name" not in data or "key" not in data or "alg" not in data:
return jsonify({"error": "Missing required fields"}), 400 return jsonify({"error": "Missing required fields"}), 400
@ -59,7 +64,7 @@ def file_upload_metadata():
return jsonify({"error": "User not found"}), 404 return jsonify({"error": "User not found"}), 404
file = upload_service.create_file(session.token, org, user, data["document_name"], data["key"], data["alg"]) file = upload_service.create_file(session.token, org, user, data["document_name"], data["key"], data["alg"])
return jsonify(file.to_dict()), 201 return encrypt_response(file.to_dict(), 201, security_service.get_key(session))
@file_bp.route("/upload/content", methods=["POST"]) @file_bp.route("/upload/content", methods=["POST"])
@ -95,23 +100,22 @@ def file_upload_content():
if isinstance(file, tuple): if isinstance(file, tuple):
return file return file
return jsonify(file.to_dict()), 201 return encrypt_response(file.to_dict(), 201, security_service.get_key(session))
@file_bp.route("/list", methods=["GET"]) @file_bp.route("/list", methods=["GET"])
def file_list(): def file_list():
if request.headers.get("Content-Type") != "application/octet-stream":
return jsonify({"error": "Invalid request"}), 400
session_token = request.headers.get("Authorization") session_token = request.headers.get("Authorization")
if not session_token: if not session_token:
return jsonify({"error": "No session token"}), 400 return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token) try:
if isinstance(session, tuple): session, data = SessionService.validate_session(session_token, data=request.data)
return session except SessionException as e:
return jsonify({"error": e.message}), e.code
data = request.json
if type(data) is str:
data = json.loads(data)
org = OrganizationService.get_organization(session.org_id) org = OrganizationService.get_organization(session.org_id)
if not org: if not org:
@ -129,13 +133,14 @@ def file_list():
if not user: if not user:
return jsonify({"error": "User not found"}), 404 return jsonify({"error": "User not found"}), 404
files = FileService.list_files_in_org(org) files = FileService.list_files_in_org(org)
return jsonify([file.to_dict() for file in files if file.creator_id == user.id and ( return encrypt_response([file.to_dict() for file in files if file.creator_id == user.id and (
check_valid_time(file.created_at, datetime_value, datetime_relation) check_valid_time(file.created_at, datetime_value, datetime_relation)
if "datetime" in data else True if "datetime" in data else True
)]) )], 200, security_service.get_key(session))
files = FileService.list_files_in_org(org) files = FileService.list_files_in_org(org)
return jsonify([file.to_dict() for file in files if (check_valid_time(file.created_at, datetime_value, datetime_relation) if "datetime" in data else True)]) return encrypt_response([file.to_dict() for file in files if (check_valid_time(file.created_at, datetime_value, datetime_relation) if "datetime" in data else True)],
200, security_service.get_key(session))
@file_bp.route("/delete/<string:document_handle>", methods=["POST"]) @file_bp.route("/delete/<string:document_handle>", methods=["POST"])
@ -144,9 +149,10 @@ def file_delete(document_handle: str):
if not session_token: if not session_token:
return jsonify({"error": "No session token"}), 400 return jsonify({"error": "No session token"}), 400
try:
session = SessionService.validate_session(session_token, required_perms=[Perm.DOC_DELETE], doc_handle=document_handle) session = SessionService.validate_session(session_token, required_perms=[Perm.DOC_DELETE], doc_handle=document_handle)
if isinstance(session, tuple): except SessionException as e:
return session return jsonify({"error": e.message}), e.code
org = OrganizationService.get_organization(session.org_id) org = OrganizationService.get_organization(session.org_id)
if not org: if not org:
@ -160,27 +166,30 @@ def file_delete(document_handle: str):
return jsonify({"error": "Not authorized to delete file"}), 403 return jsonify({"error": "Not authorized to delete file"}), 403
file = FileService.delete_file(file, session.user_id) file = FileService.delete_file(file, session.user_id)
return jsonify(file.to_dict()) return encrypt_response(file.to_dict(), 200, security_service.get_key(session))
@file_bp.route("/acl", methods=["POST"]) @file_bp.route("/acl/<string:document_handle>", methods=["POST"])
def file_acl(): def file_acl(document_handle: str):
if request.headers.get("Content-Type") != "application/octet-stream":
return jsonify({"error": "Invalid request"}), 400
session_token = request.headers.get("Authorization") session_token = request.headers.get("Authorization")
if not session_token: if not session_token:
return jsonify({"error": "No session token"}), 400 return jsonify({"error": "No session token"}), 400
data = request.json try:
if type(data) is str: session, data = SessionService.validate_session(session_token, data=request.data, required_perms=[Perm.DOC_ACL], doc_handle=document_handle)
data = json.loads(data) except SessionException as e:
if "document_handle" not in data or "role" not in data or "perm" not in data or "operation" not in data: return jsonify({"error": e.message}), e.code
if "role" not in data or "perm" not in data or "operation" not in data:
return jsonify({"error": "Missing required fields"}), 400 return jsonify({"error": "Missing required fields"}), 400
doc_handle = data["document_handle"]
role = data["role"] role = data["role"]
perm = Perm.from_str(data["perm"]) perm = Perm.from_str(data["perm"])
operation = PermOperation.ADD if data["operation"] == "add" else PermOperation.REMOVE operation = PermOperation.ADD if data["operation"] == "add" else PermOperation.REMOVE
session = SessionService.validate_session(session_token, required_perms=[Perm.DOC_ACL], doc_handle=doc_handle)
if isinstance(session, tuple): if isinstance(session, tuple):
return session return session
@ -188,7 +197,7 @@ def file_acl():
if not org: if not org:
return jsonify({"error": "Organization not found"}), 404 return jsonify({"error": "Organization not found"}), 404
file = FileService.get_file_by_document_handle(doc_handle) file = FileService.get_file_by_document_handle(document_handle)
if not file: if not file:
return jsonify({"error": "File not found"}), 404 return jsonify({"error": "File not found"}), 404
@ -199,5 +208,5 @@ def file_acl():
if isinstance(acl, tuple): if isinstance(acl, tuple):
return acl return acl
return jsonify(file.to_dict()), 200 return encrypt_response(file.to_dict(), 200, security_service.get_key(session))

View File

@ -1,14 +1,17 @@
import json import json
from flask import Blueprint, request, jsonify from flask import Blueprint, request, jsonify
from services import OrganizationService from services import OrganizationService
from utils.comms_encryption import decrypt_request_with_iv, encrypt_response_with_iv
org_bp = Blueprint("org", __name__) org_bp = Blueprint("org", __name__)
@org_bp.route("/create", methods=["POST"]) @org_bp.route("/create", methods=["POST"])
def org_create(): def org_create():
data = request.json if request.headers.get("Content-Type") != "application/octet-stream":
if type(data) is str: return jsonify({"error": "Invalid request"}), 400
data = json.loads(data)
data = json.loads(decrypt_request_with_iv(request.data))
if "name" not in data or "username" not in data or "full_name" not in data or "email" not in data or "public_key" not in data: if "name" not in data or "username" not in data or "full_name" not in data or "email" not in data or "public_key" not in data:
return jsonify({"error": "Missing required fields"}), 400 return jsonify({"error": "Missing required fields"}), 400
@ -24,9 +27,8 @@ def org_create():
public_key=data["public_key"] public_key=data["public_key"]
) )
return jsonify(org.to_dict()), 201 return encrypt_response_with_iv(json.dumps(org.to_dict()), 201)
@org_bp.route("/list", methods=["GET"]) @org_bp.route("/list", methods=["GET"])
def org_list(): def org_list():
orgs = OrganizationService.list_organizations() return encrypt_response_with_iv(json.dumps([org.to_dict() for org in OrganizationService.list_organizations()]), 200)
return jsonify([org.to_dict() for org in orgs])

View File

@ -1,27 +1,31 @@
import json import json
from flask import Blueprint, request, jsonify from flask import Blueprint, request, jsonify
from database import security_service
from services import UserService, SessionService, OrganizationService, RoleService from services import UserService, SessionService, OrganizationService, RoleService
from utils import Perm, PermOperation from utils import Perm, PermOperation
from utils.comms_encryption import encrypt_response
from utils.exceptions import SessionException
role_bp = Blueprint("role", __name__) role_bp = Blueprint("role", __name__)
@role_bp.route("/create", methods=["POST"]) @role_bp.route("/create", methods=["POST"])
def role_create(): def role_create():
data = request.json if request.headers.get("Content-Type") != "application/octet-stream":
if type(data) is str: return jsonify({"error": "Invalid request"}), 400
data = json.loads(data)
if "role" not in data:
return jsonify({"error": "Missing required fields"}), 400
session_token = request.headers.get("Authorization") session_token = request.headers.get("Authorization")
if not session_token: if not session_token:
return jsonify({"error": "No session token"}), 400 return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, [Perm.ROLE_NEW]) try:
if not session: session, data = SessionService.validate_session(session_token, data=request.data, required_perms=[Perm.ROLE_NEW])
return jsonify({"error": "Not authenticated"}), 401 except SessionException as e:
return jsonify({"error": e.message}), e.code
if "role" not in data:
return jsonify({"error": "Missing required fields"}), 400
org = OrganizationService.get_organization(session.org_id) org = OrganizationService.get_organization(session.org_id)
if not org: if not org:
@ -32,7 +36,7 @@ def role_create():
except ValueError as e: except ValueError as e:
return jsonify({"error": str(e)}), 400 return jsonify({"error": str(e)}), 400
return jsonify(role), 201 return encrypt_response(role, 201, security_service.get_key(session))
@role_bp.route("/<string:role>/list/users", methods=["GET"]) @role_bp.route("/<string:role>/list/users", methods=["GET"])
@ -52,7 +56,8 @@ def role_list_users(role):
users = RoleService.get_users_in_role(org, role) users = RoleService.get_users_in_role(org, role)
if isinstance(users, tuple): if isinstance(users, tuple):
return users return users
return jsonify(users), 200
return encrypt_response(users, 200, security_service.get_key(session))
@role_bp.route("/<string:role>/list/perms", methods=["GET"]) @role_bp.route("/<string:role>/list/perms", methods=["GET"])
@ -72,7 +77,8 @@ def role_list_perms(role):
perms = RoleService.get_perms_for_role(org, role, return_str=True) perms = RoleService.get_perms_for_role(org, role, return_str=True)
if isinstance(perms, tuple): if isinstance(perms, tuple):
return perms return perms
return jsonify(perms), 200
return encrypt_response(perms, 200, security_service.get_key(session))
@role_bp.route("/<string:role>/suspend", methods=["POST"]) @role_bp.route("/<string:role>/suspend", methods=["POST"])
@ -81,7 +87,7 @@ def role_suspend(role):
if not session_token: if not session_token:
return jsonify({"error": "No session token"}), 400 return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, [Perm.ROLE_DOWN]) session = SessionService.validate_session(session_token, required_perms=[Perm.ROLE_DOWN])
if not session: if not session:
return jsonify({"error": "Not authenticated"}), 401 return jsonify({"error": "Not authenticated"}), 401
@ -102,7 +108,7 @@ def role_activate(role):
if not session_token: if not session_token:
return jsonify({"error": "No session token"}), 400 return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, [Perm.ROLE_UP]) session = SessionService.validate_session(session_token, required_perms=[Perm.ROLE_UP])
if not session: if not session:
return jsonify({"error": "Not authenticated"}), 401 return jsonify({"error": "Not authenticated"}), 401
@ -123,7 +129,7 @@ def role_user_add(role, username):
if not session_token: if not session_token:
return jsonify({"error": "No session token"}), 400 return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, [Perm.ROLE_MOD]) session = SessionService.validate_session(session_token, required_perms=[Perm.ROLE_MOD])
if not session: if not session:
return jsonify({"error": "Not authenticated"}), 401 return jsonify({"error": "Not authenticated"}), 401
@ -148,7 +154,7 @@ def role_user_remove(role, username):
if not session_token: if not session_token:
return jsonify({"error": "No session token"}), 400 return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, [Perm.ROLE_MOD]) session = SessionService.validate_session(session_token, required_perms=[Perm.ROLE_MOD])
if not session: if not session:
return jsonify({"error": "Not authenticated"}), 401 return jsonify({"error": "Not authenticated"}), 401
@ -173,7 +179,7 @@ def role_perm_add(role, perm):
if not session_token: if not session_token:
return jsonify({"error": "No session token"}), 400 return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, [Perm.ROLE_MOD]) session = SessionService.validate_session(session_token, required_perms=[Perm.ROLE_MOD])
if not session: if not session:
return jsonify({"error": "Not authenticated"}), 401 return jsonify({"error": "Not authenticated"}), 401
@ -194,7 +200,7 @@ def role_perm_remove(role, perm):
if not session_token: if not session_token:
return jsonify({"error": "No session token"}), 400 return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, [Perm.ROLE_MOD]) session = SessionService.validate_session(session_token, required_perms=[Perm.ROLE_MOD])
if not session: if not session:
return jsonify({"error": "Not authenticated"}), 401 return jsonify({"error": "Not authenticated"}), 401
@ -228,7 +234,7 @@ def role_session_assume(role):
if isinstance(session, tuple): if isinstance(session, tuple):
return session return session
return jsonify(session.to_dict()), 200 return encrypt_response(session.to_dict(), 200, security_service.get_key(session))
@role_bp.route("/session/drop/<string:role>", methods=["POST"]) @role_bp.route("/session/drop/<string:role>", methods=["POST"])
@ -250,7 +256,7 @@ def role_session_drop(role):
if isinstance(session, tuple): if isinstance(session, tuple):
return session return session
return jsonify(session.to_dict()), 200 return encrypt_response(session.to_dict(), 200, security_service.get_key(session))
@role_bp.route("/session/list", methods=["GET"]) @role_bp.route("/session/list", methods=["GET"])
@ -264,7 +270,7 @@ def role_session_list():
return jsonify({"error": "Not authenticated"}), 401 return jsonify({"error": "Not authenticated"}), 401
roles = SessionService.list_roles(session) roles = SessionService.list_roles(session)
return jsonify(roles), 200 return encrypt_response(roles, 200, security_service.get_key(session))
@role_bp.route("/perm/<string:perm>/roles", methods=["GET"]) @role_bp.route("/perm/<string:perm>/roles", methods=["GET"])
def perm_list_roles(perm): def perm_list_roles(perm):
@ -288,4 +294,5 @@ def perm_list_roles(perm):
return jsonify({"error": "Invalid permission"}), 400 return jsonify({"error": "Invalid permission"}), 400
if isinstance(roles, tuple): if isinstance(roles, tuple):
return roles return roles
return jsonify(roles), 200
return encrypt_response(roles, 200, security_service.get_key(session))

View File

@ -1,38 +1,34 @@
import base64 import base64
import json import json
from cryptography.hazmat.primitives import serialization
from cryptography.exceptions import InvalidSignature from cryptography.exceptions import InvalidSignature
from flask import Blueprint, request, jsonify from flask import Blueprint, request, jsonify
from services import UserService, SessionService, OrganizationService, RoleService from services import UserService, SessionService, OrganizationService, RoleService
from utils import Perm from utils import Perm, generate_parameters, generate_key_pair, derive_keys, decrypt
from utils.comms_encryption import encrypt_response, decrypt_request
from database import security_service
from utils.exceptions import SessionException
user_bp = Blueprint("user", __name__) user_bp = Blueprint("user", __name__)
@user_bp.route("/login", methods=["POST"]) @user_bp.route("/login", methods=["POST"])
def user_login(): def user_login():
data = request.json if request.headers.get("Content-Type") == "application/octet-stream":
if type(data) is str: if session_token := request.headers.get("Authorization"):
data = json.loads(data) data = request.data
if "username" in data and "org" in data:
user = UserService.get_user_by_username(data["username"])
if not user:
return jsonify({"error": "User not found"}), 404
org = OrganizationService.get_organization_by_name(data["org"])
if not org:
return jsonify({"error": "Organization not found"}), 404
session = SessionService.create_session(user, org)
return jsonify(session.to_dict()), 201
elif session_token := request.headers.get("Authorization"):
session = SessionService.get_session(session_token) session = SessionService.get_session(session_token)
if not session: if not session:
return jsonify({"error": "Not authenticated"}), 401 return jsonify({"error": "Not authenticated"}), 401
if session.verified: if session.verified:
return jsonify(session.to_dict()), 200 return encrypt_response(session.to_dict(), 201, security_service.get_key(session))
try:
data = decrypt_request(data.decode(), security_service.get_key(session))
except Exception:
return jsonify({"error": "Invalid encryption"}), 400
if not "signature" in data: if not "signature" in data:
return jsonify({"error": "Missing required fields"}), 400 return jsonify({"error": "Missing required fields"}), 400
@ -43,11 +39,66 @@ def user_login():
session = SessionService.verify_session(session_token, signature) session = SessionService.verify_session(session_token, signature)
if isinstance(session, tuple): if isinstance(session, tuple):
return session return session
return jsonify(session.to_dict()), 201 return encrypt_response(session.to_dict(), 201, security_service.get_key(session))
return jsonify({"error": "No session token"}), 400
if request.json and "username" in request.json and "org" in request.json:
data = request.json
if type(data) is str:
data = json.loads(data)
user = UserService.get_user_by_username(data["username"])
if not user:
return jsonify({"error": "User not found"}), 404
org = OrganizationService.get_organization_by_name(data["org"])
if not org:
return jsonify({"error": "Organization not found"}), 404
session = SessionService.create_session(user, org)
payload = {
"token": session.token,
"challenge": session.challenge
}
return encrypt_response(payload, 201, security_service.get_key(session))
return jsonify({"error": "Missing required fields"}), 400 return jsonify({"error": "Missing required fields"}), 400
@user_bp.route("/dh", methods=["POST"])
def user_dh():
data = request.json
if type(data) is str:
data = json.loads(data)
if "user" not in data or "org" not in data:
return jsonify({"error": "Missing required fields"}), 400
if "parameters" in data:
parameters_bytes = bytes.fromhex(data["parameters"])
parameters = serialization.load_pem_parameters(parameters_bytes)
private_key, public_key = generate_key_pair(parameters)
security_service.register_key(data['user'], data['org'], private_key)
public_key_content = public_key.public_bytes(encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
return jsonify({
"public_key": public_key_content.hex()
}), 200
elif "public_key" in data:
client_public_key = data["public_key"]
client_public_key = serialization.load_pem_public_key(bytes.fromhex(client_public_key))
private_key = security_service.keys[f'{data["org"]}/{data["user"]}']
derived_key = derive_keys(private_key, client_public_key)
security_service.register_key(data['user'], data['org'], derived_key)
return jsonify({"message": "Keys exchanged"}), 200
else:
return jsonify({"error": "Missing required fields"}), 400
@user_bp.route("/logout", methods=["POST"]) @user_bp.route("/logout", methods=["POST"])
def user_logout(): def user_logout():
session_token = request.headers.get("Authorization") session_token = request.headers.get("Authorization")
@ -64,17 +115,17 @@ def user_logout():
@user_bp.route("/list", methods=["GET"]) @user_bp.route("/list", methods=["GET"])
def user_list(): def user_list():
if request.headers.get("Content-Type") != "application/octet-stream":
return jsonify({"error": "Invalid request"}), 400
session_token = request.headers.get("Authorization") session_token = request.headers.get("Authorization")
if not session_token: if not session_token:
return jsonify({"error": "No session token"}), 400 return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token) try:
if isinstance(session, tuple): session, data = SessionService.validate_session(session_token, data=request.data)
return session except SessionException as e:
return jsonify({"error": e.message}), e.code
data = request.json
if type(data) is str:
data = json.loads(data)
org = OrganizationService.get_organization(session.org_id) org = OrganizationService.get_organization(session.org_id)
if not org: if not org:
@ -84,25 +135,25 @@ def user_list():
user = UserService.get_user_by_username(data["username"]) user = UserService.get_user_by_username(data["username"])
if not user: if not user:
return jsonify({"error": "User not found"}), 404 return jsonify({"error": "User not found"}), 404
return jsonify(user.to_dict()), 200 return encrypt_response(user.to_dict(), 200, security_service.get_key(session))
users = OrganizationService.get_users_in_organization(org) users = OrganizationService.get_users_in_organization(org)
return jsonify(users), 200 return encrypt_response(users, 200, security_service.get_key(session))
@user_bp.route("/create", methods=["POST"]) @user_bp.route("/create", methods=["POST"])
def user_create(): def user_create():
if request.headers.get("Content-Type") != "application/octet-stream":
return jsonify({"error": "Invalid request"}), 400
session_token = request.headers.get("Authorization") session_token = request.headers.get("Authorization")
if not session_token: if not session_token:
return jsonify({"error": "No session token"}), 400 return jsonify({"error": "No session token"}), 400
session = SessionService.validate_session(session_token, required_perms=[Perm.SUBJECT_NEW]) try:
if isinstance(session, tuple): session, data = SessionService.validate_session(session_token, data=request.data, required_perms=[Perm.SUBJECT_NEW])
return session except SessionException as e:
return jsonify({"error": e.message}), e.code
data = request.json
if type(data) is str:
data = json.loads(data)
if "username" not in data or "full_name" not in data or "email" not in data or "public_key" not in data: if "username" not in data or "full_name" not in data or "email" not in data or "public_key" not in data:
return jsonify({"error": "Missing required fields"}), 400 return jsonify({"error": "Missing required fields"}), 400
@ -121,7 +172,7 @@ def user_create():
org=org org=org
) )
return jsonify(user.to_dict()), 201 return encrypt_response(user.to_dict(), 201, security_service.get_key(session))
@user_bp.route("/<string:username>/roles", methods=["GET"]) @user_bp.route("/<string:username>/roles", methods=["GET"])
@ -130,9 +181,10 @@ def user_roles(username):
if not session_token: if not session_token:
return jsonify({"error": "No session token"}), 400 return jsonify({"error": "No session token"}), 400
try:
session = SessionService.validate_session(session_token) session = SessionService.validate_session(session_token)
if isinstance(session, tuple): except SessionException as e:
return session return jsonify({"error": e.message}), e.code
org = OrganizationService.get_organization(session.org_id) org = OrganizationService.get_organization(session.org_id)
if not org: if not org:
@ -143,8 +195,7 @@ def user_roles(username):
return jsonify({"error": "User not found"}), 404 return jsonify({"error": "User not found"}), 404
roles = RoleService.get_roles_for_user(user, org) roles = RoleService.get_roles_for_user(user, org)
return jsonify(roles), 200 return encrypt_response(roles, 200, security_service.get_key(session))
@user_bp.route("/<string:username>/suspend", methods=["POST"]) @user_bp.route("/<string:username>/suspend", methods=["POST"])
def user_suspend(username): def user_suspend(username):
@ -152,9 +203,10 @@ def user_suspend(username):
if not session_token: if not session_token:
return jsonify({"error": "No session token"}), 400 return jsonify({"error": "No session token"}), 400
try:
session = SessionService.validate_session(session_token, required_perms=[Perm.SUBJECT_DOWN]) session = SessionService.validate_session(session_token, required_perms=[Perm.SUBJECT_DOWN])
if isinstance(session, tuple): except SessionException as e:
return session return jsonify({"error": e.message}), e.code
org = OrganizationService.get_organization(session.org_id) org = OrganizationService.get_organization(session.org_id)
if not org: if not org:
@ -173,9 +225,10 @@ def user_unsuspend(username):
if not session_token: if not session_token:
return jsonify({"error": "No session token"}), 400 return jsonify({"error": "No session token"}), 400
try:
session = SessionService.validate_session(session_token, required_perms=[Perm.SUBJECT_UP]) session = SessionService.validate_session(session_token, required_perms=[Perm.SUBJECT_UP])
if isinstance(session, tuple): except SessionException as e:
return session return jsonify({"error": e.message}), e.code
org = OrganizationService.get_organization(session.org_id) org = OrganizationService.get_organization(session.org_id)
if not org: if not org:

View File

@ -65,28 +65,6 @@ class FileService:
return file return file
@staticmethod
def create_dummy_file(org: Organization, user: User) -> File:
file = File(
file_handle = "dummy_file",
document_handle = "org/dummy_file.txt",
name = "dummy_file",
created_at = int(datetime.now().timestamp()),
org_id = 1,
creator_id = 1,
org = org,
creator = user
)
file_path = os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "repository"), file.document_handle)
with open(file_path, "w") as f:
f.write("Dummy file content")
db.add(file)
db.commit()
db.refresh(file)
return file
@staticmethod @staticmethod
def get_file(file_id: int) -> File | None: def get_file(file_id: int) -> File | None:
return db.query(File).filter(File.id == file_id).first() return db.query(File).filter(File.id == file_id).first()

View File

@ -94,7 +94,7 @@ class OrganizationService:
return org return org
@staticmethod @staticmethod
def suspend_user(org: Organization, user: User) -> tuple: def suspend_user(org: Organization, user: User):
if OrganizationService.get_user_status(org, user.id) != "active": if OrganizationService.get_user_status(org, user.id) != "active":
return {"error": "User already suspended"}, 400 return {"error": "User already suspended"}, 400
@ -109,7 +109,7 @@ class OrganizationService:
return {"message": "User suspended"}, 200 return {"message": "User suspended"}, 200
@staticmethod @staticmethod
def activate_user(org: Organization, user: User) -> tuple: def activate_user(org: Organization, user: User):
if OrganizationService.get_user_status(org, user.id) != "suspended": if OrganizationService.get_user_status(org, user.id) != "suspended":
return {"error": "User already active"}, 400 return {"error": "User already active"}, 400

View File

@ -100,8 +100,10 @@ class RoleService:
return [role for role in org.roles if user.id in org.roles[role]["users"]] return [role for role in org.roles if user.id in org.roles[role]["users"]]
@staticmethod @staticmethod
def get_perms_for_role(org: Organization, role: str, return_str=False) -> list[Perm | str]: def get_perms_for_role(org: Organization, role: str, return_str=False) -> list[Perm | str] | tuple:
from services import FileService from services import FileService
if role not in org.roles:
return jsonify({"error": f"Role {role} does not exist in organization {org.name}"}), 400
perms_list = Perm.get_perms(org.roles[role]["permissions"], return_str) perms_list = Perm.get_perms(org.roles[role]["permissions"], return_str)
for f in FileService.list_files_in_org(org): for f in FileService.list_files_in_org(org):
perms_list.append({f.document_handle: Perm.get_perms(f.acl[role], return_str)}) perms_list.append({f.document_handle: Perm.get_perms(f.acl[role], return_str)})

View File

@ -0,0 +1,14 @@
class SecurityService:
def __init__(self):
self.keys = {}
def register_key(self, username, org, key):
self.keys[f'{org}/{username}'] = key
def get_key(self, session):
from services import UserService, OrganizationService
username = UserService.get_user(session.user_id).username
org = OrganizationService.get_organization(session.org_id).name
return self.keys[f'{org}/{username}']

View File

@ -1,3 +1,4 @@
import json
import secrets import secrets
from cryptography.exceptions import InvalidSignature from cryptography.exceptions import InvalidSignature
@ -6,11 +7,13 @@ from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import hashes
from sqlalchemy.orm.attributes import flag_modified from sqlalchemy.orm.attributes import flag_modified
from database import db from database import db, security_service
from models import Session, User, Organization from models import Session, User, Organization
from flask import jsonify from flask import jsonify
from utils import Perm from utils import Perm
from utils.comms_encryption import decrypt_request
from utils.exceptions import SessionException
class SessionService: class SessionService:
@ -63,33 +66,43 @@ class SessionService:
db.commit() db.commit()
@staticmethod @staticmethod
def validate_session(token: str, required_perms: list[Perm] = None, doc_handle=None) -> tuple | Session: def validate_session(token: str, data: bytes = None, required_perms: list[Perm] = None, doc_handle=None) -> tuple | Session:
from services import OrganizationService from services import OrganizationService, UserService
if "Bearer" in token: if "Bearer" in token:
token = token.split(" ")[1] token = token.split(" ")[1]
session = SessionService.get_session(token) session = SessionService.get_session(token)
if not session: if not session:
return jsonify({"error": "Not authenticated"}), 401 raise SessionException("Not authenticated", 401)
if not session.verified: if not session.verified:
return jsonify({"error": "Session has not yet been verified"}), 403 raise SessionException("Session has not yet been verified", 403)
user = UserService.get_user(session.user_id)
if not user:
raise SessionException("User not found", 404)
org = OrganizationService.get_organization(session.org_id) org = OrganizationService.get_organization(session.org_id)
if not org: if not org:
return jsonify({"error": "Organization not found"}), 404 raise SessionException("Organization not found", 404)
status = OrganizationService.get_user_status(org, session.user_id) status = OrganizationService.get_user_status(org, session.user_id)
if status != "active": if status != "active":
return jsonify({"error": "User is not active"}), 403 raise SessionException("User is not active", 403)
if data is not None:
try:
data = decrypt_request(data.decode(), security_service.get_key(session))
except Exception:
raise SessionException("Failed to decrypt request data", 400)
if required_perms: if required_perms:
for perm in required_perms: for perm in required_perms:
if not SessionService.check_permission(session, perm, doc_handle): if not SessionService.check_permission(session, perm, doc_handle):
return jsonify({"error": f"Permission denied, missing required permission: {perm}"}), 403 raise SessionException(f"Permission denied, missing required permission: {perm}", 403)
return session return (session, data) if data is not None else session
@staticmethod @staticmethod
def change_role(session: Session, role: str, operation: str): def change_role(session: Session, role: str, operation: str):

View File

@ -1,3 +1,5 @@
from .checks import check_valid_time from .checks import check_valid_time
from .hashing import get_hash, get_hex_from_temp_file, encode_public_key from .hashing import get_hash, get_hex_from_temp_file, encode_public_key
from .perms import Perm, PermOperation from .perms import Perm, PermOperation
from .diffie_hellman import diffie_hellman, load_dh_private_key, load_dh_public_key, derive_keys, generate_key_pair, generate_parameters, encrypt, decrypt
from .comms_encryption import encrypt_response, decrypt_request, encrypt_response_with_iv, decrypt_request_with_iv

View File

@ -0,0 +1,38 @@
import json
import os
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from utils import encrypt, decrypt
def encrypt_response(payload: json, code: int, key):
payload_str = json.dumps(payload)
encrypted = encrypt(payload_str.encode(), key).hex()
return encrypted, code
def decrypt_request(data: str, key):
return json.loads(decrypt(bytes.fromhex(data), key).decode())
def encrypt_response_with_iv(input_string: str, code: int) -> tuple[str, int]:
iv = os.urandom(16)
cipher = Cipher(algorithms.AES(iv), modes.CFB(iv))
encryptor = cipher.encryptor()
plaintext_bytes = input_string.encode('utf-8')
ciphertext = encryptor.update(plaintext_bytes) + encryptor.finalize()
encrypted_data = iv + ciphertext
return encrypted_data.hex(), code
def decrypt_request_with_iv(encrypted_data: bytes) -> str:
iv = encrypted_data[:16]
ciphertext = encrypted_data[16:]
cipher = Cipher(algorithms.AES(iv), modes.CFB(iv))
decryptor = cipher.decryptor()
plaintext_bytes = decryptor.update(ciphertext) + decryptor.finalize()
return plaintext_bytes.decode('utf-8')

View File

@ -0,0 +1,147 @@
from cryptography.hazmat.primitives.asymmetric import dh
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import subprocess
import os
def generate_parameters(generator: int, key_size: int, backend=default_backend()):
"""
Generate the parameters for the Diffie-Hellman key exchange
:param generator:
:param key_size:
:param backend:
:return:
"""
if key_size < 512:
raise ValueError("Key size must be at least 512 bits")
if generator not in [2, 5]:
raise ValueError("Generator must be 2 or 5")
return dh.generate_parameters(generator, key_size, backend)
def generate_key_pair(parameters: dh.DHParameters):
"""
Generate a key pair for the Diffie-Hellman key exchange
:param parameters:
:return private_key, public_key:
"""
private_key = parameters.generate_private_key()
return private_key, private_key.public_key()
def derive_keys(private_key: dh.DHPrivateKey, peer_public_key: dh.DHPublicKey):
"""
Derive the shared key from the private key and the peer's public key
:param private_key:
:param peer_public_key:
:return private_key, derived_key:
"""
shared_key = private_key.exchange(peer_public_key)
derived_key = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=None,
info=b'handshake data',
backend=default_backend()
).derive(shared_key)
return derived_key
# Encrypt content using the derived key
def encrypt(content, key):
iv = os.urandom(16)
cipher = Cipher(algorithms.AES(key), modes.CFB(iv), backend=default_backend())
encryptor = cipher.encryptor()
ciphertext = iv + encryptor.update(content) + encryptor.finalize()
return ciphertext
# Decrypt content using the derived key
def decrypt(ciphertext, key):
iv = ciphertext[:16]
cipher = Cipher(algorithms.AES(key), modes.CFB(iv), backend=default_backend())
decryptor = cipher.decryptor()
plaintext = decryptor.update(ciphertext[16:]) + decryptor.finalize()
return plaintext
def load_dh_public_key(key):
with open(key, 'rb') as key_file:
public_key = serialization.load_pem_public_key(
key_file.read(),
)
return public_key
def load_dh_private_key(file, passwd=None):
if passwd is not None:
passwd = passwd.encode('utf-8')
try:
with open(file, 'rb') as key_file:
private_key = serialization.load_pem_private_key(
key_file.read(),
password=passwd,
)
except ValueError as e:
raise ValueError("Error: The password is not valid.") from e
return private_key
def diffie_hellman():
# imagine that the agreement was done beforehand
generator = 2; key_size = 1024
parameters = generate_parameters(generator, key_size)
# generate keys for client and server
client_private_key, client_public_key = generate_key_pair(parameters)
server_private_key, server_public_key = generate_key_pair(parameters)
# derive keys
client_derived_key = derive_keys(client_private_key, server_public_key)
server_derived_key = derive_keys(server_private_key, client_public_key)
print("Client derived key: ", client_derived_key)
print("Server derived key: ", server_derived_key)
# write the keys to files
with open("client_private_key.pem", 'wb') as f:
f.write(client_private_key.private_bytes(encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()))
with open("server_private_key.pem", 'wb') as f:
f.write(server_private_key.private_bytes(encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()))
with open("client_derived_key.pem", 'wb') as f:
f.write(client_derived_key)
with open("server_derived_key.pem", 'wb') as f:
f.write(server_derived_key)
print(f"Client private key: \n\n{client_private_key}\nServer derived key: \n\n{server_derived_key}\n")
print(f"Server private key: \n\n{server_private_key}\nClient derived key: \n\n{client_derived_key}\n")
# test encryption
process = subprocess.Popen(f"dd if=/dev/zero of=file.txt bs=1024 count=1000", shell=True)
with open("file.txt", 'rb') as f:
data = f.read()
encrypted_data = encrypt(data, client_derived_key)
decrypted_data = decrypt(encrypted_data, server_derived_key)
if data == decrypted_data:
print("Encryption and decryption successful!")

View File

@ -0,0 +1,5 @@
class SessionException(Exception):
def __init__(self, message: str, code: int):
super().__init__(message)
self.message = message
self.code = code