Source code for IoTPy.code.agents.merge

"""
This module consists of functions that merge multiple input streams
into a single output stream.

Functions in the module:
   1. element_merge_agent
   2. zip_stream
   3. zip_map
   4. asynch_merge_agent
   5. mix
   6. blend_agent
   7. blend

Merge functions:
   1. zip_stream is similar to zip in Python except that it operates on
      streams rather than lists.
   2. zip_map is map_stream(zip_stream()), i.e., first zip then map the
      result.
   3. mix is an asynchronous merge of the input streams. The elements of the
      output stream identify the input streams that generated the
      elements.
   4. blend is a merge followed by a map.

Agents:
   1. element_merge_agent is the agent used by zip_stream and zip_map.
   2. asynch_merge_agent is the agent used by mix.
   3. blend_agent is the agent used by blend.
   
"""

import sys
import os
# sys.path.append(os.path.abspath("../"))

from ..agent import Agent, InList
from ..stream import Stream, StreamArray
from ..stream import _no_value, _multivalue, _close
from ..helper_functions.check_agent_parameter_types import *
from ..helper_functions.recent_values import recent_values


[docs]def element_merge_agent(func, in_streams, out_stream, state=None, call_streams=None, name=None, *args, **kwargs): """ Parameters ---------- func: function function from an input list and args and kwargs to an output list in_streams: list of Stream The list of input streams of the agent out_stream: Stream The single output stream of the agent state: object The state of the agent call_streams: list of Stream The list of call_streams. A new value in any stream in this list causes a state transition of this agent. name: str Name of the agent created by this function. Returns ------- Agent. The agent created by this function. """ check_merge_agent_arguments(func, in_streams, out_stream, call_streams, name) check_num_args_in_func(state, name, func, args, kwargs) num_in_streams = len(in_streams) # The transition function for this agent. def transition(in_lists, state): check_in_lists_type(name, in_lists, num_in_streams) # input_snapshots is a list of snapshots. # Each snapshot is a list containing one element for each # input stream. input_snapshots = zip(*[v.list[v.start:v.stop] for v in in_lists]) # If the new input data is empty then return empty lists for # each output stream, and leave the state and the starting point # for each input stream unchanged. if not input_snapshots: return ([[]], state, [v.start for v in in_lists]) # output_list[i] will be set to the output value # corresponding to the i-th value in each of the input # streams output_list = [ [] for lst in input_snapshots] for i, snapshot in enumerate(input_snapshots): assert isinstance(snapshot, list) or isinstance(snapshot, tuple) if state is None: output_list[i] = func(snapshot, *args, **kwargs) else: output_list[i], state = func(snapshot, state, *args, **kwargs) if output_list[i] is None: output_list[i] = [] return ([output_list], state, [v.start+len(input_snapshots) for v in in_lists]) # Finished transition # Create agent return Agent(in_streams, [out_stream], transition, state, call_streams, name)
[docs]def zip_agent(in_streams, out_stream): """ zip_agent zips the input streams and returns values in out_stream Parameters ---------- in_streams: list of Stream The list of input streams that are zipped out_stream: Stream The Stream to add values to Uses ------- * element_merge_agent """ element_merge_agent(lambda v: v, in_streams, out_stream)
[docs]def zip_stream(in_streams): """ zip_stream returns out_stream, a stream obtained by zipping the input streams. zip_stream is similar to zip. Parameters ---------- in_streams: list of Stream The list of input streams that are zipped state: object function operates on a state, args, and kwargs Returns ------- out_stream: Stream The output stream generated by zip_stream """ out_stream = Stream('output of zip') zip_agent(in_streams, out_stream) return out_stream
[docs]def zip_map(function, in_streams, state=None, *args, **kwargs): """ zip_map returns out_stream, a stream obtained by applying function, with the specified state, args and kwargs, to the elements obtained by zipping the input streams. Parameters ---------- in_streams: list of Stream The list of input streams that are zipped state: object function operates on a state, args, and kwargs Returns ------- out_stream: Stream The output stream generated by zip_map Uses ------- * element_merge_agent """ out_stream = Stream('output of zip') element_merge_agent(function, in_streams, out_stream, state, *args, **kwargs) return out_stream
#################################################### # OPERATIONS ON ASYCHRONOUS INPUT STREAMS ####################################################
[docs]def asynch_merge(in_streams, out_stream): """ Parameters ---------- in_streams: list of input streams out_stream: single output stream """ def transition(in_lists, state): output_list = [] # If the input data is empty, i.e., if v.stop == v.start for all # v in in_lists, then return empty lists for each output stream, # and leave the state and the starting point for each input # stream unchanged. if all(v.stop <= v.start for v in in_lists): return ([output_list], state, [v.start for v in in_lists]) # Assert at least one input stream has unprocessed data. for stream_number, v in enumerate(in_lists): # v is an in_list # if v.stop <= v.start then skip this input stream # because no new messages have arrived on this stream. if v.stop > v.start: # In the following,input_list is the list of new values # on the input stream with index stream_number input_list = v.list[v.start:v.stop] # Add each unread element in this input stream, with the # stream_number, to output_list for element in input_list: output_list.append((stream_number, element)) return ([output_list], state, [v.stop for v in in_lists]) # Create agent Agent(in_streams, [out_stream], transition, name="asynch_merge") return
[docs]def mix(in_streams): out_stream = Stream('mix') asynch_merge(in_streams, out_stream) return out_stream
[docs]def blend_agent(func, in_streams, out_stream, state=None, call_streams=None, name=None, *args, **kwargs): """ Parameters ---------- func: function function from an input list and args and kwargs to an output list in_streams: list of Stream The list of input streams of the agent out_stream: Stream The single output stream of the agent state: object The state of the agent call_streams: list of Stream The list of call_streams. A new value in any stream in this list causes a state transition of this agent. name: str Name of the agent created by this function. Returns ------- Agent. The agent created by this function. """ def transition(in_lists, state): output_list = [] # If the input data is empty, i.e., if v.stop == v.start for all # v in in_lists, then return empty lists for each output stream, # and leave the state and the starting point for each input # stream unchanged. if all(v.stop <= v.start for v in in_lists): return ([output_list], state, [v.start for v in in_lists]) # Assert at least one input stream has unprocessed data. for stream_number, v in enumerate(in_lists): # v is an in_list # if v.stop <= v.start then skip this input stream # because no new messages have arrived on this stream. if v.stop > v.start: # In the following,input_list is the list of new values # on the input stream with index stream_number input_list = v.list[v.start:v.stop] # Add each unread element in this input stream, with the # stream_number, to output_list for element in input_list: output_list.append(func(element, *args, **kwargs)) return ([output_list], state, [v.stop for v in in_lists]) # Create agent Agent(in_streams, [out_stream], transition, name="asynch_merge") return
[docs]def blend(function, in_streams, state=None, *args, **kwargs): out_stream = Stream('output of blend') blend_agent(function, in_streams, out_stream, state, *args, **kwargs) return out_stream
#----------------------------------------------------------------------- # MANY: LIST OF INPUT STREAMS, LIST OF OUTPUT STREAMS #-----------------------------------------------------------------------
[docs]def element_many_agent(func, in_streams, out_streams, state=None, call_streams=None, name=None, *args, **kwargs): """ Parameters ---------- func: function function from an input list and args and kwargs to an output list in_streams: list of Stream The input streams of the agent out_streams: list of Stream The output streams of the agent state: object The state of the agent call_streams: list of Stream The list of call_stream. A new value in any stream in this list causes a state transition of this agent. name: str Name of the agent created by this function. Returns ------- Agent. The agent created by this function. """ check_many_agent_arguments(func, in_streams, out_streams, call_streams, name) check_num_args_in_func(state, name, func, args, kwargs) num_out_streams = len(out_streams) num_in_streams = len(in_streams) # The transition function for this agent. def transition(in_lists, state): check_in_lists_type(name, in_lists, num_in_streams) # input_snapshots is a list of snapshots of the collection of input # streams. # Each snapshot in input_snapshots is a list with one value for each # input stream. # The j-th snapshot in input_snapshots is the snapshot at the j-th # time slice in this list of the input streams. input_snapshots = zip(*[v.list[v.start:v.stop] for v in in_lists]) # If the new input data is empty then return empty lists for # each output stream, and leave the state and the starting point # for each input stream unchanged. if not input_snapshots: return ([[]]*num_out_streams, state, [v.start for v in in_lists]) # output_snapshots is a list of snapshots of the collection of output # streams. # Each snapshot in output_snapshots is a list with one value for each # output stream. # The j-th snapshot in output_snapshots is the snapshot at the j-th # time slice in this list of the output streams. # output_snapshots[j] will be set to the output value # corresponding to the j-th value in each of the input # streams output_snapshots = [ [] for lst in input_snapshots] if state is None: output_snapshots = \ [func(input_snapshot, *args, **kwargs) for input_snapshot in input_snapshots] else: output_snapshots = [[]]*len(input_list) for i in range(len(input_snapshots)): output_snapshots[i], state = \ func(input_snapshots[i], state, *args, **kwargs) check_func_output_for_multiple_streams(func, name, num_out_streams, output_snapshots) # output_snapshots[i] is a list whose each elements is a snapshot of the output # streams: each snapshot is a list with one element for each output stream. # zip them up to get add_to_output_streamss where add_to_output_streams[j] is # a list containing the sequence of values to be added to output stream j. add_to_output_streams = [list(snapshot) for snapshot in zip(*output_snapshots)] return (add_to_output_streams, state, [v.start+len(input_snapshots) for v in in_lists]) # Finished transition # Create agent return Agent( in_streams, out_streams, transition, state, call_streams, name)
[docs]def merge_split(function, in_streams, num_out_streams, state=None, *args, **kwargs): out_streams = [Stream() for _ in range(num_out_streams)] call_streams = None name=None element_many_agent(function, in_streams, out_streams, state, call_streams, name, *args, **kwargs) return out_streams
#--------------------------------------------------------
[docs]def tests(): s = Stream('s') u = Stream('u') x = Stream('x') # Test merge # func operates on a list of elements, one element for each input stream. # func returns a single element of the output stream. def g(lst): return sum(lst) d = element_merge_agent(func=g, in_streams=[x,u], out_stream=s, name='d') zipxu = zip_stream([x,u]) zip_map_xu = zip_map(sum, [x,u]) def g_args(lst, multiplier): return sum(lst)*multiplier zip_map_g_args = zip_map(g_args, [x,u], multiplier=2) ss = Stream('ss') def genral(lst, f): return f(lst) dd = element_merge_agent( func=genral, in_streams=[x,u], out_stream=ss, name='dd', f=np.mean) zip_map_ss = zip_map(np.mean, [x,u]) def fff(lst, f, addend): return f(lst, addend) def hhh(lst, addend): return sum(lst)+addend sss = Stream('sss') dddd = element_merge_agent( func=fff, in_streams=[x,u], out_stream=sss, name='ddd', f=hhh, addend=10) x.extend(range(3)) u.extend([10, 15, 18]) assert recent_values(s) == [10, 16, 20] assert recent_values(zip_map_g_args) == [2*v for v in recent_values(s)] assert recent_values(zipxu) == [(0, 10), (1, 15), (2, 18)] assert recent_values(ss) == [5, 8, 10] assert recent_values(zip_map_ss) == [5.0, 8.0, 10.0] assert recent_values(sss) == [20, 26, 30] assert recent_values(zip_map_xu) == s.recent[:s.stop] u.append(37) x.extend(range(3, 5, 1)) assert recent_values(s) == [10, 16, 20, 40] assert recent_values(zip_map_g_args) == [2*v for v in recent_values(s)] assert recent_values(zipxu) == [(0, 10), (1, 15), (2, 18), (3, 37)] assert recent_values(ss) == [5, 8, 10, 20] assert recent_values(sss) == [20, 26, 30, 50] u.extend([96, 95]) assert recent_values(s) == [10, 16, 20, 40, 100] assert recent_values(zipxu) == [(0, 10), (1, 15), (2, 18), (3, 37), (4, 96)] assert recent_values(ss) == [5, 8, 10, 20, 50] assert recent_values(sss) == [20, 26, 30, 50, 110] # TEST ASYNCH_MERGE AND MIX x = Stream('x') y = Stream('y') z = Stream('z') in_streams = [x, y] out_stream = z asynch_merge(in_streams, out_stream) mix_z = mix(in_streams) assert z.stop == 0 assert mix_z.stop == z.stop assert mix_z.recent[:mix_z.stop] == z.recent[:z.stop] x.append(10) assert z.stop == 1 assert z.recent[:z.stop] == [(0, 10)] assert mix_z.stop == z.stop assert mix_z.recent[:mix_z.stop] == z.recent[:z.stop] y.append('A') assert z.stop == 2 assert z.recent[:z.stop] == [(0, 10), (1, 'A')] assert mix_z.stop == z.stop assert mix_z.recent[:mix_z.stop] == z.recent[:z.stop] y.append('B') assert z.stop == 3 assert z.recent[:z.stop] == [(0, 10), (1, 'A'), (1, 'B')] x.append(20) assert z.stop == 4 assert z.recent[:z.stop] == [(0, 10), (1, 'A'), (1, 'B'), (0, 20)] assert mix_z.stop == z.stop assert mix_z.recent[:mix_z.stop] == z.recent[:z.stop] # Test blend_agent x = Stream('x') y = Stream('y') z = Stream('z') def double(v): return 2*v def double_add(v, addend): return 2*v+addend blend_agent(func=double, in_streams=[x, y], out_stream=z) blend_z = blend(double, [x, y]) blend_add_z = blend(double_add, [x,y], addend=10) x.append(1) assert recent_values(z) == [2] assert recent_values(blend_z) == recent_values(z) assert recent_values(blend_add_z) == [v+10 for v in recent_values(z)] x.extend(range(2,4)) assert recent_values(z) == [2, 4, 6] assert recent_values(blend_z) == recent_values(z) assert recent_values(blend_add_z) == [v+10 for v in recent_values(z)] y.extend(range(100, 102)) assert recent_values(z) == [2, 4, 6, 200, 202] assert recent_values(blend_z) == recent_values(z) assert recent_values(blend_add_z) == [v+10 for v in recent_values(z)] x.extend([10, 20]) assert recent_values(z) == [2, 4, 6, 200, 202, 20, 40] assert recent_values(blend_z) == recent_values(z) assert recent_values(blend_add_z) == [v+10 for v in recent_values(z)] # ------------------------------------ # Test many # func operates on a list with one element for each input stream. # func returns a list with one element for each output stream. def f_many(lst): return [sum(lst), sum(lst)+1] u_stream = Stream(name='u_stream') v_stream = Stream(name='v_stream') w_stream = Stream(name='w_stream') x_stream = Stream(name='x_stream') many_agent = element_many_agent( func=f_many, in_streams=[u_stream, v_stream], out_streams=[w_stream, x_stream], name='many_agent') u_stream.extend(range(5)) v_stream.extend(range(0, 40, 4)) assert recent_values(w_stream) == [0, 5, 10, 15, 20] assert recent_values(x_stream) == [1, 6, 11, 16, 21] ww_stream, xx_stream = merge_split( function=f_many, in_streams=[u_stream, v_stream], num_out_streams=2) assert recent_values(ww_stream) == recent_values(w_stream) assert recent_values(xx_stream) == recent_values(x_stream) # ------------------------------------ # Test many with args and kwargs # func operates on a list with one element for each input stream. # func returns a list with one element for each output stream. def f_many_args_kwargs(lst, multiplicand, addend): return [sum(lst)*multiplicand, sum(lst)+addend] u_args_kwargs_stream = Stream(name='u_args_kwargs_stream') v_args_kwargs_stream = Stream(name='v_args_kwargs_stream') w_args_kwargs_stream = Stream(name='w_args_kwargs_stream') x_args_kwargs_stream = Stream(name='x_args_kwargs_stream') many_args_kwargs_agent = element_many_agent( func=f_many_args_kwargs, in_streams=[u_args_kwargs_stream, v_args_kwargs_stream], out_streams=[w_args_kwargs_stream, x_args_kwargs_stream], name='many_args_kwargs_agent', multiplicand=2, addend=10) ww_args_kwargs_stream, xx_args_kwargs_stream = merge_split( function=f_many_args_kwargs, in_streams=[u_args_kwargs_stream, v_args_kwargs_stream], num_out_streams=2, multiplicand=2, addend=10) assert (recent_values(ww_args_kwargs_stream) == recent_values(w_args_kwargs_stream)) assert (recent_values(xx_args_kwargs_stream) == recent_values(x_args_kwargs_stream)) u_args_kwargs_stream.extend(range(5)) v_args_kwargs_stream.extend(range(0, 40, 4)) assert recent_values(w_args_kwargs_stream) == [0, 10, 20, 30, 40] assert recent_values(x_args_kwargs_stream) == [10, 15, 20, 25, 30] assert (recent_values(ww_args_kwargs_stream) == recent_values(w_args_kwargs_stream)) assert (recent_values(xx_args_kwargs_stream) == recent_values(x_args_kwargs_stream)) u_args_kwargs_stream.append(100) v_args_kwargs_stream.extend(range(40, 80, 4)) assert recent_values(w_args_kwargs_stream) == \ [0, 10, 20, 30, 40, 240] assert recent_values(x_args_kwargs_stream) == \ [10, 15, 20, 25, 30, 130] assert (recent_values(ww_args_kwargs_stream) == recent_values(w_args_kwargs_stream)) assert (recent_values(xx_args_kwargs_stream) == recent_values(x_args_kwargs_stream)) u_args_kwargs_stream.extend([200, 300]) v_args_kwargs_stream.append(100) assert recent_values(w_args_kwargs_stream) == \ [0, 10, 20, 30, 40, 240, 448, 656] assert recent_values(x_args_kwargs_stream) == \ [10, 15, 20, 25, 30, 130, 234, 338] assert (recent_values(ww_args_kwargs_stream) == recent_values(w_args_kwargs_stream)) assert (recent_values(xx_args_kwargs_stream) == recent_values(x_args_kwargs_stream)) return
if __name__ == '__main__': tests()