Source code for IoTPy.code.agents.list_agent

""" The list agent is one of a collection of agents, each of which has
a different type of transition function, func.

The list-agent func operates on a list input and returns a
list. func may also operate on a state which can be of any type.

This file has the following agents:
 list_map_agent: Has a single input stream and a single output stream.
 list_sink_agent: Has a single input stream and no outputs.
 list_merge_agent: Has a list of input streams and a single output
 stream.
 list_split_agent: Has a single input stream and a list of output
 streams.
 list_many_agent: Has a list of input streams and a list of output
 streams.

A single input stream is in_stream and a single output stream is
out_stream. A list of input streams is in_streams and a list of output
streams is out_streams. In general, plurals refer to lists and
singular to single elements.

"""
import sys
import os
# scriptpath = "../"
# sys.path.append(os.path.abspath(scriptpath))

from ..agent import Agent, InList
from ..stream import Stream, StreamArray
from ..stream import _no_value, _multivalue, _close
from ..helper_functions.check_agent_parameter_types import *
import types
import inspect
import numpy as np

#-----------------------------------------------------------------------
# MAP: SINGLE INPUT STREAM, SINGLE OUTPUT STREAM
#-----------------------------------------------------------------------
[docs]def list_map_agent(func, in_stream, out_stream, state=None, call_streams=None, name=None, args=[], kwargs={}): """ This agent maps the function func from its single input stream to its single output stream. Parameters ---------- func: function function from a list (a slice of the in_stream) to a list (a slice of the out_stream). in_stream: Stream The single input stream of this agent out_stream: Stream The single output streams of the agent state: object The state of the agent call_streams: list of Stream The list of call_streams. A new value in any stream in this list causes a state transition of this agent. name: str Name of the agent created by this function. Returns ------- Agent. The agent created by this function. """ # Check the arguments and output error messages if argument types # and numbers are incorrect. check_map_agent_arguments(func, in_stream, out_stream, call_streams, name) check_num_args_in_func(state, name, func, args, kwargs) # The transition function for this agent. def transition(in_lists, state): num_in_streams = 1 # Check the types of the input lists for the transition. check_in_lists_type(name, in_lists, num_in_streams) # This agent has only one input stream. Get its only in_list. in_list = in_lists[0] # input_list is the slice of the input stream that this agent # is working on. # in_list is an object of type InList, and it consists of an # arbitrarily long list (in_list.list), and pointers to where # this agent is starting to read the list (in_list.start), and # where the list ends (in_list.stop). input_list = in_list.list[in_list.start:in_list.stop] # If the new input data is empty then return an empty list for # the single output stream, and leave the state and the starting # point for the single input stream unchanged. This is the # null transition. Normally, the null transition occurs when # the agent is invoked by a call stream. if len(input_list) == 0: return ([[]], state, [in_list.start]) # Compute the output generated by this transition. if state is None: output_list = func(input_list, *args, **kwargs) else: output_list, state = func(input_list, state, *args, **kwargs) # Return: (1) the list of outputs, one per output stream, # (2) the new state and # (3) the new starting pointer into this stream for # this agent. Since this agent has read the entire # input_list, move its starting pointer forward by # the length of the input list. return ([output_list], state, [in_list.start+len(input_list)]) # Finished transition # Create agent with parameters: # 1. list of input streams. map has a single input # 2. list of output streams. map has a single output. # 3. transition function # 4. new state # 5. list of calling streams # 6. Agent name return Agent([in_stream], [out_stream], transition, state, call_streams, name)
#----------------------------------------------------------------------- # SINK: SINGLE INPUT STREAM, NO OUTPUT #-----------------------------------------------------------------------
[docs]def list_sink_agent(func, in_stream, state=None, call_streams=None, name=None, args=[], kwargs={}): """ This agent applies func to its single input stream. It has no output streams. Parameters ---------- func: function function from a list (a slice of the in_stream) to None (no output). in_stream: Stream The single input stream of this agent state: object The state of the agent call_streams: list of Stream The list of call_streams. A new value in any stream in this list causes a state transition of this agent. name: Str Name of the agent created by this function. Returns ------- Agent. The agent created by this function. """ # Check the arguments and output error messages if argument types # and numbers are incorrect. check_sink_agent_arguments(func, in_stream, call_streams, name) check_num_args_in_func(state, name, func, args, kwargs) # The transition function for this agent. def transition(in_lists, state): num_in_streams = 1 # Check the types of the input lists for the transition. check_in_lists_type(name, in_lists, num_in_streams) # This agent has only one input stream. Get its only in_list. in_list = in_lists[0] # input_list is the slice of the input stream that this agent # is working on. # in_list is an object of type InList, and it consists of an # arbitrarily long list (in_list.list), and pointers to where # this agent is starting to read the list (in_list.start), and # where the list ends (in_list.stop). input_list = in_list.list[in_list.start:in_list.stop] # If the new input data is empty then return an empty list for # the single output stream, and leave the state and the starting # point for the single input stream unchanged. Normally, the # null transition occurs when the agent is invoked by a call # stream. if not input_list or len(input_list) == 0: return ([[]], state, [in_list.start]) # Compute the output generated by this transition. if state is None: func(input_list, *args, **kwargs) else: state = func(input_list, state, *args, **kwargs) # Return: (1) the list of outputs, one per output stream. # sink agent has no output; its output list is []. # (2) the new state and # (3) the new starting pointer into this stream for # this agent. Since this agent has read the entire # input_list, move its starting pointer forward by # the length of the input list. return ([], state, [in_list.start+len(input_list)]) # Finished transition # Create agent with the following parameters: # 1. list of input streams. sink has a single input stream. # 2. list of output streams. sink has no output. # 3. transition function # 4. new state # 5. list of calling streams # 6. Agent name return Agent([in_stream], [], transition, state, call_streams, name)
#----------------------------------------------------------------------- # MERGE: LIST OF INPUT STREAMS, SINGLE OUTPUT STREAM #-----------------------------------------------------------------------
[docs]def list_merge_agent(func, in_streams, out_stream, state=None, call_streams=None, name=None, args=[], kwargs={}): """ Parameters ---------- func: function function from a list of lists (one list per input stream) to an output list in_streams: list of Stream The list of input streams of the agent out_stream: Stream The single output stream of the agent state: object The state of the agent call_streams: list of Stream The list of call_streams. A new value in any stream in this list causes a state transition of this agent. name: Str Name of the agent created by this function. Returns ------- Agent. The agent created by this function. """ # Check the arguments and output error messages if argument types # and numbers are incorrect. check_merge_agent_arguments(func, in_streams, out_stream, call_streams, name) check_num_args_in_func(state, name, func, args, kwargs) num_in_streams = len(in_streams) # The transition function for this agent. def transition(in_lists, state): # Check the types of the input lists for the transition. check_in_lists_type(name, in_lists, num_in_streams) # smallest_list_length is the smallest of all the inputs for # this agent. smallest_list_length = \ min(in_list.stop - in_list.start for in_list in in_lists) # Check for null transition. if smallest_list_length == 0: # output_list is [] # return ([output_list], state, # [in_list.start+smallest_list_length # for in_list in in_lists]) return ([[]], state, [in_list.start for in_list in in_lists]) # input_lists is the list of lists that this agent operates on # in this transition. input_lists = [in_list.list[in_list.start:in_list.start+smallest_list_length] for in_list in in_lists] # Compute the output generated by this transition. if state is None: output_list = func(input_lists, *args, **kwargs) else: ouput_list, state = func(input_lists, state, *args, **kwargs) # Return: (1) the list of outputs, one per output stream. The # merge_agent has a single output list. # (2) the new state and # (3) the new starting pointer into this stream for # this agent. Since this agent has read the entire # input_list, move its starting pointer forward by # the length of the input list. return ([output_list], state, [in_list.start+smallest_list_length for in_list in in_lists]) # Finished transition # Create agent with the following parameters: # 1. list of input streams. merge has a list, instreams, of input # streams. # 2. list of output streams. merge has a single output stream. # 3. transition function # 4. new state # 5. list of calling streams # 6. Agent name return Agent(in_streams, [out_stream], transition, state, call_streams, name)
#----------------------------------------------------------------------- # SPLIT: SINGLE INPUT STREAM, LIST OF OUTPUT STREAMS #-----------------------------------------------------------------------
[docs]def list_split_agent(func, in_stream, out_streams, state=None, call_streams=None, name=None, args=[], kwargs={}): """ Parameters ---------- func: function function from an input list to a list of lists (one per output stream). in_stream: Stream The single input stream of the agent out_streams: list of Stream The list of output streams of the agent state: object The state of the agent call_streams: list of Stream The list of call_streams. A new value in any stream in this list causes a state transition of this agent. name: Str Name of the agent created by this function. Returns ------- Agent. The agent created by this function. """ # Check the types of the input lists for the transition. check_split_agent_arguments(func, in_stream, out_streams, call_streams, name) # Check the number of arguments in func, the transition function. check_num_args_in_func(state, name, func, args, kwargs) num_out_streams = len(out_streams) num_in_streams = 1 # The transition function for this agent. def transition(in_lists, state): # Check the types of the input lists for the transition. check_in_lists_type(name, in_lists, num_in_streams) # This agent has only one input stream. Get its only in_list. in_list = in_lists[0] # input_list is the slice of the input stream that this agent # is working on. # in_list is an object of type InList, and it consists of an # arbitrarily long list (in_list.list), and pointers to where # this agent is starting to read the list (in_list.start), and # where the list ends (in_list.stop). input_list = in_list.list[in_list.start:in_list.stop] # Check for the null transition. # If the new input data is empty then return an empty list for # the single output stream, and leave the state and the starting # point for the single input stream unchanged. if len(input_list) == 0: # output_list is [] for each output stream. # So, output_lists = [[]]*num_out_streams # return (output_lists, state, # [in_list.start+len(input_list)] return ([[]]*num_out_streams, state, [in_list.start]) # Compute the output generated by this transition. if state is None: output_lists = func(input_list, *args, **kwargs) else: output_lists, state = func(input_list, state, *args, **kwargs) assert len(output_lists) == num_out_streams, \ 'Error in list-split transition function of agent {0}.'\ ' The number, {1}, of output lists does not equal the number, {2},'\ ' of output streams.'.format(name, len(output_lists), num_out_streams) # Return: (1) output_lists, the list of outputs, one per # output stream. # (2) the new state and # (3) the new starting pointer into this stream for # this agent. Since this agent has read the entire # input_list, move its starting pointer forward by # the length of the input list. return (output_lists, state, [in_list.start+len(input_list)]) # Finished transition # Create agent with the following parameters: # 1. list of input streams. split has a single input stream. # 2. list of output streams. # 3. transition function # 4. new state # 5. list of calling streams # 6. Agent name return Agent([in_stream], out_streams, transition, state, call_streams, name)
#----------------------------------------------------------------------- # MANY: LIST OF INPUT STREAMS, LIST OF OUTPUT STREAMS #-----------------------------------------------------------------------
[docs]def list_many_agent(func, in_streams, out_streams, state=None, call_streams=None, name=None, args=[], kwargs={}): """ Parameters ---------- func: function function from an input list to an output list in_streams: list of Stream The input streams of the agent out_streams: list of Stream The output streams of the agent state: object The state of the agent call_streams: list of Stream The list of call_stream. A new value in any stream in this list causes a state transition of this agent. name: Str Name of the agent created by this function. Returns ------- Agent. The agent created by this function. """ # Check the types of the input lists for the transition. check_many_agent_arguments(func, in_streams, out_streams, call_streams, name) check_num_args_in_func(state, name, func, args, kwargs) num_out_streams = len(out_streams) num_in_streams = len(in_streams) # The transition function for this agent. def transition(in_lists, state): # Check the types of the input lists for the transition. check_in_lists_type(name, in_lists, num_in_streams) # smallest_list_length is the smallest of all the inputs for # this agent. smallest_list_length = \ min(in_list.stop - in_list.start for in_list in in_lists) # Check for the null transition. if smallest_list_length == 0: # output_list is [] for each output stream. # So, output_lists = [[]]*num_out_streams # return (output_lists, state, # [in_list.start+smallest_list_length # for in_list in in_lists]) return ([[]]*num_out_streams, state, [in_list.start for in_list in in_lists]) # input_lists is the list of lists that this agent operates on # in this transition. input_lists = [in_list.list[in_list.start:in_list.start+smallest_list_length] for in_list in in_lists] # Compute the output generated by this transition. if state is None: output_lists = func(input_lists, *args, **kwargs) else: ouput_lists, state = func(input_lists, state, *args, **kwargs) # Return: (1) output_lists, the list of outputs, one per # output stream. # (2) the new state and # (3) the new starting pointer into this stream for # this agent. Since this agent has read # smallest_list_length number of elements, move # its starting pointer forward by # smallest_list_length. return (output_lists, state, [in_list.start+smallest_list_length for in_list in in_lists]) # Finished transition # Create agent with the following parameters: # 1. list of input streams. # 2. list of output streams. # 3. transition function # 4. new state # 5. list of calling streams # 6. Agent name return Agent(in_streams, out_streams, transition, state, call_streams, name)
#------------------------------------------------------------------------------------------------ #------------------------------------------------------------------------------------------------ # LIST AGENT TESTS #------------------------------------------------------------------------------------------------ #------------------------------------------------------------------------------------------------
[docs]def test_list_agents(): r = Stream('r') s = Stream('s') t = Stream('t') u = Stream('u') v = Stream('v') w = Stream('w') x = Stream('x') y = Stream('y') z = Stream('z') # Check simple map def simple(lst): return [2*v for v in lst] a = list_map_agent(func=simple, in_stream=x, out_stream=y, name='a') x.extend(range(5)) assert x.stop == 5 assert x.recent[:5] == range(5) assert y.stop == 5 assert y.recent[:5] == [0, 2, 4, 6, 8] # Check map with state # Function that operates on an element and state and returns an # element and state. def f(input_list, state): output_list = [[]]*len(input_list) for i in range(len(input_list)): output_list[i] = input_list[i]+state state += 2 return output_list, state b = list_map_agent(func=f, in_stream=x, out_stream=z, state=0, name='b') assert z.stop == 5 assert z.recent[:5] == [0, 3, 6, 9, 12] # Check map with call streams c = list_map_agent(func=f, in_stream=x, out_stream=v, state=10, call_streams=[w], name='c') assert v.stop == 0 w.append(0) assert v.stop == 5 assert v.recent[:5] == [10, 13, 16, 19, 22] # Check sink def p(input_list): for v in input_list: print v #sink_agent = list_sink_agent(func=p, in_stream=x, name='sink_agent') # Check sink with state def add(input_list, state): return sum(input_list)+state sink_with_state_agent = list_sink_agent( func=add, in_stream=x, state=0, name='sink_with_state_agent') assert sink_with_state_agent.state == 10 # Check merge # Function that operates on a list of lists def g(list_of_lists): return [sum(snapshot) for snapshot in zip(*list_of_lists)] d = list_merge_agent(func=g, in_streams=[x,u], out_stream=s, name='d') assert s.stop == 0 u.extend([10, 15, 18]) assert s.stop == 3 assert s.recent[:3] == [10, 16, 20] u.append(37) assert s.stop == 4 assert s.recent[:4] == [10, 16, 20, 40] u.extend([96, 95]) assert s.stop == 5 assert s.recent[:5] == [10, 16, 20, 40, 100] # Check split def h(input_list): return [[element+1 for element in input_list], [element*2 for element in input_list]] e = list_split_agent(func=h, in_stream=x, out_streams=[r, t], name='e') assert r.stop == 5 assert t.stop == 5 assert r.recent[:5] == [1, 2, 3, 4, 5] assert t.recent[:5] == [0, 2, 4, 6, 8] # Check split with state def h_state(input_list, state): length = len(input_list) output_list_0 = [[]]*length output_list_1 = [[]]*length for i in range(length): output_list_0[i] = input_list[i]+state output_list_1[i] = input_list[i]*state state += 1 return ([output_list_0, output_list_1], state) r_state = Stream(name='r_state') t_state = Stream(name='t_state') x_state = Stream(name='x_state') e_state = list_split_agent( func=h_state, in_stream=x_state, out_streams=[r_state, t_state], name='e_state', state=0) assert r_state.stop == 0 assert t_state.stop == 0 x_state.extend(range(5)) assert r_state.stop == 5 assert r_state.recent[:r_state.stop] == [0, 2, 4, 6, 8] assert t_state.stop == 5 assert t_state.recent[:5] == [0, 1, 4, 9, 16] x_state.extend(range(5,10,1)) assert r_state.stop == 10 assert r_state.recent[:r_state.stop] == [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] assert t_state.stop == 10 assert t_state.recent[:t_state.stop] == [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # Check many def f_many(list_of_lists): snapshots = zip(*list_of_lists) return [[max(snapshot) for snapshot in snapshots], [min(snapshot) for snapshot in snapshots]] u_stream = Stream(name='u_stream') v_stream = Stream(name='v_stream') w_stream = Stream(name='w_stream') x_stream = Stream(name='x_stream') many_agent = list_many_agent( func=f_many, in_streams=[u_stream, v_stream], out_streams=[w_stream, x_stream], name='many_agent') assert w_stream.stop == 0 assert x_stream.stop == 0 u_stream.extend([5, 9, 11, 8]) assert w_stream.stop == 0 assert x_stream.stop == 0 v_stream.extend([20, 1, -3, 17, 10]) assert w_stream.stop == 4 assert x_stream.stop == 4 assert w_stream.recent[:w_stream.stop] == [20, 9, 11, 17] assert x_stream.recent[:x_stream.stop] == [5, 1, -3, 8] v_stream.extend([20, -100]) assert w_stream.stop == 4 assert x_stream.stop == 4 assert w_stream.recent[:w_stream.stop] == [20, 9, 11, 17] assert x_stream.recent[:x_stream.stop] == [5, 1, -3, 8] u_stream.extend([14, 5, -200, 70]) assert w_stream.stop == 7 assert x_stream.stop == 7 assert w_stream.recent[:w_stream.stop] == [20, 9, 11, 17, 14, 20, -100] assert x_stream.recent[:x_stream.stop] == [5, 1, -3, 8, 10, 5, -200] #------------------------------------------------------------------ #------------------------------------------------------------------ # Test NumPy arrays: StreamArray #------------------------------------------------------------------ #------------------------------------------------------------------ # Test list map on StreamArray (dimension is 0). a_stream_array = StreamArray(name='a_stream_array') b_stream_array = StreamArray(name='b_stream_array') def f_np(input_array): return input_array+1 a_np_agent = list_map_agent(func=f_np, in_stream=a_stream_array, out_stream=b_stream_array, name='a_np_agent') assert b_stream_array.stop == 0 a_stream_array.extend(np.arange(5.0)) assert np.array_equal(b_stream_array.recent[:b_stream_array.stop], np.arange(5.0)+1) a_stream_array.extend(np.arange(5.0, 10.0, 1.0)) assert np.array_equal(b_stream_array.recent[:b_stream_array.stop], np.arange(10.0)+1) # Test list map with state on StreamArray (dimension is 0) c_stream_array = StreamArray(name='c_stream_array') d_stream_array = StreamArray(name='d_stream_array') def f_np_state(input_array, state): return np.cumsum(input_array)+state, np.sum(input_array) b_np_agent = list_map_agent(func=f_np_state, in_stream=c_stream_array, out_stream=d_stream_array, state = 0.0, name='b_np_agent') assert d_stream_array.stop == 0 c_stream_array.extend(np.arange(5.0)) assert np.array_equal( d_stream_array.recent[:d_stream_array.stop], np.cumsum(np.arange(5.0))) c_stream_array.extend(np.arange(5.0, 10.0, 1.0)) assert np.array_equal( d_stream_array.recent[:d_stream_array.stop], np.cumsum(np.arange(10.0))) # Test list map with positive integer dimension on StreamArray e_stream_array = StreamArray(name='e_stream_array', dimension=3) f_stream_array = StreamArray(name='f_stream_array', dimension=2) def f_np_dimension(input_array): output_array = np.zeros([len(input_array), 2]) output_array[:,0] = input_array[:,0]+input_array[:,1] output_array[:,1] = input_array[:,2] return output_array c_np_agent = list_map_agent(func=f_np_dimension, in_stream=e_stream_array, out_stream=f_stream_array, name='c_np_agent') e_stream_array.extend(np.array([[1.0, 2.0, 3.0]])) assert np.array_equal(f_stream_array.recent[:f_stream_array.stop], np.array([[3.0, 3.0]])) e_stream_array.extend(np.array([[4.0, 5.0, 6.0], [7.0, 8.0, 9.0]])) assert np.array_equal(f_stream_array.recent[:f_stream_array.stop], np.array([[3.0, 3.0], [9.0, 6.0], [15.0, 9.0]])) # Test list map with a dimension which is a tuple of integers. g_stream_array = StreamArray(name='g_stream_array', dimension=(2,2)) h_stream_array = StreamArray(name='h_stream_array', dimension=(2,2)) def f_np_tuple_dimension(input_array): return input_array*2 d_np_agent = list_map_agent(func=f_np_tuple_dimension, in_stream=g_stream_array, out_stream=h_stream_array, name='d_np_agent') a_array = np.array([[[1.0, 2.0],[3.0, 4.0]], [[5.0, 6.0],[7.0, 8.0]]]) g_stream_array.extend(a_array) assert np.array_equal(h_stream_array.recent[:h_stream_array.stop], a_array*2) b_array = np.array([[[9.0, 10.0], [11.0, 12.0]]]) g_stream_array.extend(b_array) assert np.array_equal(h_stream_array.recent[:h_stream_array.stop], np.vstack((a_array, b_array))*2) # Test list map with a datatype and dimension of 0. dt_0 = np.dtype([('time', int), ('value', (float, 3))]) dt_1 = np.dtype([('time', int), ('value', float)]) i_stream_array = StreamArray(name='i_stream_array', dtype=dt_0) j_stream_array = StreamArray(name='j_stream_array', dtype=dt_1) def f_datatype(input_array): output_array = np.zeros(len(input_array), dtype=dt_1) output_array['time'] = input_array['time'] output_array['value'] = np.sum(input_array['value'], axis=1) return output_array e_np_agent = list_map_agent(func=f_datatype, in_stream=i_stream_array, out_stream=j_stream_array, name='e_np_agent') c_array = np.array([(1, [2.0, 3.0, 4.0])], dtype=dt_0) assert j_stream_array.stop == 0 i_stream_array.extend(c_array) assert np.array_equal(j_stream_array.recent[:j_stream_array.stop], f_datatype(c_array)) d_array = np.array([(10, [6.0, 7.0, 8.0]), (20, [10.0, 11.0, 12.0])], dtype=dt_0) i_stream_array.extend(d_array) assert np.array_equal(j_stream_array.recent[:j_stream_array.stop], f_datatype(np.hstack((c_array, d_array)))) # Test list map with a datatype and positive integer dimension. k_stream_array = StreamArray(name='k_stream_array', dtype=dt_0, dimension=2) l_stream_array = StreamArray(name='l_stream_array', dtype=dt_1) def f_datatype_int_dimension(input_array): m = len(input_array) output_array = np.zeros(m, dtype=dt_1) for i in range(m): output_array[i]['time'] = np.max(input_array[i]['time']) output_array[i]['value'] = np.sum(input_array[i]['value']) return output_array f_np_agent = list_map_agent(func=f_datatype_int_dimension, in_stream=k_stream_array, out_stream=l_stream_array, name='f_np_agent') e_array = np.array([[(1, [2.0, 3.0, 4.0]), (2, [5.0, 6.0, 7.0])]], dtype=dt_0) assert l_stream_array.stop == 0 k_stream_array.extend(e_array) assert np.array_equal(l_stream_array.recent[:l_stream_array.stop], f_datatype_int_dimension(e_array)) f_array = np.array([[(3, [8.0, 9.0, 10.0]), (4, [11.0, 12.0, 13.0])], [(5, [-1.0, 0.0, 1.0]), (6, [-2.0, 2.0, -2.0])]], dtype=dt_0) k_stream_array.extend(f_array) assert np.array_equal(l_stream_array.recent[:l_stream_array.stop], f_datatype_int_dimension(np.vstack((e_array, f_array)))) # Test list map with a datatype and a dimension which is a tuple m_stream_array = StreamArray(name='m_stream_array', dtype=dt_0, dimension=(2,2)) n_stream_array = StreamArray(name='n_stream_array', dtype=dt_1) g_np_agent = list_map_agent(func=f_datatype_int_dimension, in_stream=m_stream_array, out_stream=n_stream_array, name='g_np_agent') assert n_stream_array.stop == 0 g_array = np.array([ # zeroth 2x2 array [[(1, [2.0, 3.0, 4.0]), (2, [5.0, 6.0, 7.0])], [(3, [8.0, 9.0, 10.0]), (4, [11.0, 12.0, 13.0])]], # first 2x2 array [[(5, [12.0, 13.0, 14.0]), (6, [15.0, 16.0, 17.0])], [(7, [18.0, 19.0, 20.0]), (8, [21.0, 22.0, 23.0])]] ], dtype=dt_0) m_stream_array.extend(g_array) assert np.array_equal(n_stream_array.recent[:n_stream_array.stop], f_datatype_int_dimension(g_array)) h_array = np.array([ [[(9, [0.0, 1.0, -1.0]), (10, [2.0, 2.0, -4.0])], [(11, [80.0, -71.0, -9.0]), (15, [0.0, 0.0, 0.0])]] ], dtype=dt_0) m_stream_array.extend(h_array) assert np.array_equal(n_stream_array.recent[:n_stream_array.stop], f_datatype_int_dimension(np.vstack((g_array, h_array)))) # Test list merge with StreamArray and no dimension and no data type a_in_0 = StreamArray(name='a_in_0') a_in_1 = StreamArray(name='a_in_1') a_out = StreamArray(name='out') def a_merge(list_of_lists): array_0, array_1 = list_of_lists return array_0 + array_1 a_s_agent = list_merge_agent(func=a_merge, in_streams=[a_in_0, a_in_1], out_stream=a_out, name='a_s_agent') assert a_out.stop == 0 a_in_0.extend(np.array([1.0, 2.0, 3.0])) assert a_out.stop == 0 a_in_0.extend(np.array([4.0, 5.0, 6.0])) assert a_out.stop == 0 a_in_1.extend(np.array([10.0, 20.0])) assert np.array_equal(a_out.recent[:a_out.stop], np.array([11.0, 22.0])) a_in_1.extend(np.array([30.0, 40.0])) assert np.array_equal(a_out.recent[:a_out.stop], np.array([11.0, 22.0, 33.0, 44.0])) # Test list merge with StreamArray and no dimension and data type a_in_dt_0 = StreamArray(name='a_in_dt_0', dtype=dt_0) a_in_dt_1 = StreamArray(name='a_in_dt_1', dtype=dt_0) a_out_dt = StreamArray(name='out', dtype=dt_0) def a_merge_dtype(list_of_arrays): input_array_0, input_array_1 = list_of_arrays output_array = np.zeros(len(input_array_0), dtype=dt_0) output_array['time'] = \ np.max((input_array_0['time'], input_array_1['time']), axis=0) output_array['value'] = input_array_0['value'] + input_array_1['value'] return output_array a_s_dt_agent = list_merge_agent(func=a_merge_dtype, in_streams=[a_in_dt_0, a_in_dt_1], out_stream=a_out_dt, name='a_s_dt_agent') a_in_dt_0.extend(np.array([(1, [1.0, 2.0, 3.0])], dtype=dt_0)) assert a_out_dt.stop == 0 a_in_dt_1.extend(np.array([(2, [10.0, 20.0, 30.0])], dtype=dt_0)) assert np.array_equal(a_out_dt.recent[:a_out_dt.stop], np.array([(2, [11.0, 22.0, 33.0])], dtype=dt_0)) a_in_dt_0.extend(np.array([(5, [21.0, 23.0, 32.0]), (9, [27.0, 29.0, 31.0])], dtype=dt_0)) assert np.array_equal(a_out_dt.recent[:a_out_dt.stop], np.array([(2, [11.0, 22.0, 33.0])], dtype=dt_0)) a_in_dt_1.extend(np.array([(6, [19.0, 17.0, 8.0]), (8, [13.0, 11.0, 9.0]), (10, [3.0, 1.0, 5.0])], dtype=dt_0)) assert np.array_equal(a_out_dt.recent[:a_out_dt.stop], np.array([(2, [11.0, 22.0, 33.0]), (6, [40.0, 40.0, 40.0]), (9, [40.0, 40.0, 40.0])], dtype=dt_0)) # Test list split with StreamArray and positive integer dimension and no data type dim = 2 b_in = StreamArray(name='b_in', dimension=dim) b_out_0 = StreamArray(name='b_out_0', dimension=dim) b_out_1 = StreamArray(name='b_out_1') def b_split(array_of_arrays): length = len(array_of_arrays) output_array_0 = np.zeros((length, dim,)) output_array_1 = np.zeros(length) for i in range(length): input_array = array_of_arrays[i] output_array_0[i] = np.array([[np.max(input_array), np.min(input_array)]]) output_array_1[i] = np.array([np.sum(input_array)]) return output_array_0, output_array_1 b_split_agent = list_split_agent(func=b_split, in_stream=b_in, out_streams=[b_out_0, b_out_1], name='b_split_agent') b_array_0 = np.array([[1.0, 9.0]]) b_in.extend(b_array_0) assert np.array_equal(b_out_0.recent[:b_out_0.stop], np.array([[9.0, 1.0]])) assert np.array_equal(b_out_1.recent[:b_out_1.stop], np.array([10.0])) b_array_1 = np.array([[98.0, 2.0]]) b_in.extend(b_array_1) assert np.array_equal(b_out_0.recent[:b_out_0.stop], np.array([[9.0, 1.0], [98.0, 2.0]])) assert np.array_equal(b_out_1.recent[:b_out_1.stop], np.array([10.0, 100.0])) b_array_3 = np.array([[10.0, 20.0], [3.0, 37.0], [55.0, 5.0]]) b_in.extend(b_array_3) assert np.array_equal(b_out_0.recent[:b_out_0.stop], np.array([[9.0, 1.0], [98.0, 2.0], [20.0, 10.0], [37.0, 3.0], [55.0, 5.0]])) assert np.array_equal(b_out_1.recent[:b_out_1.stop], np.array([10.0, 100.0, 30.0, 40.0, 60.0])) # Test list many with StreamArray and no dimension and no data type c_in_0 = StreamArray(name='c_in_0') c_in_1 = StreamArray(name='c_in_1') c_out_0 = StreamArray(name='c_out_0') c_out_1 = StreamArray(name='c_out_1') def c_many(list_of_arrays): length = len(list_of_arrays) input_array_0, input_array_1 = list_of_arrays output_array_0 = np.zeros(length) output_array_1 = np.zeros(length) output_array_0 = input_array_0 + input_array_1 output_array_1 = input_array_0 - input_array_1 return [output_array_0, output_array_1] c_many_agent = list_many_agent(func=c_many, in_streams=[c_in_0, c_in_1], out_streams=[c_out_0, c_out_1], name='c_many_agent') c_array_0_0 = np.arange(3.0)*3 c_array_1_0 = np.arange(3.0) c_in_0.extend(c_array_0_0) c_in_1.extend(c_array_1_0) assert np.array_equal(c_out_0.recent[:c_out_0.stop], np.array([0.0, 4.0, 8.0])) assert np.array_equal(c_out_1.recent[:c_out_1.stop], np.array([0.0, 2.0, 4.0])) c_array_0_1 = np.array([100.0]) c_array_1_1 = np.array([4.0, 5.0, 6.0]) c_in_0.extend(c_array_0_1) c_in_1.extend(c_array_1_1) assert np.array_equal(c_out_0.recent[:c_out_0.stop], np.array([0.0, 4.0, 8.0, 104.0])) assert np.array_equal(c_out_1.recent[:c_out_1.stop], np.array([0.0, 2.0, 4.0, 96.0])) ## # Test list many with StreamArray and no dimension and no data type ## z_in_0 = StreamArray(name='z_in_0') ## z_in_1 = StreamArray(name='z_in_1') ## z_out_0 = StreamArray(name='z_out_0') ## z_out_1 = StreamArray(name='z_out_1') ## def execute_list_of_np_func(v, list_of_np_func): ## length = len(list_of_arrays) ## input_array_0, input_array_1 = list_of_arrays ## output_array_0 = np.zeros(length) ## output_array_1 = np.zeros(length) ## output_array_0 = input_array_0 + input_array_1 ## output_array_1 = input_array_0 - input_array_1 ## return [output_array_0, output_array_1] # Test list many with StreamArray and positive integer dimension and no data type dim = 2 d_in_0 = StreamArray(name='d_in_0', dimension=dim) d_in_1 = StreamArray(name='d_in_1', dimension=dim) d_out_0 = StreamArray(name='d_out_0', dimension=dim) d_out_1 = StreamArray(name='d_out_1') def d_many(list_of_arrays): length = len(list_of_arrays) input_array_0, input_array_1 = list_of_arrays output_array_0 = input_array_0 + input_array_1 output_array_1 = np.array([np.sum(input_array_0+input_array_1)]) return output_array_0, output_array_1 d_many_agent = list_many_agent(func=d_many, in_streams=[d_in_0, d_in_1], out_streams=[d_out_0, d_out_1], name='d_many_agent') d_array_0_0 = np.array([[1.0, 2.0]]) d_array_1_0 = np.array([[0.0, 10.0]]) d_in_0.extend(d_array_0_0) d_in_1.extend(d_array_1_0) assert np.array_equal(d_out_0.recent[:d_out_0.stop], np.array([[1.0, 12.0]])) assert np.array_equal(d_out_1.recent[:d_out_1.stop], np.array([13.0])) d_array_0_1 = np.array([[4.0, 8.0]]) d_array_1_1 = np.array([[2.0, 4.0]]) d_in_0.extend(d_array_0_1) d_in_1.extend(d_array_1_1) assert np.array_equal(d_out_0.recent[:d_out_0.stop], np.array([[1.0, 12.0], [6.0, 12.0]])) assert np.array_equal(d_out_1.recent[:d_out_1.stop], np.array([13.0, 18.0])) d_array_0_2 = np.array([[20.0, 30.0], [40.0, 50.0]]) d_array_1_2 = np.array([[-10.0, -20.0]]) d_in_0.extend(d_array_0_2) d_in_1.extend(d_array_1_2) assert np.array_equal(d_out_0.recent[:d_out_0.stop], np.array([[1.0, 12.0], [6.0, 12.0], [10.0, 10.0]])) assert np.array_equal(d_out_1.recent[:d_out_1.stop], np.array([13.0, 18.0, 20.0])) # Test list many with StreamArray and tuple dimension and no data type dim = (2,2) e_in_0 = StreamArray(name='e_in_0', dimension=dim) e_in_1 = StreamArray(name='e_in_1', dimension=dim) e_out_0 = StreamArray(name='e_out_0', dimension=dim) e_out_1 = StreamArray(name='e_out_1') def e_many(list_of_arrays): input_array_0, input_array_1 = list_of_arrays output_array_0 = input_array_0 + input_array_1 output_array_1 = \ np.array([np.sum(input_array_0[i]+ input_array_1[i]) for i in range(len(input_array_0))]) return output_array_0, output_array_1 e_many_agent = list_many_agent(func=e_many, in_streams=[e_in_0, e_in_1], out_streams=[e_out_0, e_out_1], name='e_many_agent') e_array_0_0 = np.array([[[10.0, 20.0], [30.0, 40.0]]]) e_in_0.extend(e_array_0_0) e_array_1_0 = np.array([[[1.0, 2.0], [3.0, 4.0]]]) e_in_1.extend(e_array_1_0) assert np.array_equal(e_out_0.recent[:e_out_0.stop], np.array([[[11.0, 22.0], [33.0, 44.0]]])) assert np.array_equal(e_out_1.recent[:e_out_1.stop], np.array([110.0])) e_array_0_1 = np.array([[[11.0, 13.0], [17.0, 19.0]], [[2.0, 4.0], [6.0, 8.0]]]) e_in_0.extend(e_array_0_1) assert np.array_equal(e_out_0.recent[:e_out_0.stop], np.array([[[11.0, 22.0], [33.0, 44.0]]])) assert np.array_equal(e_out_1.recent[:e_out_1.stop], np.array([110.0])) e_array_1_1 = np.array([[[1.0, 2.0], [3.0, 4.0]], [[5.0, 6.0], [7.0, 8.0]]]) e_in_1.extend(e_array_1_1) assert np.array_equal(e_out_0.recent[:e_out_0.stop], np.array([[[11.0, 22.0], [33.0, 44.0]], [[12.0, 15.0], [20.0, 23.0]], [[7.0, 10.0], [13.0, 16.0]]])) assert np.array_equal(e_out_1.recent[:e_out_1.stop], np.array([110.0, 70.0, 46.0])) e_array_1_2 = np.array([[[11.0, 12.0], [13.0, 14.0]], [[15.0, 16.0], [17.0, 18.0]]]) e_in_1.extend(e_array_1_2) e_array_0_2 = np.array([[[-10.0, -11.0], [12.0, 16.0]], [[-14.0, -15.0], [-16.0, -17.0]]]) e_in_0.extend(e_array_0_2) assert np.array_equal(e_out_0.recent[:e_out_0.stop], np.array([[[11.0, 22.0], [33.0, 44.0]], [[12.0, 15.0], [20.0, 23.0]], [[7.0, 10.0], [13.0, 16.0]], [[1.0, 1.0], [25.0, 30.0]], [[1.0, 1.0], [1.0, 1.0]]])) assert np.array_equal(e_out_1.recent[:e_out_1.stop], np.array([110.0, 70.0, 46.0, 57.0, 4.0])) #------------------------------------------------------------------ #------------------------------------------------------------------ # Test args and kwargs #------------------------------------------------------------------ #------------------------------------------------------------------ # Test map def map_args(lst, multiplicand): return [multiplicand*element for element in lst] in_stream_map_args_stream = Stream('in_stream_map_args_stream') out_stream_map_args_stream = Stream('out_stream_map_args_stream') out_stream_map_kwargs_stream = Stream('out_stream_map_kwargs_stream') map_args_agent = list_map_agent(func=map_args, in_stream=in_stream_map_args_stream, out_stream=out_stream_map_args_stream, name='map_args_agent', args=[2]) map_kwargs_agent = list_map_agent(func=map_args, in_stream=in_stream_map_args_stream, out_stream=out_stream_map_kwargs_stream, name='map_args_agent', kwargs={'multiplicand':2}) assert out_stream_map_args_stream.recent[:out_stream_map_args_stream.stop] == \ [] assert out_stream_map_kwargs_stream.recent[:out_stream_map_kwargs_stream.stop] == \ [] in_stream_map_args_stream.extend(range(5)) assert out_stream_map_args_stream.recent[:out_stream_map_args_stream.stop] == \ [0, 2, 4, 6, 8] assert out_stream_map_kwargs_stream.recent[:out_stream_map_kwargs_stream.stop] == \ [0, 2, 4, 6, 8] in_stream_map_args_stream.append(5) assert out_stream_map_args_stream.recent[:out_stream_map_args_stream.stop] == \ [0, 2, 4, 6, 8, 10] assert out_stream_map_kwargs_stream.recent[:out_stream_map_kwargs_stream.stop] == \ [0, 2, 4, 6, 8, 10] # Test list map on StreamArray (dimension is 0). a_stream_array_args = StreamArray(name='a_stream_array_args') b_stream_array_args = StreamArray(name='b_stream_array_args') c_stream_array_args_kwargs = StreamArray(name='c_stream_array_args_kwargs') def f_np_args(input_array_args, addend): return input_array_args+addend def f_np_args_kwargs(input_array_args_kwargs, multiplicand, addend): return input_array_args_kwargs*multiplicand + addend a_np_agent_args = list_map_agent( func=f_np_args, in_stream=a_stream_array_args, out_stream=b_stream_array_args, name='a_np_agent_args', args=[1]) a_np_agent_args_kwargs = list_map_agent( func=f_np_args_kwargs, in_stream=a_stream_array_args, out_stream=c_stream_array_args_kwargs, name='a_np_agent_args_kwargs', args=[2], kwargs={'addend':10}) assert np.array_equal( b_stream_array_args.recent[:b_stream_array_args.stop], np.array([])) assert np.array_equal( c_stream_array_args_kwargs.recent[:c_stream_array_args_kwargs.stop], np.array([])) a_stream_array_args.extend(np.arange(5.0)) assert np.array_equal(b_stream_array_args.recent[:b_stream_array_args.stop], np.arange(5.0)+1) assert np.array_equal(c_stream_array_args_kwargs.recent[:c_stream_array_args_kwargs.stop], np.arange(5.0)*2 + 10) a_stream_array_args.extend(np.arange(5.0, 10.0, 1.0)) assert np.array_equal(b_stream_array_args.recent[:b_stream_array_args.stop], np.arange(10.0)+1) assert np.array_equal(c_stream_array_args_kwargs.recent[:c_stream_array_args_kwargs.stop], np.arange(10.0)*2 + 10) return
if __name__ == '__main__': test_list_agents()