Put a copy of mozlog in the tree.

This is required to import the command line options for the wpt tests from the harness
This commit is contained in:
James Graham 2015-03-27 21:13:33 +00:00
parent 93fd41df39
commit f7ff2aa558
33 changed files with 4501 additions and 0 deletions

View file

@ -0,0 +1,26 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
"""
Mozlog aims to standardize log formatting within Mozilla.
It simply wraps Python's logging_ module and adds a few convenience methods
for logging test results and events.
The structured submodule takes a different approach and implements a
JSON-based logging protocol designed for recording test results."""
from logger import *
from loglistener import LogMessageServer
from loggingmixin import LoggingMixin
try:
import structured
except ImportError:
# Structured logging doesn't work on python 2.6 which is still used on some
# legacy test machines; https://bugzilla.mozilla.org/show_bug.cgi?id=864866
# Once we move away from Python 2.6, please cleanup devicemanager.py's
# exception block
pass

View file

@ -0,0 +1,180 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
from logging import getLogger as getSysLogger
from logging import *
# Some of the build slave environments don't see the following when doing
# 'from logging import *'
# see https://bugzilla.mozilla.org/show_bug.cgi?id=700415#c35
from logging import getLoggerClass, addLevelName, setLoggerClass, shutdown, debug, info, basicConfig
import json
_default_level = INFO
_LoggerClass = getLoggerClass()
# Define mozlog specific log levels
START = _default_level + 1
END = _default_level + 2
PASS = _default_level + 3
KNOWN_FAIL = _default_level + 4
FAIL = _default_level + 5
CRASH = _default_level + 6
# Define associated text of log levels
addLevelName(START, 'TEST-START')
addLevelName(END, 'TEST-END')
addLevelName(PASS, 'TEST-PASS')
addLevelName(KNOWN_FAIL, 'TEST-KNOWN-FAIL')
addLevelName(FAIL, 'TEST-UNEXPECTED-FAIL')
addLevelName(CRASH, 'PROCESS-CRASH')
class MozLogger(_LoggerClass):
"""
MozLogger class which adds some convenience log levels
related to automated testing in Mozilla and ability to
output structured log messages.
"""
def testStart(self, message, *args, **kwargs):
"""Logs a test start message"""
self.log(START, message, *args, **kwargs)
def testEnd(self, message, *args, **kwargs):
"""Logs a test end message"""
self.log(END, message, *args, **kwargs)
def testPass(self, message, *args, **kwargs):
"""Logs a test pass message"""
self.log(PASS, message, *args, **kwargs)
def testFail(self, message, *args, **kwargs):
"""Logs a test fail message"""
self.log(FAIL, message, *args, **kwargs)
def testKnownFail(self, message, *args, **kwargs):
"""Logs a test known fail message"""
self.log(KNOWN_FAIL, message, *args, **kwargs)
def processCrash(self, message, *args, **kwargs):
"""Logs a process crash message"""
self.log(CRASH, message, *args, **kwargs)
def log_structured(self, action, params=None):
"""Logs a structured message object."""
if params is None:
params = {}
level = params.get('_level', _default_level)
if isinstance(level, int):
params['_level'] = getLevelName(level)
else:
params['_level'] = level
level = getLevelName(level.upper())
# If the logger is fed a level number unknown to the logging
# module, getLevelName will return a string. Unfortunately,
# the logging module will raise a type error elsewhere if
# the level is not an integer.
if not isinstance(level, int):
level = _default_level
params['action'] = action
# The can message be None. This is expected, and shouldn't cause
# unstructured formatters to fail.
message = params.get('_message')
self.log(level, message, extra={'params': params})
class JSONFormatter(Formatter):
"""Log formatter for emitting structured JSON entries."""
def format(self, record):
# Default values determined by logger metadata
output = {
'_time': int(round(record.created * 1000, 0)),
'_namespace': record.name,
'_level': getLevelName(record.levelno),
}
# If this message was created by a call to log_structured,
# anything specified by the caller's params should act as
# an override.
output.update(getattr(record, 'params', {}))
if record.msg and output.get('_message') is None:
# For compatibility with callers using the printf like
# API exposed by python logging, call the default formatter.
output['_message'] = Formatter.format(self, record)
return json.dumps(output, indent=output.get('indent'))
class MozFormatter(Formatter):
"""
MozFormatter class used to standardize formatting
If a different format is desired, this can be explicitly
overriden with the log handler's setFormatter() method
"""
level_length = 0
max_level_length = len('TEST-START')
def __init__(self, include_timestamp=False):
"""
Formatter.__init__ has fmt and datefmt parameters that won't have
any affect on a MozFormatter instance.
:param include_timestamp: if True, include formatted time at the
beginning of the message
"""
self.include_timestamp = include_timestamp
Formatter.__init__(self)
def format(self, record):
# Handles padding so record levels align nicely
if len(record.levelname) > self.level_length:
pad = 0
if len(record.levelname) <= self.max_level_length:
self.level_length = len(record.levelname)
else:
pad = self.level_length - len(record.levelname) + 1
sep = '|'.rjust(pad)
fmt = '%(name)s %(levelname)s ' + sep + ' %(message)s'
if self.include_timestamp:
fmt = '%(asctime)s ' + fmt
# this protected member is used to define the format
# used by the base Formatter's method
self._fmt = fmt
return Formatter.format(self, record)
def getLogger(name, handler=None):
"""
Returns the logger with the specified name.
If the logger doesn't exist, it is created.
If handler is specified, adds it to the logger. Otherwise a default handler
that logs to standard output will be used.
:param name: The name of the logger to retrieve
:param handler: A handler to add to the logger. If the logger already exists,
and a handler is specified, an exception will be raised. To
add a handler to an existing logger, call that logger's
addHandler method.
"""
setLoggerClass(MozLogger)
if name in Logger.manager.loggerDict:
if handler:
raise ValueError('The handler parameter requires ' + \
'that a logger by this name does ' + \
'not already exist')
return Logger.manager.loggerDict[name]
logger = getSysLogger(name)
logger.setLevel(_default_level)
if handler is None:
handler = StreamHandler()
handler.setFormatter(MozFormatter())
logger.addHandler(handler)
logger.propagate = False
return logger

View file

@ -0,0 +1,41 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import mozlog
class LoggingMixin(object):
"""Expose a subset of logging functions to an inheriting class."""
def set_logger(self, logger_instance=None, name=None):
"""Method for setting the underlying logger instance to be used."""
if logger_instance and not isinstance(logger_instance, mozlog.Logger):
raise ValueError("logger_instance must be an instance of" +
"mozlog.Logger")
if name is None:
name = ".".join([self.__module__, self.__class__.__name__])
self._logger = logger_instance or mozlog.getLogger(name)
def _log_msg(self, cmd, *args, **kwargs):
if not hasattr(self, "_logger"):
self._logger = mozlog.getLogger(".".join([self.__module__,
self.__class__.__name__]))
getattr(self._logger, cmd)(*args, **kwargs)
def log(self, *args, **kwargs):
self._log_msg("log", *args, **kwargs)
def info(self, *args, **kwargs):
self._log_msg("info", *args, **kwargs)
def error(self, *args, **kwargs):
self._log_msg("error", *args, **kwargs)
def warn(self, *args, **kwargs):
self._log_msg("warn", *args, **kwargs)
def log_structured(self, *args, **kwargs):
self._log_msg("log_structured", *args, **kwargs)

View file

@ -0,0 +1,47 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
import SocketServer
import socket
import json
class LogMessageServer(SocketServer.TCPServer):
def __init__(self, server_address, logger, message_callback=None, timeout=3):
SocketServer.TCPServer.__init__(self, server_address, LogMessageHandler)
self._logger = logger
self._message_callback = message_callback
self.timeout = timeout
class LogMessageHandler(SocketServer.BaseRequestHandler):
"""Processes output from a connected log source, logging to an
existing logger upon receipt of a well-formed log messsage."""
def handle(self):
"""Continually listens for log messages."""
self._partial_message = ''
self.request.settimeout(self.server.timeout)
while True:
try:
data = self.request.recv(1024)
if not data:
return
self.process_message(data)
except socket.timeout:
return
def process_message(self, data):
"""Processes data from a connected log source. Messages are assumed
to be newline delimited, and generally well-formed JSON."""
for part in data.split('\n'):
msg_string = self._partial_message + part
try:
msg = json.loads(msg_string)
self._partial_message = ''
self.server._logger.log_structured(msg.get('action', 'UNKNOWN'), msg)
if self.server._message_callback:
self.server._message_callback()
except ValueError:
self._partial_message = msg_string

View file

@ -0,0 +1,7 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import commandline
import structuredlog
from structuredlog import get_default_logger, set_default_logger

View file

@ -0,0 +1,225 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import sys
import os
import optparse
from collections import defaultdict
from structuredlog import StructuredLogger, set_default_logger
import handlers
import formatters
log_formatters = {
'raw': (formatters.JSONFormatter, "Raw structured log messages"),
'unittest': (formatters.UnittestFormatter, "Unittest style output"),
'xunit': (formatters.XUnitFormatter, "xUnit compatible XML"),
'html': (formatters.HTMLFormatter, "HTML report"),
'mach': (formatters.MachFormatter, "Human-readable output"),
'tbpl': (formatters.TbplFormatter, "TBPL style log format"),
}
TEXT_FORMATTERS = ('raw', 'mach')
"""a subset of formatters for non test harnesses related applications"""
def level_filter_wrapper(formatter, level):
return handlers.LogLevelFilter(formatter, level)
def verbose_wrapper(formatter, verbose):
formatter.verbose = verbose
return formatter
def buffer_handler_wrapper(handler, buffer_limit):
if buffer_limit == "UNLIMITED":
buffer_limit = None
else:
buffer_limit = int(buffer_limit)
return handlers.BufferingLogFilter(handler, buffer_limit)
formatter_option_defaults = {
'verbose': False,
'level': 'info',
}
fmt_options = {
# <option name>: (<wrapper function>, description, <applicable formatters>, action)
# "action" is used by the commandline parser in use.
'verbose': (verbose_wrapper,
"Enables verbose mode for the given formatter.",
["mach"], "store_true"),
'level': (level_filter_wrapper,
"A least log level to subscribe to for the given formatter (debug, info, error, etc.)",
["mach", "tbpl"], "store"),
'buffer': (buffer_handler_wrapper,
"If specified, enables message buffering at the given buffer size limit.",
["mach", "tbpl"], "store"),
}
def log_file(name):
if name == "-":
return sys.stdout
# ensure we have a correct dirpath by using realpath
dirpath = os.path.dirname(os.path.realpath(name))
if not os.path.exists(dirpath):
os.makedirs(dirpath)
return open(name, "w")
def add_logging_group(parser, include_formatters=None):
"""
Add logging options to an argparse ArgumentParser or
optparse OptionParser.
Each formatter has a corresponding option of the form --log-{name}
where {name} is the name of the formatter. The option takes a value
which is either a filename or "-" to indicate stdout.
:param parser: The ArgumentParser or OptionParser object that should have
logging options added.
:param include_formatters: List of formatter names that should be included
in the option group. Default to None, meaning
all the formatters are included. A common use
of this option is to specify
:data:`TEXT_FORMATTERS` to include only the
most useful formatters for a command line tool
that is not related to test harnesses.
"""
group_name = "Output Logging"
group_description = ("Each option represents a possible logging format "
"and takes a filename to write that format to, "
"or '-' to write to stdout.")
if include_formatters is None:
include_formatters = log_formatters.keys()
if isinstance(parser, optparse.OptionParser):
group = optparse.OptionGroup(parser,
group_name,
group_description)
parser.add_option_group(group)
opt_log_type = 'str'
group_add = group.add_option
else:
group = parser.add_argument_group(group_name,
group_description)
opt_log_type = log_file
group_add = group.add_argument
for name, (cls, help_str) in log_formatters.iteritems():
if name in include_formatters:
group_add("--log-" + name, action="append", type=opt_log_type,
help=help_str)
for optname, (cls, help_str, formatters, action) in fmt_options.iteritems():
for fmt in formatters:
# make sure fmt is in log_formatters and is accepted
if fmt in log_formatters and fmt in include_formatters:
group_add("--log-%s-%s" % (fmt, optname), action=action,
help=help_str, default=None)
def setup_handlers(logger, formatters, formatter_options):
"""
Add handlers to the given logger according to the formatters and
options provided.
:param logger: The logger configured by this function.
:param formatters: A dict of {formatter, [streams]} to use in handlers.
:param formatter_options: a dict of {formatter: {option: value}} to
to use when configuring formatters.
"""
unused_options = set(formatter_options.keys()) - set(formatters.keys())
if unused_options:
msg = ("Options specified for unused formatter(s) (%s) have no effect" %
list(unused_options))
raise ValueError(msg)
for fmt, streams in formatters.iteritems():
formatter_cls = log_formatters[fmt][0]
formatter = formatter_cls()
handler_wrapper, handler_option = None, ""
for option, value in formatter_options[fmt].iteritems():
if option == "buffer":
handler_wrapper, handler_option = fmt_options[option][0], value
else:
formatter = fmt_options[option][0](formatter, value)
for value in streams:
handler = handlers.StreamHandler(stream=value, formatter=formatter)
if handler_wrapper:
handler = handler_wrapper(handler, handler_option)
logger.add_handler(handler)
def setup_logging(suite, args, defaults=None):
"""
Configure a structuredlogger based on command line arguments.
The created structuredlogger will also be set as the default logger, and
can be retrieved with :py:func:`~mozlog.structured.structuredlog.get_default_logger`.
:param suite: The name of the testsuite being run
:param args: A dictionary of {argument_name:value} produced from
parsing the command line arguments for the application
:param defaults: A dictionary of {formatter name: output stream} to apply
when there is no logging supplied on the command line. If
this isn't supplied, reasonable defaults are chosen
(coloured mach formatting if stdout is a terminal, or raw
logs otherwise).
:rtype: StructuredLogger
"""
logger = StructuredLogger(suite)
# Keep track of any options passed for formatters.
formatter_options = defaultdict(lambda: formatter_option_defaults.copy())
# Keep track of formatters and list of streams specified.
formatters = defaultdict(list)
found = False
found_stdout_logger = False
if not hasattr(args, 'iteritems'):
args = vars(args)
if defaults is None:
if sys.__stdout__.isatty():
defaults = {"mach": sys.stdout}
else:
defaults = {"raw": sys.stdout}
for name, values in args.iteritems():
parts = name.split('_')
if len(parts) > 3:
continue
# Our args will be ['log', <formatter>] or ['log', <formatter>, <option>].
if parts[0] == 'log' and values is not None:
if len(parts) == 1 or parts[1] not in log_formatters:
continue
if len(parts) == 2:
_, formatter = parts
for value in values:
found = True
if isinstance(value, basestring):
value = log_file(value)
if value == sys.stdout:
found_stdout_logger = True
formatters[formatter].append(value)
if len(parts) == 3:
_, formatter, opt = parts
formatter_options[formatter][opt] = values
#If there is no user-specified logging, go with the default options
if not found:
for name, value in defaults.iteritems():
formatters[name].append(value)
elif not found_stdout_logger and sys.stdout in defaults.values():
for name, value in defaults.iteritems():
if value == sys.stdout:
formatters[name].append(value)
setup_handlers(logger, formatters, formatter_options)
set_default_logger(logger)
return logger

View file

@ -0,0 +1,13 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import json
from unittest import UnittestFormatter
from xunit import XUnitFormatter
from html import HTMLFormatter
from machformatter import MachFormatter
from tbplformatter import TbplFormatter
def JSONFormatter():
return lambda x: json.dumps(x) + "\n"

View file

