Pythonのデータソートについて検証してみた

PythonにC#/LINQのOrderBy,ThenByを求め、そして断念、ついでにまとめてみた
2020.04.15

こんにちは。DA事業本部の松村です。

Pythonはまだまだ初心者ですが、これってPythonでどうやるんだろう、と興味を持っていることのひとつに、データのソートがあります。
私の一番好きな言語であるC#だと、LINQを使ってソートキーを昇順/降順指定しながらメソッドチェーンで繋げて書けるのでとても簡単です。例えば年齢の高い順、同じ年齢なら名前順にソートするならOrdeyByDescending(x => x.Age).ThenBy(x => x.Name)でOKです。さてPythonでは?

というわけで調べてみました。標準でLINQのような手触りのものがなければ、自分で作ってみたいと思います。

基本的なソート

Pythonのlist.sort()メソッドやsorted()関数では、引数keyに複数ソートキーをタプルとして返す関数を指定することで、複数キーでソートできます。こんな感じですね。

initial_list = [{"x":x, "y":y, "z":z} for x in range(2) for y in range(2) for z in range(2)]
sorted_list = sorted(initial_list, key=lambda item: (item["z"], item["y"], item["x"]))

for item in sorted_list:
    print(item)

z,y,xの順でそれぞれ昇順にソートしています。結果はこうなります。

{'x': 0, 'y': 0, 'z': 0}
{'x': 1, 'y': 0, 'z': 0}
{'x': 0, 'y': 1, 'z': 0}
{'x': 1, 'y': 1, 'z': 0}
{'x': 0, 'y': 0, 'z': 1}
{'x': 1, 'y': 0, 'z': 1}
{'x': 0, 'y': 1, 'z': 1}
{'x': 1, 'y': 1, 'z': 1}

引数reverseTrueを指定すれば、降順でもソート可能です。なるほど簡単ですね。

initial_list = [{"x":x, "y":y, "z":z} for x in range(2) for y in range(2) for z in range(2)]
sorted_list = sorted(initial_list, key=lambda item: (item["z"], item["y"], item["x"]), reverse=True)

for item in sorted_list:
    print(item)

今度はz,y,xの順にそれぞれ降順で、結果はこうです。

{'x': 1, 'y': 1, 'z': 1}
{'x': 0, 'y': 1, 'z': 1}
{'x': 1, 'y': 0, 'z': 1}
{'x': 0, 'y': 0, 'z': 1}
{'x': 1, 'y': 1, 'z': 0}
{'x': 0, 'y': 1, 'z': 0}
{'x': 1, 'y': 0, 'z': 0}
{'x': 0, 'y': 0, 'z': 0}

ただし、これだと全てのソートキーに対して降順でソートされてしまいます。昇順と降順を混ぜたい場合はどうすれば良いのでしょう。
公式ドキュメントのソート HOW TO - ソートの安定性と複合的なソートに、これに対する回答がありました。

ソートは、 安定 (stable) であることが保証されています。これはレコードの中に同じキーがある場合、元々の順序が維持されるということを意味します。
(中略)
この素晴しい性質によって複数のソートを段階的に組み合わせることができます。
(後略)

つまりzの降順、yの昇順の順番でソートしたい場合はこうすれば良いのです。

initial_list = [{"x":x, "y":y, "z":z} for x in range(2) for y in range(2) for z in range(2)]
sorted_list_wip = sorted(initial_list, key=lambda item: item["y"])
sorted_list = sorted(sorted_list_wip, key=lambda item: item["z"], reverse=True)

for item in sorted_list:
    print(item)

ソートの優先順位とは逆順にソートキーを適用していくのがポイントですね。結果はこうです。

{'x': 0, 'y': 0, 'z': 1}
{'x': 1, 'y': 0, 'z': 1}
{'x': 0, 'y': 1, 'z': 1}
{'x': 1, 'y': 1, 'z': 1}
{'x': 0, 'y': 0, 'z': 0}
{'x': 1, 'y': 0, 'z': 0}
{'x': 0, 'y': 1, 'z': 0}
{'x': 1, 'y': 1, 'z': 0}

