Commit 7c95ecc3 authored by David Trudgian's avatar David Trudgian
Browse files

API dependent tests

parent 7cc74acb
......@@ -4,13 +4,15 @@ import json
import sys
def check_clair(API_URI):
def check_clair(API_URI, quiet):
"""Check Clair is accessible by call to namespaces end point"""
sys.stderr.write("Checking for Clair v1 API")
if not quiet:
sys.stderr.write("Checking for Clair v1 API\n")
try:
r = requests.get(API_URI + 'namespaces')
namespace_count = len(r.json()['Namespaces'])
if not quiet:
sys.stderr.write("Found Clair server with %d namespaces\n" % namespace_count)
except Exception as e:
sys.stderr.write("Error - couldn't access Clair v1 API at %s\n%s\n" % (API_URI, e.message))
......@@ -18,7 +20,7 @@ def check_clair(API_URI):
return True
def post_layer(API_URI, image_name, image_uri):
def post_layer(API_URI, image_name, image_uri, quiet):
"""Register an image .tar.gz with Clair as a parent-less layer"""
try:
......@@ -30,9 +32,9 @@ def post_layer(API_URI, image_name, image_uri):
})
if r.status_code == requests.codes.created:
if not quiet:
sys.stderr.write("Image registered as layer with Clair\n")
else:
sys.stderr.write(r.status_code)
pretty_response = json.dumps(r.json(), separators=(',', ':'), sort_keys=True, indent=2)
sys.stderr.write("Failed registering image with Clair\n %s\n" % pretty_response)
sys.exit(1)
......@@ -51,7 +53,6 @@ def get_report(API_URI, image_name):
if r.status_code == requests.codes.ok:
return r.json()
else:
sys.stderr.write(r.status_code)
pretty_response = json.dumps(r.json(), separators=(',', ':'), sort_keys=True, indent=2)
sys.stderr.write("Failed retrieving report from Clair\n %s\n" % pretty_response)
sys.exit(1)
......
......@@ -8,7 +8,6 @@ from .clair import check_clair, post_layer, get_report, format_report_text
from .util import sha256
from .image import check_image, image_to_tgz, http_server
@click.command()
@click.option('--clair-uri', default="http://localhost:6060",
help='Base URI for your Clair server')
......@@ -18,29 +17,31 @@ from .image import check_image, image_to_tgz, http_server
help='IP address that the HTTP server providing image to Clair should listen on')
@click.option('--bind-port', default=8088,
help='Port that the HTTP server providing image to Clair should listen on')
@click.option('--quiet', is_flag=True, help='Suppress progress messages to STDERR')
@click.argument('image', required=True)
def main(image, clair_uri, text_output, json_output, bind_ip, bind_port):
def cli(image, clair_uri, text_output, json_output, bind_ip, bind_port, quiet):
API_URI = clair_uri + '/v1/'
# Check image exists, and export it to a gzipped tar in a temporary directory
check_image(image)
(tar_dir, tar_file) = image_to_tgz(image)
(tar_dir, tar_file) = image_to_tgz(image, quiet)
# Image name for Clair will be the SHA256 of the .tar.gz
image_name = sha256(tar_file)
if not quiet:
click.echo("Image has SHA256: %s" % image_name, err=True)
# Make sure we can talk to Clair OK
check_clair(API_URI)
check_clair(API_URI, quiet)
# Start an HTTP server to serve the .tar.gz from our temporary directory
# so that Clair can retrieve it
httpd = Process(target=http_server, args=(tar_dir, bind_ip, bind_port))
httpd = Process(target=http_server, args=(tar_dir, bind_ip, bind_port, quiet))
httpd.start()
image_uri = 'http://%s:%d/%s' % (bind_ip, bind_port, path.basename(tar_file))
# Register the iamge with Clair as a docker layer that has no parent
post_layer(API_URI, image_name, image_uri)
post_layer(API_URI, image_name, image_uri, quiet)
# Done with the .tar.gz so stop serving it and remove the temp dir
httpd.terminate()
......
......@@ -16,7 +16,7 @@ def check_image(image):
return True
def image_to_tgz(image):
def image_to_tgz(image, quiet):
"""Export the singularity image to a tar.gz file"""
temp_dir = tempfile.mkdtemp()
......@@ -25,6 +25,7 @@ def image_to_tgz(image):
cmd = ['singularity', 'export', '-f', tar_file, image]
if not quiet:
sys.stderr.write("Exporting image to .tar\n")
try:
......@@ -35,6 +36,7 @@ def image_to_tgz(image):
cmd = ['gzip', tar_file]
if not quiet:
sys.stderr.write("Compressing to .tar.gz\n")
try:
......@@ -46,12 +48,20 @@ def image_to_tgz(image):
return (temp_dir, tar_gz_file)
def http_server(dir, ip, port):
class QuietSimpleHTTPHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
def log_message(self, format, *args):
pass
def http_server(dir, ip, port, quiet):
"""Use Python's Simple HTTP server to expose the image over HTTP for
clair to grab it.
"""
sys.stderr.write("Serving Image to Clair from http://%s:%d\n" % (ip, port))
chdir(dir)
if quiet:
Handler = QuietSimpleHTTPHandler
else:
Handler = SimpleHTTPServer.SimpleHTTPRequestHandler
httpd = socketserver.TCPServer((ip, port), Handler)
httpd.serve_forever()
......@@ -22,7 +22,7 @@ setup(
tests_require=['pytest', 'pytest-cov', 'pytest-flake8'],
entry_points={
'console_scripts': [
'clair-singularity = clair_singularity.cli:main',
'clair-singularity = clair_singularity.cli:cli',
],
},
classifiers=[
......
......@@ -2,9 +2,8 @@ import pytest
from clair_singularity.clair import check_clair, post_layer, get_report
API_URL = 'http://127.0.0.1:6060'
API_URL = 'http://127.0.0.1:6060/'
def test_check_clair:
def test_check_clair():
# We can talk to the API
assert check_clair(API_URL)
assert check_clair(API_URL,False)
import pytest
import json
import sys
from click.testing import CliRunner
from clair_singularity import cli
from clair_singularity.cli import cli
from .test_image import testimage
@pytest.fixture
def runner():
......@@ -9,4 +12,27 @@ def runner():
def test_help(runner):
runner.invoke(cli, ['--help'])
result = runner.invoke(cli, ['--help'])
assert 'Usage:' in result.output
def test_full_json(runner, testimage):
result = runner.invoke(cli, ['--quiet', '--json-output', '--bind-ip', '127.0.0.1', '--bind-port', '8081', '--clair-uri', 'http://127.0.0.1:6060', testimage])
output = json.loads(result.output)
# Using the shub://396 image and the 2017-08-21 clair db...
# There are 62 features in the container scan, and 14 have vulnerabilities
assert 'Layer' in output
assert 'Features' in output['Layer']
assert len(output['Layer']['Features']) == 62
features_with_vuln = 0
for feature in output['Layer']['Features']:
if 'Vulnerabilities' in feature:
features_with_vuln = features_with_vuln + 1
assert features_with_vuln == 14
def test_full_text(runner, testimage):
result = runner.invoke(cli, ['--quiet', '--bind-ip', '127.0.0.1', '--bind-port', '8082', '--clair-uri', 'http://127.0.0.1:6060', testimage])
# Check we do have some CVEs we expect reported here
assert 'bash - 4.3-14ubuntu1.1' in result.output
assert 'CVE-2016-9401' in result.output
\ No newline at end of file
......@@ -16,7 +16,7 @@ def testimage(tmpdir):
cwd = os.getcwd()
os.chdir(tmpdir.strpath)
# This pulls a singularity hello world image
subprocess.call(['singularity', 'pull', 'shub://396'])
subprocess.check_output(['singularity', 'pull', 'shub://396'])
os.chdir(cwd)
return os.path.join(tmpdir.strpath, 'vsoch-singularity-hello-world-master.img')
......@@ -32,7 +32,7 @@ def test_check_image(testimage):
def test_image_to_tgz(testimage):
(temp_dir, tar_file) = image_to_tgz(testimage)
(temp_dir, tar_file) = image_to_tgz(testimage, False)
# Should have created a temporary dir
assert os.path.isdir(temp_dir)
# The tar.gz should exist
......@@ -45,7 +45,7 @@ def test_image_to_tgz(testimage):
def test_http_server(testimage, tmpdir):
"""Test we can retrieve the test image from in-built http server"""
httpd = multiprocessing.Process(target=http_server,
args=(os.path.dirname(testimage), '127.0.0.1', 8088))
args=(os.path.dirname(testimage), '127.0.0.1', 8088, False))
httpd.start()
time.sleep(2)
r = requests.get('http://127.0.0.1:8088/vsoch-singularity-hello-world-master.img',
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment