Using Iterable Streams for Efficient Data Processing via Python Generators

Posted on 2017-05-27 in Python, Data Processing

We often iterate and operate over large datasets using common data structures such as lists or lists of dictionaries loaded in memory. This approach turns out to be non-feasible, never mind being scalable, when the data at hand gets to too large or it's a continuous stream of items, making the memory needs completely uncertain. In such situations the available memory gets too low as compared to the size of the dataset, hence causing the available memory to choke.

Rather than keeping such large amount of data in the memory, wouldn't it be great if we could lazily evaluate it, ie: process one item at a time in memory, and only ask for the next item when needed enabling us to rule out the dependency on size of the data?

By the end of this post you will...

  1. Appreciate the lazy evaluation approach towards the data.

  2. Understand and implement generator functions to produce/consume data as iterable streams.

  3. Build a custom processing pipeline using multiple generator functions.

Let's try to understand what lazy evaluation means.


Lazy Evaluation of Expressions

When lazily evaluating expressions, assigning an expression to a variable does not get evaluated in place; it is postponed until its value is needed or consumed. Consider the range function in Python 2.x and 3.x:

Python 2.x

>>>
>>> r = range(10)
>>> r
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>>

In the above snippet, we can clearly see that the range function, on assignment to a variable, creates a new list object in memory and binds it to that variable.


Python 3.x

>>>
>>> r = range(10)
>>> r
range(0, 10)
>>>
>>> for x in r:
...     print(x, end=' ')
...

0 1 2 3 4 5 6 7 8 9
>>>

In the above snippet, we can clearly see that the range function returns an object which, when iterated over, produces values one after the other. Such an evaluation is termed as lazy evaluation in contrast to the Eager evaluation in the previous snippet.

Note: The object returned by the range function is called as an iterator object. Python has a lot more builtins which get lazy evaluated post returning the iterator object such as Map, Reduce, Filter and Lambda.


Generator functions also return an iterator object more formally called as a generator object, and are a really good implementation of the lazy evaluation approach.


Generator Functions

Unlike normal functions, which return a single value, generator functions produce a sequence of values one after the other using the yield statement. The yield statement on execution suspends the function, remembering it's execution state in order to resume from the same point on next iteration.

Let's consider couple of snippets to contrast between a non-generator solution and a generator solution for producing the non-negative even integers within a specified range.

Snippet I - A Non-Generator way to find all non-negative even integers within 10

>>> import sys
>>>
>>> result = []
>>> sys.getsizeof(result)
64    # in bytes
>>>
>>> for i in range(0, 11):
...     if i % 2 == 0:
...          result.append(i)
...
>>> print(result)
[0, 2, 4, 6, 8, 10]
>>>
>>> sys.getsizeof(result)
>>> 128   # in bytes

In the above snippet, we get a list object of all the non-negative integers up to 10 which obviously consumes some memory. When the limit increases to around 10k or 100k of non-negative even integers, then the memory consumption levels will certainly of concern.

Generator functions are the perfect tool of choice to combat such situations. But why is that so?


Snippet II - A Generator way to find all non-negative even integers within 10

>>> import sys
>>>
>>> def gen_evens(num):
...     print('Starting to generate evens ...')
...     while num >= 0:
...             if num % 2 == 0:
...                     yield num
...             num -= 1
...
>>>
>>> evens = gen_evens(10)
>>> sys.getsizeof(evens)
>>> 88    # in bytes
>>>
>>> evens
<generator object gen_evens at 0x7f5debbd1200>
>>>

In the above snippet, the generator function didn't start executing when called as we don't see the print statement; instead, a generator object is returned which can only be iterated over to get the next values. There are simply no other operations that can be performed on the generator object. It's just good for one thing - iteration.

Usually generator objects can be hooked up with a for loop (or any looping construct) to produce values like:


>>> evens
<generator object gen_evens at 0x7f5b8e6407d8>
>>>
>>> for even in evens:
...     print(even)
...
Starting to produce evens...
10
8
6
4
2
0
>>>
>>> for even in evens:
...     print(even)
...
>>>
>>>

In the above snippet, we simply iterated over the evens generator object and got all the values one after the other.


But why didn't it do the same on second attempt?

Unlike lists, once the generator object is consumed, it can't produce values again. To generate the values again, we need to call the generator function once again. This clearly depicts that these generated values are not in memory at all, therefore allowing us to treat the data as a stream of items and process them one after the other.

Under the Hood

The generator functions starts to execute on call to next(). Any looping construct, calls __next__() or next() method on the iterator object implicitly to get the next value.


>>> next(evens)
Starting to generate evens ...
10
>>> next(evens)
8
>>> next(evens)
6
>>> next(evens)
4
>>> next(evens)
2
>>> next(evens)
0
>>> next(evens)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration
>>>
>>> next(evens)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration
>>>

Note: If you are wondering as to why there are two different ways of using next, read PEP 3114 for a better understanding.

Every time next is called, the code continues to execute until it hits the yield statement again. When no more values are left to produce, a StopIteration exception is raised indicating the end of the stream. The for loop gracefully exits the iteration process on catching such an exception.


When size of the data is larger than the available memory, then generator functions outperform in terms of speed and memory consumptions as compared to working with temporary lists within our code and iterating over them.


Building a processing pipeline using Generator functions

Until now we used generator functions to produce a stream of items, which is generally consumed by a for loop. In this section, we will glue together multiple generator functions and build our very own custom processing pipeline where each generator function acts as an individual component in the pipeline.

