import sys
import os
# sys.path.append(os.path.abspath("../"))
#sys.path.append(os.path.abspath("/helper_functions"))
from ..agent import Agent, InList
from ..stream import Stream, StreamArray
from ..stream import _no_value, _multivalue, _close
from ..helper_functions.check_agent_parameter_types import *
import types
import inspect
import numpy as np
#-----------------------------------------------------------------------
# MAP: SINGLE INPUT STREAM, SINGLE OUTPUT STREAM
#-----------------------------------------------------------------------
[docs]def window_map_agent(func, in_stream, out_stream,
window_size, step_size,
state=None, call_streams=None, name=None,
args=[], kwargs={}):
"""
Parameters
----------
func: function
function from a single element of the input stream to a
single element of the output stream
in_stream: Stream
The single input stream of this agent
out_stream: Stream
The single output streams of the agent
window_size: int
Positive integer. The size of the moving window
step_size: int
Positive integer. The step size of the moving window
state: object
The state of the agent
call_streams: list of Stream
The list of call_streams. A new value in any stream in this
list causes a state transition of this agent.
name: Str
Name of the agent created by this function.
Returns
-------
Agent.
The agent created by this function.
"""
check_map_agent_arguments(func, in_stream, out_stream, call_streams, name)
check_window_and_step_sizes(name, window_size, step_size)
check_num_args_in_func(state, name, func, args, kwargs)
num_in_streams = 1
num_out_streams = 1
# The transition function for the map agent.
def transition(in_lists, state):
check_in_lists_type(name, in_lists, num_in_streams)
# The map agent has a single input stream. So, the transition
# operates on a single in_list.
in_list = in_lists[0]
# The map agent has a single output stream. So, the transition
# outputs a single list.
output_list = []
list_length = in_list.stop - in_list.start
if window_size > list_length:
# No changes are made.
return ([output_list], state, [in_list.start])
# There is enough input data for at least one step.
num_steps = 1+(list_length - window_size)/step_size
output_list = [[]]*num_steps
for i in range(num_steps):
window = in_list.list[
in_list.start+i*step_size : in_list.start+i*step_size+window_size]
if state is None:
output_list[i] = func(window, *args, **kwargs)
else:
output_list[i], state = func(window, state, *args, **kwargs)
return ([output_list], state, [in_list.start+num_steps*step_size])
# Finished transition
# Create agent
return Agent([in_stream], [out_stream], transition, state, call_streams, name)
[docs]def window(function, in_stream, window_size, step_size, state=None, args=[], kwargs={}):
out_stream = Stream(function.__name__+in_stream.name)
window_map_agent(function, in_stream, out_stream, window_size, step_size,
state, args=args, kwargs=kwargs)
return out_stream
#-----------------------------------------------------------------------
# MERGE: LIST OF INPUT STREAMS, SINGLE OUTPUT STREAM
#-----------------------------------------------------------------------
[docs]def window_merge_agent(func, in_streams, out_stream,
window_size, step_size,
state=None, call_streams=None, name=None,
args=[], kwargs={}):
"""
Parameters
----------
func: function
function from a list of windows with one window per input stream
to a single element of the output stream.
in_streams: list of Stream
The list of input streams of the agent
out_stream: Stream
The single output streams of the agent
window_size: int
Positive integer. The size of the moving window
step_size: int
Positive integer. The step size of the moving window
state: object
The state of the agent
call_streams: list of Stream
The list of call_streams. A new value in any stream in this
list causes a state transition of this agent.
name: Str
Name of the agent created by this function.
Returns
-------
Agent.
The agent created by this function.
"""
check_merge_agent_arguments(func, in_streams, out_stream, call_streams, name)
check_window_and_step_sizes(name, window_size, step_size)
check_num_args_in_func(state, name, func, args, kwargs)
num_in_streams = len(in_streams)
num_out_streams = 1
# The transition function for this agent.
def transition(in_lists, state):
check_in_lists_type(name, in_lists, num_in_streams)
# The merge agent has a single output stream. So, the transition
# outputs a single list.
output_list = []
# The merge agent has a list of input streams. So, the transition
# operates on in_lists which is a list of elements, each of type InList.
smallest_list_length = \
min(in_list.stop - in_list.start for in_list in in_lists)
if window_size > smallest_list_length:
# No changes are made.
return ([output_list], state,
[in_list.start for in_list in in_lists])
# There is enough input for at least one step.
num_steps = 1+(smallest_list_length - window_size)/step_size
output_list = [[]]*num_steps
for i in range(num_steps):
# windows is a list with a window for each input stream.
windows = [in_list.list[
in_list.start+i*step_size : in_list.start+i*step_size+window_size]
for in_list in in_lists]
if state is None:
output_list[i] = func(windows, *args, **kwargs)
else:
output_list[i], state = func(windows, state, *args, **kwargs)
# Finished iteration: for i in range(num_steps)
return ([output_list], state,
[in_list.start+num_steps*step_size for in_list in in_lists])
# Finished transition
# Create agent
return Agent(in_streams, [out_stream], transition, state, call_streams, name)
#-----------------------------------------------------------------------
# SPLIT: SINGLE INPUT STREAM, LIST OF OUTPUT STREAMS
#-----------------------------------------------------------------------
[docs]def window_split_agent(func, in_stream, out_streams,
window_size, step_size,
state=None, call_streams=None, name=None,
args=[], kwargs={}):
"""
Parameters
----------
func: function
function from a window to a list containing a single
element for each output stream.
in_stream: Stream
The single input stream of the agent
out_streams: list of Stream
The list of output streams of the agent
window_size: int
Positive integer. The size of the moving window
step_size: int
Positive integer. The step size of the moving window
state: object
The state of the agent
call_streams: list of Stream
The list of call_streams. A new value in any stream in this
list causes a state transition of this agent.
name: Str
Name of the agent created by this function.
Returns
-------
Agent.
The agent created by this function.
"""
check_split_agent_arguments(func, in_stream, out_streams, call_streams, name)
check_window_and_step_sizes(name, window_size, step_size)
check_num_args_in_func(state, name, func, args, kwargs)
num_in_streams = 1
num_out_streams = len(out_streams)
# The transition function for this agent.
def transition(in_lists, state):
check_in_lists_type(name, in_lists, num_in_streams)
# The split agent has a single input stream. So, the transition
# operates on a single in_list.
in_list = in_lists[0]
# The split agent has a list of output streams. So, the transition
# outputs a list for each output stream.
output_lists = [ [] for _ in range(num_out_streams)]
list_length = in_list.stop - in_list.start
if window_size > list_length:
return (output_lists, state, [in_list.start])
# Assert: Each input stream has enough elements for a window operation.
# num_steps is the number of window operations that can be
# carried out with the given numbers of unprocessed elements
# in the input streams.
num_steps = 1+(list_length - window_size)/step_size
output_snapshots = [[]]*num_steps
for i in range(num_steps):
window = in_list.list[
in_list.start+i*step_size : in_list.start+i*step_size+window_size]
if state is None:
output_snapshots[i] = func(window, *args, **kwargs)
else:
output_snapshots[i], state = func(window, state, *args, **kwargs)
check_func_output_for_multiple_streams(
func, name, num_out_streams, output_snapshots)
# output_snapshots is a list of num_steps snapshots. Each snapshot is a
# list of num_outwith one output for each output stream.
# output_list is a list of num_out_streams lists. Each member of output_list
# is a list
output_list = [output_snapshot for output_snapshot in zip(*output_snapshots)]
return (output_list, state, [in_list.start+num_steps*step_size])
# Finished transition
# Create agent
return Agent([in_stream], out_streams, transition, state, call_streams, name)
#-----------------------------------------------------------------------
# MANY: LIST OF INPUT STREAMS, LIST OF OUTPUT STREAMS
#-----------------------------------------------------------------------
[docs]def window_many_agent(func, in_streams, out_streams,
window_size, step_size,
state=None, call_streams=None, name=None,
args=[], kwargs={}):
"""
Parameters
----------
func: function
function from a list of windows with one window for each input stream
to an output list containing a single element for each output stream.
in_streams: list of Stream
The list of input streams of the agent
out_streams: list of Stream
The list of output streams of the agent
window_size: int
Positive integer. The size of the moving window
step_size: int
Positive integer. The step size of the moving window
state: object
The state of the agent
call_streams: list of Stream
The list of call_streams. A new value in any stream in this
list causes a state transition of this agent.
name: Str
Name of the agent created by this function.
Returns
-------
Agent.
The agent created by this function.
"""
check_many_agent_arguments(func, in_streams, out_streams, call_streams, name)
check_window_and_step_sizes(name, window_size, step_size)
check_num_args_in_func(state, name, func, args, kwargs)
num_in_streams = len(in_streams)
num_out_streams = len(out_streams)
# The transition function for this agent.
def transition(in_lists, state=None):
check_in_lists_type(name, in_lists, num_in_streams)
output_lists = [ [] for _ in range(num_out_streams)]
smallest_list_length = min(in_list.stop - in_list.start for in_list in in_lists)
if window_size > smallest_list_length:
# No changes
return (output_lists, state, [in_list.start for in_list in in_lists])
num_steps = 1+(smallest_list_length - window_size)/step_size
output_snapshots = [[]]*num_steps
for i in range(num_steps):
windows = [in_list.list[
in_list.start + i*step_size : in_list.start+ i*step_size + window_size]
for in_list in in_lists]
if state is None:
output_snapshots[i] = func(windows, *args, **kwargs)
else:
output_snapshots[i], state = func(windows, state, *args, **kwargs)
# Finished iteration: for i in range(num_steps)
check_func_output_for_multiple_streams(
func, name, num_out_streams,output_snapshots)
in_lists_start_values = [
in_list.start + num_steps*step_size for in_list in in_lists]
# output_snapshots is a list of num_steps snapshots. Each snapshot is a
# list of num_outwith one output for each output stream.
# output_list is a list of num_out_streams lists. Each member of output_list
# is a list
output_lists = [output_snapshot for output_snapshot in zip(*output_snapshots)]
return (output_lists, state, in_lists_start_values)
# Finished transition
# Create agent
return Agent(in_streams, out_streams, transition, state, call_streams, name)
[docs]def dynamic_window_agent(func, in_stream, out_stream, state,
min_window_size, max_window_size, step_size,
args={}, kwargs={}):
# Note: The agent has a SINGLE input stream, input_stream.
# The agent has a SINGLE output stream, output_stream.
# state is a list where state[0] is the current_window_size
# state[1] is steady_state, a boolean which
# indicates whether the max window size has been reached.
# state[2] is reset, a boolean which is set to
# True when the window is to be reset to the min window size.
# state[3:] is defined by the user.
# min_window_size, max_window_size, step_size are constants.
# INVARIANT:
# max_window_size >= current_window_size >= min_window_size
# The system is in steady state if and only if
# the current window size is equal to its max value.
# When the function f resets the window size, by returning
# reset=True, the current window size is reset to its min value
# if the system is in steady state.
# If f returns reset=True while not in steady state, then
# when the system next enters steady state, the current window
# size is reset to the minimum window size.
# Note that if f returns reset=True while not in steady state,
# then reset only has an effect AFTER the system next reaches steady
# state.
# This function produces a single output stream.
num_outputs = 1
def transition(in_lists, state):
# Get parameters from the state.
current_window_size = state[0]
steady_state = state[1]
reset = state[2]
# In case current window size was set to below its min value:
current_window_size = max(current_window_size, min_window_size)
# output_list is the list of messages that will be
# sent on the output stream in this transition.
output_list = list()
# in_lists is a list of elements of type in_list, with
# one in_list for each input stream. In this case, the
# agent has only one input stream, and hence in_lists
# contains only one element. We call it: input_in_list.
# input is the list of messages in the input
# stream that are the input for this transition.
# start, stop are pointers to the input stream
# where input begins at start and ends at stop.
input_in_list = in_lists[0]
start = input_in_list.start
stop = input_in_list.stop
input_list = input_in_list.list[start:stop]
input_length = stop - start
# input_list and input_length remain unchanged hereafter.
# The current window is:
# input[start_increment:start_increment+current_window_size]
# start_increment is initially 0 and remains 0 until the
# current window size equals the max window size, and after
# that point the start_increment is increased by the step size.
# The start of the window remains unchanged while the window
# size increases from its min value to its max value. After
# the window size reaches its max value, the window size remains
# unchanged, and the window moves.
start_increment = 0
####################
# THE MAJOR LOOP #
####################
# Iterate while the end of the current window, i.e.,
# start_increment + current_window_size falls within
# the input list.
while start_increment + current_window_size <= input_length:
# At each iteration, either start_increment or
# current_window_size (possibly both) increase.
# CASE 1:
# If the system is not in steady state before and
# after the iteration, then during the iteration:
# (a) the start of the window doesn't change
# (b) the window size increases by the step size.
# CASE 2:
# If the system is in steady state before and
# after the iteration, then during the iteration:
# (a) the start of the window increases by the step size
# (b) the window size remains unchanged at its max value.
# CASE 3:
# If the system is in steady state before the iteration,
# and function f resets the window, then after the iteration
# (a) the start of the window increases and
# (b) window size is set to its min value. So the system is no
# longer in steady state.
# (c) reset is set to False.
# CASE 4:
# If the system is not in steady state before the iteration,
# and reaches steady state after the transition because the
# window size is increased to its max value, and if
# reset is False, then after the iteration:
# (a) the start of the window may increase, and
# (b) window size is its max value. So the system is now
# in steady state.
# CASE 5:
# If the system is not in steady state before the iteration,
# and reaches steady state after the transition because the
# window size is increased to its max value, and if
# reset is True, then after the iteration, Case 3 applies, i.e.,
# (a) the start of the window increases, and
# (b) window size is set to its min value, and
# (c) reset is set to False.
# The only cases in which the end of the window, i.e.,
# start_increment + current_window_size,
# does NOT increase, are cases 3 and 5, i.e., the cases in
# which reset changes from True to False.
# In these cases, the end of the window does not move, but its
# start increases. In the next iteration, the
# end of the window will move, and this ensures that the loop
# terminates.
# input_window is the next window in the input stream.
input_window = \
input_list[start_increment:start_increment+current_window_size]
#############################################
# COMPUTE INCREMENTS TO THE OUTPUT STREAM. #
#############################################
# Note: function f MUST return state (where state[0]
# is the current_window_size and state[1] indicates whether
# the steady state, i.e., current window size equals max value,
# has been reached, and state[2], the reset value).
# Update the state to reflect the new value of current_window_size
state[0] = current_window_size
state[1] = steady_state
state[2] = reset
# Compute the new output and the new state.
output_increment, state = func(input_window, state, *args, **kwargs)
# Get the new window parameters from the new state.
# state[0] and state[1] should not normally be changed
# by f().
#current_window_size = state[0]
#steady_state = state[1]
reset = state[2]
#################################
# PROCESS THE NEW OUTPUT. #
#################################
# Deal with special objects that should not be placed
# on the output stream.
output_increment = remove_novalue_and_open_multivalue(
[output_increment])
# Place the output increment on the output list.
# The messages in the output list will eventually
# be sent on the output stream
output_list.extend(output_increment)
################################################
# UPDATE WINDOW SIZE AND WINDOW STARTING POINT #
################################################
# CASES 1 or CASE 4: NOT STEADY STATE
# In this case, reset has no effect.
# This is because the window size is still increasing
# and hasn't yet reached max_window_size.
# The current window size is increased, but
# the start increment does not change because
# the starting point of the window remains
# unchanged until the current window size increases
# to the max window size. After that point,
# the starting point of the window moves forward
# by step size.
if not steady_state:
# CASE 1 or CASE 4
if current_window_size + step_size >= max_window_size:
# CASE 4 or CASE 5:
# Reaches steady state after this iteration.
steady_state = True
# Move the starting point of the window forwards.
# If current_window_size == max_window_size - step_size then
# the starting point of the window doesn't change.
# If current_window_size == max_window_size then
# the starting point of the window moves forward by step_size.
start_increment += current_window_size + step_size - max_window_size
current_window_size = max_window_size
# If reset is True then CASE 5 holds. The actions for
# CASE 5 appear later, see "if reset and steady_state:"
# If reset is False, then CASE 4 holds and no further action occurs
# in this iteration.
continue
else:
# CASE 1:
# Have not reached steady state yet.
# Increase current window size and leave the starting point
# of the window unchanged.
current_window_size += step_size
# CASE 3: IN STEADY STATE, AND RESET. or
# CASE 5: REACHED STEADY STATE, AND RESET.
# Since reset is True, the agent has determined that
# the window size should be reset to its minimum value.
if reset and steady_state:
steady_state = False
reset = False
# Assume the previous window was slice A:B. Then
# B-A = current_window_size
# The new window is A':B. The end, B, of the window doesn't
# move, but its start, A, moves forward to A', where A'
# is computed from new window size, B - A' = min_window_size.
# We have: B = start_increment + current_window_size
# Therefore:
# A' = start_increment + current_window_size - min_window_size
# A = start_increment
# Hence A' - A = current_window_size - min_window_size.
# Move the start of the window forward by this amount.
start_increment += current_window_size - min_window_size
current_window_size = min_window_size
# CASE 2: IN STEADY STATE, AND NO RESET.
if (not reset) and steady_state:
# Assert: not reset and steady_state
# Move the window forward by step_size without changing
# its size which remains max_window_size
start_increment += step_size
###############################
# END OF THE MAJOR LOOP #
###############################
# The start pointer for the input stream is moved forward
# to the starting point of the current window
start += start_increment
start_increment = 0
# Update state
state[0] = current_window_size
state[1] = steady_state
state[2] = reset
return ([output_list], state, [start])
# Create agent
Agent([in_stream], [out_stream], transition, state)
#------------------------------------------------------------------------------------------------
#------------------------------------------------------------------------------------------------
# WINDOW AGENT TESTS
#----------------------------r--------------------------------------------------------------------
#------------------------------------------------------------------------------------------------
[docs]def test_window_agents():
# Test simple window map agent with the same window size and step size
r = Stream('r')
s = Stream('s')
smap = window(function=sum, in_stream=r, window_size=4, step_size=4)
a = window_map_agent(
func=sum, in_stream=r, out_stream=s, window_size=4, step_size=4,
name='a')
r.extend(range(16))
assert s.stop == 4
assert s.recent[:4] == [0+1+2+3, 4+5+6+7, 8+9+10+11, 12+13+14+15]
r.extend([10, -10, 21, -20])
assert s.stop == 5
assert s.recent[:5] == [6, 22, 38, 54, 1]
assert smap.recent[:smap.stop] == s.recent[:s.stop]
# Test window map agent with different window and step sizes
t = Stream('t')
b = window_map_agent(
func=sum, in_stream=r, out_stream=t, window_size=3, step_size=2, name='b')
assert t.stop == (20 - 3 + 1)/2
assert t.recent[:t.stop] == [0+1+2, 2+3+4, 4+5+6, 6+7+8, 8+9+10, 10+11+12, 12+13+14,
14+15+10, 10+(-10)+21]
r.extend([-1, 1, 0])
assert t.stop == 11
assert t.recent[:t.stop] == [0+1+2, 2+3+4, 4+5+6, 6+7+8, 8+9+10, 10+11+12, 12+13+14,
14+15+10, 10+(-10)+21, 21-20-1, -1+1+0]
# Test window map agent with a NumPy function
tt = Stream('tt')
bb = window_map_agent(
func=np.mean, in_stream=r, out_stream=tt,
window_size=3, step_size=2, name='bb')
assert tt.stop == 11
assert tt.recent[:tt.stop] == [1, 3, 5, 7, 9, 11, 13, 13, 7, 0, 0]
r.extend(range(1,4,1))
assert tt.stop == 12
assert tt.recent[:tt.stop] == [1, 3, 5, 7, 9, 11, 13, 13, 7, 0, 0, 1]
# Test window map agent with user-defined function and no state
def f(lst):
return sum(lst)+1
u = Stream('u')
rr = Stream('rr')
c = window_map_agent(
func=f, in_stream=rr, out_stream=u, window_size=4, step_size=4, name='c')
assert u.stop == 0
rr.extend(range(16))
assert u.stop==4
assert u.recent[:4] == [0+1+2+3+1, 4+5+6+7+1, 8+9+10+11+1, 12+13+14+15+1]
rr.extend(range(16, 21, 1))
assert u.stop==5
assert u.recent[:5] == [0+1+2+3+1, 4+5+6+7+1, 8+9+10+11+1, 12+13+14+15+1,
16+17+18+19+1]
# Test window map agent with user-defined function and state
def g(lst, state):
return sum(lst)+state, sum(lst)+state
v = Stream('v')
v_in = Stream('v_in')
d = window_map_agent(
func=g, in_stream=v_in, out_stream=v, window_size=4, step_size=4,
state=0, name='d')
assert v.stop == 0
v_in.extend(range(18))
assert v.stop==4
assert v.recent[:4] == [0+1+2+3, 0+1+2+3+4+5+6+7, 0+1+2+3+4+5+6+7+8+9+10+11,
0+1+2+3+4+5+6+7+8+9+10+11+12+13+14+15]
v_in.extend(range(18, 22, 1))
#------------------------------
# Test window merge agent
#------------------------------
def h(list_of_windows):
return sum([sum(window) for window in list_of_windows])
w = Stream('w')
x = Stream('x')
w_prime = Stream('w_prime')
e = window_merge_agent(
func=h, in_streams=[r,w], out_stream=x, window_size=3, step_size=3,
name='e')
assert x.stop == 0
w_prime.extend(range(16))
assert x.stop == 0
w.extend(range(100, 140, 4))
assert x.stop == 3
assert x.recent[:x.stop] == [315, 360, 405]
def hh(list_of_windows):
return sum([max(window) for window in list_of_windows])
y = Stream('y')
ee = window_merge_agent(
func=hh, in_streams=[r,w], out_stream=y, window_size=3, step_size=3,
name='ee')
assert y.stop == 3
assert y.recent[:y.stop] == [110, 125, 140]
w.extend(range(140, 150, 4))
assert y.stop == 4
assert y.recent[:y.stop] == [110, 125, 140, 155]
#------------------------------
# test window split agent
#------------------------------
def splt(window):
return sum(window), max(window)
w = Stream('w')
y = Stream('y')
in_split = Stream('in_split')
ff = window_split_agent(
func=splt, in_stream=in_split, out_streams=[w,y], window_size=3, step_size=3,
name='ff')
assert w.stop == 0
assert y.stop == 0
in_split.extend(range(16))
assert w.stop == 5
assert y.stop == 5
assert w.recent[:w.stop] == [3, 12, 21, 30, 39]
assert y.recent[:y.stop] == [2, 5, 8, 11, 14]
in_split.extend(range(16, 20, 1))
assert w.recent[:w.stop] == [3, 12, 21, 30, 39, 48]
assert y.recent[:y.stop] == [2, 5, 8, 11, 14, 17]
def splt_np(window):
return sum(window), np.mean(window)
ww = Stream('ww')
yy = Stream('yy')
r_split_numpy = Stream('r split numpy')
gg = window_split_agent(
func=splt_np, in_stream=r_split_numpy, out_streams=[ww,yy], window_size=3, step_size=3,
name='gg')
assert ww.stop == 0
assert yy.stop == 0
r_split_numpy.extend(range(16))
assert ww.stop == 5
assert yy.stop == 5
assert ww.recent[:ww.stop] == [3, 12, 21, 30, 39]
assert yy.recent[:yy.stop] == [1, 4, 7, 10, 13]
r_split_numpy.extend(range(16, 24, 1))
assert ww.recent[:ww.stop] == [3, 12, 21, 30, 39, 48, 57, 66]
assert yy.recent[:yy.stop] == [1, 4, 7, 10, 13, 16, 19, 22]
def max_min_window(window):
return max(window), min(window)
zz = Stream('zz')
aa = Stream('aa')
r_max_min_window = Stream('max min window')
hh = window_split_agent(
func=max_min_window, in_stream=r_max_min_window, out_streams=[zz,aa],
window_size=2, step_size=3,name='hh')
assert zz.stop == 0
assert aa.stop == 0
r_max_min_window.extend(range(16))
assert zz.stop == 5
assert aa.stop == 5
assert zz.recent[:zz.stop] == [1, 4, 7, 10, 13]
assert aa.recent[:aa.stop] == [0, 3, 6, 9, 12]
r_max_min_window.extend(range(16, 20, 1))
assert zz.recent[:zz.stop] == [1, 4, 7, 10, 13, 16, 19]
assert aa.recent[:aa.stop] == [0, 3, 6, 9, 12, 15, 18]
bb = Stream('bb')
cc = Stream('cc')
in_bb_cc = Stream('in bb cc')
ii = window_split_agent(
func=max_min_window, in_stream=in_bb_cc, out_streams=[bb,cc], window_size=5, step_size=3,
name='ii')
assert bb.recent[:bb.stop] == []
assert cc.recent[:cc.stop] == []
in_bb_cc.extend(range(16))
assert bb.stop == 4
assert cc.stop == 4
assert bb.recent[:bb.stop] == [4, 7, 10, 13]
assert cc.recent[:cc.stop] == [0, 3, 6, 9]
in_bb_cc.extend(range(16, 20, 1))
assert bb.stop == 6
assert cc.stop == 6
assert bb.recent[:bb.stop] == [4, 7, 10, 13, 16, 19]
assert cc.recent[:cc.stop] == [0, 3, 6, 9, 12, 15]
# check window_many_agent
def max_min_of_sum_of_windows(list_of_two_windows):
window_0, window_1 = list_of_two_windows
sum_0 = sum(window_0)
sum_1 = sum(window_1)
return max(sum_0, sum_1), min(sum_0, sum_1)
def max_min_of_sum_of_windows_args(
list_of_two_windows, multiplicand, addend):
window_0, window_1 = list_of_two_windows
sum_0 = sum(window_0)
sum_1 = sum(window_1)
return max(sum_0, sum_1)*multiplicand, min(sum_0, sum_1)+addend
dd = Stream('dd')
ee = Stream('ee')
many_dd_ee_0 = Stream('input 0 for output dd ee')
many_dd_ee_1 = Stream('input 1 for output dd ee')
ddd = Stream('ddd')
eee = Stream('eee')
many_ddd_eee_0 = Stream('input 0 for output ddd eee')
many_ddd_eee_1 = Stream('input 1 for output ddd eee')
qqq = window_many_agent(
func=max_min_of_sum_of_windows, in_streams=[many_dd_ee_0, many_dd_ee_1],
out_streams=[dd, ee], window_size=7, step_size=5,
name='qqq')
qqq_args = window_many_agent(
func=max_min_of_sum_of_windows_args, in_streams=[many_dd_ee_0, many_dd_ee_1],
out_streams=[ddd, eee], window_size=7, step_size=5,
name='qqq_args', args=[2], kwargs={'addend':10})
assert dd.recent[:dd.stop] == []
assert ee.recent[:ee.stop] == []
assert ddd.recent[:ddd.stop] == []
assert eee.recent[:eee.stop] == []
many_dd_ee_0.extend(range(16))
assert dd.recent[:dd.stop] == []
assert ee.recent[:ee.stop] == []
assert ddd.recent[:ddd.stop] == []
assert eee.recent[:eee.stop] == []
many_dd_ee_1.extend(range(100, 220, 10))
assert dd.recent[:dd.stop] == [910, 1260]
assert ee.recent[:ee.stop] == [21, 56]
assert ddd.recent[:ddd.stop] == [1820, 2520]
assert eee.recent[:eee.stop] == [31, 66]
many_dd_ee_0.extend(range(16, 32, 1))
assert dd.recent[:dd.stop] == [910, 1260]
assert ee.recent[:ee.stop] == [21, 56]
assert ddd.recent[:ddd.stop] == [1820, 2520]
assert eee.recent[:eee.stop] == [31, 66]
many_dd_ee_1.extend(range(220, 300, 10))
assert dd.recent[:dd.stop] == [910, 1260, 1610]
assert ee.recent[:ee.stop] == [21, 56, 91]
assert ddd.recent[:ddd.stop] == [1820, 2520, 3220]
assert eee.recent[:eee.stop] == [31, 66, 101]
# Try a different window size
many_in_2 = Stream('many in 2')
many_in_3 = Stream('many in 3')
many_out_2 = Stream('many out 2')
many_out_3 = Stream('many out 3')
rrr = window_many_agent(
func=max_min_of_sum_of_windows, in_streams=[many_in_2, many_in_3],
out_streams=[many_out_2, many_out_3], window_size=2, step_size=5,
name='rrr')
assert many_out_2.stop == 0
assert many_out_3.stop == 0
many_in_2.extend(range(4))
many_in_3.extend(range(10, 100, 10))
assert many_out_2.stop == 1
assert many_out_3.stop == 1
assert many_out_2.recent[:many_out_2.stop] == [30]
assert many_out_3.recent[:many_out_3.stop] == [1]
many_in_3.extend(range(100,200,10))
assert many_out_2.stop == 1
assert many_out_3.stop == 1
assert many_out_2.recent[:many_out_2.stop] == [30]
assert many_out_3.recent[:many_out_3.stop] == [1]
many_in_2.extend(range(4, 10, 1))
assert many_out_2.stop == 2
assert many_out_3.stop == 2
assert many_out_2.recent[:many_out_2.stop] == [30, 130]
assert many_out_3.recent[:many_out_3.stop] == [1, 11]
#-------------------------------------------------------------
# TEST args and kwargs
#-------------------------------------------------------------
# Test simple window map agent with args
def f_args(window, multiplicand):
return sum(window)*multiplicand
rr = Stream('rr')
ss = Stream('ss')
sss = Stream('sss')
aa = window_map_agent(
func=sum, in_stream=rr, out_stream=ss, window_size=4, step_size=4,
name='aa')
aaa = window_map_agent(
func=f_args, in_stream=rr, out_stream=sss, window_size=4, step_size=4,
name='aaa', args=[10])
rr.extend(range(16))
assert ss.recent[:ss.stop] == [6, 22, 38, 54]
assert sss.recent[:sss.stop] == [60, 220, 380, 540]
rr.append(20)
assert ss.recent[:ss.stop] == [6, 22, 38, 54]
assert sss.recent[:sss.stop] == [60, 220, 380, 540]
rr.extend([21, 22, 23, 24])
assert ss.recent[:ss.stop] == [6, 22, 38, 54, 86]
assert sss.recent[:sss.stop] == [60, 220, 380, 540, 860]
# Test simple window map agent with kwargs
rr_kwargs = Stream('rr_kwargs')
sss_kwargs = Stream('sss_kwargs')
aaa_kwargs = window_map_agent(
func=f_args, in_stream=rr_kwargs, out_stream=sss_kwargs,
window_size=4, step_size=4,
name='aaa_kwargs', kwargs={'multiplicand':10})
rr_kwargs.extend(range(16))
assert sss_kwargs.recent[:sss_kwargs.stop] == [60, 220, 380, 540]
rr_kwargs.append(20)
assert sss_kwargs.recent[:sss_kwargs.stop] == [60, 220, 380, 540]
rr_kwargs.extend([21, 22, 23, 24])
assert sss_kwargs.recent[:sss_kwargs.stop] == [60, 220, 380, 540, 860]
# Test simple window map agent with args and kwargs
def f_args_kwargs(window, multiplicand, addend):
return sum(window)*multiplicand + addend
rr_args_kwargs = Stream('rr_args_kwargs')
ss_args_kwargs = Stream('ss_args_kwargs')
sss_args_kwargs = Stream('sss_args_kwargs')
aa_args_kwargs = window_map_agent(
func=sum, in_stream=rr_args_kwargs, out_stream=ss_args_kwargs,
window_size=4, step_size=4,
name='aa_args_kwargs')
aaa = window_map_agent(
func=f_args_kwargs, in_stream=rr_args_kwargs, out_stream=sss_args_kwargs,
window_size=4, step_size=4,
name='aaa_args_kwargs', args=[10], kwargs={'addend':5})
assert ss_args_kwargs.recent[:ss_args_kwargs.stop] == []
assert sss_args_kwargs.recent[:sss_args_kwargs.stop] == []
rr_args_kwargs.extend(range(16))
assert ss_args_kwargs.recent[:ss_args_kwargs.stop] == [6, 22, 38, 54]
assert sss_args_kwargs.recent[:sss_args_kwargs.stop] == [65, 225, 385, 545]
rr_args_kwargs.append(20)
assert ss_args_kwargs.recent[:ss_args_kwargs.stop] == [6, 22, 38, 54]
assert sss_args_kwargs.recent[:sss_args_kwargs.stop] == [65, 225, 385, 545]
rr_args_kwargs.extend([21, 22, 23, 24])
assert ss_args_kwargs.recent[:ss_args_kwargs.stop] == [6, 22, 38, 54, 86]
assert sss_args_kwargs.recent[:sss_args_kwargs.stop] == [65, 225, 385, 545, 865]
return
if __name__ == '__main__':
test_window_agents()