298 lines
11 KiB
Python
298 lines
11 KiB
Python
import json
|
|
import os, subprocess, sys
|
|
|
|
import requests
|
|
|
|
DELIVERY_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
|
|
FILES_PATH = os.path.join(os.path.expanduser('~'), '.sio/')
|
|
|
|
# !!! database.db must be deleted/reset before running the tests !!!
|
|
requests.post("http://localhost:5000/reset", json={"password": "123"})
|
|
os.system(f"rm {FILES_PATH}*")
|
|
|
|
def test_address_set():
|
|
# Initialize the server path on state.json
|
|
process = subprocess.Popen(f"python3 {DELIVERY_PATH}/client/bin/subject.py -r localhost:5000 ", shell=True)
|
|
process.wait()
|
|
assert os.path.exists(os.path.join(FILES_PATH, 'state.json'))
|
|
|
|
|
|
def test_rep_subject_credentials():
|
|
# Test the rep_subject_create command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_subject_credentials password pub.pem priv.pem ", shell=True)
|
|
process.wait()
|
|
assert os.path.exists(os.path.join(FILES_PATH, 'pub.pem')) and os.path.exists(os.path.join(FILES_PATH, 'priv.pem'))
|
|
|
|
|
|
def test_rep_create_org():
|
|
# Test the rep_create_org command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_create_org org1 username name email@org.com pub.pem", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_list_orgs():
|
|
# Test the list_orgs command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_orgs", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_create_session():
|
|
# Test the rep_create_session command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_create_session org1 username password priv.pem session.json", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_list_subjects():
|
|
#Test the rep_list_subjects command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_subjects session.json", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_assume_role():
|
|
# Test the rep_assume_role command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_assume_role session.json manager", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_add_role():
|
|
# Test the rep_add_role command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_role session.json newrole", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
def test_rep_add_permission():
|
|
# Test the rep_add_permission command
|
|
# Add permissions: SUBJECT_NEW, SUBJECT_DOWN, SUBJECT_UP, DOC_NEW, ROLE_NEW, ROLE_DOWN, ROLE_UP, ROLE_MOD
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole SUBJECT_NEW", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole SUBJECT_DOWN", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole SUBJECT_UP", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole DOC_NEW", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole ROLE_NEW", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole ROLE_DOWN", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole ROLE_UP", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole ROLE_MOD", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole ROLE_ACL", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_remove_permission():
|
|
# Test the rep_remove_permission command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_remove_permission session.json newrole ROLE_ACL", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_suspend_role():
|
|
# Test the rep_suspend_role command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_suspend_role session.json newrole", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_reactivate_role():
|
|
# Test the rep_activate_role command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_reactivate_role session.json newrole", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_add_permission_user():
|
|
# Test the rep_add_permission command with username
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole username", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_drop_role():
|
|
# Test the rep_drop_role command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_drop_role session.json manager", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_assume_role session.json newrole", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_list_roles():
|
|
# Test the rep_list_roles command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_roles session.json", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_list_role_subjects():
|
|
# Test the rep_list_role_subjects command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_role_subjects session.json newrole", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_list_subject_roles():
|
|
# Test the rep_list_subject_roles command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_subject_roles session.json username", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_list_role_permissions():
|
|
# Test the rep_list_role_permissions command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_role_permissions session.json newrole", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_list_permission_roles():
|
|
# Test the rep_list_permission_roles command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_permission_roles session.json SUBJECT_NEW", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_add_subject():
|
|
# Test the rep_subject_create command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_subject_credentials password pub_extra.pem priv_extra.pem ", shell=True)
|
|
process.wait()
|
|
assert os.path.exists(os.path.join(FILES_PATH, 'pub_extra.pem')) and os.path.exists(os.path.join(FILES_PATH, 'priv_extra.pem'))
|
|
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_subject session.json username2 name2 name2@any.com pub_extra.pem", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_suspend_subject():
|
|
# Test the rep_suspend_subject command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_suspend_subject session.json username2", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_activate_subject():
|
|
# Test the rep_activate_subject command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_activate_subject session.json username2", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_add_doc():
|
|
# Test the rep_add_doc command
|
|
process = subprocess.Popen(f"dd if=/dev/zero of={FILES_PATH}test.txt bs=1024 count=1000 ", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_doc session.json doc test.txt ", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_acl_doc():
|
|
# Test the rep_acl_doc command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_acl_doc session.json doc - newrole DOC_READ", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_acl_doc session.json doc - newrole DOC_DELETE", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_acl_doc session.json doc + newrole DOC_READ", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_acl_doc session.json doc + newrole DOC_DELETE", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_list_docs():
|
|
# Test the rep_list_docs command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_docs session.json", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_list_permission_roles_with_file():
|
|
# Test the rep_list_permission_roles command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_permission_roles session.json DOC_READ", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
metadata = {}
|
|
|
|
|
|
def test_rep_get_doc_metadata():
|
|
# Test the rep_get_doc_metadata command
|
|
global metadata
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_get_doc_metadata session.json doc", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
process.wait()
|
|
stdout, stderr = process.communicate()
|
|
metadata = json.loads(stdout)
|
|
assert process.returncode == 0
|
|
|
|
def test_rep_get_file():
|
|
# Test the rep_get_file command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_get_file {metadata['file_handle']} file.txt ", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
def test_decrypt_file():
|
|
# Test the rep_decrypt_file command
|
|
encryption_metadata = {
|
|
'alg': metadata['alg'],
|
|
'key': metadata['key']
|
|
}
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_decrypt_file file.txt '{json.dumps(encryption_metadata)}'", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
def test_rep_get_doc_file():
|
|
# Test the rep_get_doc_file
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_get_doc_file session.json doc file.txt", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
def test_rep_delete_doc():
|
|
# Test the rep_get_doc_file
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_delete_doc session.json doc", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_list_docs_after_deletion():
|
|
# Test the rep_list_docs command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_docs session.json", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|