@ -0,0 +1,19 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from ..reader import LogHandler
class BaseFormatter(LogHandler):
"""Base class for implementing non-trivial formatters.
Subclasses are expected to provide a method for each action type they
wish to handle, each taking a single argument for the test data.
For example a trivial subclass that just produces the id of each test as
it starts might be::
class StartIdFormatter(BaseFormatter);
def test_start(data):
#For simplicity in the example pretend the id is always a string
return data["test"]
"""

View file

@ -0,0 +1 @@
from html import HTMLFormatter

View file

@ -0,0 +1,194 @@
#!/usr/bin/env python
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import base64
import cgi
from datetime import datetime
import os
from .. import base
from collections import defaultdict
html = None
raw = None
base_path = os.path.split(__file__)[0]
def do_defered_imports():
global html
global raw
from .xmlgen import html, raw
class HTMLFormatter(base.BaseFormatter):
"""Formatter that produces a simple HTML-formatted report."""
def __init__(self):
do_defered_imports()
self.suite_name = None
self.result_rows = []
self.test_count = defaultdict(int)
self.start_times = {}
self.suite_times = {"start": None,
"end": None}
self.head = None
self.env = {}
def suite_start(self, data):
self.suite_times["start"] = data["time"]
self.suite_name = data["source"]
with open(os.path.join(base_path, "style.css")) as f:
self.head = html.head(
html.meta(charset="utf-8"),
html.title(data["source"]),
html.style(raw(f.read())))
date_format = "%d %b %Y %H:%M:%S"
version_info = data.get("version_info")
if version_info:
self.env["Device identifier"] = version_info.get("device_id")
self.env["Device firmware (base)"] = version_info.get("device_firmware_version_base")
self.env["Device firmware (date)"] = (
datetime.utcfromtimestamp(int(version_info.get("device_firmware_date"))).strftime(date_format) if
"device_firmware_date" in version_info else None)
self.env["Device firmware (incremental)"] = version_info.get("device_firmware_version_incremental")
self.env["Device firmware (release)"] = version_info.get("device_firmware_version_release")
self.env["Gaia date"] = (
datetime.utcfromtimestamp(int(version_info.get("gaia_date"))).strftime(date_format) if
"gaia_date" in version_info else None)
self.env["Gecko version"] = version_info.get("application_version")
self.env["Gecko build"] = version_info.get("application_buildid")
if version_info.get("application_changeset"):
self.env["Gecko revision"] = version_info.get("application_changeset")
if version_info.get("application_repository"):
self.env["Gecko revision"] = html.a(
version_info.get("application_changeset"),
href="/".join([version_info.get("application_repository"),
version_info.get("application_changeset")]),
target="_blank")
if version_info.get("gaia_changeset"):
self.env["Gaia revision"] = html.a(
version_info.get("gaia_changeset")[:12],
href="https://github.com/mozilla-b2g/gaia/commit/%s" % version_info.get("gaia_changeset"),
target="_blank")
device_info = data.get("device_info")
if device_info:
self.env["Device uptime"] = device_info.get("uptime")
self.env["Device memory"] = device_info.get("memtotal")
self.env["Device serial"] = device_info.get("id")
def suite_end(self, data):
self.suite_times["end"] = data["time"]
return self.generate_html()
def test_start(self, data):
self.start_times[data["test"]] = data["time"]
def test_end(self, data):
self.make_result_html(data)
def make_result_html(self, data):
tc_time = (data["time"] - self.start_times.pop(data["test"])) / 1000.
additional_html = []
debug = data.get("extra", {})
links_html = []
status = status_name = data["status"]
expected = data.get("expected", status)
if status != expected:
status_name = "UNEXPECTED_" + status
elif status not in ("PASS", "SKIP"):
status_name = "EXPECTED_" + status
self.test_count[status_name] += 1
if status in ['SKIP', 'FAIL', 'ERROR']:
if debug.get('screenshot'):
screenshot = 'data:image/png;base64,%s' % debug['screenshot']
additional_html.append(html.div(
html.a(html.img(src=screenshot), href="#"),
class_='screenshot'))
for name, content in debug.items():
if 'screenshot' in name:
href = '#'
else:
# use base64 to avoid that some browser (such as Firefox, Opera)
# treats '#' as the start of another link if the data URL contains.
# use 'charset=utf-8' to show special characters like Chinese.
href = 'data:text/plain;charset=utf-8;base64,%s' % base64.b64encode(content.encode('utf-8'))
links_html.append(html.a(
name.title(),
class_=name,
href=href,
target='_blank'))
links_html.append(' ')
log = html.div(class_='log')
output = data.get('stack', '').splitlines()
output.extend(data.get('message', '').splitlines())
for line in output:
separator = line.startswith(' ' * 10)
if separator:
log.append(line[:80])
else:
if line.lower().find("error") != -1 or line.lower().find("exception") != -1:
log.append(html.span(raw(cgi.escape(line)), class_='error'))
else:
log.append(raw(cgi.escape(line)))
log.append(html.br())
additional_html.append(log)
self.result_rows.append(
html.tr([html.td(status_name, class_='col-result'),
html.td(data['test'], class_='col-name'),
html.td('%.2f' % tc_time, class_='col-duration'),
html.td(links_html, class_='col-links'),
html.td(additional_html, class_='debug')],
class_=status_name.lower() + ' results-table-row'))
def generate_html(self):
generated = datetime.utcnow()
with open(os.path.join(base_path, "main.js")) as main_f:
doc = html.html(
self.head,
html.body(
html.script(raw(main_f.read())),
html.p('Report generated on %s at %s' % (
generated.strftime('%d-%b-%Y'),
generated.strftime('%H:%M:%S'))),
html.h2('Environment'),
html.table(
[html.tr(html.td(k), html.td(v)) for k, v in sorted(self.env.items()) if v],
id='environment'),
html.h2('Summary'),
html.p('%i tests ran in %.1f seconds.' % (sum(self.test_count.itervalues()),
(self.suite_times["end"] -
self.suite_times["start"]) / 1000.),
html.br(),
html.span('%i passed' % self.test_count["PASS"], class_='pass'), ', ',
html.span('%i skipped' % self.test_count["SKIP"], class_='skip'), ', ',
html.span('%i failed' % self.test_count["UNEXPECTED_FAIL"], class_='fail'), ', ',
html.span('%i errors' % self.test_count["UNEXPECTED_ERROR"], class_='error'), '.',
html.br(),
html.span('%i expected failures' % self.test_count["EXPECTED_FAIL"],
class_='expected_fail'), ', ',
html.span('%i unexpected passes' % self.test_count["UNEXPECTED_PASS"],
class_='unexpected_pass'), '.'),
html.h2('Results'),
html.table([html.thead(
html.tr([
html.th('Result', class_='sortable', col='result'),
html.th('Test', class_='sortable', col='name'),
html.th('Duration', class_='sortable numeric', col='duration'),
html.th('Links')]), id='results-table-head'),
html.tbody(self.result_rows, id='results-table-body')], id='results-table')))
return u"<!DOCTYPE html>\n" + doc.unicode(indent=2)

View file

@ -0,0 +1,172 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
function toArray(iter) {
if (iter === null) {
return null;
}
return Array.prototype.slice.call(iter);
}
function find(selector, elem) {
if (!elem) {
elem = document;
}
return elem.querySelector(selector);
}
function find_all(selector, elem) {
if (!elem) {
elem = document;
}
return toArray(elem.querySelectorAll(selector));
}
addEventListener("DOMContentLoaded", function() {
reset_sort_headers();
split_debug_onto_two_rows();
find_all('.col-links a.screenshot').forEach(function(elem) {
elem.addEventListener("click",
function(event) {
var node = elem;
while (node && !node.classList.contains('results-table-row')) {
node = node.parentNode;
}
if (node != null) {
if (node.nextSibling &&
node.nextSibling.classList.contains("debug")) {
var href = find('.screenshot img', node.nextSibling).src;
window.open(href);
}
}
event.preventDefault();
}, false)
});
find_all('.screenshot a').forEach(function(elem) {
elem.addEventListener("click",
function(event) {
window.open(find('img', elem).getAttribute('src'));
event.preventDefault();
}, false)
});
find_all('.sortable').forEach(function(elem) {
elem.addEventListener("click",
function(event) {
toggle_sort_states(elem);
var colIndex = toArray(elem.parentNode.childNodes).indexOf(elem);
var key = elem.classList.contains('numeric') ? key_num : key_alpha;
sort_table(elem, key(colIndex));
}, false)
});
});
function sort_table(clicked, key_func) {
one_row_for_data();
var rows = find_all('.results-table-row');
var reversed = !clicked.classList.contains('asc');
var sorted_rows = sort(rows, key_func, reversed);
var parent = document.getElementById('results-table-body');
sorted_rows.forEach(function(elem) {
parent.appendChild(elem);
});
split_debug_onto_two_rows();
}
function sort(items, key_func, reversed) {
var sort_array = items.map(function(item, i) {
return [key_func(item), i];
});
var multiplier = reversed ? -1 : 1;
sort_array.sort(function(a, b) {
var key_a = a[0];
var key_b = b[0];
return multiplier * (key_a >= key_b ? 1 : -1);
});
return sort_array.map(function(item) {
var index = item[1];
return items[index];
});
}
function key_alpha(col_index) {
return function(elem) {
return elem.childNodes[col_index].firstChild.data.toLowerCase();
};
}
function key_num(col_index) {
return function(elem) {
return parseFloat(elem.childNodes[col_index].firstChild.data);
};
}
function reset_sort_headers() {
find_all('.sort-icon').forEach(function(elem) {
elem.parentNode.removeChild(elem);
});
find_all('.sortable').forEach(function(elem) {
var icon = document.createElement("div");
icon.className = "sort-icon";
icon.textContent = "vvv";
elem.insertBefore(icon, elem.firstChild);
elem.classList.remove("desc", "active");
elem.classList.add("asc", "inactive");
});
}
function toggle_sort_states(elem) {
//if active, toggle between asc and desc
if (elem.classList.contains('active')) {
elem.classList.toggle('asc');
elem.classList.toggle('desc');
}
//if inactive, reset all other functions and add ascending active
if (elem.classList.contains('inactive')) {
reset_sort_headers();
elem.classList.remove('inactive');
elem.classList.add('active');
}
}
function split_debug_onto_two_rows() {
find_all('tr.results-table-row').forEach(function(elem) {
var new_row = document.createElement("tr")
new_row.className = "debug";
elem.parentNode.insertBefore(new_row, elem.nextSibling);
find_all(".debug", elem).forEach(function (td_elem) {
if (find(".log", td_elem)) {
new_row.appendChild(td_elem);
td_elem.colSpan=5;
} else {
td_elem.parentNode.removeChild(td_elem);
}
});
});
}
function one_row_for_data() {
find_all('tr.results-table-row').forEach(function(elem) {
if (elem.nextSibling.classList.contains('debug')) {
toArray(elem.nextSibling.childNodes).forEach(
function (td_elem) {
elem.appendChild(td_elem);
})
} else {
var new_td = document.createElement("td");
new_td.className = "debug";
elem.appendChild(new_td);
}
});
}

View file

@ -0,0 +1,154 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
body {
font-family: Helvetica, Arial, sans-serif;
font-size: 12px;
min-width: 1200px;
color: #999;
}
h2 {
font-size: 16px;
color: black;
}
p {
color: black;
}
a {
color: #999;
}
table {
border-collapse: collapse;
}
/******************************
* SUMMARY INFORMATION
******************************/
#environment td {
padding: 5px;
border: 1px solid #E6E6E6;
}
#environment tr:nth-child(odd) {
background-color: #f6f6f6;
}
/******************************
* TEST RESULT COLORS
******************************/
span.pass, .pass .col-result {
color: green;
}
span.expected_fail, .expected_fail .col-result,
span.expected_skip, .expected_skip .col-result,
span.skip, .skip .col-result {
color: orange;
}
span.error, .error .col-result,
span.fail, .fail .col-result,
span.unexpected_error, .unexpected_error .col-result,
span.unexpected_fail, .unexpected_fail .col-result,
span.unexpected_pass, .unexpected_pass .col-result {
color: red;
}
/******************************
* RESULTS TABLE
*
* 1. Table Layout
* 2. Debug
* 3. Sorting items
*
******************************/
/*------------------
* 1. Table Layout
*------------------*/
#results-table {
border: 1px solid #e6e6e6;
color: #999;
font-size: 12px;
width: 100%
}
#results-table th, #results-table td {
padding: 5px;
border: 1px solid #E6E6E6;
text-align: left
}
#results-table th {
font-weight: bold
}
/*------------------
* 2. Debug
*------------------*/
.log:only-child {
height: inherit
}
.log {
background-color: #e6e6e6;
border: 1px solid #e6e6e6;
color: black;
display: block;
font-family: "Courier New", Courier, monospace;
height: 230px;
overflow-y: scroll;
padding: 5px;
white-space: pre-wrap
}
div.screenshot {
border: 1px solid #e6e6e6;
float: right;
margin-left: 5px;
height: 240px
}
div.screenshot img {
height: 240px
}
/*if the result is passed or xpassed don't show debug row*/
.passed + .debug, .unexpected.pass + .debug {
display: none;
}
/*------------------
* 3. Sorting items
*------------------*/
.sortable {
cursor: pointer;
}
.sort-icon {
font-size: 0px;
float: left;
margin-right: 5px;
margin-top: 5px;
/*triangle*/
width: 0;
height: 0;
border-left: 8px solid transparent;
border-right: 8px solid transparent;
}
.inactive .sort-icon {
/*finish triangle*/
border-top: 8px solid #E6E6E6;
}
.asc.active .sort-icon {
/*finish triangle*/
border-bottom: 8px solid #999;
}
.desc.active .sort-icon {
/*finish triangle*/
border-top: 8px solid #999;
}

View file

