129 lines
4.4 KiB
Python
129 lines
4.4 KiB
Python
import json
|
|
import os, subprocess, sys
|
|
|
|
import requests
|
|
|
|
DELIVERY_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
|
|
FILES_PATH = os.path.join(os.path.expanduser('~'), '.sio/')
|
|
|
|
# !!! database.db must be deleted/reset before running the tests !!!
|
|
requests.post("http://localhost:5000/reset", json={"password": "123"})
|
|
os.system(f"rm {FILES_PATH}*")
|
|
|
|
def test_address_set():
|
|
# Initialize the server path on state.json
|
|
process = subprocess.Popen(f"python3 {DELIVERY_PATH}/client/bin/subject.py -r localhost:5000 ", shell=True)
|
|
process.wait()
|
|
assert os.path.exists(os.path.join(FILES_PATH, 'state.json'))
|
|
|
|
|
|
def test_rep_subject_credentials():
|
|
# Test the rep_subject_create command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_subject_credentials password pub.pem priv.pem ", shell=True)
|
|
process.wait()
|
|
assert os.path.exists(os.path.join(FILES_PATH, 'pub.pem')) and os.path.exists(os.path.join(FILES_PATH, 'priv.pem'))
|
|
|
|
|
|
def test_rep_create_org():
|
|
# Test the rep_create_org command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_create_org org1 username name email@org.com pub.pem", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_list_orgs():
|
|
# Test the list_orgs command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_orgs", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_create_session():
|
|
# Test the rep_create_session command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_create_session org1 username password pub.pem session.json", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_list_subjects():
|
|
#Test the rep_list_subjects command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_list_subjects session.json", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
def test_rep_add_subject():
|
|
# Test the rep_subject_create command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_subject_credentials password pub_extra.pem priv_extra.pem ", shell=True)
|
|
process.wait()
|
|
assert os.path.exists(os.path.join(FILES_PATH, 'pub_extra.pem')) and os.path.exists(os.path.join(FILES_PATH, 'priv_extra.pem'))
|
|
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_subject session.json username2 name2 name2@any.com pub_extra.pem", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_suspend_subject():
|
|
# Test the rep_suspend_subject command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_suspend_subject session.json username2", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_activate_subject():
|
|
# Test the rep_activate_subject command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_activate_subject session.json username2", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
def test_rep_add_doc():
|
|
# Test the rep_add_doc command
|
|
process = subprocess.Popen(f"dd if=/dev/urandom of={FILES_PATH}test.txt bs=1024 count=1000 ", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_add_doc session.json doc test.txt ", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
metadata = {}
|
|
|
|
def test_rep_get_doc_metadata():
|
|
# Test the rep_get_doc_metadata command
|
|
global metadata
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_get_doc_metadata session.json doc", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
process.wait()
|
|
stdout, stderr = process.communicate()
|
|
metadata = json.loads(stdout)
|
|
assert process.returncode == 0
|
|
|
|
def test_rep_get_file():
|
|
# Test the rep_get_file command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_get_file {metadata['file_handle']} file.txt ", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
def test_decrypt_file():
|
|
# Test the rep_decrypt_file command
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_decrypt_file file.txt '{json.dumps(metadata)}'", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
def test_rep_get_doc_file():
|
|
# Test the rep_get_doc_file
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_get_doc_file session.json doc ", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
def test_rep_delete_doc():
|
|
# Test the rep_get_doc_file
|
|
process = subprocess.Popen(f"{DELIVERY_PATH}/client/bin/rep_delete_doc session.json doc ", shell=True)
|
|
process.wait()
|
|
assert process.returncode == 0
|
|
|
|
|
|
|
|
|
|
|
|
|