import json import os, subprocess, sys import requests DELIVERY_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) FILES_PATH = os.path.join(os.path.expanduser('~'), '.sio/') # !!! database.db must be deleted/reset before running the tests !!! requests.post("http://localhost:5000/reset", json={"password": "123"}) os.system(f"rm {FILES_PATH}*") def test_address_set(): # Initialize the server path on state.json process = subprocess.Popen(f"python3 {DELIVERY_PATH}/client/bin/subject.py -r localhost:5000 ", shell=True) process.wait() assert os.path.exists(os.path.join(FILES_PATH, 'state.json')) def test_rep_subject_credentials(): # Test the rep_subject_create command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_subject_credentials password pub.pem priv.pem ", shell=True) process.wait() assert os.path.exists(os.path.join(FILES_PATH, 'pub.pem')) and os.path.exists(os.path.join(FILES_PATH, 'priv.pem')) def test_rep_create_org(): # Test the rep_create_org command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_create_org org1 username name email@org.com pub.pem", shell=True) process.wait() assert process.returncode == 0 def test_rep_list_orgs(): # Test the list_orgs command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_orgs", shell=True) process.wait() assert process.returncode == 0 def test_rep_create_session(): # Test the rep_create_session command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_create_session org1 username password priv.pem session.json", shell=True) process.wait() assert process.returncode == 0 def test_rep_assume_role(): # Test the rep_assume_role command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_assume_role session.json manager", shell=True) process.wait() assert process.returncode == 0 def test_rep_list_subjects(): #Test the rep_list_subjects command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_subjects session.json", shell=True) process.wait() assert process.returncode == 0 def test_rep_add_subject(): # Test the rep_subject_create command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_subject_credentials password pub_extra.pem priv_extra.pem ", shell=True) process.wait() assert os.path.exists(os.path.join(FILES_PATH, 'pub_extra.pem')) and os.path.exists(os.path.join(FILES_PATH, 'priv_extra.pem')) process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_subject session.json username2 name2 name2@any.com pub_extra.pem", shell=True) process.wait() assert process.returncode == 0 def test_rep_suspend_subject(): # Test the rep_suspend_subject command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_suspend_subject session.json username2", shell=True) process.wait() assert process.returncode == 0 def test_rep_activate_subject(): # Test the rep_activate_subject command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_activate_subject session.json username2", shell=True) process.wait() assert process.returncode == 0 def test_rep_add_doc(): # Test the rep_add_doc command process = subprocess.Popen(f"dd if=/dev/zero of={FILES_PATH}test.txt bs=1024 count=1000 ", shell=True) process.wait() assert process.returncode == 0 process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_doc session.json doc test.txt ", shell=True) process.wait() assert process.returncode == 0 metadata = {} def test_rep_get_doc_metadata(): # Test the rep_get_doc_metadata command global metadata process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_get_doc_metadata session.json doc", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) process.wait() stdout, stderr = process.communicate() metadata = json.loads(stdout) assert process.returncode == 0 def test_rep_get_file(): # Test the rep_get_file command process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_get_file {metadata['file_handle']} file.txt ", shell=True) process.wait() assert process.returncode == 0 def test_decrypt_file(): # Test the rep_decrypt_file command encryption_metadata = { 'alg': metadata['alg'], 'key': metadata['key'] } process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_decrypt_file file.txt '{json.dumps(encryption_metadata)}'", shell=True) process.wait() assert process.returncode == 0 def test_rep_get_doc_file(): # Test the rep_get_doc_file process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_get_doc_file session.json doc file.txt", shell=True) process.wait() assert process.returncode == 0 def test_rep_delete_doc(): # Test the rep_get_doc_file process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_delete_doc session.json doc", shell=True) process.wait() assert process.returncode == 0