@ -0,0 +1,267 @@
"""
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
This file is originally from: https://bitbucket.org/hpk42/py, specifically:
https://bitbucket.org/hpk42/py/src/980c8d526463958ee7cae678a7e4e9b054f36b94/py/_xmlgen.py?at=default
by holger krekel, holger at merlinux eu. 2009
"""
import sys, re
if sys.version_info >= (3,0):
def u(s):
return s
def unicode(x):
if hasattr(x, '__unicode__'):
return x.__unicode__()
return str(x)
else:
def u(s):
return unicode(s)
unicode = unicode
class NamespaceMetaclass(type):
def __getattr__(self, name):
if name[:1] == '_':
raise AttributeError(name)
if self == Namespace:
raise ValueError("Namespace class is abstract")
tagspec = self.__tagspec__
if tagspec is not None and name not in tagspec:
raise AttributeError(name)
classattr = {}
if self.__stickyname__:
classattr['xmlname'] = name
cls = type(name, (self.__tagclass__,), classattr)
setattr(self, name, cls)
return cls
class Tag(list):
class Attr(object):
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
def __init__(self, *args, **kwargs):
super(Tag, self).__init__(args)
self.attr = self.Attr(**kwargs)
def __unicode__(self):
return self.unicode(indent=0)
__str__ = __unicode__
def unicode(self, indent=2):
l = []
SimpleUnicodeVisitor(l.append, indent).visit(self)
return u("").join(l)
def __repr__(self):
name = self.__class__.__name__
return "<%r tag object %d>" % (name, id(self))
Namespace = NamespaceMetaclass('Namespace', (object, ), {
'__tagspec__': None,
'__tagclass__': Tag,
'__stickyname__': False,
})
class HtmlTag(Tag):
def unicode(self, indent=2):
l = []
HtmlVisitor(l.append, indent, shortempty=False).visit(self)
return u("").join(l)
# exported plain html namespace
class html(Namespace):
__tagclass__ = HtmlTag
__stickyname__ = True
__tagspec__ = dict([(x,1) for x in (
'a,abbr,acronym,address,applet,area,b,bdo,big,blink,'
'blockquote,body,br,button,caption,center,cite,code,col,'
'colgroup,comment,dd,del,dfn,dir,div,dl,dt,em,embed,'
'fieldset,font,form,frameset,h1,h2,h3,h4,h5,h6,head,html,'
'i,iframe,img,input,ins,kbd,label,legend,li,link,listing,'
'map,marquee,menu,meta,multicol,nobr,noembed,noframes,'
'noscript,object,ol,optgroup,option,p,pre,q,s,script,'
'select,small,span,strike,strong,style,sub,sup,table,'
'tbody,td,textarea,tfoot,th,thead,title,tr,tt,u,ul,xmp,'
'base,basefont,frame,hr,isindex,param,samp,var'
).split(',') if x])
class Style(object):
def __init__(self, **kw):
for x, y in kw.items():
x = x.replace('_', '-')
setattr(self, x, y)
class raw(object):
"""just a box that can contain a unicode string that will be
included directly in the output"""
def __init__(self, uniobj):
self.uniobj = uniobj
class SimpleUnicodeVisitor(object):
""" recursive visitor to write unicode. """
def __init__(self, write, indent=0, curindent=0, shortempty=True):
self.write = write
self.cache = {}
self.visited = {} # for detection of recursion
self.indent = indent
self.curindent = curindent
self.parents = []
self.shortempty = shortempty # short empty tags or not
def visit(self, node):
""" dispatcher on node's class/bases name. """
cls = node.__class__
try:
visitmethod = self.cache[cls]
except KeyError:
for subclass in cls.__mro__:
visitmethod = getattr(self, subclass.__name__, None)
if visitmethod is not None:
break
else:
visitmethod = self.__object
self.cache[cls] = visitmethod
visitmethod(node)
# the default fallback handler is marked private
# to avoid clashes with the tag name object
def __object(self, obj):
#self.write(obj)
self.write(escape(unicode(obj)))
def raw(self, obj):
self.write(obj.uniobj)
def list(self, obj):
assert id(obj) not in self.visited
self.visited[id(obj)] = 1
for elem in obj:
self.visit(elem)
def Tag(self, tag):
assert id(tag) not in self.visited
try:
tag.parent = self.parents[-1]
except IndexError:
tag.parent = None
self.visited[id(tag)] = 1
tagname = getattr(tag, 'xmlname', tag.__class__.__name__)
if self.curindent and not self._isinline(tagname):
self.write("\n" + u(' ') * self.curindent)
if tag:
self.curindent += self.indent
self.write(u('<%s%s>') % (tagname, self.attributes(tag)))
self.parents.append(tag)
for x in tag:
self.visit(x)
self.parents.pop()
self.write(u('</%s>') % tagname)
self.curindent -= self.indent
else:
nameattr = tagname+self.attributes(tag)
if self._issingleton(tagname):
self.write(u('<%s/>') % (nameattr,))
else:
self.write(u('<%s></%s>') % (nameattr, tagname))
def attributes(self, tag):
# serialize attributes
attrlist = dir(tag.attr)
attrlist.sort()
l = []
for name in attrlist:
res = self.repr_attribute(tag.attr, name)
if res is not None:
l.append(res)
l.extend(self.getstyle(tag))
return u("").join(l)
def repr_attribute(self, attrs, name):
if name[:2] != '__':
value = getattr(attrs, name)
if name.endswith('_'):
name = name[:-1]
if isinstance(value, raw):
insert = value.uniobj
else:
insert = escape(unicode(value))
return ' %s="%s"' % (name, insert)
def getstyle(self, tag):
""" return attribute list suitable for styling. """
try:
styledict = tag.style.__dict__
except AttributeError:
return []
else:
stylelist = [x+': ' + y for x,y in styledict.items()]
return [u(' style="%s"') % u('; ').join(stylelist)]
def _issingleton(self, tagname):
"""can (and will) be overridden in subclasses"""
return self.shortempty
def _isinline(self, tagname):
"""can (and will) be overridden in subclasses"""
return False
class HtmlVisitor(SimpleUnicodeVisitor):
single = dict([(x, 1) for x in
('br,img,area,param,col,hr,meta,link,base,'
'input,frame').split(',')])
inline = dict([(x, 1) for x in
('a abbr acronym b basefont bdo big br cite code dfn em font '
'i img input kbd label q s samp select small span strike '
'strong sub sup textarea tt u var'.split(' '))])
def repr_attribute(self, attrs, name):
if name == 'class_':
value = getattr(attrs, name)
if value is None:
return
return super(HtmlVisitor, self).repr_attribute(attrs, name)
def _issingleton(self, tagname):
return tagname in self.single
def _isinline(self, tagname):
return tagname in self.inline
class _escape:
def __init__(self):
self.escape = {
u('"') : u('&quot;'), u('<') : u('&lt;'), u('>') : u('&gt;'),
u('&') : u('&amp;'), u("'") : u('&apos;'),
}
self.charef_rex = re.compile(u("|").join(self.escape.keys()))
def _replacer(self, match):
return self.escape[match.group(0)]
def __call__(self, ustring):
""" xml-escape the given unicode string. """
ustring = unicode(ustring)
return self.charef_rex.sub(self._replacer, ustring)
escape = _escape()

View file

@ -0,0 +1,356 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import time
from collections import defaultdict
try:
import blessings
except ImportError:
blessings = None
import base
def format_seconds(total):
"""Format number of seconds to MM:SS.DD form."""
minutes, seconds = divmod(total, 60)
return '%2d:%05.2f' % (minutes, seconds)
class NullTerminal(object):
def __getattr__(self, name):
return self._id
def _id(self, value):
return value
class MachFormatter(base.BaseFormatter):
def __init__(self, start_time=None, write_interval=False, write_times=True,
terminal=None, disable_colors=False):
if disable_colors:
terminal = None
elif terminal is None and blessings is not None:
terminal = blessings.Terminal()
if start_time is None:
start_time = time.time()
start_time = int(start_time * 1000)
self.start_time = start_time
self.write_interval = write_interval
self.write_times = write_times
self.status_buffer = {}
self.has_unexpected = {}
self.last_time = None
self.terminal = terminal
self.verbose = False
self._known_pids = set()
self.summary_values = {"tests": 0,
"subtests": 0,
"expected": 0,
"unexpected": defaultdict(int),
"skipped": 0}
self.summary_unexpected = []
def __call__(self, data):
s = base.BaseFormatter.__call__(self, data)
if s is None:
return
time = format_seconds(self._time(data))
action = data["action"].upper()
thread = data["thread"]
# Not using the NullTerminal here is a small optimisation to cut the number of
# function calls
if self.terminal is not None:
test = self._get_test_id(data)
time = self.terminal.blue(time)
color = None
if data["action"] == "test_end":
if "expected" not in data and not self.has_unexpected[test]:
color = self.terminal.green
else:
color = self.terminal.red
elif data["action"] in ("suite_start", "suite_end",
"test_start", "test_status"):
color = self.terminal.yellow
elif data["action"] == "crash":
color = self.terminal.red
if color is not None:
action = color(action)
return "%s %s: %s %s\n" % (time, action, thread, s)
def _get_test_id(self, data):
test_id = data.get("test")
if isinstance(test_id, list):
test_id = tuple(test_id)
return test_id
def _get_file_name(self, test_id):
if isinstance(test_id, (str, unicode)):
return test_id
if isinstance(test_id, tuple):
return "".join(test_id)
assert False, "unexpected test_id"
def suite_start(self, data):
self.summary_values = {"tests": 0,
"subtests": 0,
"expected": 0,
"unexpected": defaultdict(int),
"skipped": 0}
self.summary_unexpected = []
return "%i" % len(data["tests"])
def suite_end(self, data):
term = self.terminal if self.terminal is not None else NullTerminal()
heading = "Summary"
rv = ["", heading, "=" * len(heading), ""]
has_subtests = self.summary_values["subtests"] > 0
if has_subtests:
rv.append("Ran %i tests (%i parents, %i subtests)" %
(self.summary_values["tests"] + self.summary_values["subtests"],
self.summary_values["tests"],
self.summary_values["subtests"]))
else:
rv.append("Ran %i tests" % self.summary_values["tests"])
rv.append("Expected results: %i" % self.summary_values["expected"])
unexpected_count = sum(self.summary_values["unexpected"].values())
if unexpected_count > 0:
unexpected_str = " (%s)" % ", ".join("%s: %i" % (key, value) for key, value in
sorted(self.summary_values["unexpected"].items()))
else:
unexpected_str = ""
rv.append("Unexpected results: %i%s" % (unexpected_count, unexpected_str))
if self.summary_values["skipped"] > 0:
rv.append("Skipped: %i" % self.summary_values["skipped"])
rv.append("")
if not self.summary_values["unexpected"]:
rv.append(term.green("OK"))
else:
heading = "Unexpected Results"
rv.extend([heading, "=" * len(heading), ""])
if has_subtests:
for test_id, results in self.summary_unexpected:
test = self._get_file_name(test_id)
rv.extend([test, "-" * len(test)])
for name, status, expected, message in results:
if name is None:
name = "[Parent]"
rv.append("%s %s" % (self.format_expected(status, expected), name))
else:
for test_id, results in self.summary_unexpected:
test = self._get_file_name(test_id)
assert len(results) == 1
name, status, expected, messge = results[0]
assert name is None
rv.append("%s %s" % (self.format_expected(status, expected), test))
return "\n".join(rv)
def format_expected(self, status, expected):
term = self.terminal if self.terminal is not None else NullTerminal()
if status == "ERROR":
color = term.red
else:
color = term.yellow
if expected in ("PASS", "OK"):
return color(status)
return color("%s expected %s" % (status, expected))
def test_start(self, data):
self.summary_values["tests"] += 1
return "%s" % (self._get_test_id(data),)
def test_end(self, data):
subtests = self._get_subtest_data(data)
unexpected = subtests["unexpected"]
message = data.get("message", "")
if "stack" in data:
stack = data["stack"]
if stack and stack[-1] != "\n":
stack += "\n"
message = stack + message
if "expected" in data:
parent_unexpected = True
expected_str = ", expected %s" % data["expected"]
unexpected.append((None, data["status"], data["expected"],
message))
else:
parent_unexpected = False
expected_str = ""
test = self._get_test_id(data)
if unexpected:
self.summary_unexpected.append((test, unexpected))
self._update_summary(data)
#Reset the counts to 0
self.status_buffer[test] = {"count": 0, "unexpected": [], "pass": 0}
self.has_unexpected[test] = bool(unexpected)
if subtests["count"] != 0:
rv = "Harness %s%s. Subtests passed %i/%i. Unexpected %s" % (
data["status"], expected_str, subtests["pass"], subtests["count"],
len(unexpected))
else:
rv = "%s%s" % (data["status"], expected_str)
if unexpected:
rv += "\n"
if len(unexpected) == 1 and parent_unexpected:
rv += "%s" % unexpected[0][-1]
else:
for name, status, expected, message in unexpected:
if name is None:
name = "[Parent]"
expected_str = "Expected %s, got %s" % (expected, status)
rv += "%s\n" % ("\n".join([name, "-" * len(name), expected_str, message]))
rv = rv[:-1]
return rv
def test_status(self, data):
self.summary_values["subtests"] += 1
test = self._get_test_id(data)
if test not in self.status_buffer:
self.status_buffer[test] = {"count": 0, "unexpected": [], "pass": 0}
self.status_buffer[test]["count"] += 1
message = data.get("message", "")
if "stack" in data:
if message:
message += "\n"
message += data["stack"]
if data["status"] == "PASS":
self.status_buffer[test]["pass"] += 1
self._update_summary(data)
rv = None
status, subtest = data["status"], data["subtest"]
unexpected = "expected" in data
if self.verbose:
if self.terminal is not None:
status = (self.terminal.red if unexpected else self.terminal.green)(status)
rv = " ".join([subtest, status, message])
elif unexpected:
# We only append an unexpected summary if it was not logged
# directly by verbose mode.
self.status_buffer[test]["unexpected"].append((subtest,
status,
data["expected"],
message))
return rv
def _update_summary(self, data):
if "expected" in data:
self.summary_values["unexpected"][data["status"]] += 1
elif data["status"] == "SKIP":
self.summary_values["skipped"] += 1
else:
self.summary_values["expected"] += 1
def process_output(self, data):
rv = []
if "command" in data and data["process"] not in self._known_pids:
self._known_pids.add(data["process"])
rv.append('(pid:%s) Full command: %s' % (data["process"], data["command"]))
rv.append('(pid:%s) "%s"' % (data["process"], data["data"]))
return "\n".join(rv)
def crash(self, data):
test = self._get_test_id(data)
if data.get("stackwalk_returncode", 0) != 0 and not data.get("stackwalk_stderr"):
success = True
else:
success = False
rv = ["pid:%s. Test:%s. Minidump anaylsed:%s. Signature:[%s]" %
(data.get("pid", None), test, success, data["signature"])]
if data.get("minidump_path"):
rv.append("Crash dump filename: %s" % data["minidump_path"])
if data.get("stackwalk_returncode", 0) != 0:
rv.append("minidump_stackwalk exited with return code %d" %
data["stackwalk_returncode"])
if data.get("stackwalk_stderr"):
rv.append("stderr from minidump_stackwalk:")
rv.append(data["stackwalk_stderr"])
elif data.get("stackwalk_stdout"):
rv.append(data["stackwalk_stdout"])
if data.get("stackwalk_errors"):
rv.extend(data.get("stackwalk_errors"))
rv = "\n".join(rv)
if not rv[-1] == "\n":
rv += "\n"
return rv
def log(self, data):
level = data.get("level").upper()
if self.terminal is not None:
if level in ("CRITICAL", "ERROR"):
level = self.terminal.red(level)
elif level == "WARNING":
level = self.terminal.yellow(level)
elif level == "INFO":
level = self.terminal.blue(level)
if data.get('component'):
rv = " ".join([data["component"], level, data["message"]])
else:
rv = "%s %s" % (level, data["message"])
if "stack" in data:
rv += "\n%s" % data["stack"]
return rv
def _get_subtest_data(self, data):
test = self._get_test_id(data)
return self.status_buffer.get(test, {"count": 0, "unexpected": [], "pass": 0})
def _time(self, data):
entry_time = data["time"]
if self.write_interval and self.last_time is not None:
t = entry_time - self.last_time
self.last_time = entry_time
else:
t = entry_time - self.start_time
return t / 1000.

View file