ソートできたは良いけれど、この逆順というのはイマイチ直感的じゃない、とお感じかもしれませんね。
であれば、先程リンクした公式ドキュメントの(後略)の部分に、もう少しお手軽に書くためのラッパー関数が提示されており、ソートの優先順位と同じ順番でソートキーを記述できるように配慮されていますので、それを用いれば良いでしょう。

昇順/降順が混ざっていても一発でソートしたい!

昇順/降順が混ざったソートも、少しの準備で簡単に書けそうです。これで一安心ですね。

でも待ってください。ソートといえば、高速とされるアルゴリズムでも平均時間計算量がO(n log n)という重たい処理。Pythonではティムソートというものが採用されているとのことですが、ちょっと調べた限りそれも同様です。そんな処理を複数回実行するなんて実に非効率です。1回で済ませたいですよね。

比較関数を使う

多くの言語では、ソートの関数に比較関数を渡せるようになっていて、昇順降順入り混じったような複雑なソートは比較関数の挙動で制御することができます。

Python2ではそういった仕組みがあったものの、Python3で使えなくなってしまったようです。でもPython3.2から比較関数をlist.sort()sorted()のkey引数に変換してくれるfunctools.cmp_to_key()関数が登場しましたので、それを使えば間接的に比較関数を指定できます。

もしPython3.0や3.1を使わざるを得ない場合は?
functools.cmp_to_key()相当のものを自分で用意してあげましょう。これで良いです。

def cmp_to_key(mycmp):
    'Convert a cmp= function into a key= function'
    class K:
        def __init__(self, obj, *args):
            self.obj = obj
        def __lt__(self, other):
            return mycmp(self.obj, other.obj) < 0
        def __gt__(self, other):
            return mycmp(self.obj, other.obj) > 0
        def __eq__(self, other):
            return mycmp(self.obj, other.obj) == 0
        def __le__(self, other):
            return mycmp(self.obj, other.obj) <= 0
        def __ge__(self, other):
            return mycmp(self.obj, other.obj) >= 0
        def __ne__(self, other):
            return mycmp(self.obj, other.obj) != 0
    return K

サラッと書いていますが、上記コード含め、比較関数に関するここまでの話は、やはり公式ドキュメントのソート HOW TO - cmp パラメータを利用した古い方法に書いてあることそのままです。
確認していませんが、公式に載っているだけに、Python3.2以降のfunctools.cmp_to_key()関数の実装も上記コードと同じものなのでしょうね。比較関数をソートキーに変換するってどういうカラクリだ?と不思議に思いましたが、内部的に比較関数を呼ぶように比較演算子をオーバーロードしたラッパークラスを返すわけですね。

あれ、list.sort()sorted()のkey引数は関数でなくてはならないのに、functools.cmp_to_key()は関数じゃなくてクラスを返すんですか?
調べてみたら、Pythonではクラスは呼び出し可能型というものに分類され、クラスに対して関数呼び出し操作が行われた場合はコンストラクタが実行されるみたいです。なるほど今回の用途でクラスを返すということは、クラスのコンストラクタを返すこととほぼ同等ということですね。これで納得です。
Python 言語リファレンス 3.2.標準型の階層の配下、「呼び出し可能型 (callable type)」と更にその配下「クラス」にこのあたりの説明がありました。

少し話が逸れましたね。では比較関数を使ってソートしてみましょう。最終的な目標は比較関数自体を動的に生成することですが、まずは静的な比較関数を使って期待通りにソートできることを確認します。

比較関数は、他の多くの言語でそうであるように、比較対象の2つの値を引数とし、第一引数が第二引数よりも小さければ負の値、大きければ正の値、等しければ0を返すように実装します。
当初から何度か実行しているコードを元に、zの降順、yの昇順でソートする場合はこのようになります。

from functools import cmp_to_key

def compare(arg1, arg2):
    if arg1["z"] < arg2["z"]:
        return 1
    elif arg1["z"] > arg2["z"]:
        return -1
    else:
        if arg1["y"] < arg2["y"]:
            return -1
        elif arg1["y"] > arg2["y"]:
            return 1
        else:
            return 0

initial_list = [{"x": x, "y": y, "z": z} for x in range(2) for y in range(2) for z in range(2)]
sorted_list = sorted(initial_list, key=cmp_to_key(compare))

for item in sorted_list:
    print(item)

