"""
This module consists of functions that merge multiple input streams
into a single output stream.
Functions in the module:
1. element_merge_agent
2. zip_stream
3. zip_map
4. asynch_merge_agent
5. mix
6. blend_agent
7. blend
Merge functions:
1. zip_stream is similar to zip in Python except that it operates on
streams rather than lists.
2. zip_map is map_stream(zip_stream()), i.e., first zip then map the
result.
3. mix is an asynchronous merge of the input streams. The elements of the
output stream identify the input streams that generated the
elements.
4. blend is a merge followed by a map.
Agents:
1. element_merge_agent is the agent used by zip_stream and zip_map.
2. asynch_merge_agent is the agent used by mix.
3. blend_agent is the agent used by blend.
"""
import sys
import os
# sys.path.append(os.path.abspath("../"))
from ..agent import Agent, InList
from ..stream import Stream, StreamArray
from ..stream import _no_value, _multivalue, _close
from ..helper_functions.check_agent_parameter_types import *
from ..helper_functions.recent_values import recent_values
[docs]def element_merge_agent(func, in_streams, out_stream, state=None,
call_streams=None, name=None,
*args, **kwargs):
"""
Parameters
----------
func: function
function from an input list and args and kwargs to
an output list
in_streams: list of Stream
The list of input streams of the agent
out_stream: Stream
The single output stream of the agent
state: object
The state of the agent
call_streams: list of Stream
The list of call_streams. A new value in any stream in this
list causes a state transition of this agent.
name: str
Name of the agent created by this function.
Returns
-------
Agent.
The agent created by this function.
"""
check_merge_agent_arguments(func, in_streams, out_stream, call_streams, name)
check_num_args_in_func(state, name, func, args, kwargs)
num_in_streams = len(in_streams)
# The transition function for this agent.
def transition(in_lists, state):
check_in_lists_type(name, in_lists, num_in_streams)
# input_snapshots is a list of snapshots.
# Each snapshot is a list containing one element for each
# input stream.
input_snapshots = zip(*[v.list[v.start:v.stop] for v in in_lists])
# If the new input data is empty then return empty lists for
# each output stream, and leave the state and the starting point
# for each input stream unchanged.
if not input_snapshots:
return ([[]], state, [v.start for v in in_lists])
# output_list[i] will be set to the output value
# corresponding to the i-th value in each of the input
# streams
output_list = [ [] for lst in input_snapshots]
for i, snapshot in enumerate(input_snapshots):
assert isinstance(snapshot, list) or isinstance(snapshot, tuple)
if state is None:
output_list[i] = func(snapshot, *args, **kwargs)
else:
output_list[i], state = func(snapshot, state, *args, **kwargs)
if output_list[i] is None: output_list[i] = []
return ([output_list], state, [v.start+len(input_snapshots) for v in in_lists])
# Finished transition
# Create agent
return Agent(in_streams, [out_stream], transition, state, call_streams, name)
[docs]def zip_agent(in_streams, out_stream):
"""
zip_agent zips the input streams and returns values in out_stream
Parameters
----------
in_streams: list of Stream
The list of input streams that are zipped
out_stream: Stream
The Stream to add values to
Uses
-------
* element_merge_agent
"""
element_merge_agent(lambda v: v, in_streams, out_stream)
[docs]def zip_stream(in_streams):
"""
zip_stream returns out_stream, a stream obtained by zipping the
input streams. zip_stream is similar to zip.
Parameters
----------
in_streams: list of Stream
The list of input streams that are zipped
state: object
function operates on a state, args, and kwargs
Returns
-------
out_stream: Stream
The output stream generated by zip_stream
"""
out_stream = Stream('output of zip')
zip_agent(in_streams, out_stream)
return out_stream
[docs]def zip_map(function, in_streams, state=None, *args, **kwargs):
"""
zip_map returns out_stream, a stream obtained by applying function,
with the specified state, args and kwargs, to the elements
obtained by zipping the input streams.
Parameters
----------
in_streams: list of Stream
The list of input streams that are zipped
state: object
function operates on a state, args, and kwargs
Returns
-------
out_stream: Stream
The output stream generated by zip_map
Uses
-------
* element_merge_agent
"""
out_stream = Stream('output of zip')
element_merge_agent(function, in_streams, out_stream,
state, *args, **kwargs)
return out_stream
####################################################
# OPERATIONS ON ASYCHRONOUS INPUT STREAMS
####################################################
[docs]def asynch_merge(in_streams, out_stream):
"""
Parameters
----------
in_streams: list of input streams
out_stream: single output stream
"""
def transition(in_lists, state):
output_list = []
# If the input data is empty, i.e., if v.stop == v.start for all
# v in in_lists, then return empty lists for each output stream,
# and leave the state and the starting point for each input
# stream unchanged.
if all(v.stop <= v.start for v in in_lists):
return ([output_list], state, [v.start for v in in_lists])
# Assert at least one input stream has unprocessed data.
for stream_number, v in enumerate(in_lists):
# v is an in_list
# if v.stop <= v.start then skip this input stream
# because no new messages have arrived on this stream.
if v.stop > v.start:
# In the following,input_list is the list of new values
# on the input stream with index stream_number
input_list = v.list[v.start:v.stop]
# Add each unread element in this input stream, with the
# stream_number, to output_list
for element in input_list:
output_list.append((stream_number, element))
return ([output_list], state, [v.stop for v in in_lists])
# Create agent
Agent(in_streams, [out_stream], transition, name="asynch_merge")
return
[docs]def mix(in_streams):
out_stream = Stream('mix')
asynch_merge(in_streams, out_stream)
return out_stream
[docs]def blend_agent(func, in_streams, out_stream, state=None,
call_streams=None, name=None, *args, **kwargs):
"""
Parameters
----------
func: function
function from an input list and args and kwargs to
an output list
in_streams: list of Stream
The list of input streams of the agent
out_stream: Stream
The single output stream of the agent
state: object
The state of the agent
call_streams: list of Stream
The list of call_streams. A new value in any stream in this
list causes a state transition of this agent.
name: str
Name of the agent created by this function.
Returns
-------
Agent.
The agent created by this function.
"""
def transition(in_lists, state):
output_list = []
# If the input data is empty, i.e., if v.stop == v.start for all
# v in in_lists, then return empty lists for each output stream,
# and leave the state and the starting point for each input
# stream unchanged.
if all(v.stop <= v.start for v in in_lists):
return ([output_list], state, [v.start for v in in_lists])
# Assert at least one input stream has unprocessed data.
for stream_number, v in enumerate(in_lists):
# v is an in_list
# if v.stop <= v.start then skip this input stream
# because no new messages have arrived on this stream.
if v.stop > v.start:
# In the following,input_list is the list of new values
# on the input stream with index stream_number
input_list = v.list[v.start:v.stop]
# Add each unread element in this input stream, with the
# stream_number, to output_list
for element in input_list:
output_list.append(func(element, *args, **kwargs))
return ([output_list], state, [v.stop for v in in_lists])
# Create agent
Agent(in_streams, [out_stream], transition, name="asynch_merge")
return
[docs]def blend(function, in_streams, state=None, *args, **kwargs):
out_stream = Stream('output of blend')
blend_agent(function, in_streams, out_stream, state, *args, **kwargs)
return out_stream
#-----------------------------------------------------------------------
# MANY: LIST OF INPUT STREAMS, LIST OF OUTPUT STREAMS
#-----------------------------------------------------------------------
[docs]def element_many_agent(func, in_streams, out_streams, state=None,
call_streams=None, name=None,
*args, **kwargs):
"""
Parameters
----------
func: function
function from an input list and args and kwargs to
an output list
in_streams: list of Stream
The input streams of the agent
out_streams: list of Stream
The output streams of the agent
state: object
The state of the agent
call_streams: list of Stream
The list of call_stream. A new value in any stream in this
list causes a state transition of this agent.
name: str
Name of the agent created by this function.
Returns
-------
Agent.
The agent created by this function.
"""
check_many_agent_arguments(func, in_streams, out_streams, call_streams, name)
check_num_args_in_func(state, name, func, args, kwargs)
num_out_streams = len(out_streams)
num_in_streams = len(in_streams)
# The transition function for this agent.
def transition(in_lists, state):
check_in_lists_type(name, in_lists, num_in_streams)
# input_snapshots is a list of snapshots of the collection of input
# streams.
# Each snapshot in input_snapshots is a list with one value for each
# input stream.
# The j-th snapshot in input_snapshots is the snapshot at the j-th
# time slice in this list of the input streams.
input_snapshots = zip(*[v.list[v.start:v.stop] for v in in_lists])
# If the new input data is empty then return empty lists for
# each output stream, and leave the state and the starting point
# for each input stream unchanged.
if not input_snapshots:
return ([[]]*num_out_streams, state, [v.start for v in in_lists])
# output_snapshots is a list of snapshots of the collection of output
# streams.
# Each snapshot in output_snapshots is a list with one value for each
# output stream.
# The j-th snapshot in output_snapshots is the snapshot at the j-th
# time slice in this list of the output streams.
# output_snapshots[j] will be set to the output value
# corresponding to the j-th value in each of the input
# streams
output_snapshots = [ [] for lst in input_snapshots]
if state is None:
output_snapshots = \
[func(input_snapshot, *args, **kwargs)
for input_snapshot in input_snapshots]
else:
output_snapshots = [[]]*len(input_list)
for i in range(len(input_snapshots)):
output_snapshots[i], state = \
func(input_snapshots[i], state, *args, **kwargs)
check_func_output_for_multiple_streams(func, name, num_out_streams,
output_snapshots)
# output_snapshots[i] is a list whose each elements is a snapshot of the output
# streams: each snapshot is a list with one element for each output stream.
# zip them up to get add_to_output_streamss where add_to_output_streams[j] is
# a list containing the sequence of values to be added to output stream j.
add_to_output_streams = [list(snapshot) for snapshot in zip(*output_snapshots)]
return (add_to_output_streams, state,
[v.start+len(input_snapshots) for v in in_lists])
# Finished transition
# Create agent
return Agent(
in_streams, out_streams, transition, state, call_streams,
name)
[docs]def merge_split(function, in_streams, num_out_streams,
state=None, *args, **kwargs):
out_streams = [Stream() for _ in range(num_out_streams)]
call_streams = None
name=None
element_many_agent(function, in_streams, out_streams, state,
call_streams, name, *args, **kwargs)
return out_streams
#--------------------------------------------------------
[docs]def tests():
s = Stream('s')
u = Stream('u')
x = Stream('x')
# Test merge
# func operates on a list of elements, one element for each input stream.
# func returns a single element of the output stream.
def g(lst): return sum(lst)
d = element_merge_agent(func=g, in_streams=[x,u], out_stream=s, name='d')
zipxu = zip_stream([x,u])
zip_map_xu = zip_map(sum, [x,u])
def g_args(lst, multiplier):
return sum(lst)*multiplier
zip_map_g_args = zip_map(g_args, [x,u], multiplier=2)
ss = Stream('ss')
def genral(lst, f):
return f(lst)
dd = element_merge_agent(
func=genral, in_streams=[x,u], out_stream=ss, name='dd', f=np.mean)
zip_map_ss = zip_map(np.mean, [x,u])
def fff(lst, f, addend):
return f(lst, addend)
def hhh(lst, addend):
return sum(lst)+addend
sss = Stream('sss')
dddd = element_merge_agent(
func=fff, in_streams=[x,u], out_stream=sss, name='ddd', f=hhh,
addend=10)
x.extend(range(3))
u.extend([10, 15, 18])
assert recent_values(s) == [10, 16, 20]
assert recent_values(zip_map_g_args) == [2*v for v in recent_values(s)]
assert recent_values(zipxu) == [(0, 10), (1, 15), (2, 18)]
assert recent_values(ss) == [5, 8, 10]
assert recent_values(zip_map_ss) == [5.0, 8.0, 10.0]
assert recent_values(sss) == [20, 26, 30]
assert recent_values(zip_map_xu) == s.recent[:s.stop]
u.append(37)
x.extend(range(3, 5, 1))
assert recent_values(s) == [10, 16, 20, 40]
assert recent_values(zip_map_g_args) == [2*v for v in recent_values(s)]
assert recent_values(zipxu) == [(0, 10), (1, 15), (2, 18), (3, 37)]
assert recent_values(ss) == [5, 8, 10, 20]
assert recent_values(sss) == [20, 26, 30, 50]
u.extend([96, 95])
assert recent_values(s) == [10, 16, 20, 40, 100]
assert recent_values(zipxu) == [(0, 10), (1, 15), (2, 18), (3, 37), (4, 96)]
assert recent_values(ss) == [5, 8, 10, 20, 50]
assert recent_values(sss) == [20, 26, 30, 50, 110]
# TEST ASYNCH_MERGE AND MIX
x = Stream('x')
y = Stream('y')
z = Stream('z')
in_streams = [x, y]
out_stream = z
asynch_merge(in_streams, out_stream)
mix_z = mix(in_streams)
assert z.stop == 0
assert mix_z.stop == z.stop
assert mix_z.recent[:mix_z.stop] == z.recent[:z.stop]
x.append(10)
assert z.stop == 1
assert z.recent[:z.stop] == [(0, 10)]
assert mix_z.stop == z.stop
assert mix_z.recent[:mix_z.stop] == z.recent[:z.stop]
y.append('A')
assert z.stop == 2
assert z.recent[:z.stop] == [(0, 10), (1, 'A')]
assert mix_z.stop == z.stop
assert mix_z.recent[:mix_z.stop] == z.recent[:z.stop]
y.append('B')
assert z.stop == 3
assert z.recent[:z.stop] == [(0, 10), (1, 'A'), (1, 'B')]
x.append(20)
assert z.stop == 4
assert z.recent[:z.stop] == [(0, 10), (1, 'A'), (1, 'B'), (0, 20)]
assert mix_z.stop == z.stop
assert mix_z.recent[:mix_z.stop] == z.recent[:z.stop]
# Test blend_agent
x = Stream('x')
y = Stream('y')
z = Stream('z')
def double(v): return 2*v
def double_add(v, addend): return 2*v+addend
blend_agent(func=double, in_streams=[x, y], out_stream=z)
blend_z = blend(double, [x, y])
blend_add_z = blend(double_add, [x,y], addend=10)
x.append(1)
assert recent_values(z) == [2]
assert recent_values(blend_z) == recent_values(z)
assert recent_values(blend_add_z) == [v+10 for v in recent_values(z)]
x.extend(range(2,4))
assert recent_values(z) == [2, 4, 6]
assert recent_values(blend_z) == recent_values(z)
assert recent_values(blend_add_z) == [v+10 for v in recent_values(z)]
y.extend(range(100, 102))
assert recent_values(z) == [2, 4, 6, 200, 202]
assert recent_values(blend_z) == recent_values(z)
assert recent_values(blend_add_z) == [v+10 for v in recent_values(z)]
x.extend([10, 20])
assert recent_values(z) == [2, 4, 6, 200, 202, 20, 40]
assert recent_values(blend_z) == recent_values(z)
assert recent_values(blend_add_z) == [v+10 for v in recent_values(z)]
# ------------------------------------
# Test many
# func operates on a list with one element for each input stream.
# func returns a list with one element for each output stream.
def f_many(lst):
return [sum(lst), sum(lst)+1]
u_stream = Stream(name='u_stream')
v_stream = Stream(name='v_stream')
w_stream = Stream(name='w_stream')
x_stream = Stream(name='x_stream')
many_agent = element_many_agent(
func=f_many, in_streams=[u_stream, v_stream], out_streams=[w_stream, x_stream],
name='many_agent')
u_stream.extend(range(5))
v_stream.extend(range(0, 40, 4))
assert recent_values(w_stream) == [0, 5, 10, 15, 20]
assert recent_values(x_stream) == [1, 6, 11, 16, 21]
ww_stream, xx_stream = merge_split(
function=f_many, in_streams=[u_stream, v_stream],
num_out_streams=2)
assert recent_values(ww_stream) == recent_values(w_stream)
assert recent_values(xx_stream) == recent_values(x_stream)
# ------------------------------------
# Test many with args and kwargs
# func operates on a list with one element for each input stream.
# func returns a list with one element for each output stream.
def f_many_args_kwargs(lst, multiplicand, addend):
return [sum(lst)*multiplicand, sum(lst)+addend]
u_args_kwargs_stream = Stream(name='u_args_kwargs_stream')
v_args_kwargs_stream = Stream(name='v_args_kwargs_stream')
w_args_kwargs_stream = Stream(name='w_args_kwargs_stream')
x_args_kwargs_stream = Stream(name='x_args_kwargs_stream')
many_args_kwargs_agent = element_many_agent(
func=f_many_args_kwargs,
in_streams=[u_args_kwargs_stream, v_args_kwargs_stream],
out_streams=[w_args_kwargs_stream, x_args_kwargs_stream],
name='many_args_kwargs_agent', multiplicand=2, addend=10)
ww_args_kwargs_stream, xx_args_kwargs_stream = merge_split(
function=f_many_args_kwargs,
in_streams=[u_args_kwargs_stream, v_args_kwargs_stream],
num_out_streams=2, multiplicand=2, addend=10)
assert (recent_values(ww_args_kwargs_stream) ==
recent_values(w_args_kwargs_stream))
assert (recent_values(xx_args_kwargs_stream) ==
recent_values(x_args_kwargs_stream))
u_args_kwargs_stream.extend(range(5))
v_args_kwargs_stream.extend(range(0, 40, 4))
assert recent_values(w_args_kwargs_stream) == [0, 10, 20, 30, 40]
assert recent_values(x_args_kwargs_stream) == [10, 15, 20, 25, 30]
assert (recent_values(ww_args_kwargs_stream) ==
recent_values(w_args_kwargs_stream))
assert (recent_values(xx_args_kwargs_stream) ==
recent_values(x_args_kwargs_stream))
u_args_kwargs_stream.append(100)
v_args_kwargs_stream.extend(range(40, 80, 4))
assert recent_values(w_args_kwargs_stream) == \
[0, 10, 20, 30, 40, 240]
assert recent_values(x_args_kwargs_stream) == \
[10, 15, 20, 25, 30, 130]
assert (recent_values(ww_args_kwargs_stream) ==
recent_values(w_args_kwargs_stream))
assert (recent_values(xx_args_kwargs_stream) ==
recent_values(x_args_kwargs_stream))
u_args_kwargs_stream.extend([200, 300])
v_args_kwargs_stream.append(100)
assert recent_values(w_args_kwargs_stream) == \
[0, 10, 20, 30, 40, 240, 448, 656]
assert recent_values(x_args_kwargs_stream) == \
[10, 15, 20, 25, 30, 130, 234, 338]
assert (recent_values(ww_args_kwargs_stream) ==
recent_values(w_args_kwargs_stream))
assert (recent_values(xx_args_kwargs_stream) ==
recent_values(x_args_kwargs_stream))
return
if __name__ == '__main__':
tests()