@ -0,0 +1,140 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from .base import BaseFormatter
class TbplFormatter(BaseFormatter):
"""Formatter that formats logs in the legacy formatting format used by TBPL
This is intended to be used to preserve backward compatibility with existing tools
hand-parsing this format.
"""
def __init__(self):
self.suite_start_time = None
self.test_start_times = {}
def __call__(self, data):
return getattr(self, data["action"])(data)
def log(self, data):
if data.get('component'):
message = "%s %s" % (data["component"], data["message"])
else:
message = data["message"]
if "stack" in data:
message += "\n%s" % data["stack"]
return "%s\n" % message
def process_output(self, data):
return "PROCESS | %(process)s | %(data)s\n" % data
def crash(self, data):
id = self.id_str(data["test"]) if "test" in data else "pid: %s" % data["process"]
signature = data["signature"] if data["signature"] else "unknown top frame"
rv = ["PROCESS-CRASH | %s | application crashed [%s]" % (id, signature)]
if data.get("minidump_path"):
rv.append("Crash dump filename: %s" % data["minidump_path"])
if data.get("stackwalk_stderr"):
rv.append("stderr from minidump_stackwalk:")
rv.append(data["stackwalk_stderr"])
elif data.get("stackwalk_stdout"):
rv.append(data["stackwalk_stdout"])
if data.get("stackwalk_returncode", 0) != 0:
rv.append("minidump_stackwalk exited with return code %d" %
data["stackwalk_returncode"])
if data.get("stackwalk_errors"):
rv.extend(data.get("stackwalk_errors"))
rv = "\n".join(rv)
if not rv[-1] == "\n":
rv += "\n"
return rv
def suite_start(self, data):
self.suite_start_time = data["time"]
return "SUITE-START | Running %i tests\n" % len(data["tests"])
def test_start(self, data):
self.test_start_times[self.test_id(data["test"])] = data["time"]
return "TEST-START | %s\n" % self.id_str(data["test"])
def test_status(self, data):
message = "- " + data["message"] if "message" in data else ""
if "stack" in data:
message += "\n%s" % data["stack"]
if message and message[-1] == "\n":
message = message[:-1]
if "expected" in data:
if not message:
message = "- expected %s" % data["expected"]
failure_line = "TEST-UNEXPECTED-%s | %s | %s %s\n" % (
data["status"], self.id_str(data["test"]), data["subtest"],
message)
if data["expected"] != "PASS":
info_line = "TEST-INFO | expected %s\n" % data["expected"]
return failure_line + info_line
return failure_line
return "TEST-%s | %s | %s %s\n" % (
data["status"], self.id_str(data["test"]), data["subtest"],
message)
def test_end(self, data):
test_id = self.test_id(data["test"])
time_msg = ""
if test_id in self.test_start_times:
start_time = self.test_start_times.pop(test_id)
time = data["time"] - start_time
time_msg = "took %ims" % time
if "expected" in data:
message = data.get("message", "")
if not message:
message = "expected %s" % data["expected"]
if "stack" in data:
message += "\n%s" % data["stack"]
if message and message[-1] == "\n":
message = message[:-1]
failure_line = "TEST-UNEXPECTED-%s | %s | %s\n" % (
data["status"], test_id, message)
if data["expected"] not in ("PASS", "OK"):
expected_msg = "expected %s | " % data["expected"]
else:
expected_msg = ""
info_line = "TEST-INFO %s%s\n" % (expected_msg, time_msg)
return failure_line + info_line
return "TEST-%s | %s | %s\n" % (
data["status"], test_id, time_msg)
def suite_end(self, data):
start_time = self.suite_start_time
time = int((data["time"] - start_time) / 1000)
return "SUITE-END | took %is\n" % time
def test_id(self, test_id):
if isinstance(test_id, (str, unicode)):
return test_id
else:
return tuple(test_id)
def id_str(self, test_id):
if isinstance(test_id, (str, unicode)):
return test_id
else:
return " ".join(test_id)

View file

@ -0,0 +1,58 @@
#!/usr/bin/env python
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import base
class UnittestFormatter(base.BaseFormatter):
"""Formatter designed to produce output in a format like that used by
the ``unittest`` module in the standard library."""
def __init__(self):
self.fails = []
self.errors = []
self.tests_run = 0
self.start_time = None
self.end_time = None
def suite_start(self, data):
self.start_time = data["time"]
def test_start(self, data):
self.tests_run += 1
def test_end(self, data):
char = "."
if "expected" in data:
status = data["status"]
char = {"FAIL": "F",
"ERROR": "E",
"PASS": "X"}[status]
if status == "FAIL":
self.fails.append(data)
elif status == "ERROR":
self.errors.append(data)
elif data["status"] == "SKIP":
char = "S"
return char
def suite_end(self, data):
self.end_time = data["time"]
summary = "\n".join([self.output_fails(),
self.output_errors(),
self.output_summary()])
return "\n%s\n" % summary
def output_fails(self):
return "\n".join("FAIL %(test)s\n%(message)s\n" % data
for data in self.fails)
def output_errors(self):
return "\n".join("ERROR %(test)s\n%(message)s" % data
for data in self.errors)
def output_summary(self):
return ("Ran %i tests in %.1fs" % (self.tests_run,
(self.end_time - self.start_time) / 1000))

View file

@ -0,0 +1,100 @@
import types
from xml.etree import ElementTree
import base
def format_test_id(test_id):
"""Take a test id and return something that looks a bit like
a class path"""
if type(test_id) not in types.StringTypes:
#Not sure how to deal with reftests yet
raise NotImplementedError
#Turn a path into something like a class heirachy
return test_id.replace('.', '_').replace('/', ".")
class XUnitFormatter(base.BaseFormatter):
"""Formatter that produces XUnit-style XML output.
The tree is created in-memory so this formatter may be problematic
with very large log files.
Note that the data model isn't a perfect match. In
particular XUnit assumes that each test has a unittest-style
class name and function name, which isn't the case for us. The
implementation currently replaces path names with something that
looks like class names, but this doesn't work for test types that
actually produce class names, or for test types that have multiple
components in their test id (e.g. reftests)."""
def __init__(self):
self.tree = ElementTree.ElementTree()
self.root = None
self.suite_start_time = None
self.test_start_time = None
self.tests_run = 0
self.errors = 0
self.failures = 0
self.skips = 0
def suite_start(self, data):
self.root = ElementTree.Element("testsuite")
self.tree.root = self.root
self.suite_start_time = data["time"]
def test_start(self, data):
self.tests_run += 1
self.test_start_time = data["time"]
def _create_result(self, data):
test = ElementTree.SubElement(self.root, "testcase")
name = format_test_id(data["test"])
extra = data.get('extra') or {}
test.attrib["classname"] = extra.get('class_name') or name
if "subtest" in data:
test.attrib["name"] = data["subtest"]
# We generally don't know how long subtests take
test.attrib["time"] = "0"
else:
if "." in name:
test_name = name.rsplit(".", 1)[1]
else:
test_name = name
test.attrib["name"] = extra.get('method_name') or test_name
test.attrib["time"] = "%.2f" % ((data["time"] - self.test_start_time) / 1000.0)
if ("expected" in data and data["expected"] != data["status"]):
if data["status"] in ("NOTRUN", "ASSERT", "ERROR"):
result = ElementTree.SubElement(test, "error")
self.errors += 1
else:
result = ElementTree.SubElement(test, "failure")
self.failures += 1
result.attrib["message"] = "Expected %s, got %s" % (data["expected"], data["status"])
result.text = '%s\n%s' % (data.get('stack', ''), data.get('message', ''))
elif data["status"] == "SKIP":
result = ElementTree.SubElement(test, "skipped")
self.skips += 1
def test_status(self, data):
self._create_result(data)
def test_end(self, data):
self._create_result(data)
def suite_end(self, data):
self.root.attrib.update({"tests": str(self.tests_run),
"errors": str(self.errors),
"failures": str(self.failures),
"skips": str(self.skips),
"time": "%.2f" % (
(data["time"] - self.suite_start_time) / 1000.0)})
xml_string = ElementTree.tostring(self.root, encoding="utf8")
# pretty printing can not be done from xml.etree
from xml.dom import minidom
return minidom.parseString(xml_string).toprettyxml(encoding="utf8")

View file

@ -0,0 +1,7 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from .base import LogLevelFilter, StreamHandler, BaseHandler
from .statushandler import StatusHandler
from .bufferhandler import BufferHandler

View file

@ -0,0 +1,104 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from threading import Lock
import codecs
from ..structuredlog import log_levels
class BaseHandler(object):
"""A base handler providing message handling facilities to
derived classes.
:param inner: A handler-like callable that may receive messages
from a log user.
"""
def __init__(self, inner):
self.wrapped = []
if hasattr(inner, "handle_message"):
self.wrapped.append(inner)
self.message_handlers = {}
def register_message_handlers(self, topic, handlers):
self.message_handlers[topic] = handlers
def handle_message(self, topic, cmd, *args):
"""Handles a message for the given topic by calling a subclass-defined
callback for the command.
:param topic: The topic of the broadcasted message. Handlers opt-in to
receiving messages by identifying a topic when calling
register_message_handlers.
:param command: The command to issue. This is a string that corresponds
to a callback provided by the target.
:param arg: Arguments to pass to the identified message callback, if any.
"""
rv = []
if topic in self.message_handlers and cmd in self.message_handlers[topic]:
rv.append(self.message_handlers[topic][cmd](*args))
for inner in self.wrapped:
rv.extend(inner.handle_message(topic, cmd, *args))
return rv
class LogLevelFilter(BaseHandler):
"""Handler that filters out messages with action of log and a level
lower than some specified level.
:param inner: Handler to use for messages that pass this filter
:param level: Minimum log level to process
"""
def __init__(self, inner, level):
BaseHandler.__init__(self, inner)
self.inner = inner
self.level = log_levels[level.upper()]
def __call__(self, item):
if (item["action"] != "log" or
log_levels[item["level"].upper()] <= self.level):
return self.inner(item)
class StreamHandler(BaseHandler):
"""Handler for writing to a file-like object
:param stream: File-like object to write log messages to
:param formatter: formatter to convert messages to string format
"""
_lock = Lock()
def __init__(self, stream, formatter):
BaseHandler.__init__(self, formatter)
assert stream is not None
# This is a hack to deal with the case where we are passed a
# StreamWriter (e.g. by mach for stdout). A StreamWriter requires
# the code to handle unicode in exactly the opposite way compared
# to a normal stream i.e. you always have to pass in a Unicode
# object rather than a string object. Cope with that by extracting
# the underlying raw stream.
if isinstance(stream, codecs.StreamWriter):
stream = stream.stream
self.formatter = formatter
self.stream = stream
def __call__(self, data):
"""Write a log message.
:param data: Structured log message dictionary."""
formatted = self.formatter(data)
if not formatted:
return
with self._lock:
if isinstance(formatted, unicode):
self.stream.write(formatted.encode("utf-8", "replace"))
elif isinstance(formatted, str):
self.stream.write(formatted)
else:
assert False, "Got output from the formatter of an unexpected type"
self.stream.flush()

View file

@ -0,0 +1,82 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from .base import BaseHandler
class BufferHandler(BaseHandler):
"""Handler that maintains a circular buffer of messages based on the
size and actions specified by a user.
:param inner: The underlying handler used to emit messages.
:param message_limit: The maximum number of messages to retain for
context. If None, the buffer will grow without limit.
:param buffered_actions: The set of actions to include in the buffer
rather than log directly.
"""
def __init__(self, inner, message_limit=100, buffered_actions=None):
BaseHandler.__init__(self, inner)
self.inner = inner
self.message_limit = message_limit
if buffered_actions is None:
buffered_actions = ['log', 'test_status']
self.buffered_actions = set(buffered_actions)
self._buffering = True
if self.message_limit is not None:
self._buffer = [None] * self.message_limit
self._buffer_pos = 0
else:
self._buffer = []
self.register_message_handlers("buffer", {
"on": self._enable_buffering,
"off": self._disable_buffering,
"flush": self._flush_buffered,
"clear": self._clear_buffer,
})
def __call__(self, data):
action = data['action']
if 'bypass_mozlog_buffer' in data:
data.pop('bypass_mozlog_buffer')
self.inner(data)
return
if not self._buffering or action not in self.buffered_actions:
self.inner(data)
return
self._add_message(data)
def _add_message(self, data):
if self.message_limit is None:
self._buffer.append(data)
else:
self._buffer[self._buffer_pos] = data
self._buffer_pos = (self._buffer_pos + 1) % self.message_limit
def _enable_buffering(self):
self._buffering = True
def _disable_buffering(self):
self._buffering = False
def _clear_buffer(self):
"""Clear the buffer of unwanted messages."""
current_size = len([m for m in self._buffer if m is not None])
if self.message_limit is not None:
self._buffer = [None] * self.message_limit
else:
self._buffer = []
return current_size
def _flush_buffered(self):
"""Logs the contents of the current buffer"""
for msg in self._buffer[self._buffer_pos:]:
if msg is not None:
self.inner(msg)
for msg in self._buffer[:self._buffer_pos]:
if msg is not None:
self.inner(msg)
return self._clear_buffer()

View file

@ -0,0 +1,54 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from collections import (
defaultdict,
namedtuple,
)
from mozlog.structured.structuredlog import log_levels
RunSummary = namedtuple("RunSummary",
("unexpected_statuses",
"expected_statuses",
"log_level_counts",
"action_counts"))
class StatusHandler(object):
"""A handler used to determine an overall status for a test run according
to a sequence of log messages."""
def __init__(self):
# The count of each type of unexpected result status (includes tests and subtests)
self.unexpected_statuses = defaultdict(int)
# The count of each type of expected result status (includes tests and subtests)
self.expected_statuses = defaultdict(int)
# The count of actions logged
self.action_counts = defaultdict(int)
# The count of messages logged at each log level
self.log_level_counts = defaultdict(int)
def __call__(self, data):
action = data['action']
self.action_counts[action] += 1
if action == 'log':
self.log_level_counts[data['level']] += 1
if action in ('test_status', 'test_end'):
status = data['status']
if 'expected' in data:
self.unexpected_statuses[status] += 1
else:
self.expected_statuses[status] += 1
def summarize(self):
return RunSummary(
dict(self.unexpected_statuses),
dict(self.expected_statuses),
dict(self.log_level_counts),
dict(self.action_counts),
)

View file

@ -0,0 +1,174 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
convertor_registry = {}
missing = object()
no_default = object()
class log_action(object):
def __init__(self, *args):
self.args = {}
self.args_no_default = []
self.args_with_default = []
# These are the required fields in a log message that usually aren't
# supplied by the caller, but can be in the case of log_raw
self.default_args = [
Unicode("action"),
Int("time"),
Unicode("thread"),
Int("pid", default=None),
Unicode("source"),
Unicode("component")]
for arg in args:
if arg.default is no_default:
self.args_no_default.append(arg.name)
else:
self.args_with_default.append(arg.name)
if arg.name in self.args:
raise ValueError("Repeated argument name %s" % arg.name)
self.args[arg.name] = arg
for extra in self.default_args:
self.args[extra.name] = extra
def __call__(self, f):
convertor_registry[f.__name__] = self
converter = self
def inner(self, *args, **kwargs):
data = converter.convert(*args, **kwargs)
return f(self, data)
if hasattr(f, '__doc__'):
setattr(inner, '__doc__', f.__doc__)
return inner
def convert(self, *args, **kwargs):
data = {}
values = {}
values.update(kwargs)
positional_no_default = [item for item in self.args_no_default if item not in values]
num_no_default = len(positional_no_default)
if len(args) < num_no_default:
raise TypeError("Too few arguments")
if len(args) > num_no_default + len(self.args_with_default):
raise TypeError("Too many arguments")
for i, name in enumerate(positional_no_default):
values[name] = args[i]
positional_with_default = [self.args_with_default[i]
for i in range(len(args) - num_no_default)]
for i, name in enumerate(positional_with_default):
if name in values:
raise TypeError("Argument %s specified twice" % name)
values[name] = args[i + num_no_default]
# Fill in missing arguments
for name in self.args_with_default:
if name not in values:
values[name] = self.args[name].default
for key, value in values.iteritems():
if key in self.args:
out_value = self.args[key](value)
if out_value is not missing:
data[key] = out_value
else:
raise TypeError("Unrecognised argument %s" % key)
return data
def convert_known(self, **kwargs):
known_kwargs = {name: value for name, value in kwargs.iteritems()
if name in self.args}
return self.convert(**known_kwargs)
class DataType(object):
def __init__(self, name, default=no_default, optional=False):
self.name = name
self.default = default
if default is no_default and optional is not False:
raise ValueError("optional arguments require a default value")
self.optional = optional
def __call__(self, value):
if value == self.default:
if self.optional:
return missing
return self.default
try:
return self.convert(value)
except:
raise ValueError("Failed to convert value %s of type %s for field %s to type %s" %
(value, type(value).__name__, self.name, self.__class__.__name__))
class Unicode(DataType):
def convert(self, data):
if isinstance(data, unicode):
return data
if isinstance(data, str):
return data.decode("utf8", "replace")
return unicode(data)
class TestId(DataType):
def convert(self, data):
if isinstance(data, unicode):
return data
elif isinstance(data, str):
return data.decode("utf-8", "replace")
elif isinstance(data, tuple):
# This is really a bit of a hack; should really split out convertors from the
# fields they operate on
func = Unicode(None).convert
return tuple(func(item) for item in data)
else:
raise ValueError
class Status(DataType):
allowed = ["PASS", "FAIL", "OK", "ERROR", "TIMEOUT", "CRASH", "ASSERT", "SKIP"]
def convert(self, data):
value = data.upper()
if value not in self.allowed:
raise ValueError
return value
class SubStatus(Status):
allowed = ["PASS", "FAIL", "ERROR", "TIMEOUT", "ASSERT", "NOTRUN"]
class Dict(DataType):
def convert(self, data):
return dict(data)
class List(DataType):
def __init__(self, name, item_type, default=no_default, optional=False):
DataType.__init__(self, name, default, optional)
self.item_type = item_type(None)
def convert(self, data):
return [self.item_type.convert(item) for item in data]
class Int(DataType):
def convert(self, data):
return int(data)
class Any(DataType):
def convert(self, data):
return data

