IoTPy.code package

Submodules

IoTPy.code.agent module

This module contains the Agent class. The Agent and Stream classes are the building blocks of PythonStreams.

class IoTPy.code.agent.Agent(in_streams, out_streams, transition, state=None, call_streams=None, name=None, stream_manager=None)[source]

Bases: IoTPy.code.agent.Blackbox

An agent is an automaton: a state-transition machine. An agent has only one important method: the method next() that makes the agent execute a state transition.

An agent has lists of: (1) input streams, (2) output streams and (3) call streams. Streams are described in Stream.py.

During a state transition an agent: (1) May read values from its input streams. (Note that

reading values in a stream does not change the stream.)
  1. Append values to the tails of its output streams.
  2. Change the agent’s own state.

When a call stream is modified the agent’s next() method is called which causes the agent to execute a state transition.

The default is that every input stream is also a call stream, i.e., the agent executes a state transition when any of its input streams is modified. For performance reasons, we may not want the agent to execute state transitions each time any of its input streams is modified; we may want the agent to execute state transitions periodically — for example, every second. In this case, the call streams will be different from the input streams. A call stream that has a value appended to it every second will cause the agent to execute a state transition every second.

Parameters:

in_streams : list of streams

The list of the agent’s input streams. This list may be empty.

out_streams : list of streams

The list of the agent’s output streams. This list may be empty.

call_streams : list of streams

When a new value is added to a stream in this list a state transition is invoked. This the usual way (but not the only way) in which state transitions occur. A state transiton for an agent ag can also be executed by calling ag.next()

state: object

The state of the agent. The state is updated after a transition.

transition: function

This function is called by next() which is the state-transition operation for this agent. An agent’s state transition is specified by its transition function.

stream_manager : function

Each stream has management variables, such as whether the stream is open or closed. After a state-transition the agent executes the stream_manager function to modify the management variables of the agent’s output and call streams.

name : str, optional

name of this agent

Attributes

_in_lists: list of InList InList defines the slice of a list. The j-th element of _in_lists is an InList that defines the slice of the j-th input stream that can be read by this agent in a state transition. For example, if listj = _in_lists[j].lists startj = _in_lists[j].start stopj = _in_lists[j].stop Then this agent can read the slice: listj[startj:stopj] of the jth input stream. This slice is a slice of the most recent values of the stream.
_out_lists: list The j-th element of _out_lists is the list of values to be appended to the j-th output stream after the state transition.

Methods

next() Execute a state transition. The method has 3 parts: (i) set up the data structures to execute a state transition, (ii) call the transition function to: (a) get the values to be appended to output streams, (b) get the next state, and (c) update ‘start’ indices for each input stream. The agent no longer accesses elements of its input streams with indices earlier (i.e. smaller) than ‘start’. (iii) update data structures after the transition.
next(stream_name=None)[source]

Execute the next state transition.

This function does the following: Part 1: set up data structures for the state transition. Part 2: execute the state transition by calling self.transition Part 3: update data structures after the transition.

This method can be called by any agent and is called whenever a value is appended to any stream in call_streams

Parameters:

stream_name : str, optional

A new value was appended to the stream with name stream_name as a result of which this agent executes a state transition.

class IoTPy.code.agent.Blackbox(in_streams, out_streams, name)[source]

Bases: object

IoTPy.code.agent.EPSILON = 1e-12
class IoTPy.code.agent.InList

Bases: tuple

Attributes

list Alias for field number 0
start Alias for field number 1
stop Alias for field number 2

Methods

