Source code for IoTPy.code.agents.split

"""
This module consists of functions that split a single input stream
into multiple output streams.


Functions in the module:
   1. element_split_agent
   2. split_stream
   3. separate_agent
   4. separate
   5. unzip_agent
   6. unzip_stream

Split functions:
   1. split_stream: a function returns a list with each element placed
      in a different stream.
   2. separate: separate is the inverse of mix (see merge.py). The
      elements of the input stream are pairs (i, v) and the value v is
      placed on the i-th output stream.
   3. unzip_stream: unzip_stream is the inverse of zip_stream (see
      merge.py). The elements of the input stream are lists and the
      i-th element of the list is placed on the i-th output stream.

Agents:
   1. element_split_agent: agent used by split_stream
   2. separate_agent: agent used by separate
   3. unzip_agent: not used by any function in this module. It is
      retained only for backward compatibility.
   
"""
import sys
import os
# sys.path.append(os.path.abspath("../"))

from ..agent import Agent, InList
from ..stream import Stream, StreamArray
from ..stream import _no_value, _multivalue, _close
from ..helper_functions.check_agent_parameter_types import *
from ..helper_functions.recent_values import recent_values

#-----------------------------------------------------------------------
# SPLIT: SINGLE INPUT STREAM, LIST OF OUTPUT STREAMS
#-----------------------------------------------------------------------
[docs]def element_split_agent(func, in_stream, out_streams, state=None, call_streams=None, name=None, *args, **kwargs): """ Parameters ---------- func: function function from an input list and args and kwargs to an output list in_stream: Stream The single input stream of the agent out_streams: list of Stream The list of output streams of the agent state: object The state of the agent call_streams: list of Stream The list of call_streams. A new value in any stream in this list causes a state transition of this agent. name: str Name of the agent created by this function. Returns ------- Agent. The agent created by this function. """ check_split_agent_arguments(func, in_stream, out_streams, call_streams, name) check_num_args_in_func(state, name, func, args, kwargs) num_out_streams = len(out_streams) num_in_streams = 1 # The transition function for this agent. def transition(in_lists, state): check_in_lists_type(name, in_lists, num_in_streams) in_list = in_lists[0] input_list = in_list.list[in_list.start:in_list.stop] # If the new input data is empty then return an empty list for # the single output stream, and leave the state and the starting # point for the single input stream unchanged. if not input_list or len(input_list) == 0: return ([[]]*num_out_streams, state, [in_list.start]) if state is None: output_snapshots = \ [func(element, *args, **kwargs) for element in input_list] else: output_snapshots = [[]]*len(input_list) for i in range(len(input_list)): output_snapshots[i], state = \ func(input_list[i], state, *args, **kwargs) check_func_output_for_multiple_streams(func, name, num_out_streams, output_snapshots) add_to_output_streams = [list(snapshot) for snapshot in zip(*output_snapshots)] return (add_to_output_streams, state, [in_list.start+len(input_list)]) # Finished transition # Create agent return Agent([in_stream], out_streams, transition, state, call_streams, name)
[docs]def split_stream(function, in_stream, num_out_streams, state=None, *args, **kwargs): """ split_stream returns out_streams, a list of num_out_streams streams. The function, with the specified state, args and kwargs, is applied to the elements of the input stream. The return value of the function must be a list of length num_out_streams. The i-th value of the returned list is placed in the i-th output stream. Parameters ---------- in_stream: Stream The stream that will be split num_out_streams: int The number of output streams. state: object function operates on a state, args, and kwargs Returns ------- out_streams: List of Stream The output streams generated by split_stream Uses ------- * element_split_agent """ out_streams = [Stream() for _ in range(num_out_streams)] element_split_agent( function, in_stream, out_streams, state, *args, **kwargs) return out_streams
#----------------------------------------------------------------------- # SEPARATE: SINGLE INPUT STREAMS, LIST OF OUTPUT STREAM #-----------------------------------------------------------------------
[docs]def separate_agent(in_stream, out_streams, name=None): """ Parameters ---------- in_stream: Stream The single input stream of the agent out_streams: list of Stream The list of output streams of the agent name: str Name of the agent created by this function. Returns ------- Agent. The agent created by this function. """ def f(x): j, v = x lst = [_no_value] * len(out_streams) lst[j] = v return lst return element_split_agent( func=f, in_stream=in_stream, out_streams=out_streams, name=name)
[docs]def separate(in_stream, num_out_streams): """ separate returns out_streams, a list of num_out_streams streams. separate is the inverse of mix (see merge.py). The elements of the input stream are pairs (i, v) and the value v is placed on the i-th output stream. Parameters ---------- in_stream: Stream The stream that will be split num_out_streams: int The number of output streams. Returns ------- out_streams: List of Stream The output streams generated by split_stream Uses ------- * separate_agent """ out_streams = [Stream() for _ in range(num_out_streams)] separate_agent(in_stream, out_streams) return out_streams
#----------------------------------------------------------------------- # UNZIP: SINGLE INPUT STREAMS, LIST OF OUTPUT STREAM #-----------------------------------------------------------------------
[docs]def unzip_agent(in_stream, out_streams, name=None): """ Parameters ---------- in_stream: Stream The single input stream of the agent out_streams: list of Stream The list of output streams of the agent name: str Name of the agent created by this function. Returns ------- Agent. The agent created by this function. Note ---- Not used by any function in this module. Used by external modules """ def f(lst): if len(lst) < len(out_streams): lst.extend([None] * (len(out_streams) - len(lst))) elif len(lst) > len(out_streams): lst = lst[0:len(out_streams)] return lst element_split_agent(f, in_stream, out_streams)
[docs]def unzip_stream(input_stream, num_out_streams): """ unzip_stream returns out_streams, a list of num_out_streams streams. unzip_stream is the inverse of zip_stream (see merge.py). The elements of the input stream are lists of length num_out_streams and the i-th element of the list is placed on the i-th output stream. Parameters ---------- in_stream: Stream The stream that will be split num_out_streams: int The number of output streams. Returns ------- out_streams: List of Stream The output streams generated by split_stream Uses ------- * element_split_agent """ def f(lst): if len(lst) < len(out_streams): lst.extend([None] * (len(out_streams) - len(lst))) elif len(lst) > len(out_streams): lst = lst[0:len(out_streams)] return lst out_streams = [Stream() for _ in range(num_out_streams)] element_split_agent(f, input_stream, out_streams) return out_streams
[docs]def timed_unzip(in_stream, num_out_streams): """ timed_unzip returns out_streams which is a list of num_out_streams streams. timed_unzip is the inverse of timed_zip (see merge.py). The elements of the input stream are pairs (t, v) where v is a list of length num_out_streams. The i-th element of the list, with the timestamp t is placed on the i-th output stream if and only if v is not None. Parameters ---------- in_stream: Stream The stream that will be split num_out_streams: int The number of output streams. Returns ------- out_streams: List of Stream The output streams generated by split_stream Uses ------- * split_stream """ def f_time_unzip(timed_element): timestamp, messages = timed_element if len(messages) > num_out_streams: messages = messages[:num_out_streams] elif len(messages) < num_out_streams: messages.extend([None]*(num_out_streams - len(messages))) output_list = [ _no_value if message is None else (timestamp, message) for message in messages] return output_list out_streams = split_stream( f_time_unzip, in_stream, num_out_streams) return out_streams
#------------------------------------------------------------------------------------------------ # SPLIT TESTS #------------------------------------------------------------------------------------------------
[docs]def test_element_agents(): s = Stream('s') u = Stream('u') v = Stream('v') w = Stream('w') y = Stream('y') z = Stream('z') # Test split # func operates on a single element of the single input stream and # return a list of elements, one for each output stream. def h(element): return [element+1, element*2] def h_args(element, addend, multiplier): return [element+addend, element*multiplier] in_stream_split = Stream('in_stream_split') r = Stream('r') t = Stream('t') e = element_split_agent(func=h, in_stream=in_stream_split, out_streams=[r, t], name='e') r_split, t_split = split_stream(function=h, in_stream=in_stream_split, num_out_streams=2, ) r_args, t_args = split_stream( h_args, in_stream_split, 2, addend=1, multiplier=2) assert recent_values(r) == [] assert recent_values(t) == [] assert recent_values(r_split) == recent_values(r) assert recent_values(t_split) == recent_values(t) assert recent_values(r_args) == recent_values(r) assert recent_values(t_args) == recent_values(t) in_stream_split.extend(range(5)) assert recent_values(r) == [1, 2, 3, 4, 5] assert recent_values(t) == [0, 2, 4, 6, 8] assert recent_values(r_split) == recent_values(r) assert recent_values(t_split) == recent_values(t) assert recent_values(r_args) == recent_values(r) assert recent_values(t_args) == recent_values(t) in_stream_split.append(10) assert recent_values(r) == [1, 2, 3, 4, 5, 11] assert recent_values(t) == [0, 2, 4, 6, 8, 20] in_stream_split.extend([20, 100]) assert recent_values(r) == [1, 2, 3, 4, 5, 11, 21, 101] assert recent_values(t) == [0, 2, 4, 6, 8, 20, 40, 200] assert recent_values(r_split) == recent_values(r) assert recent_values(t_split) == recent_values(t) assert recent_values(r_args) == recent_values(r) assert recent_values(t_args) == recent_values(t) # Test split with kwargs def f_list(element, list_of_functions): return [f(element) for f in list_of_functions] def f_0(element): return element*2 def f_1(element): return element+10 x = Stream('x') rr = Stream('rr') tt = Stream('tt') ee = element_split_agent(func=f_list, in_stream=x, out_streams=[rr, tt], name='ee', list_of_functions=[f_0, f_1]) x.extend(range(5)) assert recent_values(rr) == [0, 2, 4, 6, 8] assert recent_values(tt) == [10, 11, 12, 13, 14] # ------------------------------------ # Test split with state # func operates on an element of the single input stream and state. # func returns a list with one element for each output stream. def h_state(element, state): return ([element+state, element*state], state+1) r_state = Stream(name='r_state') t_state = Stream(name='t_state') in_stream_split_state = Stream('in_stream_split_state') e_state = element_split_agent( func=h_state, in_stream=in_stream_split_state, out_streams=[r_state, t_state], name='e', state=0) assert r_state.recent[:r_state.stop] == [] assert t_state.recent[:t_state.stop] == [] in_stream_split_state.extend(range(5)) assert r_state.recent[:r_state.stop] == [0, 2, 4, 6, 8] assert t_state.recent[:t_state.stop] == [0, 1, 4, 9, 16] in_stream_split_state.append(20) assert r_state.recent[:r_state.stop] == [0, 2, 4, 6, 8, 25] assert t_state.recent[:t_state.stop] == [0, 1, 4, 9, 16, 100] in_stream_split_state.extend([44, 93]) assert r_state.recent[:r_state.stop] == [0, 2, 4, 6, 8, 25, 50, 100] assert t_state.recent[:t_state.stop] == [0, 1, 4, 9, 16, 100, 264, 651] # ------------------------------------ # Test split with state and args def hh_state(element, state, increment): return ([element+state, element*state], state+increment) rr_state = Stream(name='rr_state') tt_state = Stream(name='tt_state') in_stream_split_state_funcargs = Stream('in_stream_split_state_funcargs') ee_state_agent = element_split_agent( func=hh_state, in_stream=in_stream_split_state_funcargs, out_streams=[rr_state, tt_state], name='ee_state_agent', state=0, increment=10) assert rr_state.stop == 0 assert tt_state.stop == 0 in_stream_split_state_funcargs.extend(range(5)) assert rr_state.recent[:rr_state.stop] == [0, 11, 22, 33, 44] assert tt_state.recent[:tt_state.stop] == [0, 10, 40, 90, 160] #------------------------------------------------------------------------------------------------ # UNZIP AGENT TESTS #------------------------------------------------------------------------------------------------ s_unzip = Stream('s_unzip') u_unzip = Stream('u_unzip') x_unzip = Stream('x_unzip') # ------------------------------------ # Test unzip unzip_agent(in_stream=s_unzip, out_streams=[x_unzip, u_unzip]) d_unzip_fn = unzip_stream(s_unzip, 2) s_unzip.extend([(1,10), (2,15), (3,18)]) assert x_unzip.stop == 3 assert u_unzip.stop == 3 assert x_unzip.recent[:3] == [1, 2, 3] assert u_unzip.recent[:3] == [10, 15, 18] assert d_unzip_fn[0].recent[:3] == x_unzip.recent[:3] assert d_unzip_fn[1].recent[:3] == u_unzip.recent[:3] s_unzip.extend([(37,96)]) assert x_unzip.stop == 4 assert u_unzip.stop == 4 assert x_unzip.recent[:4] == [1, 2, 3, 37] assert u_unzip.recent[:4] == [10, 15, 18, 96] assert d_unzip_fn[0].recent[:4] == x_unzip.recent[:4] assert d_unzip_fn[1].recent[:4] == u_unzip.recent[:4] #------------------------------------------------------------------------------------------------ # SEPARATE AGENT TESTS #------------------------------------------------------------------------------------------------ s_separate = Stream('s separate') u_separate = Stream('u separate') x_separate = Stream('x separate') d_separate = separate_agent( in_stream=s_separate, out_streams=[x_separate,u_separate], name='d separate') x_sep_func, u_sep_func = separate(s_separate, 2) s_separate.extend([(0,10), (1,15), (0,20)]) assert x_separate.stop == 2 assert u_separate.stop == 1 assert x_separate.recent[:2] == [10, 20] assert u_separate.recent[:1] == [15] assert x_sep_func.recent == x_separate.recent assert u_sep_func.recent == u_separate.recent s_separate.extend([(1,96)]) assert x_separate.stop == 2 assert u_separate.stop == 2 assert x_separate.recent[:2] == [10, 20] assert u_separate.recent[:2] == [15, 96] assert x_sep_func.recent == x_separate.recent assert u_sep_func.recent == u_separate.recent # timed_unzip tests t_unzip = Stream() t_unzip.extend( [(1, ["A", None]), (5, ["B", "a"]), (7, [None, "b"]), (9, ["C", "c"]), (10, [None, "d"])]) t_unzip_0, t_unzip_1 = timed_unzip(t_unzip, 2) assert recent_values(t_unzip_0) == [(1, 'A'), (5, 'B'), (9, 'C')] assert recent_values(t_unzip_1) == [(5, 'a'), (7, 'b'), (9, 'c'), (10, 'd')]
if __name__ == '__main__': test_element_agents()