mirror of
				https://github.com/laurivosandi/certidude
				synced 2025-10-31 09:29:13 +00:00 
			
		
		
		
	Add more API tests for lease, attribs etc
This commit is contained in:
		| @@ -222,4 +222,7 @@ def certidude_app(): | |||||||
|     # Bootstrap resource |     # Bootstrap resource | ||||||
|     app.add_route("/api/bootstrap/", BootstrapResource()) |     app.add_route("/api/bootstrap/", BootstrapResource()) | ||||||
|  |  | ||||||
|  |     # Add sink for serving static files | ||||||
|  |     app.add_sink(StaticResource(os.path.join(__file__, "..", "..", "static"))) | ||||||
|  |  | ||||||
|     return app |     return app | ||||||
|   | |||||||
| @@ -18,11 +18,14 @@ class LeaseDetailResource(object): | |||||||
|     @login_required |     @login_required | ||||||
|     @authorize_admin |     @authorize_admin | ||||||
|     def on_get(self, req, resp, cn): |     def on_get(self, req, resp, cn): | ||||||
|  |         try: | ||||||
|             path, buf, cert = authority.get_signed(cn) |             path, buf, cert = authority.get_signed(cn) | ||||||
|             return dict( |             return dict( | ||||||
|                 last_seen = xattr.getxattr(path, "user.lease.last_seen"), |                 last_seen = xattr.getxattr(path, "user.lease.last_seen"), | ||||||
|                 address = xattr.getxattr(path, "user.lease.address").decode("ascii") |                 address = xattr.getxattr(path, "user.lease.address").decode("ascii") | ||||||
|             ) |             ) | ||||||
|  |         except EnvironmentError: # Certificate or attribute not found | ||||||
|  |             raise falcon.HTTPNotFound() | ||||||
|  |  | ||||||
|  |  | ||||||
| class LeaseResource(object): | class LeaseResource(object): | ||||||
|   | |||||||
| @@ -1197,7 +1197,7 @@ def certidude_serve(port, listen, fork): | |||||||
|     click.echo("Serving API at %s:%d" % (listen, port)) |     click.echo("Serving API at %s:%d" % (listen, port)) | ||||||
|     from wsgiref.simple_server import make_server, WSGIServer |     from wsgiref.simple_server import make_server, WSGIServer | ||||||
|     from SocketServer import ThreadingMixIn, ForkingMixIn |     from SocketServer import ThreadingMixIn, ForkingMixIn | ||||||
|     from certidude.api import certidude_app, StaticResource |     from certidude.api import certidude_app | ||||||
|  |  | ||||||
|     class ThreadingWSGIServer(ForkingMixIn, WSGIServer): |     class ThreadingWSGIServer(ForkingMixIn, WSGIServer): | ||||||
|         pass |         pass | ||||||
| @@ -1205,7 +1205,6 @@ def certidude_serve(port, listen, fork): | |||||||
|     click.echo("Listening on %s:%d" % (listen, port)) |     click.echo("Listening on %s:%d" % (listen, port)) | ||||||
|  |  | ||||||
|     app = certidude_app() |     app = certidude_app() | ||||||
|     app.add_sink(StaticResource(os.path.join(os.path.dirname(__file__), "static"))) |  | ||||||
|  |  | ||||||
|     httpd = make_server(listen, port, app, ThreadingWSGIServer) |     httpd = make_server(listen, port, app, ThreadingWSGIServer) | ||||||
|  |  | ||||||
|   | |||||||
| @@ -53,17 +53,6 @@ def test_cli_setup_authority(): | |||||||
|     assert not result.exception |     assert not result.exception | ||||||
|  |  | ||||||
|  |  | ||||||
|     # Test session API call |  | ||||||
|     r = client().simulate_get("/api/", headers={"Authorization":usertoken}) |  | ||||||
|     assert r.status_code == 200 |  | ||||||
|  |  | ||||||
|     r = client().simulate_get("/api/", headers={"Authorization":admintoken}) |  | ||||||
|     assert r.status_code == 200 |  | ||||||
|  |  | ||||||
|     r = client().simulate_get("/api/") |  | ||||||
|     assert r.status_code == 401 |  | ||||||
|  |  | ||||||
|  |  | ||||||
|     # Try starting up forked server |     # Try starting up forked server | ||||||
|     result = runner.invoke(cli, ['serve', '-f', '-p', '8080']) |     result = runner.invoke(cli, ['serve', '-f', '-p', '8080']) | ||||||
|     assert not result.exception |     assert not result.exception | ||||||
| @@ -145,6 +134,18 @@ def test_cli_setup_authority(): | |||||||
|     result = runner.invoke(cli, ['cron']) |     result = runner.invoke(cli, ['cron']) | ||||||
|     assert not result.exception |     assert not result.exception | ||||||
|  |  | ||||||
|  |  | ||||||
|  |     # Test session API call | ||||||
|  |     r = client().simulate_get("/api/", headers={"Authorization":usertoken}) | ||||||
|  |     assert r.status_code == 200 | ||||||
|  |  | ||||||
|  |     r = client().simulate_get("/api/", headers={"Authorization":admintoken}) | ||||||
|  |     assert r.status_code == 200 | ||||||
|  |  | ||||||
|  |     r = client().simulate_get("/api/") | ||||||
|  |     assert r.status_code == 401 | ||||||
|  |  | ||||||
|  |  | ||||||
|     # Test signed certificate API call |     # Test signed certificate API call | ||||||
|     r = client().simulate_get("/api/signed/nonexistant/") |     r = client().simulate_get("/api/signed/nonexistant/") | ||||||
|     assert r.status_code == 404 |     assert r.status_code == 404 | ||||||
| @@ -181,35 +182,86 @@ def test_cli_setup_authority(): | |||||||
|     # Test attribute fetching API call |     # Test attribute fetching API call | ||||||
|     r = client().simulate_get("/api/signed/test2/attr/") |     r = client().simulate_get("/api/signed/test2/attr/") | ||||||
|     assert r.status_code == 403 |     assert r.status_code == 403 | ||||||
|  |     r = client().simulate_get("/api/signed/test2/lease/", headers={"Authorization":admintoken}) | ||||||
|  |     assert r.status_code == 404 | ||||||
|  |  | ||||||
|  |     # Insert lease as if VPN gateway had submitted it | ||||||
|     path, _, _ = authority.get_signed("test2") |     path, _, _ = authority.get_signed("test2") | ||||||
|     setxattr(path, "user.lease.address", b"127.0.0.1") |     setxattr(path, "user.lease.address", b"127.0.0.1") | ||||||
|  |     setxattr(path, "user.lease.last_seen", b"random") | ||||||
|     r = client().simulate_get("/api/signed/test2/attr/") |     r = client().simulate_get("/api/signed/test2/attr/") | ||||||
|     assert r.status_code == 200 |     assert r.status_code == 200 | ||||||
|  |  | ||||||
|  |     # Test lease retrieval | ||||||
|  |     r = client().simulate_get("/api/signed/test2/lease/") | ||||||
|  |     assert r.status_code == 401 | ||||||
|  |     r = client().simulate_get("/api/signed/test2/lease/", headers={"Authorization":usertoken}) | ||||||
|  |     assert r.status_code == 403 | ||||||
|  |     r = client().simulate_get("/api/signed/test2/lease/", headers={"Authorization":admintoken}) | ||||||
|  |     assert r.status_code == 200 | ||||||
|  |     assert r.headers.get('content-type') == "application/json; charset=UTF-8" | ||||||
|  |  | ||||||
|  |  | ||||||
|     # Tags should not be visible anonymously |     # Tags should not be visible anonymously | ||||||
|     r = client().simulate_get("/api/signed/test2/tag/") |     r = client().simulate_get("/api/signed/test2/tag/") | ||||||
|     assert r.status_code == 401 |     assert r.status_code == 401 | ||||||
|  |  | ||||||
|     r = client().simulate_get("/api/signed/test2/tag/", headers={"Authorization":usertoken}) |     r = client().simulate_get("/api/signed/test2/tag/", headers={"Authorization":usertoken}) | ||||||
|     assert r.status_code == 403 |     assert r.status_code == 403 | ||||||
|  |  | ||||||
|     r = client().simulate_get("/api/signed/test2/tag/", headers={"Authorization":admintoken}) |     r = client().simulate_get("/api/signed/test2/tag/", headers={"Authorization":admintoken}) | ||||||
|     assert r.status_code == 200 |     assert r.status_code == 200 | ||||||
|  |  | ||||||
|  |     # Tags can be added only by admin | ||||||
|  |     r = client().simulate_post("/api/signed/test2/tag/") | ||||||
|  |     assert r.status_code == 401 | ||||||
|  |     r = client().simulate_post("/api/signed/test2/tag/", | ||||||
|  |         headers={"Authorization":usertoken}) | ||||||
|  |     assert r.status_code == 403 | ||||||
|  |     r = client().simulate_post("/api/signed/test2/tag/", | ||||||
|  |         body="key=other&value=something", | ||||||
|  |         headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken}) | ||||||
|  |     assert r.status_code == 200 | ||||||
|  |  | ||||||
|  |     # Tags can be overwritten only by admin | ||||||
|  |     r = client().simulate_put("/api/signed/test2/tag/other/") | ||||||
|  |     assert r.status_code == 401 | ||||||
|  |     r = client().simulate_put("/api/signed/test2/tag/other/", | ||||||
|  |         headers={"Authorization":usertoken}) | ||||||
|  |     assert r.status_code == 403 | ||||||
|  |     r = client().simulate_put("/api/signed/test2/tag/other/", | ||||||
|  |         body="value=else", | ||||||
|  |         headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken}) | ||||||
|  |     assert r.status_code == 200 | ||||||
|  |  | ||||||
|  |     # Tags can be deleted only by admin | ||||||
|  |     r = client().simulate_delete("/api/signed/test2/tag/else/") | ||||||
|  |     assert r.status_code == 401 | ||||||
|  |     r = client().simulate_delete("/api/signed/test2/tag/else/", | ||||||
|  |         headers={"Authorization":usertoken}) | ||||||
|  |     assert r.status_code == 403 | ||||||
|  |     r = client().simulate_delete("/api/signed/test2/tag/else/", | ||||||
|  |         headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken}) | ||||||
|  |     assert r.status_code == 200 | ||||||
|  |  | ||||||
|  |  | ||||||
|     # Test revocation |     # Test revocation | ||||||
|     r = client().simulate_delete("/api/signed/test2/") |     r = client().simulate_delete("/api/signed/test2/") | ||||||
|     assert r.status_code == 401 |     assert r.status_code == 401 | ||||||
|  |     r = client().simulate_delete("/api/signed/test2/", | ||||||
|     r = client().simulate_delete("/api/signed/test2/", headers={"Authorization":usertoken}) |         headers={"Authorization":usertoken}) | ||||||
|     assert r.status_code == 403 |     assert r.status_code == 403 | ||||||
|  |     r = client().simulate_delete("/api/signed/test2/", | ||||||
|     r = client().simulate_delete("/api/signed/test2/", headers={"Authorization":admintoken}) |         headers={"Authorization":admintoken}) | ||||||
|     assert r.status_code == 200 |     assert r.status_code == 200 | ||||||
|  |  | ||||||
|     result = runner.invoke(cli, ['revoke', 'test3']) |     result = runner.invoke(cli, ['revoke', 'test3']) | ||||||
|     assert not result.exception |     assert not result.exception | ||||||
|  |  | ||||||
|  |  | ||||||
|  |     # Test static | ||||||
|  |     r = client().simulate_delete("/nonexistant.html") | ||||||
|  |     assert r.status_code == 404 | ||||||
|  |  | ||||||
|  |     r = client().simulate_delete("/index.html") | ||||||
|  |     assert r.status_code == 200 | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user