module documentation

Utilities for unit testing reactor implementations.

The main feature of this module is ReactorBuilder, a base class for use when writing interface/blackbox tests for reactor implementations. Test case classes for reactor features should subclass ReactorBuilder instead of SynchronousTestCase. All of the features of SynchronousTestCase will be available. Additionally, the tests will automatically be applied to all available reactor implementations.

Class ReactorBuilder SynchronousTestCase mixin which provides a reactor-creation API. This mixin defines setUp and tearDown, so mix it in before SynchronousTestCase or call its methods from the overridden ones in the subclass.
Exception TestTimeoutError The reactor was still running after the timeout period elapsed in ReactorBuilder.runReactor.
Function asyncioSelectorReactor Make a new asyncio reactor associated with a new event loop.
Function needsRunningReactor Various functions within these tests need an already-running reactor at some point. They need to stop the reactor when the test has completed, and that means calling reactor.stop(). However, reactor.
Function stopOnError Stop the reactor as soon as any error is logged on the given publisher.
def asyncioSelectorReactor(self): (source)

Make a new asyncio reactor associated with a new event loop.

The test suite prefers this constructor because having a new event loop for each reactor provides better test isolation. The real constructor prefers to re-use (or create) a global loop because of how this interacts with other asyncio-based libraries and applications (though maybe it shouldn't).

Parameters
self:objectThe ReactorBuilder subclass this is being called on. We don't use this parameter but we get called with it anyway.
Returns
asyncioreactor.AsyncioSelectorReactorUndocumented
def needsRunningReactor(reactor, thunk): (source)

Various functions within these tests need an already-running reactor at some point. They need to stop the reactor when the test has completed, and that means calling reactor.stop(). However, reactor.stop() raises an exception if the reactor isn't already running, so if the Deferred that a particular API under test returns fires synchronously (as especially an endpoint's connect() method may do, if the connect is to a local interface address) then the test won't be able to stop the reactor being tested and finish. So this calls thunk only once reactor is running.

(This is just an alias for twisted.internet.interfaces.IReactorCore.callWhenRunning on the given reactor parameter, in order to centrally reference the above paragraph and repeating it everywhere as a comment.)

Parameters
reactorthe twisted.internet.interfaces.IReactorCore under test
thunka 0-argument callable, which eventually finishes the test in question, probably in a Deferred callback.
def stopOnError(case, reactor, publisher=None): (source)

Stop the reactor as soon as any error is logged on the given publisher.

This is beneficial for tests which will wait for a Deferred to fire before completing (by passing or failing). Certain implementation bugs may prevent the Deferred from firing with any result at all (consider a protocol's {dataReceived} method that raises an exception: this exception is logged but it won't ever cause a Deferred to fire). In that case the test would have to complete by timing out which is a much less desirable outcome than completing as soon as the unexpected error is encountered.

Parameters
caseA SynchronousTestCase to use to clean up the necessary log observer when the test is over.
reactorThe reactor to stop.
publisherA LogPublisher to watch for errors. If None, the global log publisher will be watched.