compareが比較関数です。zの降順でソートするために、第一引数が第二引数よりも小さいときには正の値を返すようにしています。実行結果はこちら。

{'x': 0, 'y': 0, 'z': 1}
{'x': 1, 'y': 0, 'z': 1}
{'x': 0, 'y': 1, 'z': 1}
{'x': 1, 'y': 1, 'z': 1}
{'x': 0, 'y': 0, 'z': 0}
{'x': 1, 'y': 0, 'z': 0}
{'x': 0, 'y': 1, 'z': 0}
{'x': 1, 'y': 1, 'z': 0}

狙い通り、zの降順、yの昇順でソートされていますね。

比較関数の挙動が掴めましたので、今度はソート2回と比較関数使用でパフォーマンスの違いを見てみたいと思います。
実行環境はMacBook Pro (13-inch, 2019, Four Thunderbolt 3 ports)/第8世代クアッドコア Core i5 2.4GHz/Python3.8.2です。

データ件数を100万件に増やして、同じくzの降順、yの昇順でソートしてみましょう。
まずは愚直に2回ソートする版から。

import random
from time import perf_counter

initial_list = [{"x": x, "y": y, "z": z} for x in range(100) for y in range(100) for z in range(100)]
random.seed(0)
random.shuffle(initial_list)

start_time =perf_counter()
sorted_list_wip = sorted(initial_list, key=lambda item: item["y"])
sorted_list = sorted(sorted_list_wip, key=lambda item: item["z"], reverse=True)
print("duration:",perf_counter() - start_time, "sec.")

最初に作ったリストはある程度ソートされてしまっているので、一旦シャッフルしています。
3回実行した結果がこちらです。

duration: 0.937465303 sec.
duration: 0.9072027410000001 sec.
duration: 0.9315092679999999 sec.

およそ0.9秒強といったところでしょうか。比較関数を使って2回のソートを1回に抑えれば、半分の約0.45秒程度で終わることが期待できますね。以下のコードで計測してみましょう。

import random
from functools import cmp_to_key
from time import perf_counter

def compare(arg1, arg2):
    if arg1["z"] < arg2["z"]:
        return 1
    elif arg1["z"] > arg2["z"]:
        return -1
    else:
        if arg1["y"] < arg2["y"]:
            return -1
        elif arg1["y"] > arg2["y"]:
            return 1
        else:
            return 0

initial_list = [{"x": x, "y": y, "z": z} for x in range(100) for y in range(100) for z in range(100)]
random.seed(0)
random.shuffle(initial_list)

start_time = perf_counter()
sorted_list = sorted(initial_list, key=cmp_to_key(compare))
print("duration : ", perf_counter() - start_time, "sec.")

3回実行した結果がこちらです。

duration: 8.227075562 sec.
duration: 8.184288013 sec.
duration: 8.135030636 sec.

???え???

半分になるどころか、逆に遅くなっています。それも先程の結果と比較してどうこうではなく、単純に遅すぎてちょっと実用をためらうレベルです。正直目を疑ったのですが、コードを見直してみても、意図に反して無駄な処理をしていそうな箇所は見当たりません。
ソートの回数を抑えたのに一体なぜ?比較関数が遅いから?でも大したことはしていないし、そもそも関数呼び出し自体がオーバーヘッドだというなら、2回ソートする版でもkeyに渡したラムダ式は実行されるわけで、しかもこっちはO(n log n)が2回なわけだから(注:予想外の結果に冷静さを失ってこう考えてしまいましたが、誤りです)、説明がつかなくない??

と、こんな結果は納得できないので調べてみたら、公式ドキュメントではないですが、stack overflowでこんなポストを見つけました。

このAnswersの2つめにこんな記述があります。

The keys are calculated not instead of calling cmp, but before it. Calculating the keys takes O(n), but the sorting takes O(n log(n)), so the overall complexity of sorting is still O(n log(n)).

keyに渡した関数は、比較時ではなく、iterableから比較用の値を取り出すために各要素につき1回ずつ実行される、つまり計算量は0(n)であると。今読み返すと当たり前のことですが、最初に見たときはまだ意味が理解できず、ついこんなコードでkey関数の実行回数を数えてしまいました。

import random

i = 0

def get_sort_key_y(item):
    global i
    i += 1
    return item["y"]