View file

@ -0,0 +1,73 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import json
def read(log_f, raise_on_error=False):
"""Return a generator that will return the entries in a structured log file.
Note that the caller must not close the file whilst the generator is still
in use.
:param log_f: file-like object containing the raw log entries, one per line
:param raise_on_error: boolean indicating whether ValueError should be raised
for lines that cannot be decoded."""
while True:
line = log_f.readline()
if not line:
# This allows log_f to be a stream like stdout
break
try:
yield json.loads(line)
except ValueError:
if raise_on_error:
raise
def imap_log(log_iter, action_map):
"""Create an iterator that will invoke a callback per action for each item in a
iterable containing structured log entries
:param log_iter: Iterator returning structured log entries
:param action_map: Dictionary mapping action name to callback function. Log items
with actions not in this dictionary will be skipped.
"""
for item in log_iter:
if item["action"] in action_map:
yield action_map[item["action"]](item)
def each_log(log_iter, action_map):
"""Call a callback for each item in an iterable containing structured
log entries
:param log_iter: Iterator returning structured log entries
:param action_map: Dictionary mapping action name to callback function. Log items
with actions not in this dictionary will be skipped.
"""
for item in log_iter:
if item["action"] in action_map:
action_map[item["action"]](item)
class LogHandler(object):
"""Base class for objects that act as log handlers. A handler is a callable
that takes a log entry as the only argument.
Subclasses are expected to provide a method for each action type they
wish to handle, each taking a single argument for the test data.
For example a trivial subclass that just produces the id of each test as
it starts might be::
class StartIdHandler(LogHandler):
def test_start(data):
#For simplicity in the example pretend the id is always a string
return data["test"]
"""
def __call__(self, data):
if hasattr(self, data["action"]):
handler = getattr(self, data["action"])
return handler(data)
def handle_log(log_iter, handler):
"""Call a handler for each item in a log, discarding the return value"""
for item in log_iter:
handler(item)

View file

@ -0,0 +1,30 @@
#!/usr/bin/env python
import argparse
import unstable
import format as formatlog
import logmerge
def get_parser():
parser = argparse.ArgumentParser("structlog",
description="Tools for dealing with structured logs")
commands = {"unstable": (unstable.get_parser, unstable.main),
"format": (formatlog.get_parser, formatlog.main),
"logmerge": (logmerge.get_parser, logmerge.main)}
sub_parser = parser.add_subparsers(title='Subcommands')
for command, (parser_func, main_func) in commands.iteritems():
parent = parser_func(False)
command_parser = sub_parser.add_parser(command,
description=parent.description,
parents=[parent])
command_parser.set_defaults(func=main_func)
return parser
def main():
parser = get_parser()
args = parser.parse_args()
args.func(**vars(args))

View file

@ -0,0 +1,39 @@
import argparse
import sys
from .. import handlers, commandline, reader
def get_parser(add_help=True):
parser = argparse.ArgumentParser("format",
description="Format a structured log stream", add_help=add_help)
parser.add_argument("--input", action="store", default=None,
help="Filename to read from, defaults to stdin")
parser.add_argument("--output", action="store", default=None,
help="Filename to write to, defaults to stdout")
parser.add_argument("format", choices=commandline.log_formatters.keys(),
help="Format to use")
return parser
def main(**kwargs):
if kwargs["input"] is None:
input_file = sys.stdin
else:
input_file = open(kwargs["input"])
if kwargs["output"] is None:
output_file = sys.stdout
else:
output_file = open(kwargs["output"], "w")
formatter = commandline.log_formatters[kwargs["format"]][0]()
handler = handlers.StreamHandler(stream=output_file,
formatter=formatter)
for data in reader.read(input_file):
handler(data)
if __name__ == "__main__":
parser = get_parser()
args = parser.parse_args()
kwargs = vars(args)
main(**kwargs)

View file

@ -0,0 +1,82 @@
from __future__ import print_function
import argparse
import json
import os
import sys
from threading import current_thread
import time
from mozlog.structured.reader import read
def dump_entry(entry, output):
json.dump(entry, output)
output.write("\n")
def fill_process_info(event):
event["time"] = int(round(time.time() * 1000))
event["thread"] = current_thread().name
event["pid"] = os.getpid()
return event
def process_until(reader, output, action):
for entry in reader:
if entry['action'] == action:
return entry
dump_entry(entry, output)
def process_until_suite_start(reader, output):
return process_until(reader, output, "suite_start")
def process_until_suite_end(reader, output):
return process_until(reader, output, "suite_end")
def validate_start_events(events):
for start in events:
if not start['run_info'] == events[0]['run_info']:
print("Error: different run_info entries", file=sys.stderr)
sys.exit(1)
def merge_start_events(events):
for start in events[1:]:
events[0]["tests"].extend(start["tests"])
return events[0]
def get_parser(add_help=True):
parser = argparse.ArgumentParser("logmerge", description='Merge multiple log files.', add_help=add_help)
parser.add_argument('-o', dest='output', help='output file, defaults to stdout')
parser.add_argument('files', metavar='File', type=str, nargs='+', help='file to be merged')
return parser
def main(**kwargs):
if kwargs["output"] is None:
output = sys.stdout
else:
output = open(kwargs["output"], "w")
readers = [read(open(filename, 'r')) for filename in kwargs["files"]]
start_events = [process_until_suite_start(reader, output) for reader in readers]
validate_start_events(start_events)
merged_start_event = merge_start_events(start_events)
dump_entry(fill_process_info(merged_start_event), output)
end_events = [process_until_suite_end(reader, output) for reader in readers]
dump_entry(fill_process_info(end_events[0]), output)
for reader in readers:
for entry in reader:
dump_entry(entry, output)
if __name__ == "__main__":
parser = get_parser()
args = parser.parse_args()
kwargs = vars(args)
main(**kwargs)

View file

@ -0,0 +1,108 @@
import argparse
from collections import defaultdict
import json
from mozlog.structured import reader
class StatusHandler(reader.LogHandler):
def __init__(self):
self.run_info = None
self.statuses = defaultdict(lambda:defaultdict(lambda:defaultdict(lambda: defaultdict(int))))
def test_id(self, test):
if type(test) in (str, unicode):
return test
else:
return tuple(test)
def suite_start(self, item):
self.run_info = tuple(sorted(item.get("run_info", {}).items()))
def test_status(self, item):
self.statuses[self.run_info][self.test_id(item["test"])][item["subtest"]][item["status"]] += 1
def test_end(self, item):
self.statuses[self.run_info][self.test_id(item["test"])][None][item["status"]] += 1
def suite_end(self, item):
self.run_info = None
def get_statuses(filenames):
handler = StatusHandler()
for filename in filenames:
with open(filename) as f:
reader.handle_log(reader.read(f), handler)
return handler.statuses
def _filter(results_cmp):
def inner(statuses):
rv = defaultdict(lambda:defaultdict(dict))
for run_info, tests in statuses.iteritems():
for test, subtests in tests.iteritems():
for name, results in subtests.iteritems():
if results_cmp(results):
rv[run_info][test][name] = results
return rv
return inner
filter_unstable = _filter(lambda x: len(x) > 1)
filter_stable = _filter(lambda x: len(x) == 1)
def group_results(data):
rv = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
for run_info, tests in data.iteritems():
for test, subtests in tests.iteritems():
for name, results in subtests.iteritems():
for status, number in results.iteritems():
rv[test][name][status] += number
return rv
def print_results(data):
for run_info, tests in data.iteritems():
run_str = " ".join("%s:%s" % (k,v) for k,v in run_info) if run_info else "No Run Info"
print run_str
print "=" * len(run_str)
print_run(tests)
def print_run(tests):
for test, subtests in sorted(tests.items()):
print "\n" + str(test)
print "-" * len(test)
for name, results in subtests.iteritems():
print "[%s]: %s" % (name if name is not None else "",
" ".join("%s (%i)" % (k,v) for k,v in results.iteritems()))
def get_parser(add_help=True):
parser = argparse.ArgumentParser("unstable",
description="List tests that don't give consistent results from one or more runs.", add_help=add_help)
parser.add_argument("--json", action="store_true", default=False,
help="Output in JSON format")
parser.add_argument("--group", action="store_true", default=False,
help="Group results from different run types")
parser.add_argument("log_file", nargs="+",
help="Log files to read")
return parser
def main(**kwargs):
unstable = filter_unstable(get_statuses(kwargs["log_file"]))
if kwargs["group"]:
unstable = group_results(unstable)
if kwargs["json"]:
print json.dumps(unstable)
else:
if not kwargs["group"]:
print_results(unstable)
else:
print_run(unstable)
if __name__ == "__main__":
parser = get_parser()
args = parser.parse_args()
kwargs = vars(args)
main(**kwargs)

View file

@ -0,0 +1,40 @@
import logging
from structuredlog import StructuredLogger, log_levels
class UnstructuredHandler(logging.Handler):
def __init__(self, name=None, level=logging.NOTSET):
self.structured = StructuredLogger(name)
logging.Handler.__init__(self, level=level)
def emit(self, record):
if record.levelname in log_levels:
log_func = getattr(self.structured, record.levelname.lower())
else:
log_func = self.logger.debug
log_func(record.msg)
def handle(self, record):
self.emit(record)
class LoggingWrapper(object):
def __init__(self, wrapped):
self.wrapped = wrapped
self.wrapped.addHandler(UnstructuredHandler(self.wrapped.name,
logging.getLevelName(self.wrapped.level)))
def add_handler(self, handler):
self.addHandler(handler)
def remove_handler(self, handler):
self.removeHandler(handler)
def __getattr__(self, name):
return getattr(self.wrapped, name)
def std_logging_adapter(logger):
"""Adapter for stdlib logging so that it produces structured
messages rather than standard logging messages
:param logger: logging.Logger to wrap"""
return LoggingWrapper(logger)

View file

