Source code for IoTPy.code.agents.timed_agent

""" This module has timed_zip and timed_window which are described
in the manual documentation.

"""
import sys
import os
# scriptpath = "../"
# sys.path.append(os.path.abspath(scriptpath))

from ..agent import Agent, InList
from ..stream import Stream, StreamArray
from ..stream import _no_value, _multivalue, _close
from ..helper_functions.check_agent_parameter_types import *

####################################################
#                     TIMED ZIP
####################################################

[docs]def timed_zip_agent(in_streams, out_stream, call_streams=None, name=None): """ Parameters ---------- in_streams: list of Stream The list of input streams of the agent out_stream: Stream The single output stream of the agent call_streams: list of Stream The list of call_streams. A new value in any stream in this list causes a state transition of this agent. name: Str Name of the agent created by this function. Returns ------- Agent. The agent created by this function. Notes ----- Each stream in in_streams must be a stream of tuples or lists or NumPy arrays where element[0] is a time and where time is a total order. Each stream in in_stream must be strictly monotonically increasing in time. out_stream merges the in_streams in order of time. An element of out_stream is a list where element[0] is a time T and element[1] is a list consisting of all elements of in in_streams that have time T. Examples -------- """ # Check types of arguments check_list_of_streams_type(list_of_streams=in_streams, agent_name=name, parameter_name='in_streams') check_stream_type(name, 'out_stream', out_stream) check_list_of_streams_type(list_of_streams=call_streams, agent_name=name, parameter_name='call_streams') num_in_streams = len(in_streams) indices = range(num_in_streams) # The transition function for this agent. def transition(in_lists, state): # Check the types of in_lists check_in_lists_type(name, in_lists, num_in_streams) # input_lists is the list of lists that this agent can operate on # in this transition. input_lists = [in_list.list[in_list.start:in_list.stop] for in_list in in_lists] # pointers is a list where pointers[i] is a pointer into the i-th # input lists pointers = [0 for i in indices] # stops is a list where pointers[i] must not exceed stops[i]. stops = [len(input_lists[i]) for i in indices] # output_list is the single output list for this agent. output_list = [] while all(pointers[i] < stops[i] for i in indices): # slice is a list with one element per input stream. # slice[i] is the value pointed to by pointers[i]. slice = [input_lists[i][pointers[i]] for i in indices] # slice[i][0] is the time field for slice[i]. # earliest_time is the earliest time pointed to by pointers. earliest_time = min(slice[i][0] for i in indices) # slice[i][1:] is the list of fields other than the time # field for slice[i]. # next_output_value is a list with one element for # each input stream. # next_output_value[i] is the empty list if the time # for slice[i] is later than earliest time. If the time # for slice[i] is the earliest time, hen next_output_value[i] # is the list of all the non-time fields. next_output_value = [slice[i][1] if slice[i][0] == earliest_time else None for i in indices] # increment pointers for this indexes where the time was the # earliest time. pointers = [pointers[i]+1 if slice[i][0] == earliest_time else pointers[i] for i in indices] # Make next_output a list consisting of a time: the earliest time # followed by a sequence of lists, one for each input stream. # Each list in this sequence consists of the non-time fields. next_output = [earliest_time] next_output.append(next_output_value) # output_list has an element for each time in the input list. output_list.append(next_output) # Return: (1) output_lists, the list of outputs, one per # output stream. This agent has a single output stream # and so output_lists = [output_list] # (2) the new state; the state is irrelevant for this # agent because all it does is merge streams. # (3) the new starting pointer into this stream for # this agent. Since this agent has read # pointers[i] number of elements in the i-th input # stream, move the starting pointer for the i-th input # stream forward by pointers[i]. return [output_list], state, [in_lists[i].start+pointers[i] for i in indices] # Finished transition # Create agent state = None # Create agent with the following parameters: # 1. list of input streams. # 2. list of output streams. This agent has a single output stream and so # out_streams is [out_stream]. # 3. transition function # 4. new state (irrelevant for this agent), so state is None # 5. list of calling streams # 6. Agent name return Agent(in_streams, [out_stream], transition, state, call_streams, name)
[docs]def timed_zip(list_of_streams): out_stream = Stream('output of timed zip') timed_zip_agent(list_of_streams, out_stream) return out_stream
############################################################### # TIMED_WINDOW ###############################################################
[docs]def timed_window_agent( func, in_stream, out_stream, window_duration, step_time, window_start_time=0, state=None, call_streams=None, name=None, args=[], kwargs={}): # All windows with start times earlier than window_start_time # have already been processed. # state is the state of the underlying agent. # Augment the state with the start time of the # window. state = (window_start_time, state) def transition(in_lists, state): # The map agent has a single input stream. So, the transition # operates on a single in_list. in_list = in_lists[0] # The map agent has a single output stream. So, the transition # outputs a single list. output_list = [] # input_list is the list extracted from in_list input_list = in_list.list[in_list.start:in_list.stop] if len(input_list) == 0: return ([output_list], state, [in_list.start]) # Extract window start and the underlying state from the combined state. window_start_time, temp_state = state state = temp_state window_end_time = window_start_time + window_duration # last_element = input_list[-1] last_element_time = input_list[-1][0] # index is a pointer to input_list which starts at 0. timestamp_list = [timestamp_and_value[0] for timestamp_and_value in input_list] window_start_index = 0 # Main loop while window_end_time <= last_element_time: # Compute window_start_index which is the earliest index to an element # whose timestamp is greater than or equal to window_start_time while (window_start_index < len(input_list) and timestamp_list[window_start_index] < window_start_time): window_start_index += 1 if window_start_index >= len(input_list): # No element has timestamp greater than or equal to window_start_time. break # The timestamp corresponding to window_start_index may be much larger than # window_start_time. So, instead of moving the window start time in many steps, # move the window start time forward to match the window start index. # Then update the window end time to match the new window start time. # num_steps is the number of steps that the window is moved forward # so that the window includes window_start_index. if window_end_time > timestamp_list[window_start_index]: num_steps = 0 else: num_steps = \ 1 + int(timestamp_list[window_start_index] - window_end_time)/ int(step_time) # Slide the start and end times forward by the number of steps. window_start_time += num_steps * step_time window_end_time = window_start_time + window_duration # If window end time exceeds the timestamp of the last element then # this time-window crosses the input list. So, we have to wait for # elements with higher timestamps before the end of the window can # be determined. In this case break from the main loop. if window_end_time > last_element_time: break # Compute window end index which is the first element whose timestamp # is greater than or equal to window_end_time. window_end_index = window_start_index while timestamp_list[window_end_index] < window_end_time: window_end_index += 1 next_window = input_list[window_start_index : window_end_index] # Compute output_increment which is the output for # next_window. if state is None: output_increment = func(next_window) else: output_increment, state = func(next_window, state) # Append the output for this window to the output list. # The timestamp for this output is window_end_time. output_list.append((window_end_time,output_increment)) # Move the window forward by one step. window_start_time += step_time window_end_time = window_start_time + window_duration # End main loop # RETURN OUTPUT LIST, NEW STATE, and NEW STARTING INDEX # Compute window_start_index which is the earliest index to an element # whose timestamp is greater than or equal to window_start_time while (window_start_index < len(timestamp_list) and timestamp_list[window_start_index] < window_start_time): window_start_index += 1 state = (window_start_time, state) # Return the list of output messages, the new state, and the # new start value of the input stream. return ([output_list], state, [window_start_index+in_list.start]) # Create agent return Agent([in_stream], [out_stream], transition, state, call_streams, name)
[docs]def timed_window(function, in_stream, window_duration, step_time, state=None, args=[], kwargs={}): out_stream = Stream(function.__name__+in_stream.name) timed_window_agent(function, in_stream, out_stream, window_duration, step_time, state=state, args=args, kwargs=kwargs) return out_stream
############################################################### # TESTS ###############################################################
[docs]def test_timed_zip_agents(): x = Stream('x') y = Stream('y') z = Stream('z') # timed_zip_agent(in_streams=[x,y], out_stream=z, name='a') z = timed_zip([x, y]) def concat_operator(timed_list): result = '' for timestamp_value in timed_list: result = result + timestamp_value[1] return result r = timed_window(concat_operator, x, 5, 5) s = timed_window(concat_operator, x, 20, 10) x.extend([[1, 'a'], [3, 'b'], [10, 'd'], [15, 'e'], [17, 'f']]) y.extend([[2, 'A'], [3, 'B'], [9, 'D'], [20, 'E']]) assert z.recent[:z.stop] == \ [[1, ['a', None]], [2, [None, 'A']], [3, ['b', 'B']], [9, [None, 'D']], [10, ['d', None]], [15, ['e', None]], [17, ['f', None]]] assert r.recent[:r.stop] == [(5, 'ab'), (15, 'd')] assert s.recent[:s.stop] == [] x.extend([[21, 'g'], [23, 'h'], [40, 'i'], [55, 'j'], [97, 'k']]) y.extend([[21, 'F'], [23, 'G'], [29, 'H'], [55, 'I']]) assert z.recent[:z.stop] == \ [[1, ['a', None]], [2, [None, 'A']], [3, ['b', 'B']], [9, [None, 'D']], [10, ['d', None]], [15, ['e', None]], [17, ['f', None]], [20, [None, 'E']], [21, ['g', 'F']], [23, ['h', 'G']], [29, [None, 'H']], [40, ['i', None]], [55, ['j', 'I']]] assert r.recent[:r.stop] == [(5, 'ab'), (15, 'd'), (20, 'ef'), (25, 'gh'), (45, 'i'), (60, 'j')] assert s.recent[:s.stop] == [(20, 'abdef'), (30, 'defgh'), (40, 'gh'), (50, 'i'), (60, 'ij'), (70, 'j')] x.extend([[100, 'l'], [105, 'm']]) y.extend([[100, 'J'], [104, 'K'], [105, 'L'], [107, 'M']]) assert z.recent[:z.stop] == \ [[1, ['a', None]], [2, [None, 'A']], [3, ['b', 'B']], [9, [None, 'D']], [10, ['d', None]], [15, ['e', None]], [17, ['f', None]], [20, [None, 'E']], [21, ['g', 'F']], [23, ['h', 'G']], [29, [None, 'H']], [40, ['i', None]], [55, ['j', 'I']], [97, ['k', None]], [100, ['l', 'J']], [104, [None, 'K']], [105, ['m', 'L']]] assert r.recent[:r.stop] == [(5, 'ab'), (15, 'd'), (20, 'ef'), (25, 'gh'), (45, 'i'), (60, 'j'), (100, 'k'), (105, 'l')] assert s.recent[:s.stop] == [(20, 'abdef'), (30, 'defgh'), (40, 'gh'), (50, 'i'), (60, 'ij'), (70, 'j'), (100, 'k') ] x.extend([[106, 'n'], [110, 'o']]) assert z.recent[:z.stop] == \ [[1, ['a', None]], [2, [None, 'A']], [3, ['b', 'B']], [9, [None, 'D']], [10, ['d', None]], [15, ['e', None]], [17, ['f', None]], [20, [None, 'E']], [21, ['g', 'F']], [23, ['h', 'G']], [29, [None, 'H']], [40, ['i', None]], [55, ['j', 'I']], [97, ['k', None]], [100, ['l', 'J']], [104, [None, 'K']], [105, ['m', 'L']], [106, ['n', None]], [107, [None, 'M']] ] assert r.recent[:r.stop] == [(5, 'ab'), (15, 'd'), (20, 'ef'), (25, 'gh'), (45, 'i'), (60, 'j'), (100, 'k'), (105, 'l'), (110, 'mn')] assert s.recent[:s.stop] == [(20, 'abdef'), (30, 'defgh'), (40, 'gh'), (50, 'i'), (60, 'ij'), (70, 'j'), (100, 'k'), (110, 'klmn')] return
if __name__ == '__main__': test_timed_zip_agents()