count(...)
index((value, [start, ...) Raises ValueError if the value is not present.
list

Alias for field number 0

start

Alias for field number 1

stop

Alias for field number 2

IoTPy.code.agent.main()[source]

IoTPy.code.stream module

This module contains the Stream class. The Stream and Agent classes are the building blocks of PythonStreams. (Version 1.0 June 16, 2016. Created by: Mani Chandy)

class IoTPy.code.stream.Stream(name='NoName', proc_name='UnknownProcess', initial_value=[], num_in_memory=16, min_history=4)[source]

Bases: object

A stream is a sequence of values. Agents can: (1) Append values to the tail of stream and close a stream. (2) Read a stream. (3) Subscribe to be notified when a value is added to a stream. (See Agent.py for details of agents.)

The ONLY way in which a stream can be modified is that values can be appended to its tail. The length of a stream (number of elements in its sequence) can stay the same or increase, but never decrease. If at some point, the length of a stream is k, then from that point onwards, the first k elements of the stream remain unchanged.

A stream is written by only one agent. Any number of agents can read a stream, and any number of agents can subscribe to a stream. An agent can be a reader and a subscriber and a writer of the same stream. An agent may subscribe to a stream without reading the stream’s values; for example the agent may subscribe to a clock stream and the agent executes a state transition when the the clock stream has a new value, regardless of the value.

Parameters:

name : str, optional

name of the stream. Though the name is optional a named stream helps with debugging. default : ‘NoName’

proc_name : str, optional

The name of the process in which this agent executes. default: ‘UnknownProcess’

initial_value : list or array, optional

The list (or array) of initial values in the stream. If a stream starts in a known state, i.e., with a known sequence of messages, then set the initial_value to this sequence. default : []

num_in_memory: positive int, optional

It is the initial value of the number of elements in the stream that are stored in main memory. If the stream has 9000 elements and num_in_memory is 100 then the most recent 100 elements of the stream are stored in main memory and the earlier 8900 elements are stored in a file or discarded. num_in_memory may change. It increases if a reader is reading the i-th value of the stream and if j is the index of the most recent value in the stream and |j - i| exceeds num_in_memory. It may decrease if the gap between the indices of the most recent value in the stream and the earliest value being read by any agent is less than num_in_memory default : DEFAULT_NUM_IN_MEMORY

specified in SystemParameters.py

min_history: non-negative int, optional

The minimum number of elements of the stream that are stored in main memory. If min_history is 3 and the stream has 9000 elements then the elements 8997, 8998, 8999 will be stored in main memory even if no agent is reading the stream. min_history is used primarily for debugging. A debugger may need to read early values of the stream, and reading from main memory is faster than reading from a file. default : DEFAULT_MIN_HISTORY

specified in SystemParameters.py.

Notes

  1. AGENTS SUBSCRIBING TO A STREAM

An agent is a state-transition automaton and the only action that an agent executes is a state transition. If agent x is a subscriber to a stream s then x.next() — a state transition of x — is invoked whenever messages are appended to s.

The only point at which an agent executes a state transition is when a stream to which the agent subscribes is modified.

An agent x subscribes to a stream s by executing
s.call(x).

An agent x unsubscribes from a stream s by executing:

s.delete_caller(x)
  1. AGENTS READING A STREAM

2.1 Agent registers for reading

An agent can read a stream only after it registers with the stream as a reader. An agents r registers with a stream s by executing:

s.reader(r)

An agent r deletes its registration for reading s by executing:

s.delete_reader(r)

An agent that reads a stream is also a subscriber to that stream unless the agent has a call-stream. If an agent has no call-stream and stream s is an input stream of that agent, then whenever s is modified, the agent is told to execute a state transition.

2.2 Slice of a stream that can be read by an agent

At any given point, an agent r that has registered to read a stream s can only read some of the most recent values in the stream. The number of values that an agent can read may vary from agent to agent. A reader r can only read a slice:

s[s.start[r]+s.offset: s.stop+s.offset]

of stream s where start[r], stop and offset are defined later.

  1. WRITING A STREAM

3.1 Extending a stream

When an agent is created it is passed a list of streams that it can write.

An agent adds a single element v to a stream s by executing:

s.append(v)

An agent adds the sequence of values in a list l to a stream s by executing:

s.extend(l)

The operations append and extend of streams are analogous to operations with the same names on lists.

3.2 Closing a Stream

A stream is either closed or open. Initially a stream is open. An agent that writes a stream s can close s by executing:

s.close()

A closed stream cannot be modified.

  1. MEMORY

4.1 The most recent values of a stream

The most recent elements of a stream are stored in main memory. In addition, the user can specify whether all or part of the stream is saved to a file.

Associated with each stream s is a list (or array) s.recent that includes the most recent elements of s. If the value of s is a sequence:

s[0], ..., s[n-1],

at a point in a computation then at that point, s.recent is a list

s[m], .., s[n-1]

for some m, followed by some padding (usually a sequence of zeroes, as described later).

The system ensures that all readers of stream s only read elements of s that are in s.recent.

4.2 Slice of a stream that can be read

Associated with a reader r of stream s is an integer s.start[r]. Reader r can only read the slice:

s.recent[s.start[r] : ]

of s.recent.

For readers r1 and r2 of a stream s the values s.start[r1] and s.start[r2] may be different.

4.3 When a reader finishes reading part of a stream

Reader r informs stream s that it will only read values with indexes greater than or equal to j in the list, recent, by executing

s.set_start(r, j)

which causes s.start[r] to be set to j.

  1. OPERATION

5.1 Memory structure

Associated with a stream is: (1) a list or NumPy darray, recent. (2) a nonnegative integer stop where:

  1. recent[ : stop] contains the most recent values of the stream,
  2. the slice recent[stop:] is padded with padding values (either 0 or 0.0 or default value specified by the numpy data type).
  1. a nonnegative integer s.offset where
    recent[i] = stream[i + offset]
    for 0 <= i < s.stop

Example: if the sequence of values in a stream is:

0, 1, .., 949
and s.offset is 900, then
s.recent[i] = s[900+i] for i in 0, 1, ..., 49.
and
s.recent[i] is the default value for i > 49.
Invariant:
len(s) = s.offset + s.stop

where len(s) is the number of values in stream s.

The size of s.recent increases and decreases so that the length of slice that any reader may read is less than the length of s.recent.

The entire stream, or the stream up to offset, can be saved in a file for later processing. You can also specify that no part of the stream is saved to a file.

In the current implementation old values of the stream are not saved.

5.2 Memory Management

We illustrate memory management with the following example with num_in_memory=4 and buffer_size=1

Assume that a point in time, for a stream s, the list of values in the stream is [1, 2, 3, 10, 20, 30]; num_in_memory=4; s.offset=3; s.stop=3; and s.recent = [10, 20, 30, 0]. The length of s.recent is num_in_memory (i.e. 4). The s.stop (i.e. 3) most recent values in the stream are 10, 20, 30. s[3] == 10 == s.recent[0] s[4] == 20 == s.recent[1] s[5] == 30 == s.recent[2] The values in s.recent[s.stop:] are padded values (zeroes).

A reader r of stream s has access to the list:
s.recent[s.start[r] : s.stop]

Assume that s has two readers r and q where: s.start[r] = 1 and s.start[q] = 2. So agent r can read the slice [1:3] of recent which is the list [20, 30], and agent q can read the slice [2:3] of recent which is the list [30]. An invariant is:

0 <= s.start[r] <= s.stop

for any reader r.

When a value v is appended to stream s, v is inserted in s.recent[s.stop], replacing a default value, and s.stop is incremented. If s.stop >= len(s.recent) then a new s.recent is created and the values that may be read by any reader are copied into the new s.recent, and s.start, s.stop, and s._begin are modified.

Example: Start with the previous example. (Assume min_history is 0. This parameter is discussed in the next paragraph.) When a new value, 40 is appended to the stream, the list of values in s becomes. [1, 2, 3, 10, 20, 30, 40]. s.recent becomes [10, 20, 30, 40], and s.stop becomes 4. Since s.stop >= len(recent), a new copy of recent is made and the elements that are being read in s are copied into the new copy. So, recent becomes [20, 30, 40, 0] because no agent is reading s[3] = 10. Then s.stop becomes 3 and s.offset becomes 4. s.start is modified with s.start[r] becoming 0 and s.start[q] becoming 1, so that r continues to have access to values of the stream after 20; thus r can now read the list [20, 30, 40] and q can read the list [30, 40].

At a later point, agent r informs the stream that it no longer needs to access elements 20, 30, 40 and so s.start[r] becomes 3. Later agent q informs the stream that it no longer needs to access element 30 and s.start[q] becomes 2. At this point r has access to the list [] and q to the list [40].

Now suppose the agent writing stream s extends the stream by the list [50, 60, 70, 80]. At this point, agent q needs to access the list [40, 50, 60, 70, 80] which is longer than len(recent). In this case the size of recent is doubled, and the new recent becomes: [40, 50, 60, 70, 80, 0, 0, 0], with s.start[r] = 1 and s.start[q] = 0. s.stop becomes 5.

Example of min_history = 4. Now suppose the stream is extended by 90, 100 so that s.recent becomes [40, 50, 60, 70, 80, 90, 100, 0] with s.stop = 7. Suppose r and q inform the stream that they no longer need to access the elements currently in the stream, and so s.start[r] and s.start[q] become 7. (In this case the size of recent may be made smaller (halved); but, this is not done in the current implementation and will be done later.) Next suppose the stream is extended by [110]. Since r and q only need to read this value, all the earlier values could be deleted from recent; however, min_history elements must remain in recent and so recent becomes: [80, 90, 100, 110, 0, 0, 0, 0]

Attributes

recent (list or NumPy array.) A list or array that includes the most recent values of the stream. This list or array is padded with default values (see stop).
stop (int) index into the list recent. s.recent[:s.stop] contains the s.stop most recent values of stream s. s.recent[s.stop:] contains padded values.
offset: int index into the stream used to map the location of an element in the entire stream with the location of the same element in s.recent, which only contains the most recent elements of the stream. For a stream s: s.recent[i] = s[i + s.offset] for i in range(s.stop) Note: In later versions, offset will be implemented as a list of ints.
start (dict of readers.) key = reader value = start index of the reader Reader r can read the slice: s.recent[s.start[r] : s.stop ] in s.recent which is equivalent to the following slice in the entire stream: s[s.start[r]+s.offset: s.stop+s.offset] Invariant: For all readers r: stop - start[r] <= len(recent) This invariant is maintained by increasing the size of recent when necessary.
subscribers_set: set the set of subscribers for this stream. Subscribers are agents to be notified when an element is added to the stream.
closed: boolean True if and only if the stream is closed. An exception is thrown if a value is appended to a closed stream.
close_message: _close or np.NaN This message is appended to a stream to indicate that when this message is received the stream should be closed. If the stream is implemented as a list then close_message is _close, and for StreamArray the close_message is np.NaN (not a number).
_begin (int) index into the list, recent recent[:_begin] is not being accessed by any reader; therefore recent[:_begin] can be deleted from main memory. Invariant: for all readers r: _begin <= min(start[r])

Methods

append(value) Append a single value to the end of the stream.
call(agent) Register a subscriber for this stream.
close() Close this stream.”
delete_caller(agent) Delete a subscriber for this stream.
delete_reader(reader) Delete this reader from this stream.
extend(value_list) Extend the stream by value_list.
get_contents_after_column_value(...) Assumes that the stream consists of rows where the number of elements in each row exceeds column_number.
get_index_for_column_value(column_number, value) Similar to get_contents_after_column_value except that the value returned is an index into recent rather than the sequence of rows.
get_last_n(n)
Parameters:
get_latest() Returns the latest element in the stream.
get_latest_n(n) Same as get_last_n()
is_empty() Returns: boolean ——- True if and only if this stream is empty.
print_recent()
reader(r[, start_index]) A newly registered reader starts reading recent from index start, i.e., reads recent[start_index:s.stop] If reader has already been registered with this stream its start value is updated to start_index.
set_name(name)
set_start(reader, starting_value) The reader tells the stream that it is only accessing elements of the list recent with index start or higher.
append(value)[source]

Append a single value to the end of the stream.

call(agent)[source]

Register a subscriber for this stream.

close()[source]

Close this stream.”

delete_caller(agent)[source]

Delete a subscriber for this stream.

delete_reader(reader)[source]

Delete this reader from this stream.

extend(value_list)[source]

Extend the stream by value_list.

Parameters:value_list: list
get_contents_after_column_value(column_number, value)[source]

Assumes that the stream consists of rows where the number of elements in each row exceeds column_number. Also assumes that values in the column with index column_number are in increasing order.

Returns the rows in the stream for which:
row[column_number] >= value
get_index_for_column_value(column_number, value)[source]

Similar to get_contents_after_column_value except that the value returned is an index into recent rather than the sequence of rows.

get_last_n(n)[source]
Parameters:

n : positive integer

Returns:

The list of the last n elements of the stream. If the

number of elements in the stream is less than n, then it returns all the elements in the stream.

Note

Requirement: n >= self.min_history

get_latest()[source]

Returns the latest element in the stream. If the stream is empty then it returns the empty list

get_latest_n(n)[source]

Same as get_last_n()

is_empty()[source]
True if and only if this stream is empty.
print_recent()[source]
reader(r, start_index=0)[source]

A newly registered reader starts reading recent from index start, i.e., reads recent[start_index:s.stop] If reader has already been registered with this stream its start value is updated to start_index.

set_name(name)[source]
set_start(reader, starting_value)[source]

The reader tells the stream that it is only accessing elements of the list recent with index start or higher.

class IoTPy.code.stream.StreamArray(name='NoName', proc_name='UnknownProcess', dimension=0, dtype=<type 'float'>, initial_value=None, num_in_memory=16, min_history=4)[source]

Bases: IoTPy.code.stream.Stream

Methods

append(value)
Parameters:
call(agent) Register a subscriber for this stream.
close() Close this stream.”
delete_caller(agent) Delete a subscriber for this stream.
delete_reader(reader) Delete this reader from this stream.
extend(lst) See extend() for the class Stream.
get_contents_after_column_value(...) Assumes that the stream consists of rows where the number of elements in each row exceeds column_number.
get_contents_after_time(start_time)
get_index_for_column_value(column_number, value) Similar to get_contents_after_column_value except that the value returned is an index into recent rather than the sequence of rows.
get_last_n(n)
Parameters:
get_latest() Returns the latest element in the stream.
get_latest_n(n) Same as get_last_n()
is_empty() Returns: boolean ——- True if and only if this stream is empty.
print_recent()
reader(r[, start_index]) A newly registered reader starts reading recent from index start, i.e., reads recent[start_index:s.stop] If reader has already been registered with this stream its start value is updated to start_index.
set_name(name)
set_start(reader, starting_value) The reader tells the stream that it is only accessing elements of the list recent with index start or higher.
append(value)[source]
Parameters:

value: 1-D numpy array

The value appended to the StreamArray

Notes

See self._create_recent() for a description of the elements of the stream.

extend(lst)[source]

See extend() for the class Stream. Extend the stream by an numpy ndarray.

Parameters:lst: np.ndarray

Notes

See self._create_recent() for a description of the elements of the stream.

get_contents_after_time(start_time)[source]
class IoTPy.code.stream.StreamSeries(name=None)[source]

Bases: IoTPy.code.stream.Stream

Methods

append(value) Append a single value to the end of the stream.
call(agent) Register a subscriber for this stream.
close() Close this stream.”
delete_caller(agent) Delete a subscriber for this stream.
delete_reader(reader) Delete this reader from this stream.
extend(value_list) Extend the stream by value_list.
get_contents_after_column_value(...) Assumes that the stream consists of rows where the number of elements in each row exceeds column_number.
get_index_for_column_value(column_number, value) Similar to get_contents_after_column_value except that the value returned is an index into recent rather than the sequence of rows.
get_last_n(n)
Parameters:
get_latest() Returns the latest element in the stream.
get_latest_n(n) Same as get_last_n()
is_empty() Returns: boolean ——- True if and only if this stream is empty.
print_recent()
reader(r[, start_index]) A newly registered reader starts reading recent from index start, i.e., reads recent[start_index:s.stop] If reader has already been registered with this stream its start value is updated to start_index.
set_name(name)
set_start(reader, starting_value) The reader tells the stream that it is only accessing elements of the list recent with index start or higher.
class IoTPy.code.stream.StreamTimed[source]

Bases: IoTPy.code.stream.StreamArray

Methods

append(value)
Parameters:
call(agent) Register a subscriber for this stream.
close() Close this stream.”
delete_caller(agent) Delete a subscriber for this stream.
delete_reader(reader) Delete this reader from this stream.
extend(lst) See extend() for the class Stream.
get_contents_after_column_value(...) Assumes that the stream consists of rows where the number of elements in each row exceeds column_number.
get_contents_after_time(start_time)
get_index_for_column_value(column_number, value) Similar to get_contents_after_column_value except that the value returned is an index into recent rather than the sequence of rows.
get_last_n(n)
Parameters:
get_latest() Returns the latest element in the stream.
get_latest_n(n) Same as get_last_n()
is_empty() Returns: boolean ——- True if and only if this stream is empty.
print_recent()
reader(r[, start_index]) A newly registered reader starts reading recent from index start, i.e., reads recent[start_index:s.stop] If reader has already been registered with this stream its start value is updated to start_index.
set_name(name)
set_start(reader, starting_value) The reader tells the stream that it is only accessing elements of the list recent with index start or higher.
class IoTPy.code.stream.TimeAndValue

Bases: tuple

Attributes

time Alias for field number 0
value Alias for field number 1

Methods

count(...)
index((value, [start, ...) Raises ValueError if the value is not present.
time

Alias for field number 0

value

Alias for field number 1

IoTPy.code.stream.main()[source]
IoTPy.code.stream.remove_novalue_and_open_multivalue(l)[source]

This function returns a list which is the same as the input parameter l except that (1) _no_value elements in l are deleted and (2) each _multivalue element in l is opened

i.e., for an object _multivalue(list_x) each element of list_x appears in the returned list.

IoTPy.code.system_parameters module

SYSTEM_PARAMETERS

Module contents