def get_sort_key_z(item):
    global i
    i += 1
    return item["z"]

initial_list = [{"x": x, "y": y, "z": z} for x in range(100) for y in range(100) for z in range(100)]
random.seed(0)
random.shuffle(initial_list)

sorted_list_wip = sorted(initial_list, key=get_sort_key_y)
sorted_list = sorted(sorted_list_wip, key=get_sort_key_z, reverse=True)
print("execution count:", i, "times.")

2回ソートする版をベースにしています。key関数にカウントアップ処理を入れるため、見た目の都合でラムダ式ではなく普通の関数にしています。
結果はこちらです。

execution count: 2000000 times.

100万件のソートが2回なので、キー取得関数の実行関数は200万回です。この数字を自分の目で見て、ようやくstack overflowに書かれていることを理解しました。

これに対し、比較関数を使用する版では、まずkey関数が100万回実行され(コンストラクタなので、こちらもディクショナリアクセス200万回のコストを上回っていそうですね)、ソート処理で改めて比較関数がO(n log n)回、nが100万だから概ね2000万回近く実行されます。

list.sort()sorted()のソートアルゴリズム自体はC言語で記述され、ネイティブで動作するそうです。一方で比較関数、すなわち素のPythonで書かれたコードが私の想像以上に遅いのだとすると、ソートの実行回数を減らしたところで、それで削減した処理時間は膨大な回数の比較関数実行(と、もしかするとPythonとネイティブのコンテキストスイッチのオーバーヘッドも?)であっさり消し飛んでしまう、という理屈で辻褄が合います。
Python3で直接比較関数を渡せなくなった理由は、一見正統かつ便利でありながら、実はパフォーマンス上の重大な問題を抱えていたから、というのものそのひとつなのかもしれませんね。そうは言っても互換性の問題で仕方なくPython3.2からfunctools.cmp_to_key()を標準搭載した、という流れでしょうか。

長くなりましたが、どうやらPythonでは比較関数によるソートは非推奨、と捉えるべきでしょう。ですので、当初構想していた比較関数を動的に作成する、というのも無意味なのでやりません。
こんなふうにLINQっぽくメソッドチェーンしたいな、と思っていましたが、性能を大幅に犠牲にしてまでやることではありません。

sorted_list = sorted(
    initial_list,
    key=cmp_to_key(
        order_by_descending(lambda item: item["z"])
        .then_by(lambda item: item["y"])
        .get_comparer()
    )
)

ソートキーを工夫する

比較関数は使うべきでないことがわかりました。したがって、昇順/降順が入り混じったソートを行うときに、list.sort()sorted()を複数回実行しなければいけない問題は解決されていないままです。
比較関数を使うよりも遥かにましとはいえ、何度もソートを実行するのはやはり無駄なので避けたいところです。何かいい方法はないでしょうか。

全てのソートキーを昇順か降順のどちらかに統一してしまうのはどうでしょう。数値であれば、正負の符号を反転させてから昇順にソートし、後で符号を元に戻せば、降順でソートしたのと同じ結果を得ることができます。日付型なら一度UNIX時間などに変換すればいいでしょう。文字列型は...降順にソートすることは滅多になさそうなので、ここでは考えないことにします。

実際にやってみましょう。ソートキーは元のiterableには影響を与えることなく取得できますので、前段の「符号を元に戻す」という処理は不要です。

initial_list = [{"x":x, "y":y, "z":z} for x in range(2) for y in range(2) for z in range(2)]
sorted_list = sorted(initial_list, key=lambda item: (-item["z"], item["y"]))
 
for item in sorted_list:
    print(item)

ソートキーのうち、zのみ符号を反転していることに注目してください。結果はこうなります。

{'x': 0, 'y': 0, 'z': 1}
{'x': 1, 'y': 0, 'z': 1}
{'x': 0, 'y': 1, 'z': 1}
{'x': 1, 'y': 1, 'z': 1}
{'x': 0, 'y': 0, 'z': 0}
{'x': 1, 'y': 0, 'z': 0}
{'x': 0, 'y': 1, 'z': 0}
{'x': 1, 'y': 1, 'z': 0}

