Python: Introducing ppipe : Parallel Pipe

I’ll speak about my pipe python module so if you didn’t know it, you should first read the first aricle about pipes

The idea behind ppipe (parallel pipe) is to transparently make a pipe async and or multithreaded. As multithreading isn’t an easy piece of code for everybody, with loads of pollutions like locks, queues, giving code far away from the actual simple task you tried to do…

The idea is that one kind of multithreading can be nicely handled with a simple design pattern well implemented in python : the queue. The queue handles all the locking part so you don’t have to worry about it, just make your workers work, enqueue, dequeue, work … but you still have to create workers !
As pipe is not far away from the concept of queue, as, every part of a pipe command works on a piece of data and then git it to the next worker, it’s not hard to imagine an asynchronous pipe in which every part of a pipe command can work at the same time. Then it’s not hard to imagine that n threads can be started for a single step of a pipe command, leading to a completly multithreaded application having ~0 lines of code bloat for the tread generation / synchronization in your actual code.

So I tried to implement it, keeping the actual contract which is very simple that is : Every pipe should take an iterable as input. (I was tempted to change it to ‘every pipe must take a Queue as input’ … but if I don’t change the contract, normal pipes and parallel pipes should be mixed.), so I created a branch you’ll found on github with a single new file ‘ppipe.py’ that, actually, is not ‘importable’ it’s only a proof of concept, that can be launched.

Here is the test I wrote using ppipe :

print "Normal execution :"
xrange(4) | where(fat_big_condition1) \
          | where(fat_big_condition2) \
          | add | lineout
 
print "Parallel with 1 worker"
xrange(4) | parallel_where(fat_big_condition1) \
          | where(fat_big_condition2) \
          | add | lineout
 
print "Parallel with 2 workers"
xrange(4) | parallel_where(fat_big_condition1, qte_of_workers=2) \
          | parallel_where(fat_big_condition2, qte_of_workers=2) | add | stdout
 
print "Parallel with 4 workers"
xrange(4) | parallel_where(fat_big_condition1, qte_of_workers=4) \
          | parallel_where(fat_big_condition2, qte_of_workers=4) | add | stdout

The idea is to compare normal pipe (Normal execution) with asynchronous pipe (Parallel with 1 worker), as 1 worker is the default, and then 2 and 4 workers that can be given to a ppipe using ‘qte_of_workers=’.

fat_big_condition1 and 2 are just f*cking long running piece of code like fetching something far far away in the internet … but for our tests, let’s use time.sleep :

def fat_big_condition1(x):
    log(1, "Working...")
    time.sleep(2)
    log(1, "Done !")
    return 1
 
def fat_big_condition2(x):
    log(2, "Working...")
    time.sleep(2)
    log(2, "Done !")
    return 1

They always return 1 … and they log using a simple log function that make fat_big_condition1 to log in the left column and fat_big_condition2 to log in the right column :

stdoutlock = Lock()
def log(column, text):
    stdoutlock.acquire()
    print ' ' * column * 10,
    print str(datetime.now().time().strftime("%S")),
    print text
    stdoutlock.release()

And that is the output (integers are the current second, so the times didn’t start at 0…):

Normal execution :
           57 Working...
           59 Done !
                     59 Working...
                     01 Done !
           01 Working...
           03 Done !
                     03 Working...
                     05 Done !
           05 Working...
           07 Done !
                     07 Working...
                     09 Done !
           09 Working...
           11 Done !
                     11 Working...
                     13 Done !

// As you can see here, only one condition is executed at a time,
// that is a normal behavior for a non-threaded program.

Parallel with 1 worker
           13 Working...
           15 Done !
                     15 Working...
           15 Working...
                     17 Done !
           17 Done !
           17 Working...
                     17 Working...
                     19 Done !
           19 Done !
           19 Working...
                     19 Working...
                     21 Done !
           21 Done !
                     21 Working...
                     23 Done !

// Just adding parallel_ to the first where, you now see that it's
// asynchronous and that the two conditions can work at the
// same time, interlacing a bit the output.