Use Case

We need to filter out our access logs to determine all incoming GET requests which are get a 404 HTTP status code in a realtime fashion in order to determine whether or not our application server is getting spammed by unwanted requests.

Our Approach

  1. Read the incoming access logs of our nginx file indefinitely forever. (similar to tail -f command)

    • Component name: read_access_logs
  2. Now, filter out all the 404 responses.

    • Component name: filter_404
  3. Then, filter out all the GET requests.

    • Component name: filter_get_requests
  4. Finally, print out the requested route and IP address of that particular request.

    • Component name: get_request_info

Now we will create a different component for each step above in our processing pipeline.

# parse.py

def read_access_logs():
    with open('/var/log/nginx/access.log') as f:
        f.seek(0, 2)  # seek to the end of file and start listening for next incoming request
        while True:
            line = f.readline()
            if line:
                yield line

def filter_404(logs):
    for log in logs:
        if '404' in log:
            yield log
        else:
            pass

def filter_get_requests(logs_404):
    for log_404 in logs_404:
        if 'GET' in log_404:
            yield log_404
        else:
            pass

def get_request_info(get_404_requests):
    for request in get_404_requests:
        r = request.split('-')
        yield "Remote Address :: {} :: Requested Route :: {} :: HTTP User Agent :: {}".format(r[0], r[2], r[3])


# glue up all the components and create a processing pipeline
logs = read_access_logs()
logs_404 = filter_404(logs)
get_404_logs = filter_get_requests(logs_404)
request_info_logs = get_request_info(get_404_logs)
for request_info_log in request_info_logs:
    print(request_info_log)

In the above snippet, we stacked together different components and formed our processing pipeline. Each component in the pipeline is driven by the for loop on the generator object and consuming the whole stream lazily by pushing down each item into the next component for further processing.

Such an approach allows for a better way to iterate over huge amount of data by decoupling the iteration process from the operation to be performed on the data at each stage.

Sample Output

Remote Address :: 124.88.64.206  :: Requested Route ::  [25/May/2017:04:56:25 +0000] "GET / HTTP/1.0" 404 233 " :: HTTP User Agent :: " "
Remote Address :: 139.162.114.70  :: Requested Route ::  [25/May/2017:06:01:21 +0000] "GET / HTTP/1.1" 404 233 " :: HTTP User Agent :: " "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36" "
Remote Address :: 60.191.38.77  :: Requested Route ::  [25/May/2017:22:34:24 +0000] "GET /current_config/Account1 HTTP/1.1" 404 233 " :: HTTP User Agent :: " "Mozilla/5.0" "
Remote Address :: 59.56.110.208  :: Requested Route ::  [26/May/2017:01:02:54 +0000] "GET /phpMyAdmin HTTP/1.1" 404 233 "http://139.59.13.232/phpMyAdmin" "Mozilla/4.0 (compatible; MSIE 9.0; Windows NT 6.1)" " :: HTTP User Agent :: "

Remote Address :: 59.56.110.208  :: Requested Route ::  [26/May/2017:01:05:47 +0000] "GET /phpMyAdmin HTTP/1.1" 404 233 "http://139.59.13.232/phpMyAdmin" "Mozilla/4.0 (compatible; MSIE 9.0; Windows NT 6.1)" " :: HTTP User Agent :: "

Remote Address :: 139.162.124.167  :: Requested Route ::  [26/May/2017:07:41:21 +0000] "GET / HTTP/1.1" 404 233 " :: HTTP User Agent :: " "Mozilla/5.0" "
Remote Address :: 218.93.201.199  :: Requested Route ::  [26/May/2017:11:17:46 +0000] "GET /manager/html HTTP/1.1" 404 233 " :: HTTP User Agent :: " "Mozilla/3.0 (compatible; Indy Library)" "
Remote Address :: 183.129.160.229  :: Requested Route ::  [26/May/2017:12:48:31 +0000] "GET / HTTP/1.1" 404 233 " :: HTTP User Agent :: " "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.11; rv:47.0) Gecko/20100101 Firefox/47.0" "
Remote Address :: 183.129.160.229  :: Requested Route ::  [26/May/2017:21:51:38 +0000] "GET / HTTP/1.1" 404 233 " :: HTTP User Agent :: " "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.11; rv:47.0) Gecko/20100101 Firefox/47.0" "
Remote Address :: 49.73.191.7  :: Requested Route ::  [27/May/2017:09:05:39 +0000] "GET / HTTP/1.1" 404 233 " :: HTTP User Agent :: " "

Note: The program keeps on running indefinitely until we exit from it, and listens for the next incoming request so that it can push it through our pipeline and filter out desired results. For the purpose of this post, all of the filtered data is printed out to STDOUT.

Checkout out the video demonstrating the above use case.


Conclusion

Iteration is very basic and yet one of the most powerful programming tools. Approaching huge datasets lazily, makes it a lot easier to efficiently process the data by caring about just the next incoming item. Nowadays, many of the data processing tools/products such as Storm, Spark, Luigi(built on core Python) and many others are aligned with the same approach of streaming the data in a pipeline with a lot more robustness of course.

I hope this post encourages you to breakdown your laborious processes into abstract components and form a pipeline out of it, hence making it more scalable in terms of easier debugging and adding/removing new components within the pipeline.

Would love to hear suggestions and feedback.

Cheers!