Viewing file: test_worker.py (14.67 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details.
""" Test for distributed trial worker side. """
import os
from zope.interface.verify import verifyObject
from twisted.trial.reporter import TestResult from twisted.trial.unittest import TestCase from twisted.trial._dist.worker import ( LocalWorker, LocalWorkerAMP, LocalWorkerTransport, WorkerProtocol) from twisted.trial._dist import managercommands, workercommands
from twisted.scripts import trial from twisted.test.proto_helpers import StringTransport
from twisted.internet.interfaces import ITransport, IAddress from twisted.internet.defer import fail, succeed from twisted.internet.main import CONNECTION_DONE from twisted.internet.error import ConnectionDone from twisted.python.compat import NativeStringIO as StringIO, unicode from twisted.python.reflect import fullyQualifiedName from twisted.python.failure import Failure from twisted.protocols.amp import AMP
class FakeAMP(AMP): """ A fake amp protocol. """
class WorkerProtocolTests(TestCase): """ Tests for L{WorkerProtocol}. """
def setUp(self): """ Set up a transport, a result stream and a protocol instance. """ self.serverTransport = StringTransport() self.clientTransport = StringTransport() self.server = WorkerProtocol() self.server.makeConnection(self.serverTransport) self.client = FakeAMP() self.client.makeConnection(self.clientTransport)
def test_run(self): """ Calling the L{workercommands.Run} command on the client returns a response with C{success} sets to C{True}. """ d = self.client.callRemote(workercommands.Run, testCase=b"doesntexist")
def check(result): self.assertTrue(result['success'])
d.addCallback(check) self.server.dataReceived(self.clientTransport.value()) self.clientTransport.clear() self.client.dataReceived(self.serverTransport.value()) self.serverTransport.clear() return d
def test_start(self): """ The C{start} command changes the current path. """ curdir = os.path.realpath(os.path.curdir) self.addCleanup(os.chdir, curdir) self.server.start('..') self.assertNotEqual(os.path.realpath(os.path.curdir), curdir)
class LocalWorkerAMPTests(TestCase): """ Test case for distributed trial's manager-side local worker AMP protocol """
def setUp(self): self.managerTransport = StringTransport() self.managerAMP = LocalWorkerAMP() self.managerAMP.makeConnection(self.managerTransport) self.result = TestResult() self.workerTransport = StringTransport() self.worker = AMP() self.worker.makeConnection(self.workerTransport)
config = trial.Options() self.testName = b"twisted.doesnexist" config['tests'].append(self.testName) self.testCase = trial._getSuite(config)._tests.pop()
self.managerAMP.run(self.testCase, self.result) self.managerTransport.clear()
def pumpTransports(self): """ Sends data from C{self.workerTransport} to C{self.managerAMP}, and then data from C{self.managerTransport} back to C{self.worker}. """ self.managerAMP.dataReceived(self.workerTransport.value()) self.workerTransport.clear() self.worker.dataReceived(self.managerTransport.value())
def test_runSuccess(self): """ Run a test, and succeed. """ results = []
d = self.worker.callRemote(managercommands.AddSuccess, testName=self.testName) d.addCallback(lambda result: results.append(result['success'])) self.pumpTransports()
self.assertTrue(results)
def test_runExpectedFailure(self): """ Run a test, and fail expectedly. """ results = []
d = self.worker.callRemote(managercommands.AddExpectedFailure, testName=self.testName, error=b'error', todo=b'todoReason') d.addCallback(lambda result: results.append(result['success'])) self.pumpTransports()
self.assertEqual(self.testCase, self.result.expectedFailures[0][0]) self.assertTrue(results)
def test_runError(self): """ Run a test, and encounter an error. """ results = [] errorClass = fullyQualifiedName(ValueError) d = self.worker.callRemote(managercommands.AddError, testName=self.testName, error=b'error', errorClass=errorClass.encode("ascii"), frames=[]) d.addCallback(lambda result: results.append(result['success'])) self.pumpTransports()
self.assertEqual(self.testCase, self.result.errors[0][0]) self.assertTrue(results)
def test_runErrorWithFrames(self): """ L{LocalWorkerAMP._buildFailure} recreates the C{Failure.frames} from the C{frames} argument passed to C{AddError}. """ results = [] errorClass = fullyQualifiedName(ValueError) d = self.worker.callRemote(managercommands.AddError, testName=self.testName, error=b'error', errorClass=errorClass.encode("ascii"), frames=[b"file.py", b"invalid code", b"3"]) d.addCallback(lambda result: results.append(result['success'])) self.pumpTransports()
self.assertEqual(self.testCase, self.result.errors[0][0]) self.assertEqual( [(b'file.py', b'invalid code', 3, [], [])], self.result.errors[0][1].frames) self.assertTrue(results)
def test_runFailure(self): """ Run a test, and fail. """ results = [] failClass = fullyQualifiedName(RuntimeError) d = self.worker.callRemote(managercommands.AddFailure, testName=self.testName, fail=b'fail', failClass=failClass.encode("ascii"), frames=[]) d.addCallback(lambda result: results.append(result['success'])) self.pumpTransports()
self.assertEqual(self.testCase, self.result.failures[0][0]) self.assertTrue(results)
def test_runSkip(self): """ Run a test, but skip it. """ results = []
d = self.worker.callRemote(managercommands.AddSkip, testName=self.testName, reason=b'reason') d.addCallback(lambda result: results.append(result['success'])) self.pumpTransports()
self.assertEqual(self.testCase, self.result.skips[0][0]) self.assertTrue(results)
def test_runUnexpectedSuccesses(self): """ Run a test, and succeed unexpectedly. """ results = []
d = self.worker.callRemote(managercommands.AddUnexpectedSuccess, testName=self.testName, todo=b'todo') d.addCallback(lambda result: results.append(result['success'])) self.pumpTransports()
self.assertEqual(self.testCase, self.result.unexpectedSuccesses[0][0]) self.assertTrue(results)
def test_testWrite(self): """ L{LocalWorkerAMP.testWrite} writes the data received to its test stream. """ results = [] stream = StringIO() self.managerAMP.setTestStream(stream)
d = self.worker.callRemote(managercommands.TestWrite, out=b"Some output") d.addCallback(lambda result: results.append(result['success'])) self.pumpTransports()
self.assertEqual("Some output\n", stream.getvalue()) self.assertTrue(results)
def test_stopAfterRun(self): """ L{LocalWorkerAMP.run} calls C{stopTest} on its test result once the C{Run} commands has succeeded. """ result = object() stopped = []
def fakeCallRemote(command, testCase): return succeed(result)
self.managerAMP.callRemote = fakeCallRemote
class StopTestResult(TestResult):
def stopTest(self, test): stopped.append(test)
d = self.managerAMP.run(self.testCase, StopTestResult()) self.assertEqual([self.testCase], stopped) return d.addCallback(self.assertIdentical, result)
class FakeAMProtocol(AMP): """ A fake implementation of L{AMP} for testing. """ id = 0 dataString = ""
def dataReceived(self, data): self.dataString += data
def setTestStream(self, stream): self.testStream = stream
class FakeTransport(object): """ A fake process transport implementation for testing. """ dataString = "" calls = 0
def writeToChild(self, fd, data): if isinstance(self.dataString, unicode) and isinstance(data, bytes): data = data.decode("utf-8") self.dataString += data
def loseConnection(self): self.calls += 1
class LocalWorkerTests(TestCase): """ Tests for L{LocalWorker} and L{LocalWorkerTransport}. """
def test_childDataReceived(self): """ L{LocalWorker.childDataReceived} forwards the received data to linked L{AMP} protocol if the right file descriptor, otherwise forwards to C{ProcessProtocol.childDataReceived}. """ fakeTransport = FakeTransport() localWorker = LocalWorker(FakeAMProtocol(), '.', 'test.log') localWorker.makeConnection(fakeTransport) localWorker._outLog = StringIO() localWorker.childDataReceived(4, "foo") localWorker.childDataReceived(1, "bar") self.assertEqual("foo", localWorker._ampProtocol.dataString) self.assertEqual("bar", localWorker._outLog.getvalue())
def test_outReceived(self): """ L{LocalWorker.outReceived} logs the output into its C{_outLog} log file. """ fakeTransport = FakeTransport() localWorker = LocalWorker(FakeAMProtocol(), '.', 'test.log') localWorker.makeConnection(fakeTransport) localWorker._outLog = StringIO() data = "The quick brown fox jumps over the lazy dog" localWorker.outReceived(data) self.assertEqual(data, localWorker._outLog.getvalue())
def test_errReceived(self): """ L{LocalWorker.errReceived} logs the errors into its C{_errLog} log file. """ fakeTransport = FakeTransport() localWorker = LocalWorker(FakeAMProtocol(), '.', 'test.log') localWorker.makeConnection(fakeTransport) localWorker._errLog = StringIO() data = "The quick brown fox jumps over the lazy dog" localWorker.errReceived(data) self.assertEqual(data, localWorker._errLog.getvalue())
def test_write(self): """ L{LocalWorkerTransport.write} forwards the written data to the given transport. """ transport = FakeTransport() localTransport = LocalWorkerTransport(transport) data = "The quick brown fox jumps over the lazy dog" localTransport.write(data) self.assertEqual(data, transport.dataString)
def test_writeSequence(self): """ L{LocalWorkerTransport.writeSequence} forwards the written data to the given transport. """ transport = FakeTransport() localTransport = LocalWorkerTransport(transport) data = ("The quick ", "brown fox jumps ", "over the lazy dog") localTransport.writeSequence(data) self.assertEqual("".join(data), transport.dataString)
def test_loseConnection(self): """ L{LocalWorkerTransport.loseConnection} forwards the call to the given transport. """ transport = FakeTransport() localTransport = LocalWorkerTransport(transport) localTransport.loseConnection()
self.assertEqual(transport.calls, 1)
def test_connectionLost(self): """ L{LocalWorker.connectionLost} closes the log streams. """
class FakeStream(object): callNumber = 0
def close(self): self.callNumber += 1
transport = FakeTransport() localWorker = LocalWorker(FakeAMProtocol(), '.', 'test.log') localWorker.makeConnection(transport) localWorker._outLog = FakeStream() localWorker._errLog = FakeStream() localWorker.connectionLost(None) self.assertEqual(localWorker._outLog.callNumber, 1) self.assertEqual(localWorker._errLog.callNumber, 1)
def test_processEnded(self): """ L{LocalWorker.processEnded} calls C{connectionLost} on itself and on the L{AMP} protocol. """
class FakeStream(object): callNumber = 0
def close(self): self.callNumber += 1
transport = FakeTransport() protocol = FakeAMProtocol() localWorker = LocalWorker(protocol, '.', 'test.log') localWorker.makeConnection(transport) localWorker._outLog = FakeStream() localWorker.processEnded(Failure(CONNECTION_DONE)) self.assertEqual(localWorker._outLog.callNumber, 1) self.assertIdentical(None, protocol.transport) return self.assertFailure(localWorker.endDeferred, ConnectionDone)
def test_addresses(self): """ L{LocalWorkerTransport.getPeer} and L{LocalWorkerTransport.getHost} return L{IAddress} objects. """ localTransport = LocalWorkerTransport(None) self.assertTrue(verifyObject(IAddress, localTransport.getPeer())) self.assertTrue(verifyObject(IAddress, localTransport.getHost()))
def test_transport(self): """ L{LocalWorkerTransport} implements L{ITransport} to be able to be used by L{AMP}. """ localTransport = LocalWorkerTransport(None) self.assertTrue(verifyObject(ITransport, localTransport))
def test_startError(self): """ L{LocalWorker} swallows the exceptions returned by the L{AMP} protocol start method, as it generates unnecessary errors. """
def failCallRemote(command, directory): return fail(RuntimeError("oops"))
transport = FakeTransport() protocol = FakeAMProtocol() protocol.callRemote = failCallRemote localWorker = LocalWorker(protocol, '.', 'test.log') localWorker.makeConnection(transport)
self.assertEqual([], self.flushLoggedErrors(RuntimeError))
|