@ -0,0 +1,425 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import unicode_literals
from multiprocessing import current_process
from threading import current_thread, Lock
import json
import sys
import time
import traceback
from logtypes import Unicode, TestId, Status, SubStatus, Dict, List, Int, Any
from logtypes import log_action, convertor_registry
"""Structured Logging for recording test results.
Allowed actions, and subfields:
suite_start
tests - List of test names
suite_end
test_start
test - ID for the test
path - Relative path to test (optional)
test_end
test - ID for the test
status [PASS | FAIL | OK | ERROR |
TIMEOUT | CRASH | ASSERT | SKIP] - test status
expected [As for status] - Status that the test was expected to get,
or absent if the test got the expected status
extra - Dictionary of harness-specific extra information e.g. debug info
test_status
test - ID for the test
subtest - Name of the subtest
status [PASS | FAIL | TIMEOUT | NOTRUN] - test status
expected [As for status] - Status that the subtest was expected to get,
or absent if the subtest got the expected status
process_output
process - PID of the process
command - Command line of the process
data - Output data from the process
log
level [CRITICAL | ERROR | WARNING |
INFO | DEBUG] - level of the logging message
message - Message to log
Subfields for all messages:
action - the action type of the current message
time - the timestamp in ms since the epoch of the log message
thread - name for the thread emitting the message
pid - id of the python process in which the logger is running
source - name for the source emitting the message
component - name of the subcomponent emitting the message
"""
_default_logger_name = None
def get_default_logger(component=None):
"""Gets the default logger if available, optionally tagged with component
name. Will return None if not yet set
:param component: The component name to tag log messages with
"""
global _default_logger_name
if not _default_logger_name:
return None
return StructuredLogger(_default_logger_name, component=component)
def set_default_logger(default_logger):
"""Sets the default logger to logger.
It can then be retrieved with :py:func:`get_default_logger`
Note that :py:func:`~mozlog.structured.commandline.setup_logging` will
set a default logger for you, so there should be no need to call this
function if you're using setting up logging that way (recommended).
:param default_logger: The logger to set to default.
"""
global _default_logger_name
_default_logger_name = default_logger.name
log_levels = dict((k.upper(), v) for v, k in
enumerate(["critical", "error", "warning", "info", "debug"]))
def log_actions():
"""Returns the set of actions implemented by mozlog."""
return set(convertor_registry.keys())
class LoggerState(object):
def __init__(self):
self.handlers = []
self.running_tests = set()
self.suite_started = False
self.component_states = {}
class ComponentState(object):
def __init__(self):
self.filter_ = None
class StructuredLogger(object):
_lock = Lock()
_logger_states = {}
"""Create a structured logger with the given name
:param name: The name of the logger.
:param component: A subcomponent that the logger belongs to (typically a library name)
"""
def __init__(self, name, component=None):
self.name = name
self.component = component
with self._lock:
if name not in self._logger_states:
self._logger_states[name] = LoggerState()
if component not in self._logger_states[name].component_states:
self._logger_states[name].component_states[component] = ComponentState()
self._state = self._logger_states[name]
self._component_state = self._state.component_states[component]
def add_handler(self, handler):
"""Add a handler to the current logger"""
self._state.handlers.append(handler)
def remove_handler(self, handler):
"""Remove a handler from the current logger"""
self._state.handlers.remove(handler)
def send_message(self, topic, command, *args):
"""Send a message to each handler configured for this logger. This
part of the api is useful to those users requiring dynamic control
of a handler's behavior.
:param topic: The name used by handlers to subscribe to a message.
:param command: The name of the command to issue.
:param args: Any arguments known to the target for specialized
behavior.
"""
rv = []
for handler in self._state.handlers:
if hasattr(handler, "handle_message"):
rv += handler.handle_message(topic, command, *args)
return rv
@property
def handlers(self):
"""A list of handlers that will be called when a
message is logged from this logger"""
return self._state.handlers
@property
def component_filter(self):
return self._component_state.filter_
@component_filter.setter
def component_filter(self, value):
self._component_state.filter_ = value
def log_raw(self, raw_data):
if "action" not in raw_data:
raise ValueError
action = raw_data["action"]
converted_data = convertor_registry[action].convert_known(**raw_data)
for k, v in raw_data.iteritems():
if k not in converted_data:
converted_data[k] = v
data = self._make_log_data(action, converted_data)
if action in ("test_status", "test_end"):
if (data["expected"] == data["status"] or
data["status"] == "SKIP" or
"expected" not in raw_data):
del data["expected"]
self._handle_log(data)
def _log_data(self, action, data=None):
if data is None:
data = {}
log_data = self._make_log_data(action, data)
self._handle_log(log_data)
def _handle_log(self, data):
with self._lock:
if self.component_filter:
data = self.component_filter(data)
if data is None:
return
for handler in self.handlers:
handler(data)
def _make_log_data(self, action, data):
all_data = {"action": action,
"time": int(time.time() * 1000),
"thread": current_thread().name,
"pid": current_process().pid,
"source": self.name}
if self.component:
all_data['component'] = self.component
all_data.update(data)
return all_data
@log_action(List("tests", Unicode),
Dict("run_info", default=None, optional=True),
Dict("version_info", default=None, optional=True),
Dict("device_info", default=None, optional=True))
def suite_start(self, data):
"""Log a suite_start message
:param list tests: Test identifiers that will be run in the suite.
:param dict run_info: Optional information typically provided by mozinfo.
:param dict version_info: Optional target application version information provided by mozversion.
:param dict device_info: Optional target device information provided by mozdevice.
"""
if self._state.suite_started:
self.error("Got second suite_start message before suite_end. Logged with data %s" %
json.dumps(data))
return
self._state.suite_started = True
self._log_data("suite_start", data)
@log_action()
def suite_end(self, data):
"""Log a suite_end message"""
if not self._state.suite_started:
self.error("Got suite_end message before suite_start.")
return
self._state.suite_started = False
self._log_data("suite_end")
@log_action(TestId("test"),
Unicode("path", default=None, optional=True))
def test_start(self, data):
"""Log a test_start message
:param test: Identifier of the test that will run.
:param path: Path to test relative to some base (typically the root of
the source tree).
"""
if not self._state.suite_started:
self.error("Got test_start message before suite_start for test %s" %
data["test"])
return
if data["test"] in self._state.running_tests:
self.error("test_start for %s logged while in progress." %
data["test"])
return
self._state.running_tests.add(data["test"])
self._log_data("test_start", data)
@log_action(TestId("test"),
Unicode("subtest"),
SubStatus("status"),
SubStatus("expected", default="PASS"),
Unicode("message", default=None, optional=True),
Unicode("stack", default=None, optional=True),
Dict("extra", default=None, optional=True))
def test_status(self, data):
"""
Log a test_status message indicating a subtest result. Tests that
do not have subtests are not expected to produce test_status messages.
:param test: Identifier of the test that produced the result.
:param subtest: Name of the subtest.
:param status: Status string indicating the subtest result
:param expected: Status string indicating the expected subtest result.
:param message: String containing a message associated with the result.
:param stack: a stack trace encountered during test execution.
:param extra: suite-specific data associated with the test result.
"""
if (data["expected"] == data["status"] or
data["status"] == "SKIP"):
del data["expected"]
if data["test"] not in self._state.running_tests:
self.error("test_status for %s logged while not in progress. "
"Logged with data: %s" % (data["test"], json.dumps(data)))
return
self._log_data("test_status", data)
@log_action(TestId("test"),
Status("status"),
Status("expected", default="OK"),
Unicode("message", default=None, optional=True),
Unicode("stack", default=None, optional=True),
Dict("extra", default=None, optional=True))
def test_end(self, data):
"""
Log a test_end message indicating that a test completed. For tests
with subtests this indicates whether the overall test completed without
errors. For tests without subtests this indicates the test result
directly.
:param test: Identifier of the test that produced the result.
:param status: Status string indicating the test result
:param expected: Status string indicating the expected test result.
:param message: String containing a message associated with the result.
:param stack: a stack trace encountered during test execution.
:param extra: suite-specific data associated with the test result.
"""
if (data["expected"] == data["status"] or
data["status"] == "SKIP"):
del data["expected"]
if data["test"] not in self._state.running_tests:
self.error("test_end for %s logged while not in progress. "
"Logged with data: %s" % (data["test"], json.dumps(data)))
else:
self._state.running_tests.remove(data["test"])
self._log_data("test_end", data)
@log_action(Unicode("process"),
Unicode("data"),
Unicode("command", default=None, optional=True))
def process_output(self, data):
"""Log output from a managed process.
:param process: A unique identifier for the process producing the output
(typically the pid)
:param data: The output to log
:param command: A string representing the full command line used to start
the process.
"""
self._log_data("process_output", data)
@log_action(Unicode("process", default=None),
Unicode("signature", default="[Unknown]"),
TestId("test", default=None, optional=True),
Unicode("minidump_path", default=None, optional=True),
Unicode("minidump_extra", default=None, optional=True),
Int("stackwalk_retcode", default=None, optional=True),
Unicode("stackwalk_stdout", default=None, optional=True),
Unicode("stackwalk_stderr", default=None, optional=True),
List("stackwalk_errors", Unicode, default=None))
def crash(self, data):
if data["stackwalk_errors"] is None:
data["stackwalk_errors"] = []
self._log_data("crash", data)
def _log_func(level_name):
@log_action(Unicode("message"),
Any("exc_info", default=False))
def log(self, data):
exc_info = data.pop("exc_info", None)
if exc_info:
if not isinstance(exc_info, tuple):
exc_info = sys.exc_info()
if exc_info != (None, None, None):
bt = traceback.format_exception(*exc_info)
data["stack"] = u"\n".join(bt)
data["level"] = level_name
self._log_data("log", data)
log.__doc__ = """Log a message with level %s
:param message: The string message to log
:param exc_info: Either a boolean indicating whether to include a traceback
derived from sys.exc_info() or a three-item tuple in the
same format as sys.exc_info() containing exception information
to log.
""" % level_name
log.__name__ = str(level_name).lower()
return log
# Create all the methods on StructuredLog for debug levels
for level_name in log_levels:
setattr(StructuredLogger, level_name.lower(), _log_func(level_name))
class StructuredLogFileLike(object):
"""Wrapper for file-like objects to redirect writes to logger
instead. Each call to `write` becomes a single log entry of type `log`.
When using this it is important that the callees i.e. the logging
handlers do not themselves try to write to the wrapped file as this
will cause infinite recursion.
:param logger: `StructuredLogger` to which to redirect the file write operations.
:param level: log level to use for each write.
:param prefix: String prefix to prepend to each log entry.
"""
def __init__(self, logger, level="info", prefix=None):
self.logger = logger
self.log_func = getattr(self.logger, level)
self.prefix = prefix
def write(self, data):
if data.endswith("\n"):
data = data[:-1]
if data.endswith("\r"):
data = data[:-1]
if self.prefix is not None:
data = "%s: %s" % (self.prefix, data)
self.log_func(data)
def flush(self):
pass

36
python/mozlog/setup.py Normal file
View file

@ -0,0 +1,36 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
from setuptools import setup, find_packages
PACKAGE_NAME = 'mozlog'
PACKAGE_VERSION = '2.10'
setup(name=PACKAGE_NAME,
version=PACKAGE_VERSION,
description="Robust log handling specialized for logging in the Mozilla universe",
long_description="see http://mozbase.readthedocs.org/",
author='Mozilla Automation and Testing Team',
author_email='tools@lists.mozilla.org',
url='https://wiki.mozilla.org/Auto-tools/Projects/Mozbase',
license='MPL 1.1/GPL 2.0/LGPL 2.1',
packages=find_packages(),
zip_safe=False,
install_requires=["blessings>=1.3"],
tests_require=['mozfile'],
platforms =['Any'],
classifiers=['Development Status :: 4 - Beta',
'Environment :: Console',
'Intended Audience :: Developers',
'License :: OSI Approved :: Mozilla Public License 1.1 (MPL 1.1)',
'Operating System :: OS Independent',
'Topic :: Software Development :: Libraries :: Python Modules',
],
package_data={"mozlog.structured": ["formatters/html/main.js",
"formatters/html/style.css"]},
entry_points={
"console_scripts": [
"structlog = mozlog.structured.scripts:main"
]}
)

View file

@ -0,0 +1,2 @@
[test_logger.py]
[test_structured.py]

View file

@ -0,0 +1,259 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
import datetime
import json
import socket
import threading
import time
import unittest
import mozfile
import mozlog
class ListHandler(mozlog.Handler):
"""Mock handler appends messages to a list for later inspection."""
def __init__(self):
mozlog.Handler.__init__(self)
self.messages = []
def emit(self, record):
self.messages.append(self.format(record))
class TestLogging(unittest.TestCase):
"""Tests behavior of basic mozlog api."""
def test_logger_defaults(self):
"""Tests the default logging format and behavior."""
default_logger = mozlog.getLogger('default.logger')
self.assertEqual(default_logger.name, 'default.logger')
self.assertEqual(len(default_logger.handlers), 1)
self.assertTrue(isinstance(default_logger.handlers[0],
mozlog.StreamHandler))
f = mozfile.NamedTemporaryFile()
list_logger = mozlog.getLogger('file.logger',
handler=mozlog.FileHandler(f.name))
self.assertEqual(len(list_logger.handlers), 1)
self.assertTrue(isinstance(list_logger.handlers[0],
mozlog.FileHandler))
f.close()
self.assertRaises(ValueError, mozlog.getLogger,
'file.logger', handler=ListHandler())
def test_timestamps(self):
"""Verifies that timestamps are included when asked for."""
log_name = 'test'
handler = ListHandler()
handler.setFormatter(mozlog.MozFormatter())
log = mozlog.getLogger(log_name, handler=handler)
log.info('no timestamp')
self.assertTrue(handler.messages[-1].startswith('%s ' % log_name))
handler.setFormatter(mozlog.MozFormatter(include_timestamp=True))
log.info('timestamp')
# Just verify that this raises no exceptions.
datetime.datetime.strptime(handler.messages[-1][:23],
'%Y-%m-%d %H:%M:%S,%f')
class TestStructuredLogging(unittest.TestCase):
"""Tests structured output in mozlog."""
def setUp(self):
self.handler = ListHandler()
self.handler.setFormatter(mozlog.JSONFormatter())
self.logger = mozlog.MozLogger('test.Logger')
self.logger.addHandler(self.handler)
self.logger.setLevel(mozlog.DEBUG)
def check_messages(self, expected, actual):
"""Checks actual for equality with corresponding fields in actual.
The actual message should contain all fields in expected, and
should be identical, with the exception of the timestamp field.
The actual message should contain no fields other than the timestamp
field and those present in expected."""
self.assertTrue(isinstance(actual['_time'], (int, long)))
for k, v in expected.items():
self.assertEqual(v, actual[k])
for k in actual.keys():
if k != '_time':
self.assertTrue(expected.get(k) is not None)
def test_structured_output(self):
self.logger.log_structured('test_message',
{'_level': mozlog.INFO,
'_message': 'message one'})
self.logger.log_structured('test_message',
{'_level': mozlog.INFO,
'_message': 'message two'})
self.logger.log_structured('error_message',
{'_level': mozlog.ERROR,
'diagnostic': 'unexpected error'})
message_one_expected = {'_namespace': 'test.Logger',
'_level': 'INFO',
'_message': 'message one',
'action': 'test_message'}
message_two_expected = {'_namespace': 'test.Logger',
'_level': 'INFO',
'_message': 'message two',
'action': 'test_message'}
message_three_expected = {'_namespace': 'test.Logger',
'_level': 'ERROR',
'diagnostic': 'unexpected error',
'action': 'error_message'}
message_one_actual = json.loads(self.handler.messages[0])
message_two_actual = json.loads(self.handler.messages[1])
message_three_actual = json.loads(self.handler.messages[2])
self.check_messages(message_one_expected, message_one_actual)
self.check_messages(message_two_expected, message_two_actual)
self.check_messages(message_three_expected, message_three_actual)
def test_unstructured_conversion(self):
""" Tests that logging to a logger with a structured formatter
via the traditional logging interface works as expected. """
self.logger.info('%s %s %d', 'Message', 'number', 1)
self.logger.error('Message number 2')
self.logger.debug('Message with %s', 'some extras',
extra={'params': {'action': 'mozlog_test_output',
'is_failure': False}})
message_one_expected = {'_namespace': 'test.Logger',
'_level': 'INFO',
'_message': 'Message number 1'}
message_two_expected = {'_namespace': 'test.Logger',
'_level': 'ERROR',
'_message': 'Message number 2'}
message_three_expected = {'_namespace': 'test.Logger',
'_level': 'DEBUG',
'_message': 'Message with some extras',
'action': 'mozlog_test_output',
'is_failure': False}
message_one_actual = json.loads(self.handler.messages[0])
message_two_actual = json.loads(self.handler.messages[1])
message_three_actual = json.loads(self.handler.messages[2])
self.check_messages(message_one_expected, message_one_actual)
self.check_messages(message_two_expected, message_two_actual)
self.check_messages(message_three_expected, message_three_actual)
def message_callback(self):
if len(self.handler.messages) == 3:
message_one_expected = {'_namespace': 'test.Logger',
'_level': 'DEBUG',
'_message': 'socket message one',
'action': 'test_message'}
message_two_expected = {'_namespace': 'test.Logger',
'_level': 'DEBUG',
'_message': 'socket message two',
'action': 'test_message'}
message_three_expected = {'_namespace': 'test.Logger',
'_level': 'DEBUG',
'_message': 'socket message three',
'action': 'test_message'}
message_one_actual = json.loads(self.handler.messages[0])
message_two_actual = json.loads(self.handler.messages[1])
message_three_actual = json.loads(self.handler.messages[2])
self.check_messages(message_one_expected, message_one_actual)
self.check_messages(message_two_expected, message_two_actual)
self.check_messages(message_three_expected, message_three_actual)
def test_log_listener(self):
connection = '127.0.0.1', 0
self.log_server = mozlog.LogMessageServer(connection,
self.logger,
message_callback=self.message_callback,
timeout=0.5)
message_string_one = json.dumps({'_message': 'socket message one',
'action': 'test_message',
'_level': 'DEBUG'})
message_string_two = json.dumps({'_message': 'socket message two',
'action': 'test_message',
'_level': 'DEBUG'})
message_string_three = json.dumps({'_message': 'socket message three',
'action': 'test_message',
'_level': 'DEBUG'})
message_string = message_string_one + '\n' + \
message_string_two + '\n' + \
message_string_three + '\n'
server_thread = threading.Thread(target=self.log_server.handle_request)
server_thread.start()
host, port = self.log_server.server_address
sock = socket.socket()
sock.connect((host, port))
# Sleeps prevent listener from receiving entire message in a single call
# to recv in order to test reconstruction of partial messages.
sock.sendall(message_string[:8])
time.sleep(.01)
sock.sendall(message_string[8:32])
time.sleep(.01)
sock.sendall(message_string[32:64])
time.sleep(.01)
sock.sendall(message_string[64:128])
time.sleep(.01)
sock.sendall(message_string[128:])
server_thread.join()
class Loggable(mozlog.LoggingMixin):
"""Trivial class inheriting from LoggingMixin"""
pass
class TestLoggingMixin(unittest.TestCase):
"""Tests basic use of LoggingMixin"""
def test_mixin(self):
loggable = Loggable()
self.assertTrue(not hasattr(loggable, "_logger"))
loggable.log(mozlog.INFO, "This will instantiate the logger")
self.assertTrue(hasattr(loggable, "_logger"))
self.assertEqual(loggable._logger.name, "test_logger.Loggable")
self.assertRaises(ValueError, loggable.set_logger,
"not a logger")
logger = mozlog.MozLogger('test.mixin')
handler = ListHandler()
logger.addHandler(handler)
loggable.set_logger(logger)
self.assertTrue(isinstance(loggable._logger.handlers[0],
ListHandler))
self.assertEqual(loggable._logger.name, "test.mixin")
loggable.log(mozlog.WARN, 'message for "log" method')
loggable.info('message for "info" method')
loggable.error('message for "error" method')
loggable.log_structured('test_message',
params={'_message': 'message for ' + \
'"log_structured" method'})
expected_messages = ['message for "log" method',
'message for "info" method',
'message for "error" method',
'message for "log_structured" method']
actual_messages = loggable._logger.handlers[0].messages
self.assertEqual(expected_messages, actual_messages)
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,986 @@
# -*- coding: utf-8 -*-
import argparse
import json
import optparse
import os
import StringIO
import sys
import unittest
import xml.etree.ElementTree as ET
import mozfile
from mozlog.structured import (
commandline,
reader,
structuredlog,
stdadapter,
handlers,
formatters,
)
class TestHandler(object):
def __init__(self):
self.items = []
def __call__(self, data):
self.items.append(data)
@property
def last_item(self):
return self.items[-1]
@property
def empty(self):
return not self.items
class BaseStructuredTest(unittest.TestCase):
def setUp(self):
self.logger = structuredlog.StructuredLogger("test")
self.handler = TestHandler()
self.logger.add_handler(self.handler)
def pop_last_item(self):
return self.handler.items.pop()
def assert_log_equals(self, expected, actual=None):
if actual is None:
actual = self.pop_last_item()
all_expected = {"pid": os.getpid(),
"thread": "MainThread",
"source": "test"}
specials = set(["time"])
all_expected.update(expected)
for key, value in all_expected.iteritems():
self.assertEqual(actual[key], value)
self.assertEquals(set(all_expected.keys()) | specials, set(actual.keys()))
class TestStatusHandler(BaseStructuredTest):
def setUp(self):
super(TestStatusHandler, self).setUp()
self.handler = handlers.StatusHandler()
self.logger.add_handler(self.handler)
def test_failure_run(self):
self.logger.suite_start([])
self.logger.test_start("test1")
self.logger.test_status("test1", "sub1", status='PASS')
self.logger.test_status("test1", "sub2", status='TIMEOUT')
self.logger.test_end("test1", status='OK')
self.logger.suite_end()
summary = self.handler.summarize()
self.assertIn('TIMEOUT', summary.unexpected_statuses)
self.assertEqual(1, summary.unexpected_statuses['TIMEOUT'])
self.assertIn('PASS', summary.expected_statuses)
self.assertEqual(1, summary.expected_statuses['PASS'])
self.assertIn('OK', summary.expected_statuses)
self.assertEqual(1, summary.expected_statuses['OK'])
self.assertEqual(2, summary.action_counts['test_status'])
self.assertEqual(1, summary.action_counts['test_end'])
def test_error_run(self):
self.logger.suite_start([])
self.logger.test_start("test1")
self.logger.error("ERRR!")
self.logger.test_end("test1", status='OK')
self.logger.test_start("test2")
self.logger.test_end("test2", status='OK')
self.logger.suite_end()
summary = self.handler.summarize()
self.assertIn('ERROR', summary.log_level_counts)
self.assertEqual(1, summary.log_level_counts['ERROR'])
self.assertIn('OK', summary.expected_statuses)
self.assertEqual(2, summary.expected_statuses['OK'])
class TestStructuredLog(BaseStructuredTest):
def test_suite_start(self):
self.logger.suite_start(["test"])
self.assert_log_equals({"action": "suite_start",
"tests":["test"]})
self.logger.suite_end()
def test_suite_end(self):
self.logger.suite_start([])
self.logger.suite_end()
self.assert_log_equals({"action": "suite_end"})
def test_start(self):
self.logger.suite_start([])
self.logger.test_start("test1")
self.assert_log_equals({"action": "test_start",
"test":"test1"})
self.logger.test_start(("test1", "==", "test1-ref"), path="path/to/test")
self.assert_log_equals({"action": "test_start",
"test":("test1", "==", "test1-ref"),
"path": "path/to/test"})
self.logger.suite_end()
def test_start_inprogress(self):
self.logger.suite_start([])
self.logger.test_start("test1")
self.logger.test_start("test1")
self.assert_log_equals({"action": "log",
"message": "test_start for test1 logged while in progress.",
"level": "ERROR"})
self.logger.suite_end()
def test_status(self):
self.logger.suite_start([])
self.logger.test_start("test1")
self.logger.test_status("test1", "subtest name", "fail", expected="FAIL", message="Test message")
self.assert_log_equals({"action": "test_status",
"subtest": "subtest name",
"status": "FAIL",
"message": "Test message",
"test":"test1"})
self.logger.test_end("test1", "OK")
self.logger.suite_end()
def test_status_1(self):
self.logger.suite_start([])
self.logger.test_start("test1")
self.logger.test_status("test1", "subtest name", "fail")
self.assert_log_equals({"action": "test_status",
"subtest": "subtest name",
"status": "FAIL",
"expected": "PASS",
"test":"test1"})
self.logger.test_end("test1", "OK")
self.logger.suite_end()
def test_status_2(self):
self.assertRaises(ValueError, self.logger.test_status, "test1", "subtest name", "XXXUNKNOWNXXX")
def test_status_extra(self):
self.logger.suite_start([])
self.logger.test_start("test1")
self.logger.test_status("test1", "subtest name", "FAIL", expected="PASS", extra={"data": 42})
self.assert_log_equals({"action": "test_status",
"subtest": "subtest name",
"status": "FAIL",
"expected": "PASS",
"test": "test1",
"extra": {"data":42}
})
self.logger.test_end("test1", "OK")
self.logger.suite_end()
def test_status_stack(self):
self.logger.suite_start([])
self.logger.test_start("test1")
self.logger.test_status("test1", "subtest name", "FAIL", expected="PASS", stack="many\nlines\nof\nstack")
self.assert_log_equals({"action": "test_status",
"subtest": "subtest name",
"status": "FAIL",
"expected": "PASS",
"test": "test1",
"stack": "many\nlines\nof\nstack"
})
self.logger.test_end("test1", "OK")
self.logger.suite_end()
def test_status_not_started(self):
self.logger.test_status("test_UNKNOWN", "subtest", "PASS")
self.assertTrue(self.pop_last_item()["message"].startswith(
"test_status for test_UNKNOWN logged while not in progress. Logged with data: {"))
def test_end(self):
self.logger.suite_start([])
self.logger.test_start("test1")
self.logger.test_end("test1", "fail", message="Test message")
self.assert_log_equals({"action": "test_end",
"status": "FAIL",
"expected": "OK",
"message": "Test message",
"test":"test1"})
self.logger.suite_end()
def test_end_1(self):
self.logger.suite_start([])
self.logger.test_start("test1")
self.logger.test_end("test1", "PASS", expected="PASS", extra={"data":123})
self.assert_log_equals({"action": "test_end",
"status": "PASS",
"extra": {"data": 123},
"test":"test1"})
self.logger.suite_end()
def test_end_2(self):
self.assertRaises(ValueError, self.logger.test_end, "test1", "XXXUNKNOWNXXX")
def test_end_stack(self):
self.logger.suite_start([])
self.logger.test_start("test1")
self.logger.test_end("test1", "PASS", expected="PASS", stack="many\nlines\nof\nstack")
self.assert_log_equals({"action": "test_end",
"status": "PASS",
"test": "test1",
"stack": "many\nlines\nof\nstack"
})
self.logger.suite_end()
def test_end_no_start(self):
self.logger.test_end("test1", "PASS", expected="PASS")
self.assertTrue(self.pop_last_item()["message"].startswith(
"test_end for test1 logged while not in progress. Logged with data: {"))
self.logger.suite_end()
def test_end_twice(self):
self.logger.suite_start([])
self.logger.test_start("test2")
self.logger.test_end("test2", "PASS", expected="PASS")
self.assert_log_equals({"action": "test_end",
"status": "PASS",
"test": "test2"})
self.logger.test_end("test2", "PASS", expected="PASS")
last_item = self.pop_last_item()
self.assertEquals(last_item["action"], "log")
self.assertEquals(last_item["level"], "ERROR")
self.assertTrue(last_item["message"].startswith(
"test_end for test2 logged while not in progress. Logged with data: {"))
self.logger.suite_end()
def test_suite_start_twice(self):
self.logger.suite_start([])
self.assert_log_equals({"action": "suite_start",
"tests": []})
self.logger.suite_start([])
last_item = self.pop_last_item()
self.assertEquals(last_item["action"], "log")
self.assertEquals(last_item["level"], "ERROR")
self.logger.suite_end()
def test_suite_end_no_start(self):
self.logger.suite_start([])
self.assert_log_equals({"action": "suite_start",
"tests": []})
self.logger.suite_end()
self.assert_log_equals({"action": "suite_end"})
self.logger.suite_end()
last_item = self.pop_last_item()
self.assertEquals(last_item["action"], "log")
self.assertEquals(last_item["level"], "ERROR")
def test_multiple_loggers_suite_start(self):
logger1 = structuredlog.StructuredLogger("test")
self.logger.suite_start([])
logger1.suite_start([])
last_item = self.pop_last_item()
self.assertEquals(last_item["action"], "log")
self.assertEquals(last_item["level"], "ERROR")
def test_multiple_loggers_test_start(self):
logger1 = structuredlog.StructuredLogger("test")
self.logger.suite_start([])
self.logger.test_start("test")
logger1.test_start("test")
last_item = self.pop_last_item()
self.assertEquals(last_item["action"], "log")
self.assertEquals(last_item["level"], "ERROR")
def test_process(self):
self.logger.process_output(1234, "test output")
self.assert_log_equals({"action": "process_output",
"process": "1234",
"data": "test output"})
def test_log(self):
for level in ["critical", "error", "warning", "info", "debug"]:
getattr(self.logger, level)("message")
self.assert_log_equals({"action": "log",
"level": level.upper(),
"message": "message"})
def test_logging_adapter(self):
import logging
logging.basicConfig(level="DEBUG")
old_level = logging.root.getEffectiveLevel()
logging.root.setLevel("DEBUG")
std_logger = logging.getLogger("test")
std_logger.setLevel("DEBUG")
logger = stdadapter.std_logging_adapter(std_logger)
try:
for level in ["critical", "error", "warning", "info", "debug"]:
getattr(logger, level)("message")
self.assert_log_equals({"action": "log",
"level": level.upper(),
"message": "message"})
finally:
logging.root.setLevel(old_level)
def test_add_remove_handlers(self):
handler = TestHandler()
self.logger.add_handler(handler)
self.logger.info("test1")
self.assert_log_equals({"action": "log",
"level": "INFO",
"message": "test1"})
self.assert_log_equals({"action": "log",
"level": "INFO",
"message": "test1"}, actual=handler.last_item)
self.logger.remove_handler(handler)
self.logger.info("test2")
self.assert_log_equals({"action": "log",
"level": "INFO",
"message": "test2"})
self.assert_log_equals({"action": "log",
"level": "INFO",
"message": "test1"}, actual=handler.last_item)
def test_wrapper(self):
file_like = structuredlog.StructuredLogFileLike(self.logger)
file_like.write("line 1")
self.assert_log_equals({"action": "log",
"level": "INFO",
"message": "line 1"})
file_like.write("line 2\n")
self.assert_log_equals({"action": "log",
"level": "INFO",
"message": "line 2"})
file_like.write("line 3\r")
self.assert_log_equals({"action": "log",
"level": "INFO",
"message": "line 3"})
file_like.write("line 4\r\n")
self.assert_log_equals({"action": "log",
"level": "INFO",
"message": "line 4"})
class TestTypeConversions(BaseStructuredTest):
def test_raw(self):
self.logger.log_raw({"action":"suite_start", "tests":[1], "time": "1234"})
self.assert_log_equals({"action": "suite_start",
"tests":["1"],
"time": 1234})
self.logger.suite_end()
def test_tuple(self):
self.logger.suite_start([])
self.logger.test_start(("\xf0\x90\x8d\x84\xf0\x90\x8c\xb4\xf0\x90\x8d\x83\xf0\x90\x8d\x84", 42, u"\u16a4"))
self.assert_log_equals({"action": "test_start",
"test": (u'\U00010344\U00010334\U00010343\U00010344', u"42", u"\u16a4")})
self.logger.suite_end()
def test_non_string_messages(self):
self.logger.suite_start([])
self.logger.info(1)
self.assert_log_equals({"action": "log",
"message": "1",
"level": "INFO"})
self.logger.info([1, (2, '3'), "s", "s" + chr(255)])
self.assert_log_equals({"action": "log",
"message": "[1, (2, '3'), 's', 's\\xff']",
"level": "INFO"})
self.logger.suite_end()
def test_utf8str_write(self):
with mozfile.NamedTemporaryFile() as logfile:
_fmt = formatters.TbplFormatter()
_handler = handlers.StreamHandler(logfile, _fmt)
self.logger.add_handler(_handler)
self.logger.suite_start([])
self.logger.info("")
logfile.seek(0)
data = logfile.readlines()[-1].strip()
self.assertEquals(data, "")
self.logger.suite_end()
def test_arguments(self):
self.logger.info(message="test")
self.assert_log_equals({"action": "log",
"message": "test",
"level": "INFO"})
self.logger.suite_start([], {})
self.assert_log_equals({"action": "suite_start",
"tests": [],
"run_info": {}})
self.logger.test_start(test="test1")
self.logger.test_status("subtest1", "FAIL", test="test1", status="PASS")
self.assert_log_equals({"action": "test_status",
"test": "test1",
"subtest": "subtest1",
"status": "PASS",
"expected": "FAIL"})
self.logger.process_output(123, "data", "test")
self.assert_log_equals({"action": "process_output",
"process": "123",
"command": "test",
"data": "data"})
self.assertRaises(TypeError, self.logger.test_status, subtest="subtest2",
status="FAIL", expected="PASS")
self.assertRaises(TypeError, self.logger.test_status, "test1", "subtest1",
"PASS", "FAIL", "message", "stack", {}, "unexpected")
self.assertRaises(TypeError, self.logger.test_status, "test1", test="test2")
self.logger.suite_end()
class TestComponentFilter(BaseStructuredTest):
def test_filter_component(self):
component_logger = structuredlog.StructuredLogger(self.logger.name,
"test_component")
component_logger.component_filter = handlers.LogLevelFilter(lambda x:x, "info")
self.logger.debug("Test")
self.assertFalse(self.handler.empty)
self.assert_log_equals({"action": "log",
"level": "DEBUG",
"message": "Test"})
self.assertTrue(self.handler.empty)
component_logger.info("Test 1")
self.assertFalse(self.handler.empty)
self.assert_log_equals({"action": "log",
"level": "INFO",
"message": "Test 1",
"component": "test_component"})
component_logger.debug("Test 2")
self.assertTrue(self.handler.empty)
component_logger.component_filter = None
component_logger.debug("Test 3")
self.assertFalse(self.handler.empty)
self.assert_log_equals({"action": "log",
"level": "DEBUG",
"message": "Test 3",
"component": "test_component"})
def test_filter_default_component(self):
component_logger = structuredlog.StructuredLogger(self.logger.name,
"test_component")
self.logger.debug("Test")
self.assertFalse(self.handler.empty)
self.assert_log_equals({"action": "log",
"level": "DEBUG",
"message": "Test"})
self.logger.component_filter = handlers.LogLevelFilter(lambda x:x, "info")
self.logger.debug("Test 1")
self.assertTrue(self.handler.empty)
component_logger.debug("Test 2")
self.assertFalse(self.handler.empty)
self.assert_log_equals({"action": "log",
"level": "DEBUG",
"message": "Test 2",
"component": "test_component"})
self.logger.component_filter = None
self.logger.debug("Test 3")
self.assertFalse(self.handler.empty)
self.assert_log_equals({"action": "log",
"level": "DEBUG",
"message": "Test 3"})
def test_filter_message_mutuate(self):
def filter_mutate(msg):
if msg["action"] == "log":
msg["message"] = "FILTERED! %s" % msg["message"]
return msg
self.logger.component_filter = filter_mutate
self.logger.debug("Test")
self.assert_log_equals({"action": "log",
"level": "DEBUG",
"message": "FILTERED! Test"})
self.logger.component_filter = None
class FormatterTest(unittest.TestCase):
def setUp(self):
self.position = 0
self.logger = structuredlog.StructuredLogger("test_%s" % type(self).__name__)
self.output_file = StringIO.StringIO()
self.handler = handlers.StreamHandler(
self.output_file, self.get_formatter())
self.logger.add_handler(self.handler)
def set_position(self, pos=None):
if pos is None:
pos = self.output_file.tell()
self.position = pos
def get_formatter(self):
raise NotImplementedError("FormatterTest subclasses must implement get_formatter")
@property
def loglines(self):
self.output_file.seek(self.position)
return [line.rstrip() for line in self.output_file.readlines()]
class TestTBPLFormatter(FormatterTest):
def get_formatter(self):
return formatters.TbplFormatter()
def test_unexpected_message(self):
self.logger.suite_start([])
self.logger.test_start("timeout_test")
self.logger.test_end("timeout_test",
"TIMEOUT",
message="timed out")
self.assertIn("TEST-UNEXPECTED-TIMEOUT | timeout_test | timed out",
self.loglines)
self.logger.suite_end()
def test_default_unexpected_end_message(self):
self.logger.suite_start([])
self.logger.test_start("timeout_test")
self.logger.test_end("timeout_test",
"TIMEOUT")
self.assertIn("TEST-UNEXPECTED-TIMEOUT | timeout_test | expected OK",
self.loglines)
self.logger.suite_end()
def test_default_unexpected_status_message(self):
self.logger.suite_start([])
self.logger.test_start("timeout_test")
self.logger.test_status("timeout_test",
"subtest",
status="TIMEOUT")
self.assertIn("TEST-UNEXPECTED-TIMEOUT | timeout_test | subtest - expected PASS",
self.loglines)
self.logger.test_end("timeout_test", "OK")
self.logger.suite_end()
def test_single_newline(self):
self.logger.suite_start([])
self.logger.test_start("test1")
self.set_position()
self.logger.test_status("test1", "subtest",
status="PASS",
expected="FAIL")
self.logger.test_end("test1", "OK")
self.logger.suite_end()
# This sequence should not produce blanklines
for line in self.loglines:
self.assertNotEqual("", line, "No blank line should be present in: %s" %
self.loglines)
class TestMachFormatter(FormatterTest):
def get_formatter(self):
return formatters.MachFormatter(disable_colors=True)
def test_summary(self):
self.logger.suite_start([])
#Some tests that pass
self.logger.test_start("test1")
self.logger.test_end("test1", status="PASS", expected="PASS")
self.logger.test_start("test2")
self.logger.test_end("test2", status="PASS", expected="TIMEOUT")
self.logger.test_start("test3")
self.logger.test_end("test3", status="FAIL", expected="PASS")
self.set_position()
self.logger.suite_end()
self.assertIn("Ran 3 tests", self.loglines)
self.assertIn("Expected results: 1", self.loglines)
self.assertIn("Unexpected results: 2 (FAIL: 1, PASS: 1)", self.loglines)
self.assertNotIn("test1", self.loglines)
self.assertIn("PASS expected TIMEOUT test2", self.loglines)
self.assertIn("FAIL test3", self.loglines)
def test_summary_subtests(self):
self.logger.suite_start([])
self.logger.test_start("test1")
self.logger.test_status("test1", "subtest1", status="PASS")
self.logger.test_status("test1", "subtest2", status="FAIL")
self.logger.test_end("test1", status="OK", expected="OK")
self.logger.test_start("test2")
self.logger.test_status("test2", "subtest1", status="TIMEOUT", expected="PASS")
self.logger.test_end("test2", status="TIMEOUT", expected="OK")
self.set_position()
self.logger.suite_end()
self.assertIn("Ran 5 tests (2 parents, 3 subtests)", self.loglines)
self.assertIn("Expected results: 2", self.loglines)
self.assertIn("Unexpected results: 3 (FAIL: 1, TIMEOUT: 2)", self.loglines)
def test_summary_ok(self):
self.logger.suite_start([])
self.logger.test_start("test1")
self.logger.test_status("test1", "subtest1", status="PASS")
self.logger.test_status("test1", "subtest2", status="PASS")
self.logger.test_end("test1", status="OK", expected="OK")
self.logger.test_start("test2")
self.logger.test_status("test2", "subtest1", status="PASS", expected="PASS")
self.logger.test_end("test2", status="OK", expected="OK")
self.set_position()
self.logger.suite_end()
self.assertIn("OK", self.loglines)
self.assertIn("Expected results: 5", self.loglines)
self.assertIn("Unexpected results: 0", self.loglines)
class TestXUnitFormatter(FormatterTest):
def get_formatter(self):
return formatters.XUnitFormatter()
def log_as_xml(self):
return ET.fromstring('\n'.join(self.loglines))
def test_stacktrace_is_present(self):
self.logger.suite_start([])
self.logger.test_start("test1")
self.logger.test_end("test1", "fail", message="Test message", stack='this\nis\na\nstack')
self.logger.suite_end()
root = self.log_as_xml()
self.assertIn('this\nis\na\nstack', root.find('testcase/failure').text)
def test_failure_message(self):
self.logger.suite_start([])
self.logger.test_start("test1")
self.logger.test_end("test1", "fail", message="Test message")
self.logger.suite_end()
root = self.log_as_xml()
self.assertEquals('Expected OK, got FAIL', root.find('testcase/failure').get('message'))
def test_suite_attrs(self):
self.logger.suite_start([])
self.logger.test_start("test1")
self.logger.test_end("test1", "ok", message="Test message")
self.logger.suite_end()
root = self.log_as_xml()
self.assertEqual(root.get('skips'), '0')
self.assertEqual(root.get('failures'), '0')
self.assertEqual(root.get('errors'), '0')
self.assertEqual(root.get('tests'), '1')
self.assertEqual(root.get('time'), '0.00')
def test_time_is_not_rounded(self):
# call formatter directly, it is easier here
formatter = self.get_formatter()
formatter.suite_start(dict(time=55000))
formatter.test_start(dict(time=55100))
formatter.test_end(dict(time=55558, test='id', message='message', status='PASS'))
xml_string = formatter.suite_end(dict(time=55559))
root = ET.fromstring(xml_string)
self.assertEqual(root.get('time'), '0.56')
self.assertEqual(root.find('testcase').get('time'), '0.46')
class TestCommandline(unittest.TestCase):
def setUp(self):
self.logfile = mozfile.NamedTemporaryFile()
@property
def loglines(self):
self.logfile.seek(0)
return [line.rstrip() for line in self.logfile.readlines()]
def test_setup_logging(self):
parser = argparse.ArgumentParser()
commandline.add_logging_group(parser)
args = parser.parse_args(["--log-raw=-"])
logger = commandline.setup_logging("test_setup_logging", args, {})
self.assertEqual(len(logger.handlers), 1)
def test_setup_logging_optparse(self):
parser = optparse.OptionParser()
commandline.add_logging_group(parser)
args, _ = parser.parse_args(["--log-raw=-"])
logger = commandline.setup_logging("test_optparse", args, {})
self.assertEqual(len(logger.handlers), 1)
self.assertIsInstance(logger.handlers[0], handlers.StreamHandler)
def test_limit_formatters(self):
parser = argparse.ArgumentParser()
commandline.add_logging_group(parser, include_formatters=['raw'])
other_formatters = [fmt for fmt in commandline.log_formatters
if fmt != 'raw']
# check that every formatter except raw is not present
for fmt in other_formatters:
with self.assertRaises(SystemExit):
parser.parse_args(["--log-%s=-" % fmt])
with self.assertRaises(SystemExit):
parser.parse_args(["--log-%s-level=error" % fmt])
# raw is still ok
args = parser.parse_args(["--log-raw=-"])
logger = commandline.setup_logging("test_setup_logging2", args, {})
self.assertEqual(len(logger.handlers), 1)
def test_setup_logging_optparse_unicode(self):
parser = optparse.OptionParser()
commandline.add_logging_group(parser)
args, _ = parser.parse_args([u"--log-raw=-"])
logger = commandline.setup_logging("test_optparse_unicode", args, {})
self.assertEqual(len(logger.handlers), 1)
self.assertEqual(logger.handlers[0].stream, sys.stdout)
self.assertIsInstance(logger.handlers[0], handlers.StreamHandler)
def test_logging_defaultlevel(self):
parser = argparse.ArgumentParser()
commandline.add_logging_group(parser)
args = parser.parse_args(["--log-tbpl=%s" % self.logfile.name])
logger = commandline.setup_logging("test_fmtopts", args, {})
logger.info("INFO message")
logger.debug("DEBUG message")
logger.error("ERROR message")
# The debug level is not logged by default.
self.assertEqual(["INFO message",
"ERROR message"],
self.loglines)
def test_logging_errorlevel(self):
parser = argparse.ArgumentParser()
commandline.add_logging_group(parser)
args = parser.parse_args(["--log-tbpl=%s" % self.logfile.name, "--log-tbpl-level=error"])
logger = commandline.setup_logging("test_fmtopts", args, {})
logger.info("INFO message")
logger.debug("DEBUG message")
logger.error("ERROR message")
# Only the error level and above were requested.
self.assertEqual(["ERROR message"],
self.loglines)
def test_logging_debuglevel(self):
parser = argparse.ArgumentParser()
commandline.add_logging_group(parser)
args = parser.parse_args(["--log-tbpl=%s" % self.logfile.name, "--log-tbpl-level=debug"])
logger = commandline.setup_logging("test_fmtopts", args, {})
logger.info("INFO message")
logger.debug("DEBUG message")
logger.error("ERROR message")
# Requesting a lower log level than default works as expected.
self.assertEqual(["INFO message",
"DEBUG message",
"ERROR message"],
self.loglines)
def test_unused_options(self):
parser = argparse.ArgumentParser()
commandline.add_logging_group(parser)
args = parser.parse_args(["--log-tbpl-level=error"])
self.assertRaises(ValueError, commandline.setup_logging, "test_fmtopts", args, {})
class TestBuffer(BaseStructuredTest):
def assert_log_equals(self, expected, actual=None):
if actual is None:
actual = self.pop_last_item()
all_expected = {"pid": os.getpid(),
"thread": "MainThread",
"source": "testBuffer"}
specials = set(["time"])
all_expected.update(expected)
for key, value in all_expected.iteritems():
self.assertEqual(actual[key], value)
self.assertEquals(set(all_expected.keys()) | specials, set(actual.keys()))
def setUp(self):
self.logger = structuredlog.StructuredLogger("testBuffer")
self.handler = handlers.BufferHandler(TestHandler(), message_limit=4)
self.logger.add_handler(self.handler)
def tearDown(self):
self.logger.remove_handler(self.handler)
def pop_last_item(self):
return self.handler.inner.items.pop()
def test_buffer_messages(self):
self.logger.suite_start([])
self.logger.test_start("test1")
self.logger.send_message("buffer", "off")
self.logger.test_status("test1", "sub1", status="PASS")
# Even for buffered actions, the buffer does not interfere if
# buffering is turned off.
self.assert_log_equals({"action": "test_status",
"test": "test1",
"status": "PASS",
"subtest": "sub1"})
self.logger.send_message("buffer", "on")
self.logger.test_status("test1", "sub2", status="PASS")
self.logger.test_status("test1", "sub3", status="PASS")
self.logger.test_status("test1", "sub4", status="PASS")
self.logger.test_status("test1", "sub5", status="PASS")
self.logger.test_status("test1", "sub6", status="PASS")
self.logger.test_status("test1", "sub7", status="PASS")
self.logger.test_end("test1", status="OK")
self.logger.send_message("buffer", "clear")
self.assert_log_equals({"action": "test_end",
"test": "test1",
"status": "OK"})
self.logger.suite_end()
def test_buffer_size(self):
self.logger.suite_start([])
self.logger.test_start("test1")
self.logger.test_status("test1", "sub1", status="PASS")
self.logger.test_status("test1", "sub2", status="PASS")
self.logger.test_status("test1", "sub3", status="PASS")
self.logger.test_status("test1", "sub4", status="PASS")
self.logger.test_status("test1", "sub5", status="PASS")
self.logger.test_status("test1", "sub6", status="PASS")
self.logger.test_status("test1", "sub7", status="PASS")
# No test status messages made it to the underlying handler.
self.assert_log_equals({"action": "test_start",
"test": "test1"})
# The buffer's actual size never grows beyond the specified limit.
self.assertEquals(len(self.handler._buffer), 4)
self.logger.test_status("test1", "sub8", status="FAIL")
# The number of messages deleted comes back in a list.
self.assertEquals([4], self.logger.send_message("buffer", "flush"))
# When the buffer is dumped, the failure is the last thing logged
self.assert_log_equals({"action": "test_status",
"test": "test1",
"subtest": "sub8",
"status": "FAIL",
"expected": "PASS"})
# Three additional messages should have been retained for context
self.assert_log_equals({"action": "test_status",
"test": "test1",
"status": "PASS",
"subtest": "sub7"})
self.assert_log_equals({"action": "test_status",
"test": "test1",
"status": "PASS",
"subtest": "sub6"})
self.assert_log_equals({"action": "test_status",
"test": "test1",
"status": "PASS",
"subtest": "sub5"})
self.assert_log_equals({"action": "suite_start",
"tests": []})
class TestReader(unittest.TestCase):
def to_file_like(self, obj):
data_str = "\n".join(json.dumps(item) for item in obj)
return StringIO.StringIO(data_str)
def test_read(self):
data = [{"action": "action_0", "data": "data_0"},
{"action": "action_1", "data": "data_1"}]
f = self.to_file_like(data)
self.assertEquals(data, list(reader.read(f)))
def test_imap_log(self):
data = [{"action": "action_0", "data": "data_0"},
{"action": "action_1", "data": "data_1"}]
f = self.to_file_like(data)
def f_action_0(item):
return ("action_0", item["data"])
def f_action_1(item):
return ("action_1", item["data"])
res_iter = reader.imap_log(reader.read(f),
{"action_0": f_action_0,
"action_1": f_action_1})
self.assertEquals([("action_0", "data_0"), ("action_1", "data_1")],
list(res_iter))
def test_each_log(self):
data = [{"action": "action_0", "data": "data_0"},
{"action": "action_1", "data": "data_1"}]
f = self.to_file_like(data)
count = {"action_0":0,
"action_1":0}
def f_action_0(item):
count[item["action"]] += 1
def f_action_1(item):
count[item["action"]] += 2
reader.each_log(reader.read(f),
{"action_0": f_action_0,
"action_1": f_action_1})
self.assertEquals({"action_0":1, "action_1":2}, count)
def test_handler(self):
data = [{"action": "action_0", "data": "data_0"},
{"action": "action_1", "data": "data_1"}]
f = self.to_file_like(data)
test = self
class ReaderTestHandler(reader.LogHandler):
def __init__(self):
self.action_0_count = 0
self.action_1_count = 0
def action_0(self, item):
test.assertEquals(item["action"], "action_0")
self.action_0_count += 1
def action_1(self, item):
test.assertEquals(item["action"], "action_1")
self.action_1_count += 1
handler = ReaderTestHandler()
reader.handle_log(reader.read(f), handler)
self.assertEquals(handler.action_0_count, 1)
self.assertEquals(handler.action_1_count, 1)
if __name__ == "__main__":
unittest.main()