import json import os, subprocess, sys import requests DELIVERY_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) FILES_PATH = os.path.join(os.path.expanduser('~'), '.sio/') # !!! database.db must be deleted/reset before running the tests !!! requests.post("http://localhost:5000/reset", json={"password": "123"}) os.system(f"rm {FILES_PATH}*") def test_address_set(): # Initialize the server path on state.json process = subprocess.Popen(f"python3 {DELIVERY_PATH}/client/bin/subject.py -r localhost:5000 ", shell=True) process.wait() assert os.path.exists(os.path.join(FILES_PATH, 'state.json')) def test_rep_subject_credentials(): # Test the rep_subject_create command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_subject_credentials password pub.pem priv.pem ", shell=True) process.wait() assert os.path.exists(os.path.join(FILES_PATH, 'pub.pem')) and os.path.exists(os.path.join(FILES_PATH, 'priv.pem')) def test_rep_create_org(): # Test the rep_create_org command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_create_org org1 username name email@org.com pub.pem", shell=True) process.wait() assert process.returncode == 0 def test_rep_list_orgs(): # Test the list_orgs command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_orgs", shell=True) process.wait() assert process.returncode == 0 def test_rep_create_session(): # Test the rep_create_session command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_create_session org1 username password priv.pem session.json", shell=True) process.wait() assert process.returncode == 0 def test_rep_list_subjects(): #Test the rep_list_subjects command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_subjects session.json", shell=True) process.wait() assert process.returncode == 0 def test_rep_assume_role(): # Test the rep_assume_role command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_assume_role session.json manager", shell=True) process.wait() assert process.returncode == 0 def test_rep_add_role(): # Test the rep_add_role command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_role session.json newrole", shell=True) process.wait() assert process.returncode == 0 def test_rep_add_permission(): # Test the rep_add_permission command # Add permissions: SUBJECT_NEW, SUBJECT_DOWN, SUBJECT_UP, DOC_NEW, ROLE_NEW, ROLE_DOWN, ROLE_UP, ROLE_MOD process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole SUBJECT_NEW", shell=True) process.wait() assert process.returncode == 0 process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole SUBJECT_DOWN", shell=True) process.wait() assert process.returncode == 0 process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole SUBJECT_UP", shell=True) process.wait() assert process.returncode == 0 process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole DOC_NEW", shell=True) process.wait() assert process.returncode == 0 process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole ROLE_NEW", shell=True) process.wait() assert process.returncode == 0 process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole ROLE_DOWN", shell=True) process.wait() assert process.returncode == 0 process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole ROLE_UP", shell=True) process.wait() assert process.returncode == 0 process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole ROLE_MOD", shell=True) process.wait() assert process.returncode == 0 process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole ROLE_ACL", shell=True) process.wait() assert process.returncode == 0 def test_rep_remove_permission(): # Test the rep_remove_permission command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_remove_permission session.json newrole ROLE_ACL", shell=True) process.wait() assert process.returncode == 0 def test_rep_suspend_role(): # Test the rep_suspend_role command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_suspend_role session.json newrole", shell=True) process.wait() assert process.returncode == 0 def test_rep_reactivate_role(): # Test the rep_activate_role command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_reactivate_role session.json newrole", shell=True) process.wait() assert process.returncode == 0 def test_rep_add_permission_user(): # Test the rep_add_permission command with username process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole username", shell=True) process.wait() assert process.returncode == 0 def test_rep_drop_role(): # Test the rep_drop_role command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_drop_role session.json manager", shell=True) process.wait() assert process.returncode == 0 process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_assume_role session.json newrole", shell=True) process.wait() assert process.returncode == 0 def test_rep_list_roles(): # Test the rep_list_roles command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_roles session.json", shell=True) process.wait() assert process.returncode == 0 def test_rep_list_role_subjects(): # Test the rep_list_role_subjects command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_role_subjects session.json newrole", shell=True) process.wait() assert process.returncode == 0 def test_rep_list_subject_roles(): # Test the rep_list_subject_roles command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_subject_roles session.json username", shell=True) process.wait() assert process.returncode == 0 def test_rep_list_role_permissions(): # Test the rep_list_role_permissions command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_role_permissions session.json newrole", shell=True) process.wait() assert process.returncode == 0 def test_rep_list_permission_roles(): # Test the rep_list_permission_roles command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_permission_roles session.json SUBJECT_NEW", shell=True) process.wait() assert process.returncode == 0 def test_rep_add_subject(): # Test the rep_subject_create command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_subject_credentials password pub_extra.pem priv_extra.pem ", shell=True) process.wait() assert os.path.exists(os.path.join(FILES_PATH, 'pub_extra.pem')) and os.path.exists(os.path.join(FILES_PATH, 'priv_extra.pem')) process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_subject session.json username2 name2 name2@any.com pub_extra.pem", shell=True) process.wait() assert process.returncode == 0 def test_rep_suspend_subject(): # Test the rep_suspend_subject command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_suspend_subject session.json username2", shell=True) process.wait() assert process.returncode == 0 def test_rep_activate_subject(): # Test the rep_activate_subject command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_activate_subject session.json username2", shell=True) process.wait() assert process.returncode == 0 def test_rep_add_doc(): # Test the rep_add_doc command process = subprocess.Popen(f"dd if=/dev/zero of={FILES_PATH}test.txt bs=1024 count=1000 ", shell=True) process.wait() assert process.returncode == 0 process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_doc session.json doc test.txt ", shell=True) process.wait() assert process.returncode == 0 def test_rep_acl_doc(): # Test the rep_acl_doc command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_acl_doc session.json doc - newrole DOC_READ", shell=True) process.wait() assert process.returncode == 0 process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_acl_doc session.json doc - newrole DOC_DELETE", shell=True) process.wait() assert process.returncode == 0 process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_acl_doc session.json doc + newrole DOC_READ", shell=True) process.wait() assert process.returncode == 0 process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_acl_doc session.json doc + newrole DOC_DELETE", shell=True) process.wait() assert process.returncode == 0 def test_rep_list_docs(): # Test the rep_list_docs command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_docs session.json", shell=True) process.wait() assert process.returncode == 0 def test_rep_list_permission_roles_with_file(): # Test the rep_list_permission_roles command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_permission_roles session.json DOC_READ", shell=True) process.wait() assert process.returncode == 0 metadata = {} def test_rep_get_doc_metadata(): # Test the rep_get_doc_metadata command global metadata process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_get_doc_metadata session.json doc", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) process.wait() stdout, stderr = process.communicate() metadata = json.loads(stdout) assert process.returncode == 0 def test_rep_get_file(): # Test the rep_get_file command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_get_file {metadata['file_handle']} file.txt ", shell=True) process.wait() assert process.returncode == 0 def test_decrypt_file(): # Test the rep_decrypt_file command encryption_metadata = { 'alg': metadata['alg'], 'key': metadata['key'] } process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_decrypt_file file.txt '{json.dumps(encryption_metadata)}'", shell=True) process.wait() assert process.returncode == 0 def test_rep_get_doc_file(): # Test the rep_get_doc_file process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_get_doc_file session.json doc file.txt", shell=True) process.wait() assert process.returncode == 0 def test_rep_delete_doc(): # Test the rep_get_doc_file process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_delete_doc session.json doc", shell=True) process.wait() assert process.returncode == 0 def test_rep_list_docs_after_deletion(): # Test the rep_list_docs command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_docs session.json", shell=True) process.wait() assert process.returncode == 0