zの降順、yの昇順でソートされていますね。動きはOKです。
再び、データを100万件に増やして同じキーでソートし、処理時間を計測してみます。

import random
from time import perf_counter

initial_list = [{"x":x, "y":y, "z":z} for x in range(100) for y in range(100) for z in range(100)]
random.seed(0)
random.shuffle(initial_list)

start_time = perf_counter()
sorted_list = sorted(initial_list, key=lambda item: (-item["z"], item["y"]))
print("duration:", perf_counter() - start_time, "sec.")

今度はソート1回で完結し、コスト高な独自の比較関数もありません。愚直に2回ソートする版がおよそ0.9秒強でしたが、きっとこれより良好な結果を叩き出してくれるでしょう。
では3回実行します。結果はこちら。

duration: 1.7119310689999998 sec.
duration: 1.7188602279999998 sec.
duration: 1.7353343780000001 sec.

うーん。。。またも期待を裏切られてしまいました。比較関数を使う版よりはだいぶ改善されていますが、2回ソートする版よりもまだ倍近く遅いです。
ソートキー取得に関しては、こちらはタプルを生成して返しているので、そこだけに着目すれば愚直に2回ソートする版よりも高コストだというのは想像できますが、その影響がここまで大きいとは。予想外ですが、事実です。

もう少しソートキーを工夫...できなかった

まだ試していないことがあるのを思い出しました。ひとつ上のコードではソートキーを取得するのにラムダ式を使っていますが、代わりにoperatorモジュール関数というものも使えます。これまで何度か出てきたソート HOW TO - operator モジュール関数によると、

上述した key 関数のパターンはとても一般的です、 そのため、Python は高速で扱いやすいアクセサ関数を提供しています。 operator モジュールには itemgetter(), attrgetter() そして methodcaller() 関数があります。

これらの関数を利用すると、上の例はもっと簡単で高速になります:

だそうです。「上述した key 関数のパターン」や「上の例」というのは、ラムダ式でタプルを返すことを指しています。
では懲りずにこれで計測...したかったのですが、昇順/降順を混在させるために、特定要素の符号を反転させる方法がわかりません。operatorモジュールのリファレンスを見てもできそうにありません。

operatorモジュールを使ったソートの性能

operatorモジュールを昇順/降順入り混じったソートに適用することは諦めましたが、これを使ったソートの性能は計測しておきます。
なぜなら、複数ソートキーで昇順/降順が統一されている場合であれば、operatorモジュールの性能次第では複数回ソートする版よりも高速な可能性を捨てきれないためです。とはいえ正直期待していませんが、念のため確認します。
同じように100万件でやってみましょう。

import random
from operator import itemgetter
from time import perf_counter

initial_list = [{"x":x, "y":y, "z":z} for x in range(100) for y in range(100) for z in range(100)]
random.seed(0)
random.shuffle(initial_list)

start_time = perf_counter()
sorted_list = sorted(initial_list, key=itemgetter("z", "y"))
print("duration:", perf_counter() - start_time, "sec.")

ソート順はzの昇順、yの昇順です。3回実行した結果はこちら。

duration: 1.310642186 sec.
duration: 1.306707809 sec.
duration: 1.302523648 sec.

やはり愚直に2回ソートする版には及びませんでしたが、ソートキーをラムダ式で作る版よりはそこそこ速くなっていますね。
...と言いたいところですが、先ほどはzの降順、yの昇順だったのに対し、今回はzの昇順、yの昇順と、ソートキーが異なっています。ソートキーの違いにより、そもそも実質計算量が異なってしまうことはありえるので、公平を期すためにソートキーをラムダ式で作る版もzの昇順、yの昇順で計測し直します。

import random
from time import perf_counter

initial_list = [{"x":x, "y":y, "z":z} for x in range(100) for y in range(100) for z in range(100)]
random.seed(0)
random.shuffle(initial_list)

start_time = perf_counter()
sorted_list = sorted(initial_list, key=lambda item: (item["z"], item["y"]))
print("duration:", perf_counter() - start_time, "sec.")

今度はzの符号反転をしていません。3回実行した結果はこちら。

duration: 1.359291545 sec.
duration: 1.3597131639999998 sec.
duration: 1.358856213 sec.

