""" The list agent is one of a collection of agents, each of which has
a different type of transition function, func.
The list-agent func operates on a list input and returns a
list. func may also operate on a state which can be of any type.
This file has the following agents:
list_map_agent: Has a single input stream and a single output stream.
list_sink_agent: Has a single input stream and no outputs.
list_merge_agent: Has a list of input streams and a single output
stream.
list_split_agent: Has a single input stream and a list of output
streams.
list_many_agent: Has a list of input streams and a list of output
streams.
A single input stream is in_stream and a single output stream is
out_stream. A list of input streams is in_streams and a list of output
streams is out_streams. In general, plurals refer to lists and
singular to single elements.
"""
import sys
import os
# scriptpath = "../"
# sys.path.append(os.path.abspath(scriptpath))
from ..agent import Agent, InList
from ..stream import Stream, StreamArray
from ..stream import _no_value, _multivalue, _close
from ..helper_functions.check_agent_parameter_types import *
import types
import inspect
import numpy as np
#-----------------------------------------------------------------------
# MAP: SINGLE INPUT STREAM, SINGLE OUTPUT STREAM
#-----------------------------------------------------------------------
[docs]def list_map_agent(func, in_stream, out_stream, state=None,
call_streams=None, name=None,
args=[], kwargs={}):
"""
This agent maps the function func from its single input stream to its
single output stream.
Parameters
----------
func: function
function from a list (a slice of the in_stream) to
a list (a slice of the out_stream).
in_stream: Stream
The single input stream of this agent
out_stream: Stream
The single output streams of the agent
state: object
The state of the agent
call_streams: list of Stream
The list of call_streams. A new value in any stream in this
list causes a state transition of this agent.
name: str
Name of the agent created by this function.
Returns
-------
Agent.
The agent created by this function.
"""
# Check the arguments and output error messages if argument types
# and numbers are incorrect.
check_map_agent_arguments(func, in_stream, out_stream, call_streams, name)
check_num_args_in_func(state, name, func, args, kwargs)
# The transition function for this agent.
def transition(in_lists, state):
num_in_streams = 1
# Check the types of the input lists for the transition.
check_in_lists_type(name, in_lists, num_in_streams)
# This agent has only one input stream. Get its only in_list.
in_list = in_lists[0]
# input_list is the slice of the input stream that this agent
# is working on.
# in_list is an object of type InList, and it consists of an
# arbitrarily long list (in_list.list), and pointers to where
# this agent is starting to read the list (in_list.start), and
# where the list ends (in_list.stop).
input_list = in_list.list[in_list.start:in_list.stop]
# If the new input data is empty then return an empty list for
# the single output stream, and leave the state and the starting
# point for the single input stream unchanged. This is the
# null transition. Normally, the null transition occurs when
# the agent is invoked by a call stream.
if len(input_list) == 0:
return ([[]], state, [in_list.start])
# Compute the output generated by this transition.
if state is None:
output_list = func(input_list, *args, **kwargs)
else:
output_list, state = func(input_list, state, *args, **kwargs)
# Return: (1) the list of outputs, one per output stream,
# (2) the new state and
# (3) the new starting pointer into this stream for
# this agent. Since this agent has read the entire
# input_list, move its starting pointer forward by
# the length of the input list.
return ([output_list], state, [in_list.start+len(input_list)])
# Finished transition
# Create agent with parameters:
# 1. list of input streams. map has a single input
# 2. list of output streams. map has a single output.
# 3. transition function
# 4. new state
# 5. list of calling streams
# 6. Agent name
return Agent([in_stream], [out_stream], transition, state, call_streams, name)
#-----------------------------------------------------------------------
# SINK: SINGLE INPUT STREAM, NO OUTPUT
#-----------------------------------------------------------------------
[docs]def list_sink_agent(func, in_stream, state=None,
call_streams=None, name=None,
args=[], kwargs={}):
"""
This agent applies func to its single input stream. It has no output streams.
Parameters
----------
func: function
function from a list (a slice of the in_stream) to None (no output).
in_stream: Stream
The single input stream of this agent
state: object
The state of the agent
call_streams: list of Stream
The list of call_streams. A new value in any stream in this
list causes a state transition of this agent.
name: Str
Name of the agent created by this function.
Returns
-------
Agent.
The agent created by this function.
"""
# Check the arguments and output error messages if argument types
# and numbers are incorrect.
check_sink_agent_arguments(func, in_stream, call_streams, name)
check_num_args_in_func(state, name, func, args, kwargs)
# The transition function for this agent.
def transition(in_lists, state):
num_in_streams = 1
# Check the types of the input lists for the transition.
check_in_lists_type(name, in_lists, num_in_streams)
# This agent has only one input stream. Get its only in_list.
in_list = in_lists[0]
# input_list is the slice of the input stream that this agent
# is working on.
# in_list is an object of type InList, and it consists of an
# arbitrarily long list (in_list.list), and pointers to where
# this agent is starting to read the list (in_list.start), and
# where the list ends (in_list.stop).
input_list = in_list.list[in_list.start:in_list.stop]
# If the new input data is empty then return an empty list for
# the single output stream, and leave the state and the starting
# point for the single input stream unchanged. Normally, the
# null transition occurs when the agent is invoked by a call
# stream.
if not input_list or len(input_list) == 0:
return ([[]], state, [in_list.start])
# Compute the output generated by this transition.
if state is None:
func(input_list, *args, **kwargs)
else:
state = func(input_list, state, *args, **kwargs)
# Return: (1) the list of outputs, one per output stream.
# sink agent has no output; its output list is [].
# (2) the new state and
# (3) the new starting pointer into this stream for
# this agent. Since this agent has read the entire
# input_list, move its starting pointer forward by
# the length of the input list.
return ([], state, [in_list.start+len(input_list)])
# Finished transition
# Create agent with the following parameters:
# 1. list of input streams. sink has a single input stream.
# 2. list of output streams. sink has no output.
# 3. transition function
# 4. new state
# 5. list of calling streams
# 6. Agent name
return Agent([in_stream], [], transition, state, call_streams, name)
#-----------------------------------------------------------------------
# MERGE: LIST OF INPUT STREAMS, SINGLE OUTPUT STREAM
#-----------------------------------------------------------------------
[docs]def list_merge_agent(func, in_streams, out_stream, state=None,
call_streams=None, name=None,
args=[], kwargs={}):
"""
Parameters
----------
func: function
function from a list of lists (one list per input stream) to an output list
in_streams: list of Stream
The list of input streams of the agent
out_stream: Stream
The single output stream of the agent
state: object
The state of the agent
call_streams: list of Stream
The list of call_streams. A new value in any stream in this
list causes a state transition of this agent.
name: Str
Name of the agent created by this function.
Returns
-------
Agent.
The agent created by this function.
"""
# Check the arguments and output error messages if argument types
# and numbers are incorrect.
check_merge_agent_arguments(func, in_streams, out_stream, call_streams, name)
check_num_args_in_func(state, name, func, args, kwargs)
num_in_streams = len(in_streams)
# The transition function for this agent.
def transition(in_lists, state):
# Check the types of the input lists for the transition.
check_in_lists_type(name, in_lists, num_in_streams)
# smallest_list_length is the smallest of all the inputs for
# this agent.
smallest_list_length = \
min(in_list.stop - in_list.start for in_list in in_lists)
# Check for null transition.
if smallest_list_length == 0:
# output_list is []
# return ([output_list], state,
# [in_list.start+smallest_list_length
# for in_list in in_lists])
return ([[]], state,
[in_list.start for in_list in in_lists])
# input_lists is the list of lists that this agent operates on
# in this transition.
input_lists = [in_list.list[in_list.start:in_list.start+smallest_list_length]
for in_list in in_lists]
# Compute the output generated by this transition.
if state is None:
output_list = func(input_lists, *args, **kwargs)
else:
ouput_list, state = func(input_lists, state, *args, **kwargs)
# Return: (1) the list of outputs, one per output stream. The
# merge_agent has a single output list.
# (2) the new state and
# (3) the new starting pointer into this stream for
# this agent. Since this agent has read the entire
# input_list, move its starting pointer forward by
# the length of the input list.
return ([output_list], state, [in_list.start+smallest_list_length for in_list in in_lists])
# Finished transition
# Create agent with the following parameters:
# 1. list of input streams. merge has a list, instreams, of input
# streams.
# 2. list of output streams. merge has a single output stream.
# 3. transition function
# 4. new state
# 5. list of calling streams
# 6. Agent name
return Agent(in_streams, [out_stream], transition, state, call_streams, name)
#-----------------------------------------------------------------------
# SPLIT: SINGLE INPUT STREAM, LIST OF OUTPUT STREAMS
#-----------------------------------------------------------------------
[docs]def list_split_agent(func, in_stream, out_streams, state=None,
call_streams=None, name=None,
args=[], kwargs={}):
"""
Parameters
----------
func: function
function from an input list to a list of lists (one per output stream).
in_stream: Stream
The single input stream of the agent
out_streams: list of Stream
The list of output streams of the agent
state: object
The state of the agent
call_streams: list of Stream
The list of call_streams. A new value in any stream in this
list causes a state transition of this agent.
name: Str
Name of the agent created by this function.
Returns
-------
Agent.
The agent created by this function.
"""
# Check the types of the input lists for the transition.
check_split_agent_arguments(func, in_stream, out_streams,
call_streams, name)
# Check the number of arguments in func, the transition function.
check_num_args_in_func(state, name, func, args, kwargs)
num_out_streams = len(out_streams)
num_in_streams = 1
# The transition function for this agent.
def transition(in_lists, state):
# Check the types of the input lists for the transition.
check_in_lists_type(name, in_lists, num_in_streams)
# This agent has only one input stream. Get its only in_list.
in_list = in_lists[0]
# input_list is the slice of the input stream that this agent
# is working on.
# in_list is an object of type InList, and it consists of an
# arbitrarily long list (in_list.list), and pointers to where
# this agent is starting to read the list (in_list.start), and
# where the list ends (in_list.stop).
input_list = in_list.list[in_list.start:in_list.stop]
# Check for the null transition.
# If the new input data is empty then return an empty list for
# the single output stream, and leave the state and the starting
# point for the single input stream unchanged.
if len(input_list) == 0:
# output_list is [] for each output stream.
# So, output_lists = [[]]*num_out_streams
# return (output_lists, state,
# [in_list.start+len(input_list)]
return ([[]]*num_out_streams, state, [in_list.start])
# Compute the output generated by this transition.
if state is None:
output_lists = func(input_list, *args, **kwargs)
else:
output_lists, state = func(input_list, state, *args, **kwargs)
assert len(output_lists) == num_out_streams, \
'Error in list-split transition function of agent {0}.'\
' The number, {1}, of output lists does not equal the number, {2},'\
' of output streams.'.format(name, len(output_lists), num_out_streams)
# Return: (1) output_lists, the list of outputs, one per
# output stream.
# (2) the new state and
# (3) the new starting pointer into this stream for
# this agent. Since this agent has read the entire
# input_list, move its starting pointer forward by
# the length of the input list.
return (output_lists, state, [in_list.start+len(input_list)])
# Finished transition
# Create agent with the following parameters:
# 1. list of input streams. split has a single input stream.
# 2. list of output streams.
# 3. transition function
# 4. new state
# 5. list of calling streams
# 6. Agent name
return Agent([in_stream], out_streams, transition, state, call_streams, name)
#-----------------------------------------------------------------------
# MANY: LIST OF INPUT STREAMS, LIST OF OUTPUT STREAMS
#-----------------------------------------------------------------------
[docs]def list_many_agent(func, in_streams, out_streams, state=None,
call_streams=None, name=None,
args=[], kwargs={}):
"""
Parameters
----------
func: function
function from an input list to an output list
in_streams: list of Stream
The input streams of the agent
out_streams: list of Stream
The output streams of the agent
state: object
The state of the agent
call_streams: list of Stream
The list of call_stream. A new value in any stream in this
list causes a state transition of this agent.
name: Str
Name of the agent created by this function.
Returns
-------
Agent.
The agent created by this function.
"""
# Check the types of the input lists for the transition.
check_many_agent_arguments(func, in_streams, out_streams, call_streams, name)
check_num_args_in_func(state, name, func, args, kwargs)
num_out_streams = len(out_streams)
num_in_streams = len(in_streams)
# The transition function for this agent.
def transition(in_lists, state):
# Check the types of the input lists for the transition.
check_in_lists_type(name, in_lists, num_in_streams)
# smallest_list_length is the smallest of all the inputs for
# this agent.
smallest_list_length = \
min(in_list.stop - in_list.start for in_list in in_lists)
# Check for the null transition.
if smallest_list_length == 0:
# output_list is [] for each output stream.
# So, output_lists = [[]]*num_out_streams
# return (output_lists, state,
# [in_list.start+smallest_list_length
# for in_list in in_lists])
return ([[]]*num_out_streams, state,
[in_list.start for in_list in in_lists])
# input_lists is the list of lists that this agent operates on
# in this transition.
input_lists = [in_list.list[in_list.start:in_list.start+smallest_list_length]
for in_list in in_lists]
# Compute the output generated by this transition.
if state is None:
output_lists = func(input_lists, *args, **kwargs)
else:
ouput_lists, state = func(input_lists, state, *args, **kwargs)
# Return: (1) output_lists, the list of outputs, one per
# output stream.
# (2) the new state and
# (3) the new starting pointer into this stream for
# this agent. Since this agent has read
# smallest_list_length number of elements, move
# its starting pointer forward by
# smallest_list_length.
return (output_lists, state,
[in_list.start+smallest_list_length for in_list in in_lists])
# Finished transition
# Create agent with the following parameters:
# 1. list of input streams.
# 2. list of output streams.
# 3. transition function
# 4. new state
# 5. list of calling streams
# 6. Agent name
return Agent(in_streams, out_streams, transition, state, call_streams, name)
#------------------------------------------------------------------------------------------------
#------------------------------------------------------------------------------------------------
# LIST AGENT TESTS
#------------------------------------------------------------------------------------------------
#------------------------------------------------------------------------------------------------
[docs]def test_list_agents():
r = Stream('r')
s = Stream('s')
t = Stream('t')
u = Stream('u')
v = Stream('v')
w = Stream('w')
x = Stream('x')
y = Stream('y')
z = Stream('z')
# Check simple map
def simple(lst):
return [2*v for v in lst]
a = list_map_agent(func=simple, in_stream=x, out_stream=y, name='a')
x.extend(range(5))
assert x.stop == 5
assert x.recent[:5] == range(5)
assert y.stop == 5
assert y.recent[:5] == [0, 2, 4, 6, 8]
# Check map with state
# Function that operates on an element and state and returns an
# element and state.
def f(input_list, state):
output_list = [[]]*len(input_list)
for i in range(len(input_list)):
output_list[i] = input_list[i]+state
state += 2
return output_list, state
b = list_map_agent(func=f, in_stream=x, out_stream=z, state=0, name='b')
assert z.stop == 5
assert z.recent[:5] == [0, 3, 6, 9, 12]
# Check map with call streams
c = list_map_agent(func=f, in_stream=x, out_stream=v, state=10,
call_streams=[w], name='c')
assert v.stop == 0
w.append(0)
assert v.stop == 5
assert v.recent[:5] == [10, 13, 16, 19, 22]
# Check sink
def p(input_list):
for v in input_list:
print v
#sink_agent = list_sink_agent(func=p, in_stream=x, name='sink_agent')
# Check sink with state
def add(input_list, state):
return sum(input_list)+state
sink_with_state_agent = list_sink_agent(
func=add, in_stream=x, state=0, name='sink_with_state_agent')
assert sink_with_state_agent.state == 10
# Check merge
# Function that operates on a list of lists
def g(list_of_lists):
return [sum(snapshot) for snapshot in zip(*list_of_lists)]
d = list_merge_agent(func=g, in_streams=[x,u], out_stream=s, name='d')
assert s.stop == 0
u.extend([10, 15, 18])
assert s.stop == 3
assert s.recent[:3] == [10, 16, 20]
u.append(37)
assert s.stop == 4
assert s.recent[:4] == [10, 16, 20, 40]
u.extend([96, 95])
assert s.stop == 5
assert s.recent[:5] == [10, 16, 20, 40, 100]
# Check split
def h(input_list):
return [[element+1 for element in input_list],
[element*2 for element in input_list]]
e = list_split_agent(func=h, in_stream=x, out_streams=[r, t], name='e')
assert r.stop == 5
assert t.stop == 5
assert r.recent[:5] == [1, 2, 3, 4, 5]
assert t.recent[:5] == [0, 2, 4, 6, 8]
# Check split with state
def h_state(input_list, state):
length = len(input_list)
output_list_0 = [[]]*length
output_list_1 = [[]]*length
for i in range(length):
output_list_0[i] = input_list[i]+state
output_list_1[i] = input_list[i]*state
state += 1
return ([output_list_0, output_list_1], state)
r_state = Stream(name='r_state')
t_state = Stream(name='t_state')
x_state = Stream(name='x_state')
e_state = list_split_agent(
func=h_state, in_stream=x_state, out_streams=[r_state, t_state], name='e_state', state=0)
assert r_state.stop == 0
assert t_state.stop == 0
x_state.extend(range(5))
assert r_state.stop == 5
assert r_state.recent[:r_state.stop] == [0, 2, 4, 6, 8]
assert t_state.stop == 5
assert t_state.recent[:5] == [0, 1, 4, 9, 16]
x_state.extend(range(5,10,1))
assert r_state.stop == 10
assert r_state.recent[:r_state.stop] == [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
assert t_state.stop == 10
assert t_state.recent[:t_state.stop] == [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# Check many
def f_many(list_of_lists):
snapshots = zip(*list_of_lists)
return [[max(snapshot) for snapshot in snapshots],
[min(snapshot) for snapshot in snapshots]]
u_stream = Stream(name='u_stream')
v_stream = Stream(name='v_stream')
w_stream = Stream(name='w_stream')
x_stream = Stream(name='x_stream')
many_agent = list_many_agent(
func=f_many, in_streams=[u_stream, v_stream], out_streams=[w_stream, x_stream],
name='many_agent')
assert w_stream.stop == 0
assert x_stream.stop == 0
u_stream.extend([5, 9, 11, 8])
assert w_stream.stop == 0
assert x_stream.stop == 0
v_stream.extend([20, 1, -3, 17, 10])
assert w_stream.stop == 4
assert x_stream.stop == 4
assert w_stream.recent[:w_stream.stop] == [20, 9, 11, 17]
assert x_stream.recent[:x_stream.stop] == [5, 1, -3, 8]
v_stream.extend([20, -100])
assert w_stream.stop == 4
assert x_stream.stop == 4
assert w_stream.recent[:w_stream.stop] == [20, 9, 11, 17]
assert x_stream.recent[:x_stream.stop] == [5, 1, -3, 8]
u_stream.extend([14, 5, -200, 70])
assert w_stream.stop == 7
assert x_stream.stop == 7
assert w_stream.recent[:w_stream.stop] == [20, 9, 11, 17, 14, 20, -100]
assert x_stream.recent[:x_stream.stop] == [5, 1, -3, 8, 10, 5, -200]
#------------------------------------------------------------------
#------------------------------------------------------------------
# Test NumPy arrays: StreamArray
#------------------------------------------------------------------
#------------------------------------------------------------------
# Test list map on StreamArray (dimension is 0).
a_stream_array = StreamArray(name='a_stream_array')
b_stream_array = StreamArray(name='b_stream_array')
def f_np(input_array):
return input_array+1
a_np_agent = list_map_agent(func=f_np, in_stream=a_stream_array, out_stream=b_stream_array,
name='a_np_agent')
assert b_stream_array.stop == 0
a_stream_array.extend(np.arange(5.0))
assert np.array_equal(b_stream_array.recent[:b_stream_array.stop],
np.arange(5.0)+1)
a_stream_array.extend(np.arange(5.0, 10.0, 1.0))
assert np.array_equal(b_stream_array.recent[:b_stream_array.stop],
np.arange(10.0)+1)
# Test list map with state on StreamArray (dimension is 0)
c_stream_array = StreamArray(name='c_stream_array')
d_stream_array = StreamArray(name='d_stream_array')
def f_np_state(input_array, state):
return np.cumsum(input_array)+state, np.sum(input_array)
b_np_agent = list_map_agent(func=f_np_state, in_stream=c_stream_array,
out_stream=d_stream_array, state = 0.0,
name='b_np_agent')
assert d_stream_array.stop == 0
c_stream_array.extend(np.arange(5.0))
assert np.array_equal(
d_stream_array.recent[:d_stream_array.stop], np.cumsum(np.arange(5.0)))
c_stream_array.extend(np.arange(5.0, 10.0, 1.0))
assert np.array_equal(
d_stream_array.recent[:d_stream_array.stop], np.cumsum(np.arange(10.0)))
# Test list map with positive integer dimension on StreamArray
e_stream_array = StreamArray(name='e_stream_array', dimension=3)
f_stream_array = StreamArray(name='f_stream_array', dimension=2)
def f_np_dimension(input_array):
output_array = np.zeros([len(input_array), 2])
output_array[:,0] = input_array[:,0]+input_array[:,1]
output_array[:,1] = input_array[:,2]
return output_array
c_np_agent = list_map_agent(func=f_np_dimension, in_stream=e_stream_array,
out_stream=f_stream_array, name='c_np_agent')
e_stream_array.extend(np.array([[1.0, 2.0, 3.0]]))
assert np.array_equal(f_stream_array.recent[:f_stream_array.stop], np.array([[3.0, 3.0]]))
e_stream_array.extend(np.array([[4.0, 5.0, 6.0], [7.0, 8.0, 9.0]]))
assert np.array_equal(f_stream_array.recent[:f_stream_array.stop],
np.array([[3.0, 3.0], [9.0, 6.0], [15.0, 9.0]]))
# Test list map with a dimension which is a tuple of integers.
g_stream_array = StreamArray(name='g_stream_array', dimension=(2,2))
h_stream_array = StreamArray(name='h_stream_array', dimension=(2,2))
def f_np_tuple_dimension(input_array):
return input_array*2
d_np_agent = list_map_agent(func=f_np_tuple_dimension, in_stream=g_stream_array,
out_stream=h_stream_array, name='d_np_agent')
a_array = np.array([[[1.0, 2.0],[3.0, 4.0]],
[[5.0, 6.0],[7.0, 8.0]]])
g_stream_array.extend(a_array)
assert np.array_equal(h_stream_array.recent[:h_stream_array.stop],
a_array*2)
b_array = np.array([[[9.0, 10.0], [11.0, 12.0]]])
g_stream_array.extend(b_array)
assert np.array_equal(h_stream_array.recent[:h_stream_array.stop],
np.vstack((a_array, b_array))*2)
# Test list map with a datatype and dimension of 0.
dt_0 = np.dtype([('time', int), ('value', (float, 3))])
dt_1 = np.dtype([('time', int), ('value', float)])
i_stream_array = StreamArray(name='i_stream_array', dtype=dt_0)
j_stream_array = StreamArray(name='j_stream_array', dtype=dt_1)
def f_datatype(input_array):
output_array = np.zeros(len(input_array), dtype=dt_1)
output_array['time'] = input_array['time']
output_array['value'] = np.sum(input_array['value'], axis=1)
return output_array
e_np_agent = list_map_agent(func=f_datatype, in_stream=i_stream_array,
out_stream=j_stream_array, name='e_np_agent')
c_array = np.array([(1, [2.0, 3.0, 4.0])], dtype=dt_0)
assert j_stream_array.stop == 0
i_stream_array.extend(c_array)
assert np.array_equal(j_stream_array.recent[:j_stream_array.stop],
f_datatype(c_array))
d_array = np.array([(10, [6.0, 7.0, 8.0]), (20, [10.0, 11.0, 12.0])], dtype=dt_0)
i_stream_array.extend(d_array)
assert np.array_equal(j_stream_array.recent[:j_stream_array.stop],
f_datatype(np.hstack((c_array, d_array))))
# Test list map with a datatype and positive integer dimension.
k_stream_array = StreamArray(name='k_stream_array', dtype=dt_0, dimension=2)
l_stream_array = StreamArray(name='l_stream_array', dtype=dt_1)
def f_datatype_int_dimension(input_array):
m = len(input_array)
output_array = np.zeros(m, dtype=dt_1)
for i in range(m):
output_array[i]['time'] = np.max(input_array[i]['time'])
output_array[i]['value'] = np.sum(input_array[i]['value'])
return output_array
f_np_agent = list_map_agent(func=f_datatype_int_dimension, in_stream=k_stream_array,
out_stream=l_stream_array, name='f_np_agent')
e_array = np.array([[(1, [2.0, 3.0, 4.0]), (2, [5.0, 6.0, 7.0])]], dtype=dt_0)
assert l_stream_array.stop == 0
k_stream_array.extend(e_array)
assert np.array_equal(l_stream_array.recent[:l_stream_array.stop],
f_datatype_int_dimension(e_array))
f_array = np.array([[(3, [8.0, 9.0, 10.0]), (4, [11.0, 12.0, 13.0])],
[(5, [-1.0, 0.0, 1.0]), (6, [-2.0, 2.0, -2.0])]],
dtype=dt_0)
k_stream_array.extend(f_array)
assert np.array_equal(l_stream_array.recent[:l_stream_array.stop],
f_datatype_int_dimension(np.vstack((e_array, f_array))))
# Test list map with a datatype and a dimension which is a tuple
m_stream_array = StreamArray(name='m_stream_array', dtype=dt_0, dimension=(2,2))
n_stream_array = StreamArray(name='n_stream_array', dtype=dt_1)
g_np_agent = list_map_agent(func=f_datatype_int_dimension, in_stream=m_stream_array,
out_stream=n_stream_array, name='g_np_agent')
assert n_stream_array.stop == 0
g_array = np.array([
# zeroth 2x2 array
[[(1, [2.0, 3.0, 4.0]), (2, [5.0, 6.0, 7.0])],
[(3, [8.0, 9.0, 10.0]), (4, [11.0, 12.0, 13.0])]],
# first 2x2 array
[[(5, [12.0, 13.0, 14.0]), (6, [15.0, 16.0, 17.0])],
[(7, [18.0, 19.0, 20.0]), (8, [21.0, 22.0, 23.0])]]
], dtype=dt_0)
m_stream_array.extend(g_array)
assert np.array_equal(n_stream_array.recent[:n_stream_array.stop],
f_datatype_int_dimension(g_array))
h_array = np.array([
[[(9, [0.0, 1.0, -1.0]), (10, [2.0, 2.0, -4.0])],
[(11, [80.0, -71.0, -9.0]), (15, [0.0, 0.0, 0.0])]]
], dtype=dt_0)
m_stream_array.extend(h_array)
assert np.array_equal(n_stream_array.recent[:n_stream_array.stop],
f_datatype_int_dimension(np.vstack((g_array, h_array))))
# Test list merge with StreamArray and no dimension and no data type
a_in_0 = StreamArray(name='a_in_0')
a_in_1 = StreamArray(name='a_in_1')
a_out = StreamArray(name='out')
def a_merge(list_of_lists):
array_0, array_1 = list_of_lists
return array_0 + array_1
a_s_agent = list_merge_agent(func=a_merge, in_streams=[a_in_0, a_in_1],
out_stream=a_out, name='a_s_agent')
assert a_out.stop == 0
a_in_0.extend(np.array([1.0, 2.0, 3.0]))
assert a_out.stop == 0
a_in_0.extend(np.array([4.0, 5.0, 6.0]))
assert a_out.stop == 0
a_in_1.extend(np.array([10.0, 20.0]))
assert np.array_equal(a_out.recent[:a_out.stop],
np.array([11.0, 22.0]))
a_in_1.extend(np.array([30.0, 40.0]))
assert np.array_equal(a_out.recent[:a_out.stop],
np.array([11.0, 22.0, 33.0, 44.0]))
# Test list merge with StreamArray and no dimension and data type
a_in_dt_0 = StreamArray(name='a_in_dt_0', dtype=dt_0)
a_in_dt_1 = StreamArray(name='a_in_dt_1', dtype=dt_0)
a_out_dt = StreamArray(name='out', dtype=dt_0)
def a_merge_dtype(list_of_arrays):
input_array_0, input_array_1 = list_of_arrays
output_array = np.zeros(len(input_array_0), dtype=dt_0)
output_array['time'] = \
np.max((input_array_0['time'], input_array_1['time']), axis=0)
output_array['value'] = input_array_0['value'] + input_array_1['value']
return output_array
a_s_dt_agent = list_merge_agent(func=a_merge_dtype, in_streams=[a_in_dt_0, a_in_dt_1],
out_stream=a_out_dt, name='a_s_dt_agent')
a_in_dt_0.extend(np.array([(1, [1.0, 2.0, 3.0])], dtype=dt_0))
assert a_out_dt.stop == 0
a_in_dt_1.extend(np.array([(2, [10.0, 20.0, 30.0])], dtype=dt_0))
assert np.array_equal(a_out_dt.recent[:a_out_dt.stop],
np.array([(2, [11.0, 22.0, 33.0])], dtype=dt_0))
a_in_dt_0.extend(np.array([(5, [21.0, 23.0, 32.0]),
(9, [27.0, 29.0, 31.0])], dtype=dt_0))
assert np.array_equal(a_out_dt.recent[:a_out_dt.stop],
np.array([(2, [11.0, 22.0, 33.0])], dtype=dt_0))
a_in_dt_1.extend(np.array([(6, [19.0, 17.0, 8.0]),
(8, [13.0, 11.0, 9.0]),
(10, [3.0, 1.0, 5.0])], dtype=dt_0))
assert np.array_equal(a_out_dt.recent[:a_out_dt.stop],
np.array([(2, [11.0, 22.0, 33.0]),
(6, [40.0, 40.0, 40.0]),
(9, [40.0, 40.0, 40.0])], dtype=dt_0))
# Test list split with StreamArray and positive integer dimension and no data type
dim = 2
b_in = StreamArray(name='b_in', dimension=dim)
b_out_0 = StreamArray(name='b_out_0', dimension=dim)
b_out_1 = StreamArray(name='b_out_1')
def b_split(array_of_arrays):
length = len(array_of_arrays)
output_array_0 = np.zeros((length, dim,))
output_array_1 = np.zeros(length)
for i in range(length):
input_array = array_of_arrays[i]
output_array_0[i] = np.array([[np.max(input_array), np.min(input_array)]])
output_array_1[i] = np.array([np.sum(input_array)])
return output_array_0, output_array_1
b_split_agent = list_split_agent(func=b_split, in_stream=b_in,
out_streams=[b_out_0, b_out_1], name='b_split_agent')
b_array_0 = np.array([[1.0, 9.0]])
b_in.extend(b_array_0)
assert np.array_equal(b_out_0.recent[:b_out_0.stop],
np.array([[9.0, 1.0]]))
assert np.array_equal(b_out_1.recent[:b_out_1.stop],
np.array([10.0]))
b_array_1 = np.array([[98.0, 2.0]])
b_in.extend(b_array_1)
assert np.array_equal(b_out_0.recent[:b_out_0.stop],
np.array([[9.0, 1.0], [98.0, 2.0]]))
assert np.array_equal(b_out_1.recent[:b_out_1.stop],
np.array([10.0, 100.0]))
b_array_3 = np.array([[10.0, 20.0], [3.0, 37.0], [55.0, 5.0]])
b_in.extend(b_array_3)
assert np.array_equal(b_out_0.recent[:b_out_0.stop],
np.array([[9.0, 1.0], [98.0, 2.0],
[20.0, 10.0], [37.0, 3.0],
[55.0, 5.0]]))
assert np.array_equal(b_out_1.recent[:b_out_1.stop],
np.array([10.0, 100.0, 30.0, 40.0, 60.0]))
# Test list many with StreamArray and no dimension and no data type
c_in_0 = StreamArray(name='c_in_0')
c_in_1 = StreamArray(name='c_in_1')
c_out_0 = StreamArray(name='c_out_0')
c_out_1 = StreamArray(name='c_out_1')
def c_many(list_of_arrays):
length = len(list_of_arrays)
input_array_0, input_array_1 = list_of_arrays
output_array_0 = np.zeros(length)
output_array_1 = np.zeros(length)
output_array_0 = input_array_0 + input_array_1
output_array_1 = input_array_0 - input_array_1
return [output_array_0, output_array_1]
c_many_agent = list_many_agent(func=c_many, in_streams=[c_in_0, c_in_1],
out_streams=[c_out_0, c_out_1],
name='c_many_agent')
c_array_0_0 = np.arange(3.0)*3
c_array_1_0 = np.arange(3.0)
c_in_0.extend(c_array_0_0)
c_in_1.extend(c_array_1_0)
assert np.array_equal(c_out_0.recent[:c_out_0.stop],
np.array([0.0, 4.0, 8.0]))
assert np.array_equal(c_out_1.recent[:c_out_1.stop],
np.array([0.0, 2.0, 4.0]))
c_array_0_1 = np.array([100.0])
c_array_1_1 = np.array([4.0, 5.0, 6.0])
c_in_0.extend(c_array_0_1)
c_in_1.extend(c_array_1_1)
assert np.array_equal(c_out_0.recent[:c_out_0.stop],
np.array([0.0, 4.0, 8.0, 104.0]))
assert np.array_equal(c_out_1.recent[:c_out_1.stop],
np.array([0.0, 2.0, 4.0, 96.0]))
## # Test list many with StreamArray and no dimension and no data type
## z_in_0 = StreamArray(name='z_in_0')
## z_in_1 = StreamArray(name='z_in_1')
## z_out_0 = StreamArray(name='z_out_0')
## z_out_1 = StreamArray(name='z_out_1')
## def execute_list_of_np_func(v, list_of_np_func):
## length = len(list_of_arrays)
## input_array_0, input_array_1 = list_of_arrays
## output_array_0 = np.zeros(length)
## output_array_1 = np.zeros(length)
## output_array_0 = input_array_0 + input_array_1
## output_array_1 = input_array_0 - input_array_1
## return [output_array_0, output_array_1]
# Test list many with StreamArray and positive integer dimension and no data type
dim = 2
d_in_0 = StreamArray(name='d_in_0', dimension=dim)
d_in_1 = StreamArray(name='d_in_1', dimension=dim)
d_out_0 = StreamArray(name='d_out_0', dimension=dim)
d_out_1 = StreamArray(name='d_out_1')
def d_many(list_of_arrays):
length = len(list_of_arrays)
input_array_0, input_array_1 = list_of_arrays
output_array_0 = input_array_0 + input_array_1
output_array_1 = np.array([np.sum(input_array_0+input_array_1)])
return output_array_0, output_array_1
d_many_agent = list_many_agent(func=d_many, in_streams=[d_in_0, d_in_1],
out_streams=[d_out_0, d_out_1],
name='d_many_agent')
d_array_0_0 = np.array([[1.0, 2.0]])
d_array_1_0 = np.array([[0.0, 10.0]])
d_in_0.extend(d_array_0_0)
d_in_1.extend(d_array_1_0)
assert np.array_equal(d_out_0.recent[:d_out_0.stop],
np.array([[1.0, 12.0]]))
assert np.array_equal(d_out_1.recent[:d_out_1.stop],
np.array([13.0]))
d_array_0_1 = np.array([[4.0, 8.0]])
d_array_1_1 = np.array([[2.0, 4.0]])
d_in_0.extend(d_array_0_1)
d_in_1.extend(d_array_1_1)
assert np.array_equal(d_out_0.recent[:d_out_0.stop],
np.array([[1.0, 12.0], [6.0, 12.0]]))
assert np.array_equal(d_out_1.recent[:d_out_1.stop],
np.array([13.0, 18.0]))
d_array_0_2 = np.array([[20.0, 30.0], [40.0, 50.0]])
d_array_1_2 = np.array([[-10.0, -20.0]])
d_in_0.extend(d_array_0_2)
d_in_1.extend(d_array_1_2)
assert np.array_equal(d_out_0.recent[:d_out_0.stop],
np.array([[1.0, 12.0], [6.0, 12.0], [10.0, 10.0]]))
assert np.array_equal(d_out_1.recent[:d_out_1.stop],
np.array([13.0, 18.0, 20.0]))
# Test list many with StreamArray and tuple dimension and no data type
dim = (2,2)
e_in_0 = StreamArray(name='e_in_0', dimension=dim)
e_in_1 = StreamArray(name='e_in_1', dimension=dim)
e_out_0 = StreamArray(name='e_out_0', dimension=dim)
e_out_1 = StreamArray(name='e_out_1')
def e_many(list_of_arrays):
input_array_0, input_array_1 = list_of_arrays
output_array_0 = input_array_0 + input_array_1
output_array_1 = \
np.array([np.sum(input_array_0[i]+ input_array_1[i])
for i in range(len(input_array_0))])
return output_array_0, output_array_1
e_many_agent = list_many_agent(func=e_many, in_streams=[e_in_0, e_in_1],
out_streams=[e_out_0, e_out_1],
name='e_many_agent')
e_array_0_0 = np.array([[[10.0, 20.0], [30.0, 40.0]]])
e_in_0.extend(e_array_0_0)
e_array_1_0 = np.array([[[1.0, 2.0], [3.0, 4.0]]])
e_in_1.extend(e_array_1_0)
assert np.array_equal(e_out_0.recent[:e_out_0.stop],
np.array([[[11.0, 22.0], [33.0, 44.0]]]))
assert np.array_equal(e_out_1.recent[:e_out_1.stop],
np.array([110.0]))
e_array_0_1 = np.array([[[11.0, 13.0], [17.0, 19.0]],
[[2.0, 4.0], [6.0, 8.0]]])
e_in_0.extend(e_array_0_1)
assert np.array_equal(e_out_0.recent[:e_out_0.stop],
np.array([[[11.0, 22.0], [33.0, 44.0]]]))
assert np.array_equal(e_out_1.recent[:e_out_1.stop],
np.array([110.0]))
e_array_1_1 = np.array([[[1.0, 2.0], [3.0, 4.0]],
[[5.0, 6.0], [7.0, 8.0]]])
e_in_1.extend(e_array_1_1)
assert np.array_equal(e_out_0.recent[:e_out_0.stop],
np.array([[[11.0, 22.0], [33.0, 44.0]],
[[12.0, 15.0], [20.0, 23.0]],
[[7.0, 10.0], [13.0, 16.0]]]))
assert np.array_equal(e_out_1.recent[:e_out_1.stop],
np.array([110.0, 70.0, 46.0]))
e_array_1_2 = np.array([[[11.0, 12.0], [13.0, 14.0]],
[[15.0, 16.0], [17.0, 18.0]]])
e_in_1.extend(e_array_1_2)
e_array_0_2 = np.array([[[-10.0, -11.0], [12.0, 16.0]],
[[-14.0, -15.0], [-16.0, -17.0]]])
e_in_0.extend(e_array_0_2)
assert np.array_equal(e_out_0.recent[:e_out_0.stop],
np.array([[[11.0, 22.0], [33.0, 44.0]],
[[12.0, 15.0], [20.0, 23.0]],
[[7.0, 10.0], [13.0, 16.0]],
[[1.0, 1.0], [25.0, 30.0]],
[[1.0, 1.0], [1.0, 1.0]]]))
assert np.array_equal(e_out_1.recent[:e_out_1.stop],
np.array([110.0, 70.0, 46.0, 57.0, 4.0]))
#------------------------------------------------------------------
#------------------------------------------------------------------
# Test args and kwargs
#------------------------------------------------------------------
#------------------------------------------------------------------
# Test map
def map_args(lst, multiplicand):
return [multiplicand*element for element in lst]
in_stream_map_args_stream = Stream('in_stream_map_args_stream')
out_stream_map_args_stream = Stream('out_stream_map_args_stream')
out_stream_map_kwargs_stream = Stream('out_stream_map_kwargs_stream')
map_args_agent = list_map_agent(func=map_args,
in_stream=in_stream_map_args_stream,
out_stream=out_stream_map_args_stream,
name='map_args_agent',
args=[2])
map_kwargs_agent = list_map_agent(func=map_args,
in_stream=in_stream_map_args_stream,
out_stream=out_stream_map_kwargs_stream,
name='map_args_agent',
kwargs={'multiplicand':2})
assert out_stream_map_args_stream.recent[:out_stream_map_args_stream.stop] == \
[]
assert out_stream_map_kwargs_stream.recent[:out_stream_map_kwargs_stream.stop] == \
[]
in_stream_map_args_stream.extend(range(5))
assert out_stream_map_args_stream.recent[:out_stream_map_args_stream.stop] == \
[0, 2, 4, 6, 8]
assert out_stream_map_kwargs_stream.recent[:out_stream_map_kwargs_stream.stop] == \
[0, 2, 4, 6, 8]
in_stream_map_args_stream.append(5)
assert out_stream_map_args_stream.recent[:out_stream_map_args_stream.stop] == \
[0, 2, 4, 6, 8, 10]
assert out_stream_map_kwargs_stream.recent[:out_stream_map_kwargs_stream.stop] == \
[0, 2, 4, 6, 8, 10]
# Test list map on StreamArray (dimension is 0).
a_stream_array_args = StreamArray(name='a_stream_array_args')
b_stream_array_args = StreamArray(name='b_stream_array_args')
c_stream_array_args_kwargs = StreamArray(name='c_stream_array_args_kwargs')
def f_np_args(input_array_args, addend):
return input_array_args+addend
def f_np_args_kwargs(input_array_args_kwargs, multiplicand, addend):
return input_array_args_kwargs*multiplicand + addend
a_np_agent_args = list_map_agent(
func=f_np_args, in_stream=a_stream_array_args,
out_stream=b_stream_array_args, name='a_np_agent_args',
args=[1])
a_np_agent_args_kwargs = list_map_agent(
func=f_np_args_kwargs, in_stream=a_stream_array_args,
out_stream=c_stream_array_args_kwargs, name='a_np_agent_args_kwargs',
args=[2], kwargs={'addend':10})
assert np.array_equal(
b_stream_array_args.recent[:b_stream_array_args.stop],
np.array([]))
assert np.array_equal(
c_stream_array_args_kwargs.recent[:c_stream_array_args_kwargs.stop],
np.array([]))
a_stream_array_args.extend(np.arange(5.0))
assert np.array_equal(b_stream_array_args.recent[:b_stream_array_args.stop],
np.arange(5.0)+1)
assert np.array_equal(c_stream_array_args_kwargs.recent[:c_stream_array_args_kwargs.stop],
np.arange(5.0)*2 + 10)
a_stream_array_args.extend(np.arange(5.0, 10.0, 1.0))
assert np.array_equal(b_stream_array_args.recent[:b_stream_array_args.stop],
np.arange(10.0)+1)
assert np.array_equal(c_stream_array_args_kwargs.recent[:c_stream_array_args_kwargs.stop],
np.arange(10.0)*2 + 10)
return
if __name__ == '__main__':
test_list_agents()