Parallel with 2 workers
           23 Working...
           23 Working...
           25 Done !
           25 Working...
           25 Done !
           25 Working...
                     25 Working...
                     25 Working...
           27 Done !
           27 Done !
                     27 Done !
                     27 Working...
                     27 Done !
                     27 Working...
                     29 Done !
                     29 Done !


Parallel with 4 workers
           55 Working...
           55 Working...
           55 Working...
           55 Working...
           57 Done !
           57 Done !
           57 Done !
           57 Done ! 
                     57 Working...
                     57 Working...
                     57 Working...
                     57 Working...
                     59 Done !
                     59 Done !
                     59 Done !
                     59 Done !

// And now with 2 and 4 workers you can clearly see what
// happens, with 2 workers, input is computed by pairs,
// and with 4 threads, all the input can be computed at once
// but the 4 workers of the 2nd condition have to wait the data
// before starting to work, so in the last test, you have 8 threads,
// only the 4 firsts are working the 2 first second, then only the 4
// others works.

To make the creation of ppipe simple, I excluded all the ‘threading’ part in a function usable as a decorator, so writing a parallel_where give :

@Pipe
@Threaded
def parallel_where(item, output, condition):
    if condition(item):
        output.put(item)

You can see the queue here ! :-)

Enjoy !

Posted in Python | 3 Comments

Python: Consulting PEPs from command line, while being offline

One day I wished I could read PEPs in the tube on my laptop… so I searched for a convenient way to do so, and I didn’t found …

So I wrote a very simple shell script you can found here : https://github.com/julienpalard/pep

I’m currently packaging it for Debian and trying to put it on PyPI, (help is welcome :-) I don’t have so much time) but you can make it work just by downloading the script here : https://github.com/JulienPalard/pep/raw/master/pep

The script is very simple to use, first you can change a bit its configuration, typically change the LOCAL_PEP_PATH to something writeable for a use to don’t have to be root to update you PEPs (As running an unknown script as root is a bit scary )

Then you should run `./pep upgrade with progress` to download PEPs to you LOCAL_PEP_PATH, and then just do a ‘./pep 8′ to read the PEP 8 !

Others features include searching using regex, you can read about it here reading the README the -h or the manpage.

Hope you’ll enjoy it !

Posted in Python | 1 Comment

echo and backslash-escaped caracters / new lines: how to write portable scripts ?

While writing shell scripts you are using a lot of ‘echo’ but did you think about portability of this simple statement ?

Can you say what will be diplayed, without testing, on your shell, the following tests :

echo \n \c '\n' '\c' "\n" "\c"
echo -e \n \c '\n' '\c' "\n" "\c"

?

I can’t, cause I know that the echo behavior is very implementation dependent, typically in dash, echo -e foo actually print ‘-e foo’ cause the dash’s echo don’t parses any options …

Here is the bug I found on one of my shell scripts, simplified to this 9 bytes shell scripts :

$ cat /tmp/test.sh
#!/bin/sh
echo "$*"
$ /tmp/test.sh '1\n2'
1
2

I’m running Debian Squeeze so my sh is a dash, and the ‘\n’ is interpreted by the dash’s echo … but I don’t want it !
The only portable workaround I found is :

$ cat /tmp/test.sh
#!/bin/sh
printf "%s\n" "$*"
$ /tmp/test.sh '1\n2'
1\n2

Conclusion: Keep a look at your input, if you don’t want backslash-escaped chars to be interpreted and want to be portable, use printf !
You can keep using echo when you have a full control on the input, so the sh Hello World will forever stay :

echo "Hello world"
Posted in Shell Script | Leave a comment

[BASHISM] How to generate random number without $RANDOM ?

There is a common bashism, that is to use $RANDOM. Typically you’ll have to remove bashism while rewriting your scripts for dash or sh.

There is a lot of solutions to generate random numbers in many different ways, but I’m not here to pollute the internet demonstrating 42 differents ways to do this, i’ll only show you the one I think is the best, and let you comment it if you can do better :-)

My solution is to replace $RANDOM by :

$(($(dd if=/dev/urandom count=1 2> /dev/null | cksum | cut -d' ' -f1) % 32768))

Pros :

  • Not so long
  • Yield a different number each call (not all replacement for $RANDOM do)
  • Excatly like in bash, it yield a number between 0 and 32767

Cons :

  • Longer than $RANDOM :p
  • /dev/urandom is not present on every systems …

You should simplify it if the [0, 32767] rule has no importance for you, typically if you want a number between 0 and 10, don’t do … % 32768)) % 11)) …

Posted in Shell Script | 3 Comments

Pipe: Infix syntax for Python

Pipe is a Python module enabling infix syntax in Python.
For those asking “Why ?” let’s take an example :

Compare the readability of the classical prefix syntax :

sum(select(where(take_while(fib(), lambda x: x < 1000000) lambda x: x % 2), lambda x: x * x))

And the infix syntax :

fib() | take_while(lambda x: x < 1000000) \
      | where(lambda x: x % 2) \
      | select(lambda x: x * x) \
      | sum()

Isn’t the infix syntax more readable ?

The base class of Pipe is kept simple (7 lines of python) and is usable as a decorator permitting you to create new ‘pipeable’ functions easily. The module provides ~30 prepared pipes functions like ‘where’, ‘group_by’, ‘sort’, ‘take_while’ … A pipeable function takes an iterable (tuple, list, generator) and yields to be itself an iterator, so pipeable function can be piped together.

Let me introduce the basic usage of the Pipe module, then I’ll write some bits on how to build new ones :

To start, get it from PyPI http://pypi.python.org/pypi/pipe/1.3 and install it, open a REPL, import pipe, and play :

Python 2.6.6 (r266:84292, Dec 26 2010, 22:31:48) 
[GCC 4.4.5] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from pipe import *
>>> [1, 2, 3, 4, 5] | add
15
>>> [5, 4, 3, 2, 1] | sort
[1, 2, 3, 4, 5]

Until here it’s easy, to know more about available pipes, just read the help(pipe) in the REPL, all are explained with an example as a doctest

Now as we know that pipeable functions use iterables, we can try to pipe together two or more pipeables :

>>> [1, 2, 3, 4, 5] | where(lambda x: x % 2) | concat
'1, 3, 5'
>>> [1, 2, 3, 4, 5] | where(lambda x: x % 2) | tail(2) | concat
'3, 5'
>>> [1, 2, 3, 4, 5] | where(lambda x: x % 2) | tail(2) | select(lambda x: x * x) | concat
'9, 25'
>>> [1, 2, 3, 4, 5] | where(lambda x: x % 2) | tail(2) | select(lambda x: x * x) | add
34

Now, a bit about lazyness, as Pipe use iterables, the evaluation of a whole Pipe is lazy, so we can play with infinite generators like this one :

>>> def fib():
...    a, b = 0, 1
...    while True:
...        yield a
...        a, b = b, a + b

Now we can do every kind of stuff into the fibonacci sequence, like solving the 2nd problem of http://projecteuler.net in a readable one liner :

Find the sum of all the even-valued terms in Fibonacci which do not exceed four million.

>>> euler2 = fib() | where(lambda x: x % 2 == 0) | take_while(lambda x: x < 4000000) | add
>>> assert euler2 == 4613732

Isn it pretty ?

Let now see how to create new pipeable functions using the @pipe decorator :
You want to create a function that yields the first x elements from its input
You want its usage to be (1, 2, 3, 4, 5) | take(2) to take the fist 2 elements.
I know that you are thinking about a basic implementation like :

def take(iterable, qte):
    for item in iterable:
        if qte > 0:
            qte -= 1
            yield item
        else:
            return

Right ? You take an iterable, a qantity, and while the quantity is not reached, you just have to yield ?
OK, just add @pipe to you take function and it’s pipeable :-)

Posted in Code, Python | 42 Comments