Changeset View
Changeset View
Standalone View
Standalone View
tests/log_test.py
#!/usr/bin/env python2 | #!/usr/bin/env python2 | ||||
import json | import json | ||||
import logging | import logging | ||||
import re | import re | ||||
from StringIO import StringIO | from StringIO import StringIO | ||||
import sys | |||||
from textwrap import dedent | from textwrap import dedent | ||||
import unittest | import unittest | ||||
from scap import log | from scap import log | ||||
class FilterTest(unittest.TestCase): | class FilterTest(unittest.TestCase): | ||||
def setUp(self): | def setUp(self): | ||||
self.root_logger = logging.getLogger('A') | self.root_logger = logging.getLogger('A') | ||||
self.root_logger.setLevel(logging.INFO) | self.root_logger.setLevel(logging.INFO) | ||||
for handler in self.root_logger.handlers: | for handler in self.root_logger.handlers: | ||||
self.root_logger.removeHandler(handler) | self.root_logger.removeHandler(handler) | ||||
self.stream = StringIO() | self.stream = StringIO() | ||||
self.root_handler = logging.StreamHandler(self.stream) | self.root_handler = logging.StreamHandler(self.stream) | ||||
self.root_logger.addHandler(self.root_handler) | self.root_logger.addHandler(self.root_handler) | ||||
self.b_logger = self.root_logger.getChild('B') | self.b_logger = self.root_logger.getChild('B') | ||||
self.c_logger = self.b_logger.getChild('C') | self.c_logger = self.b_logger.getChild('C') | ||||
def test_filter_loads(self): | |||||
expression = r""" | |||||
foo == 'bar.*' baz ~ "qux" | |||||
blah = splat.* | |||||
""" | |||||
filter = log.Filter.loads(expression) | |||||
self.assertIsInstance(filter, log.Filter) | |||||
def test_filter_loads_filters_correctly(self): | |||||
expression = r""" | |||||
name == foo.*.baz | |||||
msg ~ 'log' | |||||
""" | |||||
filter = log.Filter.loads(expression, filter=False) | |||||
record = ['foo.bar.baz', logging.DEBUG, '', 0, 'bologna', [], None] | |||||
record = logging.LogRecord(*record) | |||||
self.assertTrue(filter.filter(record)) | |||||
record = ['bar.baz', logging.DEBUG, '', 0, 'bologna', [], None] | |||||
record = logging.LogRecord(*record) | |||||
self.assertFalse(filter.filter(record)) | |||||
def test_filter_loads_supports_numeric_comparisons(self): | |||||
expression = r""" | |||||
levelno > 10 | |||||
""" | |||||
filter = log.Filter.loads(expression, filter=False) | |||||
record = ['', 10, '', 0, 'bologna', [], None] | |||||
record = logging.LogRecord(*record) | |||||
self.assertFalse(filter.filter(record)) | |||||
record = ['', 20, '', 0, '', [], None] | |||||
record = logging.LogRecord(*record) | |||||
self.assertTrue(filter.filter(record)) | |||||
def test_filter_parse(self): | |||||
expression = r""" | |||||
foo == 'bar.*' baz ~ "qux" | |||||
blah = splat.* | |||||
""" | |||||
elements = list(log.Filter.parse(expression)) | |||||
self.assertEqual(elements, [ | |||||
('foo', '==', 'bar.*'), | |||||
('baz', '~', 'qux'), | |||||
('blah', '=', 'splat.*')]) | |||||
def test_filter_filters_matching(self): | def test_filter_filters_matching(self): | ||||
filter = log.Filter(name='A.B.*') | filter = log.Filter({'name': 'A.B.*'}) | ||||
self.root_handler.addFilter(filter) | self.root_handler.addFilter(filter) | ||||
self.b_logger.info('please log this') | self.b_logger.info('please log this') | ||||
self.c_logger.info('do not log this') | self.c_logger.info('do not log this') | ||||
self.assertEqual("please log this\n", self.stream.getvalue()) | self.assertEqual("please log this\n", self.stream.getvalue()) | ||||
def test_filter_filters_matching_regex(self): | def test_filter_filters_matching_regex(self): | ||||
filter = log.Filter(name=re.compile(r'^A\.B\.')) | filter = log.Filter({'name': re.compile(r'^A\.B\.')}) | ||||
self.root_handler.addFilter(filter) | self.root_handler.addFilter(filter) | ||||
self.b_logger.info('please log this') | self.b_logger.info('please log this') | ||||
self.c_logger.info('do not log this') | self.c_logger.info('do not log this') | ||||
self.assertEqual("please log this\n", self.stream.getvalue()) | self.assertEqual("please log this\n", self.stream.getvalue()) | ||||
def test_filter_filters_matching_lambda(self): | def test_filter_filters_matching_lambda(self): | ||||
filter = log.Filter(levelno=lambda lvl: lvl < logging.WARNING) | filter = log.Filter({'levelno': lambda lvl: lvl < logging.WARNING}) | ||||
self.root_handler.addFilter(filter) | self.root_handler.addFilter(filter) | ||||
self.b_logger.warning('please log this') | self.b_logger.warning('please log this') | ||||
self.c_logger.info('do not log this') | self.c_logger.info('do not log this') | ||||
self.assertEqual("please log this\n", self.stream.getvalue()) | self.assertEqual("please log this\n", self.stream.getvalue()) | ||||
def test_filter_can_invert_behavior(self): | def test_filter_can_invert_behavior(self): | ||||
filter = log.Filter(filter=False, name='A.B.*') | filter = log.Filter({'name': 'A.B.*'}, filter=False) | ||||
self.root_handler.addFilter(filter) | self.root_handler.addFilter(filter) | ||||
self.b_logger.info('please log this') | self.b_logger.info('please log this') | ||||
self.c_logger.info('do not log this') | self.c_logger.info('do not log this') | ||||
self.assertEqual("do not log this\n", self.stream.getvalue()) | self.assertEqual("do not log this\n", self.stream.getvalue()) | ||||
▲ Show 20 Lines • Show All 76 Lines • ▼ Show 20 Lines | def test_format_includes_extra_fields(self): | ||||
# we can't reliably test an unordered JSON string so load it back in | # we can't reliably test an unordered JSON string so load it back in | ||||
parsed = json.loads(line) | parsed = json.loads(line) | ||||
self.assertIs(type(parsed), dict) | self.assertIs(type(parsed), dict) | ||||
self.assertIn('foo_extra', parsed) | self.assertIn('foo_extra', parsed) | ||||
self.assertEqual(parsed['foo_extra'], 'bar') | self.assertEqual(parsed['foo_extra'], 'bar') | ||||
def test_format_correctly_handles_exceptions(self): | |||||
formatter = log.JSONFormatter() | |||||
args = ['foo', logging.DEBUG, 'foo_file', 123, 'foo message', [], None] | |||||
record = logging.LogRecord(*args) | |||||
try: | |||||
raise RuntimeError('fail fail fail') | |||||
except RuntimeError: | |||||
record.exc_info = sys.exc_info() | |||||
line = formatter.format(record) | |||||
# we can't reliably test an unordered JSON string so load it back in | |||||
parsed = json.loads(line) | |||||
self.assertIs(type(parsed), dict) | |||||
self.assertNotIn('exc_info', parsed) | |||||
self.assertIn('exc_text', parsed) | |||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
unittest.main() | unittest.main() |
Content licensed under Creative Commons Attribution-ShareAlike 3.0 (CC-BY-SA) unless otherwise noted; code licensed under GNU General Public License (GPL) or other open source licenses. By using this site, you agree to the Terms of Use, Privacy Policy, and Code of Conduct. · Wikimedia Foundation · Privacy Policy · Code of Conduct · Terms of Use · Disclaimer · CC-BY-SA · GPL