Flow-based Programming and Ability writing

Abilities might be run in parallel, using ThreadedAbilityBase subclasses. Yet, they run unbeknownst to each other, save for the parent Ability that know them all.

PacketWeaver offers syntactic sugars for you to write Abilities that communicate with each others, improving on the classic shell pipe syntax.

Doing it all by hand

You may add by hand input and output pipes to any ThreadedAbilityBase subclass instance. These pipes are multiprocessing.Pipe instances. As such, adding such pipes may be done like this:

inst = self.get_dependency('example')
outputp, inputp = multiprocessing.Pipe()
outputp2, inputp2 = multiprocessing.Pipe()
inst.add_in_pipe(inputp)
inst.add_out_pipe(outputp2)

self._start_wait_and_stop([inst])

Once set up like this, the parent Ability may write into outputp and read from inputp2.

Warning

you cannot add pipes to an already started Ability hitherto.

Reading and writing from an Ability

Reading

For the point of view of the Ability refered to by the pet name example, input data may be read from the standard input by calling the built-in _recv() instance method:

def main(self):
   p = self._recv()

The _recv method consumes the input data as datagrams, not streams. Moreover, you may receive all types of pickable data, which means that in the previous example, p might be a full-fledged Python object!

Warning

the _recv method is blocking, kernel-wise. This is a problem because well-written Abilities must keep aware of the stop signal. For this reason, the standard way of reading the standard input of an Ability is to write a code similar to this one:

def main(self):
    try:
        while not self._is_stopped():
            if self._poll(0.1):
                p = self._recv()
                # Do something with p
    except (EOFError, IOError):
        pass

The _poll method is similar to the Kernel poll syscall. It monitors whether there is a datagram to be read on the standard input, and it times out after a certain delay (100 miliseconds in the previous example). The previous example code thus ensures that this Ability checks at least every 100 ms that a stop signal was received.

Writing

Writing to the standard output of an Ability is relatively simple. You may simply call the _send method with any Pickable Python object as argument:

def main(self):
    self._send('abc')

Caution

_send might be blocking at times, which is in violations of the code of conduct of well-written Abilities… This is a known limitation.

Using the Pipe Syntactic Sugar

Whenever an Ability purpose is to orchestrate multiple Abilities, it owns the references to multiple Abilities objects:

inst1 = self.get_dependency('example1')
inst2 = self.get_dependency('example2')
inst3 = self.get_dependency('example3')

Configuring the pipes between these Abilities by hand might be cumbersome. For this reason, PacketWeaver ships a syntactic sugar similar to shell pipes.

To pipe the standard output of the first Ability to the standard input of the second Ability, one can write the following Python expression:

inst1 | inst2

We use the fluent design pattern, so that one may write:

inst1 | inst2 | inst3 | inst1

Note

In the previous example, inst1 is listed twice into the pipeline. This is the same instance of the same Ability. This line means that the standard output of inst1 is piped into inst2, and that the standard output of inst3 is piped into the standard input of inst1. This enables developers to write pipelines of Abilities that are cyclic in a much easier way that it is generally possible in shell (e.g. netcat pipelines with named FIFOs)

Multiple Inputs and Outputs

While in shell, it is possible to pipe multiple scripts output into a script standard input, it requires some tricks, for instance using named FIFOs. With Packetweaver, multiple inputs and multiple outputs are seemless:

def main(self):
    inst1 = self.get_dependency('example1')
    inst2 = self.get_dependency('example2')
    inst3 = self.get_dependency('example3')
    inst4 = self.get_dependency('example4')
    inst5 = self.get_dependency('example5')

    inst1 | inst2 | inst3
    inst4 | inst2 | inst5

    self._start_wait_and_stop([inst1, inst2, inst3, inst4, inst5])

In the previous example, inst2 is part of two pipeline declarations. However Packetweaver interprets this as: inst2 standard input is composed of a round robin read from inst1 and inst4, and inst2 standard output is broadcast to inst3 and inst5.

On Detecting Source and Sink Conditions

An Ability may dynamically discover if there are other Abilities that are piped to its standard input by calling self._is_source(). If True, this Ability has currently no input pipes.

Similarly, an Ability may discover if other Abilities subscribed to its standard output by calling self._is_sink().

Transfering Pipes

Sometimes, the sole purpose of some component Abilities is to instanciate other component Abilities and set up the pipelines. If that orchestrating Ability has standard input and standard output pipes, it would be cumbersome to transfer by hand all input and output messages to the pipeline. For this reason, Packetweaver enables an Ability developer to specify that all input or output pipes are to be transfered from the current Ability to another Ability. This is performed using the self._transfer_in(otherAbilityInstance) and self._transfer_out(otherAbilityInstance) methods.

The following Ability sets up such a pipeline:

def main(self):
    inst1 = self.get_dependency('example1')
    inst2 = self.get_dependency('example2')
    inst3 = self.get_dependency('example3')

    inst1 | inst2 | inst3

    self._transfer_in(inst1)
    self._transfer_out(inst3)

    self._start_wait_and_stop([inst1, inst2, inst3])