sio-2425/delivery2/client/tests/test_client.py

298 lines
11 KiB
Python

import json
import os, subprocess, sys
import requests
DELIVERY_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
FILES_PATH = os.path.join(os.path.expanduser('~'), '.sio/')
# !!! database.db must be deleted/reset before running the tests !!!
requests.post("http://localhost:5000/reset", json={"password": "123"})
os.system(f"rm {FILES_PATH}*")
def test_address_set():
# Initialize the server path on state.json
process = subprocess.Popen(f"python3 {DELIVERY_PATH}/client/bin/subject.py -r localhost:5000 ", shell=True)
process.wait()
assert os.path.exists(os.path.join(FILES_PATH, 'state.json'))
def test_rep_subject_credentials():
# Test the rep_subject_create command
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_subject_credentials password pub.pem priv.pem ", shell=True)
process.wait()
assert os.path.exists(os.path.join(FILES_PATH, 'pub.pem')) and os.path.exists(os.path.join(FILES_PATH, 'priv.pem'))
def test_rep_create_org():
# Test the rep_create_org command
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_create_org org1 username name email@org.com pub.pem", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_list_orgs():
# Test the list_orgs command
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_orgs", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_create_session():
# Test the rep_create_session command
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_create_session org1 username password priv.pem session.json", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_list_subjects():
#Test the rep_list_subjects command
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_subjects session.json", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_assume_role():
# Test the rep_assume_role command
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_assume_role session.json manager", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_add_role():
# Test the rep_add_role command
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_role session.json newrole", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_add_permission():
# Test the rep_add_permission command
# Add permissions: SUBJECT_NEW, SUBJECT_DOWN, SUBJECT_UP, DOC_NEW, ROLE_NEW, ROLE_DOWN, ROLE_UP, ROLE_MOD
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole SUBJECT_NEW", shell=True)
process.wait()
assert process.returncode == 0
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole SUBJECT_DOWN", shell=True)
process.wait()
assert process.returncode == 0
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole SUBJECT_UP", shell=True)
process.wait()
assert process.returncode == 0
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole DOC_NEW", shell=True)
process.wait()
assert process.returncode == 0
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole ROLE_NEW", shell=True)
process.wait()
assert process.returncode == 0
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole ROLE_DOWN", shell=True)
process.wait()
assert process.returncode == 0
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole ROLE_UP", shell=True)
process.wait()
assert process.returncode == 0
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole ROLE_MOD", shell=True)
process.wait()
assert process.returncode == 0
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole ROLE_ACL", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_remove_permission():
# Test the rep_remove_permission command
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_remove_permission session.json newrole ROLE_ACL", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_suspend_role():
# Test the rep_suspend_role command
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_suspend_role session.json newrole", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_reactivate_role():
# Test the rep_activate_role command
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_reactivate_role session.json newrole", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_add_permission_user():
# Test the rep_add_permission command with username
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_permission session.json newrole username", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_drop_role():
# Test the rep_drop_role command
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_drop_role session.json manager", shell=True)
process.wait()
assert process.returncode == 0
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_assume_role session.json newrole", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_list_roles():
# Test the rep_list_roles command
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_roles session.json", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_list_role_subjects():
# Test the rep_list_role_subjects command
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_role_subjects session.json newrole", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_list_subject_roles():
# Test the rep_list_subject_roles command
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_subject_roles session.json username", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_list_role_permissions():
# Test the rep_list_role_permissions command
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_role_permissions session.json newrole", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_list_permission_roles():
# Test the rep_list_permission_roles command
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_permission_roles session.json SUBJECT_NEW", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_add_subject():
# Test the rep_subject_create command
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_subject_credentials password pub_extra.pem priv_extra.pem ", shell=True)
process.wait()
assert os.path.exists(os.path.join(FILES_PATH, 'pub_extra.pem')) and os.path.exists(os.path.join(FILES_PATH, 'priv_extra.pem'))
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_subject session.json username2 name2 name2@any.com pub_extra.pem", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_suspend_subject():
# Test the rep_suspend_subject command
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_suspend_subject session.json username2", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_activate_subject():
# Test the rep_activate_subject command
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_activate_subject session.json username2", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_add_doc():
# Test the rep_add_doc command
process = subprocess.Popen(f"dd if=/dev/zero of={FILES_PATH}test.txt bs=1024 count=1000 ", shell=True)
process.wait()
assert process.returncode == 0
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_doc session.json doc test.txt ", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_acl_doc():
# Test the rep_acl_doc command
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_acl_doc session.json doc - newrole DOC_READ", shell=True)
process.wait()
assert process.returncode == 0
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_acl_doc session.json doc - newrole DOC_DELETE", shell=True)
process.wait()
assert process.returncode == 0
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_acl_doc session.json doc + newrole DOC_READ", shell=True)
process.wait()
assert process.returncode == 0
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_acl_doc session.json doc + newrole DOC_DELETE", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_list_docs():
# Test the rep_list_docs command
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_docs session.json", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_list_permission_roles_with_file():
# Test the rep_list_permission_roles command
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_permission_roles session.json DOC_READ", shell=True)
process.wait()
assert process.returncode == 0
metadata = {}
def test_rep_get_doc_metadata():
# Test the rep_get_doc_metadata command
global metadata
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_get_doc_metadata session.json doc", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
process.wait()
stdout, stderr = process.communicate()
metadata = json.loads(stdout)
assert process.returncode == 0
def test_rep_get_file():
# Test the rep_get_file command
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_get_file {metadata['file_handle']} file.txt ", shell=True)
process.wait()
assert process.returncode == 0
def test_decrypt_file():
# Test the rep_decrypt_file command
encryption_metadata = {
'alg': metadata['alg'],
'key': metadata['key']
}
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_decrypt_file file.txt '{json.dumps(encryption_metadata)}'", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_get_doc_file():
# Test the rep_get_doc_file
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_get_doc_file session.json doc file.txt", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_delete_doc():
# Test the rep_get_doc_file
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_delete_doc session.json doc", shell=True)
process.wait()
assert process.returncode == 0
def test_rep_list_docs_after_deletion():
# Test the rep_list_docs command
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_docs session.json", shell=True)
process.wait()
assert process.returncode == 0