やはり先程の速度向上は、operatorモジュールのおかげというよりはソートキーの違いによる影響が大きかったようですね。今回の元データの並びに対しては、計算量が減る方向に作用したのでしょう。
ソートキーを同じにした結果、operatorモジュールを使う方がわずかに速いことがわかりましたが、ほんの数%の差なので、この程度なら好きな方を使えば良いのではないでしょうか。

愚直に2回ソートする版の再計測は省略します。ソートキーの違いでもっと速くなるのであれば、operatorモジュールを使用する版が敵わないのは明らかだからです。
結局のところ、ソートキーが複数ある場合において、基本的には愚直に複数回ソートするのが直感に反して実は性能が頭一つ抜けており、他の方法を採用する余地は少ない、と言えるでしょう。

ソートキーが多い場合はどうなるか

とはいえソートキーがもっと多いのであれば、もしかするとソートキー生成するよりもソートを何度も実行する方が高コストになることもあるかもしれません。思い切ってソートキーを6つに増やして試してみましょう。

まずは愚直に6回ソートする版から。

import random
from time import perf_counter

initial_list = [
    {"u": u, "v": v, "w": w, "x": x, "y": y, "z": z}
    for u in range(10) for v in range(10) for w in range(10)
    for x in range(10) for y in range(10) for z in range(10)
]
random.seed(0)
random.shuffle(initial_list)

start_time =perf_counter()
sorted_list_wip1 = sorted(initial_list, key=lambda item: item["z"])
sorted_list_wip2 = sorted(sorted_list_wip1, key=lambda item: item["y"])
sorted_list_wip3 = sorted(sorted_list_wip2, key=lambda item: item["x"])
sorted_list_wip4 = sorted(sorted_list_wip3, key=lambda item: item["w"])
sorted_list_wip5 = sorted(sorted_list_wip4, key=lambda item: item["v"])
sorted_list = sorted(sorted_list_wip5, key=lambda item: item["u"])
print("duration:",perf_counter() - start_time, "sec.")

3回実行した結果はこちら。

duration: 2.401714868 sec.
duration: 2.32521275 sec.
duration: 2.31811426 sec.

次はoperatorモジュールでソートキーを生成する版です。

import random
from operator import itemgetter
from time import perf_counter

initial_list = [
    {"u": u, "v": v, "w": w, "x": x, "y": y, "z": z}
    for u in range(10) for v in range(10) for w in range(10)
    for x in range(10) for y in range(10) for z in range(10)
]
random.seed(0)
random.shuffle(initial_list)

start_time =perf_counter()
sorted_list = sorted(initial_list, key=itemgetter("u", "v", "w", "x", "y", "z"))
print("duration:",perf_counter() - start_time, "sec.")

3回実行した結果はこちら。

duration: 1.8482623999999999 sec.
duration: 1.8492553959999998 sec.
duration: 1.8653135120000002 sec.

ついに逆転しましたね。ソートキー6個という要件は珍しいけれどなくはない、ぐらいの頻度でしょうか。一応気に留めておいたほうが良さそうです。
実際の性能分岐点は、ソートキーだけでなくデータの件数やその元々の並びにも影響してくるかと思いますので、ここではこれ以上深追いしません。必要があれば実データで実際に比較してみてください。

まとめ

今回の検証で、Pythonのソートについて以下のことがわかりました。

  • 比較関数のことは忘れるべし
  • ソートキーが複数あっても、安定ソートという性質を利用して愚直に複数回ソートするのが多くの場合最速
  • ただしソートキーが多数ある場合は、予めソートキーを作って一発でソートしたほうが速い可能性があるので要検証
  • ソートキーに昇順/降順が混在している場合は、ソートキー作成時に降順要素の符号を反転させれば良い

今回は標準機能だけで検証しましたが、もしかするとサードパーティーのモジュールの中には、複数キーによるソートを高速に行えるものがあるかもしれません。そのようなものを見つけたらまた紹介したいと思います。

最後に

本当は比較関数を動的に作ってサクッと終わる予定でしたが、何かするたびに想定外のことが起こってはその理由を掘り下げて、でめっちゃ疲れました。でもいい勉強になりました。

次回はNode.jsで同じネタをやろうと思います。既に検証はできていて、今度は比較関数を動的に作って本当にサクッと終わります。お楽しみに!(?)