mirror of
				https://github.com/laurivosandi/certidude
				synced 2025-10-31 09:29:13 +00:00 
			
		
		
		
	tests: Fixes and better code coverage
This commit is contained in:
		| @@ -11,30 +11,28 @@ env = Environment(loader=FileSystemLoader(config.SCRIPT_DIR), trim_blocks=True) | |||||||
| class ScriptResource(): | class ScriptResource(): | ||||||
|     @whitelist_subject |     @whitelist_subject | ||||||
|     def on_get(self, req, resp, cn): |     def on_get(self, req, resp, cn): | ||||||
|         try: |  | ||||||
|         path, buf, cert, attribs = authority.get_attributes(cn) |         path, buf, cert, attribs = authority.get_attributes(cn) | ||||||
|         except IOError: |         # TODO: are keys unique? | ||||||
|             raise falcon.HTTPNotFound() |         named_tags = {} | ||||||
|         else: |         other_tags = [] | ||||||
|             script = config.SCRIPT_DEFAULT |  | ||||||
|             tags = [] |  | ||||||
|         try: |         try: | ||||||
|             for tag in attribs.get("user").get("xdg").get("tags").split(","): |             for tag in attribs.get("user").get("xdg").get("tags").split(","): | ||||||
|                 if "=" in tag: |                 if "=" in tag: | ||||||
|                     k, v = tag.split("=", 1) |                     k, v = tag.split("=", 1) | ||||||
|  |                     named_tags[k] = v | ||||||
|                 else: |                 else: | ||||||
|                         k, v = "other", tag |                     other_tags.append(v) | ||||||
|                     if k == "script": |  | ||||||
|                         script = v |  | ||||||
|                     tags.append(dict(id=tag, key=k, value=v)) |  | ||||||
|         except AttributeError: # No tags |         except AttributeError: # No tags | ||||||
|             pass |             pass | ||||||
|  |  | ||||||
|  |         script = named_tags.get("script", config.SCRIPT_DEFAULT) | ||||||
|         resp.set_header("Content-Type", "text/x-shellscript") |         resp.set_header("Content-Type", "text/x-shellscript") | ||||||
|         resp.body = env.get_template(script).render( |         resp.body = env.get_template(script).render( | ||||||
|             authority_name=const.FQDN, |             authority_name=const.FQDN, | ||||||
|             common_name=cn, |             common_name=cn, | ||||||
|                 tags=tags, |             other_tags=other_tags, | ||||||
|  |             named_tags=named_tags, | ||||||
|             attributes=attribs.get("user").get("machine")) |             attributes=attribs.get("user").get("machine")) | ||||||
|         logger.info("Served script %s for %s at %s" % (script, cn, req.context["remote_addr"])) |         logger.info("Served script %s for %s at %s" % (script, cn, req.context["remote_addr"])) | ||||||
|         # TODO: Assert time is within reasonable range |         # TODO: Assert time is within reasonable range | ||||||
|   | |||||||
| @@ -1,3 +1,3 @@ | |||||||
| {% for key, value in certificate.attributes %} | {% for key, value in certificate.attributes %} | ||||||
| <span class="attribute icon {{ key | replace('.', ' ') }}">{{ value }}</span> | <span class="attribute icon {{ key | replace('.', ' ') }}" title="{{ key }}={{ value }}">{{ value }}</span> | ||||||
| {% endfor %} | {% endfor %} | ||||||
|   | |||||||
| @@ -1,10 +1,14 @@ | |||||||
| #!/bin/sh | #!/bin/sh | ||||||
|  |  | ||||||
| # Tags: | # Tags: | ||||||
| {% for tag in tags %} | {% for key, value in named_tags.items() %} | ||||||
|  | # {{ key }} -> {{ value }} | ||||||
|  | {% endfor %} | ||||||
|  | {% for tag in other_tags %} | ||||||
| # {{ tag }} | # {{ tag }} | ||||||
| {% endfor %} | {% endfor %} | ||||||
|  |  | ||||||
|  | # Submit some stats to CA | ||||||
| curl http://{{ authority_name }}/api/signed/{{ common_name }}/attr -X POST -d "\ | curl http://{{ authority_name }}/api/signed/{{ common_name }}/attr -X POST -d "\ | ||||||
| dmi.product_name=$(cat /sys/class/dmi/id/product_name)&\ | dmi.product_name=$(cat /sys/class/dmi/id/product_name)&\ | ||||||
| dmi.product_serial=$(cat /sys/class/dmi/id/product_serial)&\ | dmi.product_serial=$(cat /sys/class/dmi/id/product_serial)&\ | ||||||
|   | |||||||
| @@ -10,13 +10,13 @@ for band in 2ghz 5ghz; do | |||||||
|     uci set wireless.lan$band.mode=ap |     uci set wireless.lan$band.mode=ap | ||||||
|     uci set wireless.lan$band.device=radio$band |     uci set wireless.lan$band.device=radio$band | ||||||
|     uci set wireless.lan$band.encryption=psk2 |     uci set wireless.lan$band.encryption=psk2 | ||||||
|     {% if attributes.wireless.protected and attributes.wireless.protected.ssid %} |     {% if named_tags and named_tags.wireless and named_tags.wireless.protected and named_tags.wireless.protected.ssid %} | ||||||
|     uci set wireless.lan$band.ssid={{ attrbutes.wireless.protected.ssid }} |     uci set wireless.lan$band.ssid={{ named_tags.wireless.protected.ssid }} | ||||||
|     {% else %} |     {% else %} | ||||||
|     uci set wireless.lan$band.ssid=$(uci get system.@system[0].hostname)-protected |     uci set wireless.lan$band.ssid=$(uci get system.@system[0].hostname)-protected | ||||||
|     {% endif %} |     {% endif %} | ||||||
|     {% if attributes.wireless.protected and attributes.wireless.protected.psk %} |     {% if named_tags and named_tags.wireless and named_tags.wireless.protected and named_tags.wireless.protected.psk %} | ||||||
|     uci set wireless.lan$band.key={{ attributes.wireless.protected.psk }} |     uci set wireless.lan$band.key={{ named_tags.wireless.protected.psk }} | ||||||
|     {% else %} |     {% else %} | ||||||
|     uci set wireless.lan$band.key=salakala |     uci set wireless.lan$band.key=salakala | ||||||
|     {% endif %} |     {% endif %} | ||||||
| @@ -29,8 +29,8 @@ for band in 2ghz 5ghz; do | |||||||
|     uci set wireless.guest$band.mode=ap |     uci set wireless.guest$band.mode=ap | ||||||
|     uci set wireless.guest$band.device=radio$band |     uci set wireless.guest$band.device=radio$band | ||||||
|     uci set wireless.guest$band.encryption=none |     uci set wireless.guest$band.encryption=none | ||||||
|     {% if attributes.wireless.public and attributes.wireless.public.ssid %} |     {% if named_tags and named_tags.wireless and named_tags.wireless.public and named_tags.wireless.public.ssid %} | ||||||
|     uci set wireless.guest$band.ssid={{ attrbutes.wireless.public.ssid }} |     uci set wireless.guest$band.ssid={{ named_tags.wireless.public.ssid }} | ||||||
|     {% else %} |     {% else %} | ||||||
|     uci set wireless.guest$band.ssid=$(uci get system.@system[0].hostname)-public |     uci set wireless.guest$band.ssid=$(uci get system.@system[0].hostname)-public | ||||||
|     {% endif %} |     {% endif %} | ||||||
|   | |||||||
| @@ -493,40 +493,6 @@ def test_cli_setup_authority(): | |||||||
|     r = client().simulate_get("/api/signed/nonexistant/attr/", headers={"Authorization":admintoken}) |     r = client().simulate_get("/api/signed/nonexistant/attr/", headers={"Authorization":admintoken}) | ||||||
|     assert r.status_code == 404, r.text |     assert r.status_code == 404, r.text | ||||||
|  |  | ||||||
|     # Insert lease |  | ||||||
|     r = client().simulate_get("/api/signed/test/script/") |  | ||||||
|     assert r.status_code == 403, r.text # script not authorized |  | ||||||
|     r = client().simulate_get("/api/signed/test/lease/", headers={"Authorization":admintoken}) |  | ||||||
|     assert r.status_code == 404, r.text |  | ||||||
|     r = client().simulate_post("/api/lease/", |  | ||||||
|         query_string = "client=test&inner_address=127.0.0.1&outer_address=8.8.8.8", |  | ||||||
|         headers={"Authorization":admintoken}) |  | ||||||
|     assert r.status_code == 200, r.text # lease update ok |  | ||||||
|     r = client().simulate_get("/api/signed/nonexistant/script/") |  | ||||||
|     assert r.status_code == 404, r.text # cert not found |  | ||||||
|     r = client().simulate_get("/api/signed/test/script/") |  | ||||||
|     assert r.status_code == 200, r.text # script render ok |  | ||||||
|     assert "curl http://ca.example.lan/api/signed/test/attr " in r.text, r.text |  | ||||||
|  |  | ||||||
|     r = client().simulate_post("/api/lease/", |  | ||||||
|         query_string = "client=test&inner_address=127.0.0.1&outer_address=8.8.8.8&serial=0", |  | ||||||
|         headers={"Authorization":admintoken}) |  | ||||||
|     assert r.status_code == 403, r.text # invalid serial number supplied |  | ||||||
|     r = client().simulate_post("/api/lease/", |  | ||||||
|         query_string = "client=test&inner_address=1.2.3.4&outer_address=8.8.8.8", |  | ||||||
|         headers={"Authorization":admintoken}) |  | ||||||
|     assert r.status_code == 200, r.text # lease update ok |  | ||||||
|  |  | ||||||
|     # Test lease retrieval |  | ||||||
|     r = client().simulate_get("/api/signed/test/lease/") |  | ||||||
|     assert r.status_code == 401, r.text |  | ||||||
|     r = client().simulate_get("/api/signed/test/lease/", headers={"Authorization":usertoken}) |  | ||||||
|     assert r.status_code == 403, r.text |  | ||||||
|     r = client().simulate_get("/api/signed/test/lease/", headers={"Authorization":admintoken}) |  | ||||||
|     assert r.status_code == 200, r.text |  | ||||||
|     assert r.headers.get('content-type') == "application/json; charset=UTF-8" |  | ||||||
|  |  | ||||||
|  |  | ||||||
|     # Tags should not be visible anonymously |     # Tags should not be visible anonymously | ||||||
|     r = client().simulate_get("/api/signed/test/tag/") |     r = client().simulate_get("/api/signed/test/tag/") | ||||||
|     assert r.status_code == 401, r.text |     assert r.status_code == 401, r.text | ||||||
| @@ -568,6 +534,56 @@ def test_cli_setup_authority(): | |||||||
|     assert r.status_code == 200, r.text |     assert r.status_code == 200, r.text | ||||||
|     assert r.text == '[{"value": "Tartu", "key": "location", "id": "location=Tartu"}, {"value": "else", "key": "other", "id": "else"}]', r.text |     assert r.text == '[{"value": "Tartu", "key": "location", "id": "location=Tartu"}, {"value": "else", "key": "other", "id": "else"}]', r.text | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
|  |     # Test scripting | ||||||
|  |     r = client().simulate_get("/api/signed/test/script/") | ||||||
|  |     assert r.status_code == 403, r.text # script not authorized | ||||||
|  |     r = client().simulate_get("/api/signed/nonexistant/script/") | ||||||
|  |     assert r.status_code == 404, r.text # cert not found | ||||||
|  |  | ||||||
|  |     # Insert lease | ||||||
|  |     r = client().simulate_get("/api/signed/test/lease/", headers={"Authorization":admintoken}) | ||||||
|  |     assert r.status_code == 404, r.text | ||||||
|  |     r = client().simulate_post("/api/lease/", | ||||||
|  |         query_string = "client=test&inner_address=127.0.0.1&outer_address=8.8.8.8", | ||||||
|  |         headers={"Authorization":admintoken}) | ||||||
|  |     assert r.status_code == 200, r.text # lease update ok | ||||||
|  |  | ||||||
|  |     # Test tagging integration in scripting framework | ||||||
|  |     r = client().simulate_get("/api/signed/test/script/") | ||||||
|  |     assert r.status_code == 200, r.text # script render ok | ||||||
|  |     assert "curl http://ca.example.lan/api/signed/test/attr " in r.text, r.text | ||||||
|  |     assert "Tartu" in r.text, r.text | ||||||
|  |  | ||||||
|  |     r = client().simulate_post("/api/signed/test/tag/", | ||||||
|  |         body="key=script&value=openwrt.sh", | ||||||
|  |         headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken}) | ||||||
|  |     assert r.status_code == 200, r.text | ||||||
|  |  | ||||||
|  |     r = client().simulate_get("/api/signed/test/script/") | ||||||
|  |     assert r.status_code == 200, r.text # script render ok | ||||||
|  |     assert "uci set " in r.text, r.text | ||||||
|  |  | ||||||
|  |     # Test lease update | ||||||
|  |     r = client().simulate_post("/api/lease/", | ||||||
|  |         query_string = "client=test&inner_address=127.0.0.1&outer_address=8.8.8.8&serial=0", | ||||||
|  |         headers={"Authorization":admintoken}) | ||||||
|  |     assert r.status_code == 403, r.text # invalid serial number supplied | ||||||
|  |     r = client().simulate_post("/api/lease/", | ||||||
|  |         query_string = "client=test&inner_address=1.2.3.4&outer_address=8.8.8.8", | ||||||
|  |         headers={"Authorization":admintoken}) | ||||||
|  |     assert r.status_code == 200, r.text # lease update ok | ||||||
|  |  | ||||||
|  |     # Test lease retrieval | ||||||
|  |     r = client().simulate_get("/api/signed/test/lease/") | ||||||
|  |     assert r.status_code == 401, r.text | ||||||
|  |     r = client().simulate_get("/api/signed/test/lease/", headers={"Authorization":usertoken}) | ||||||
|  |     assert r.status_code == 403, r.text | ||||||
|  |     r = client().simulate_get("/api/signed/test/lease/", headers={"Authorization":admintoken}) | ||||||
|  |     assert r.status_code == 200, r.text | ||||||
|  |     assert r.headers.get('content-type') == "application/json; charset=UTF-8" | ||||||
|  |  | ||||||
|     # Tags can be deleted only by admin |     # Tags can be deleted only by admin | ||||||
|     r = client().simulate_delete("/api/signed/test/tag/else/") |     r = client().simulate_delete("/api/signed/test/tag/else/") | ||||||
|     assert r.status_code == 401, r.text |     assert r.status_code == 401, r.text | ||||||
| @@ -580,6 +596,9 @@ def test_cli_setup_authority(): | |||||||
|     r = client().simulate_delete("/api/signed/test/tag/location=Tartu/", |     r = client().simulate_delete("/api/signed/test/tag/location=Tartu/", | ||||||
|         headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken}) |         headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken}) | ||||||
|     assert r.status_code == 200, r.text |     assert r.status_code == 200, r.text | ||||||
|  |     r = client().simulate_delete("/api/signed/test/tag/script=openwrt.sh/", | ||||||
|  |         headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken}) | ||||||
|  |     assert r.status_code == 200, r.text | ||||||
|     r = client().simulate_get("/api/signed/test/tag/", headers={"Authorization":admintoken}) |     r = client().simulate_get("/api/signed/test/tag/", headers={"Authorization":admintoken}) | ||||||
|     assert r.status_code == 200, r.text |     assert r.status_code == 200, r.text | ||||||
|     assert r.text == "[]", r.text |     assert r.text == "[]", r.text | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user