"""
This module consists of functions that split a single input stream
into multiple output streams.
Functions in the module:
1. element_split_agent
2. split_stream
3. separate_agent
4. separate
5. unzip_agent
6. unzip_stream
Split functions:
1. split_stream: a function returns a list with each element placed
in a different stream.
2. separate: separate is the inverse of mix (see merge.py). The
elements of the input stream are pairs (i, v) and the value v is
placed on the i-th output stream.
3. unzip_stream: unzip_stream is the inverse of zip_stream (see
merge.py). The elements of the input stream are lists and the
i-th element of the list is placed on the i-th output stream.
Agents:
1. element_split_agent: agent used by split_stream
2. separate_agent: agent used by separate
3. unzip_agent: not used by any function in this module. It is
retained only for backward compatibility.
"""
import sys
import os
# sys.path.append(os.path.abspath("../"))
from ..agent import Agent, InList
from ..stream import Stream, StreamArray
from ..stream import _no_value, _multivalue, _close
from ..helper_functions.check_agent_parameter_types import *
from ..helper_functions.recent_values import recent_values
#-----------------------------------------------------------------------
# SPLIT: SINGLE INPUT STREAM, LIST OF OUTPUT STREAMS
#-----------------------------------------------------------------------
[docs]def element_split_agent(func, in_stream, out_streams, state=None,
call_streams=None, name=None,
*args, **kwargs):
"""
Parameters
----------
func: function
function from an input list and args and kwargs to
an output list
in_stream: Stream
The single input stream of the agent
out_streams: list of Stream
The list of output streams of the agent
state: object
The state of the agent
call_streams: list of Stream
The list of call_streams. A new value in any stream in this
list causes a state transition of this agent.
name: str
Name of the agent created by this function.
Returns
-------
Agent.
The agent created by this function.
"""
check_split_agent_arguments(func, in_stream, out_streams, call_streams, name)
check_num_args_in_func(state, name, func, args, kwargs)
num_out_streams = len(out_streams)
num_in_streams = 1
# The transition function for this agent.
def transition(in_lists, state):
check_in_lists_type(name, in_lists, num_in_streams)
in_list = in_lists[0]
input_list = in_list.list[in_list.start:in_list.stop]
# If the new input data is empty then return an empty list for
# the single output stream, and leave the state and the starting
# point for the single input stream unchanged.
if not input_list or len(input_list) == 0:
return ([[]]*num_out_streams, state, [in_list.start])
if state is None:
output_snapshots = \
[func(element, *args, **kwargs) for element in input_list]
else:
output_snapshots = [[]]*len(input_list)
for i in range(len(input_list)):
output_snapshots[i], state = \
func(input_list[i], state, *args, **kwargs)
check_func_output_for_multiple_streams(func, name, num_out_streams,
output_snapshots)
add_to_output_streams = [list(snapshot) for snapshot in zip(*output_snapshots)]
return (add_to_output_streams, state, [in_list.start+len(input_list)])
# Finished transition
# Create agent
return Agent([in_stream], out_streams, transition, state, call_streams, name)
[docs]def split_stream(function, in_stream, num_out_streams, state=None, *args,
**kwargs):
"""
split_stream returns out_streams, a list of num_out_streams
streams. The function, with the specified state, args and kwargs,
is applied to the elements of the input stream. The return value
of the function must be a list of length num_out_streams. The i-th
value of the returned list is placed in the i-th output stream.
Parameters
----------
in_stream: Stream
The stream that will be split
num_out_streams: int
The number of output streams.
state: object
function operates on a state, args, and kwargs
Returns
-------
out_streams: List of Stream
The output streams generated by split_stream
Uses
-------
* element_split_agent
"""
out_streams = [Stream() for _ in range(num_out_streams)]
element_split_agent(
function, in_stream, out_streams, state, *args, **kwargs)
return out_streams
#-----------------------------------------------------------------------
# SEPARATE: SINGLE INPUT STREAMS, LIST OF OUTPUT STREAM
#-----------------------------------------------------------------------
[docs]def separate_agent(in_stream, out_streams, name=None):
"""
Parameters
----------
in_stream: Stream
The single input stream of the agent
out_streams: list of Stream
The list of output streams of the agent
name: str
Name of the agent created by this function.
Returns
-------
Agent.
The agent created by this function.
"""
def f(x):
j, v = x
lst = [_no_value] * len(out_streams)
lst[j] = v
return lst
return element_split_agent(
func=f, in_stream=in_stream, out_streams=out_streams, name=name)
[docs]def separate(in_stream, num_out_streams):
"""
separate returns out_streams, a list of num_out_streams
streams. separate is the inverse of mix (see merge.py). The
elements of the input stream are pairs (i, v) and the value v is
placed on the i-th output stream.
Parameters
----------
in_stream: Stream
The stream that will be split
num_out_streams: int
The number of output streams.
Returns
-------
out_streams: List of Stream
The output streams generated by split_stream
Uses
-------
* separate_agent
"""
out_streams = [Stream() for _ in range(num_out_streams)]
separate_agent(in_stream, out_streams)
return out_streams
#-----------------------------------------------------------------------
# UNZIP: SINGLE INPUT STREAMS, LIST OF OUTPUT STREAM
#-----------------------------------------------------------------------
[docs]def unzip_agent(in_stream, out_streams, name=None):
"""
Parameters
----------
in_stream: Stream
The single input stream of the agent
out_streams: list of Stream
The list of output streams of the agent
name: str
Name of the agent created by this function.
Returns
-------
Agent.
The agent created by this function.
Note
----
Not used by any function in this module.
Used by external modules
"""
def f(lst):
if len(lst) < len(out_streams):
lst.extend([None] * (len(out_streams) - len(lst)))
elif len(lst) > len(out_streams):
lst = lst[0:len(out_streams)]
return lst
element_split_agent(f, in_stream, out_streams)
[docs]def unzip_stream(input_stream, num_out_streams):
"""
unzip_stream returns out_streams, a list of num_out_streams
streams. unzip_stream is the inverse of zip_stream (see
merge.py). The elements of the input stream are lists of length
num_out_streams and the i-th element of the list is placed on the
i-th output stream.
Parameters
----------
in_stream: Stream
The stream that will be split
num_out_streams: int
The number of output streams.
Returns
-------
out_streams: List of Stream
The output streams generated by split_stream
Uses
-------
* element_split_agent
"""
def f(lst):
if len(lst) < len(out_streams):
lst.extend([None] * (len(out_streams) - len(lst)))
elif len(lst) > len(out_streams):
lst = lst[0:len(out_streams)]
return lst
out_streams = [Stream() for _ in range(num_out_streams)]
element_split_agent(f, input_stream, out_streams)
return out_streams
[docs]def timed_unzip(in_stream, num_out_streams):
"""
timed_unzip returns out_streams which is a list of num_out_streams
streams. timed_unzip is the inverse of timed_zip (see
merge.py). The elements of the input stream are pairs (t, v) where
v is a list of length num_out_streams. The i-th element of the
list, with the timestamp t is placed on the i-th output stream if
and only if v is not None.
Parameters
----------
in_stream: Stream
The stream that will be split
num_out_streams: int
The number of output streams.
Returns
-------
out_streams: List of Stream
The output streams generated by split_stream
Uses
-------
* split_stream
"""
def f_time_unzip(timed_element):
timestamp, messages = timed_element
if len(messages) > num_out_streams:
messages = messages[:num_out_streams]
elif len(messages) < num_out_streams:
messages.extend([None]*(num_out_streams - len(messages)))
output_list = [
_no_value if message is None else (timestamp, message) for
message in messages]
return output_list
out_streams = split_stream(
f_time_unzip, in_stream, num_out_streams)
return out_streams
#------------------------------------------------------------------------------------------------
# SPLIT TESTS
#------------------------------------------------------------------------------------------------
[docs]def test_element_agents():
s = Stream('s')
u = Stream('u')
v = Stream('v')
w = Stream('w')
y = Stream('y')
z = Stream('z')
# Test split
# func operates on a single element of the single input stream and
# return a list of elements, one for each output stream.
def h(element):
return [element+1, element*2]
def h_args(element, addend, multiplier):
return [element+addend, element*multiplier]
in_stream_split = Stream('in_stream_split')
r = Stream('r')
t = Stream('t')
e = element_split_agent(func=h, in_stream=in_stream_split,
out_streams=[r, t], name='e')
r_split, t_split = split_stream(function=h, in_stream=in_stream_split,
num_out_streams=2, )
r_args, t_args = split_stream(
h_args, in_stream_split, 2, addend=1, multiplier=2)
assert recent_values(r) == []
assert recent_values(t) == []
assert recent_values(r_split) == recent_values(r)
assert recent_values(t_split) == recent_values(t)
assert recent_values(r_args) == recent_values(r)
assert recent_values(t_args) == recent_values(t)
in_stream_split.extend(range(5))
assert recent_values(r) == [1, 2, 3, 4, 5]
assert recent_values(t) == [0, 2, 4, 6, 8]
assert recent_values(r_split) == recent_values(r)
assert recent_values(t_split) == recent_values(t)
assert recent_values(r_args) == recent_values(r)
assert recent_values(t_args) == recent_values(t)
in_stream_split.append(10)
assert recent_values(r) == [1, 2, 3, 4, 5, 11]
assert recent_values(t) == [0, 2, 4, 6, 8, 20]
in_stream_split.extend([20, 100])
assert recent_values(r) == [1, 2, 3, 4, 5, 11, 21, 101]
assert recent_values(t) == [0, 2, 4, 6, 8, 20, 40, 200]
assert recent_values(r_split) == recent_values(r)
assert recent_values(t_split) == recent_values(t)
assert recent_values(r_args) == recent_values(r)
assert recent_values(t_args) == recent_values(t)
# Test split with kwargs
def f_list(element, list_of_functions):
return [f(element) for f in list_of_functions]
def f_0(element):
return element*2
def f_1(element):
return element+10
x = Stream('x')
rr = Stream('rr')
tt = Stream('tt')
ee = element_split_agent(func=f_list, in_stream=x, out_streams=[rr, tt], name='ee',
list_of_functions=[f_0, f_1])
x.extend(range(5))
assert recent_values(rr) == [0, 2, 4, 6, 8]
assert recent_values(tt) == [10, 11, 12, 13, 14]
# ------------------------------------
# Test split with state
# func operates on an element of the single input stream and state.
# func returns a list with one element for each output stream.
def h_state(element, state):
return ([element+state, element*state], state+1)
r_state = Stream(name='r_state')
t_state = Stream(name='t_state')
in_stream_split_state = Stream('in_stream_split_state')
e_state = element_split_agent(
func=h_state, in_stream=in_stream_split_state,
out_streams=[r_state, t_state], name='e', state=0)
assert r_state.recent[:r_state.stop] == []
assert t_state.recent[:t_state.stop] == []
in_stream_split_state.extend(range(5))
assert r_state.recent[:r_state.stop] == [0, 2, 4, 6, 8]
assert t_state.recent[:t_state.stop] == [0, 1, 4, 9, 16]
in_stream_split_state.append(20)
assert r_state.recent[:r_state.stop] == [0, 2, 4, 6, 8, 25]
assert t_state.recent[:t_state.stop] == [0, 1, 4, 9, 16, 100]
in_stream_split_state.extend([44, 93])
assert r_state.recent[:r_state.stop] == [0, 2, 4, 6, 8, 25, 50, 100]
assert t_state.recent[:t_state.stop] == [0, 1, 4, 9, 16, 100, 264, 651]
# ------------------------------------
# Test split with state and args
def hh_state(element, state, increment):
return ([element+state, element*state], state+increment)
rr_state = Stream(name='rr_state')
tt_state = Stream(name='tt_state')
in_stream_split_state_funcargs = Stream('in_stream_split_state_funcargs')
ee_state_agent = element_split_agent(
func=hh_state,
in_stream=in_stream_split_state_funcargs,
out_streams=[rr_state, tt_state],
name='ee_state_agent', state=0, increment=10)
assert rr_state.stop == 0
assert tt_state.stop == 0
in_stream_split_state_funcargs.extend(range(5))
assert rr_state.recent[:rr_state.stop] == [0, 11, 22, 33, 44]
assert tt_state.recent[:tt_state.stop] == [0, 10, 40, 90, 160]
#------------------------------------------------------------------------------------------------
# UNZIP AGENT TESTS
#------------------------------------------------------------------------------------------------
s_unzip = Stream('s_unzip')
u_unzip = Stream('u_unzip')
x_unzip = Stream('x_unzip')
# ------------------------------------
# Test unzip
unzip_agent(in_stream=s_unzip, out_streams=[x_unzip, u_unzip])
d_unzip_fn = unzip_stream(s_unzip, 2)
s_unzip.extend([(1,10), (2,15), (3,18)])
assert x_unzip.stop == 3
assert u_unzip.stop == 3
assert x_unzip.recent[:3] == [1, 2, 3]
assert u_unzip.recent[:3] == [10, 15, 18]
assert d_unzip_fn[0].recent[:3] == x_unzip.recent[:3]
assert d_unzip_fn[1].recent[:3] == u_unzip.recent[:3]
s_unzip.extend([(37,96)])
assert x_unzip.stop == 4
assert u_unzip.stop == 4
assert x_unzip.recent[:4] == [1, 2, 3, 37]
assert u_unzip.recent[:4] == [10, 15, 18, 96]
assert d_unzip_fn[0].recent[:4] == x_unzip.recent[:4]
assert d_unzip_fn[1].recent[:4] == u_unzip.recent[:4]
#------------------------------------------------------------------------------------------------
# SEPARATE AGENT TESTS
#------------------------------------------------------------------------------------------------
s_separate = Stream('s separate')
u_separate = Stream('u separate')
x_separate = Stream('x separate')
d_separate = separate_agent(
in_stream=s_separate, out_streams=[x_separate,u_separate],
name='d separate')
x_sep_func, u_sep_func = separate(s_separate, 2)
s_separate.extend([(0,10), (1,15), (0,20)])
assert x_separate.stop == 2
assert u_separate.stop == 1
assert x_separate.recent[:2] == [10, 20]
assert u_separate.recent[:1] == [15]
assert x_sep_func.recent == x_separate.recent
assert u_sep_func.recent == u_separate.recent
s_separate.extend([(1,96)])
assert x_separate.stop == 2
assert u_separate.stop == 2
assert x_separate.recent[:2] == [10, 20]
assert u_separate.recent[:2] == [15, 96]
assert x_sep_func.recent == x_separate.recent
assert u_sep_func.recent == u_separate.recent
# timed_unzip tests
t_unzip = Stream()
t_unzip.extend(
[(1, ["A", None]), (5, ["B", "a"]), (7, [None, "b"]),
(9, ["C", "c"]), (10, [None, "d"])])
t_unzip_0, t_unzip_1 = timed_unzip(t_unzip, 2)
assert recent_values(t_unzip_0) == [(1, 'A'), (5, 'B'), (9, 'C')]
assert recent_values(t_unzip_1) == [(5, 'a'), (7, 'b'), (9, 'c'), (10, 'd')]
if __name__ == '__main__':
test_element_agents()