【処理高速化】パイプライン化による並列処理の考え方・手順・実装方法

2020.07.30

カフェチームの山本です。

エッジデバイスでプログラムを動かしていると、ハードウェアの性能があまり高くないこともあり、所望の処理速度を得られないことが多いです。

今回は、こうした状況に対して、処理をパイプライン化することによって、全体の処理速度を高速化することを目指します。自分の実装したプログラムを例に、考え方・手順・実装方法(Python)について記載します。

ステップ0.スクリプトを用意する・状況を把握する

ステップ0-1.処理概要

今回は以下のようなスクリプトを利用します。処理の概要は以下のとおりです。よくあるような、学習モデルを利用して物体を検出するスクリプトです。今回のスクリプトでは、カメラを複数接続し、同時に複数の画像を撮影して処理をしています。

前処理ではカメラとモデルをセットアップしています。ループ中の処理の流れとしては以下のとおりです。

  • 画像取得:RealSenseというカメラデバイスから画像を取得します
  • 前処理(変換):学習モデルに入力サイズに合わせるために、画像サイズを縮小します
  • モデルで推論処理:学習モデルに画像を入力し、GPUで推論処理を行います
  • 後処理(骨格検出):学習モデルの出力を変換し、骨格を検出します
  • 結果を送信:検出結果をクラウドに送信します(今回のスクリプトでは、どこにも送信せず、単に結果を画面に表示するだけになっています)

(importしている各モジュールの処理の詳細については、割愛させていただきます。)

これらの処理が直列につながっているため、全体の処理時間は、すべての処理にかかる時間の合計になります。

import realsense_utils
import cubemos_model_utils
import dlr_model_utils
import cafe_utils

N_REALSENSE_DEVICES = 2
CAPTURE_WIDTH = 640
CAPTURE_HEIGHT = 480
CAPTURE_FPS = 30

MODEL_FOLDER_PATH = "../compiled_512/"
MODEL_INPUT_SHAPE = 512
MODEL_INPUT_WIDTH = MODEL_INPUT_SHAPE
MODEL_INPUT_HEIGHT = MODEL_INPUT_SHAPE

def main(
    n_realsense_devices, capture_width, capture_height, capture_fps, 
    model_folder_path, model_input_width, model_input_height
):
    pipelines = realsense_utils.setup_realsense_cameras(n_realsense_devices, capture_width, capture_height, capture_fps)
    # check pipelines
    if not len(pipelines) == n_realsense_devices:
        print(f"Not Found {n_realsense_devices} Reasence devices")
        print(f"Found only {len(pipelines)} devices")
        return
    # test pipelines by getting 1 frame
    for pipeline in pipelines:
        RGB_image, depth_image = realsense_utils.get_images(pipeline)

    model = dlr_model_utils.setup_model(model_folder_path)

    while True:
        # 画像取得
        image_sets = []
        for pipeline in pipelines:
            RGB_image, depth_image = realsense_utils.get_images(pipeline)
            image_sets.append((RGB_image, depth_image))

        # 前処理(変換)
        RGB_images_conv = []
        for RGB_image, _ in image_sets:
            RGB_image_conv = cubemos_model_utils.convert_pre(RGB_image, model_input_width, model_input_height)
            RGB_images_conv.append(RGB_image_conv)

        # モデルで推論処理
        outputs = []
        for RGB_image_conv in RGB_images_conv:
            output = dlr_model_utils.infer(model, RGB_image_conv)
            outputs.append(output)

        # 後処理(骨格検出)
        skeletons = []
        for output in outputs:
            pose_keypoints_coco_list, scores_list = cubemos_model_utils.convert_post(output)
            pose_keypoints_coco = pose_keypoints_coco_list[0]
            scores = scores_list[0]
            skeletons.append((pose_keypoints_coco, scores))

        # 結果を送信
        for skeleton in skeletons:
            cafe_utils.send(skeletons)

if __name__ == "__main__":
    main(
        N_REALSENSE_DEVICES, CAPTURE_WIDTH, CAPTURE_HEIGHT, CAPTURE_FPS,
        MODEL_FOLDER_PATH, MODEL_INPUT_WIDTH, MODEL_INPUT_HEIGHT,
    )

ステップ0-2.目標を設定する

今回は、骨格から人の行動を把握することも目標とするために、8[fps]を目標とします。

ステップ0-3.問題を把握する

目標どおりでないことを確認します。上のスクリプトを実行すると、5~6[fps]程度であり、目標からは少し遅く、十分な速度ではありませんでした。

この問題に対して、次節以降の手順でパイプライン化していきます。

もちろん他の方法で高速化できるようであれば、そちらも検討すると良いと思います。

ステップ1.前準備

パイプライン化する前に、プログラムの動作を把握します。まずプログラムを整理し、それぞれの処理の速度を計測します。

ステップ1-1.処理を分割する

各処理をそれぞれ関数として分けておきます。これは、1-2で処理時間を計測するためと、2で処理をパイプライン化するためです。

import realsense_utils
import cubemos_model_utils
import dlr_model_utils
import cafe_utils

N_REALSENSE_DEVICES = 2
CAPTURE_WIDTH = 640
CAPTURE_HEIGHT = 480
CAPTURE_FPS = 30

MODEL_FOLDER_PATH = "../compiled_512/"
MODEL_INPUT_SHAPE = 512
MODEL_INPUT_WIDTH = MODEL_INPUT_SHAPE
MODEL_INPUT_HEIGHT = MODEL_INPUT_SHAPE

def pre_proc(
    n_realsense_devices, capture_width, capture_height, capture_fps,
    model_folder_path
):
    pipelines = realsense_utils.setup_realsense_cameras(n_realsense_devices, capture_width, capture_height, capture_fps)
    # check pipelines
    if not len(pipelines) == n_realsense_devices:
        print(f"Not Found {n_realsense_devices} Reasence devices")
        print(f"Found only {len(pipelines)} devices")
        return
    # test pipelines by getting 1 frame
    for pipeline in pipelines:
        RGB_image, depth_image = realsense_utils.get_images(pipeline)

    model = dlr_model_utils.setup_model(model_folder_path)

    return pipelines, model

def get_images(pipelines):
    image_sets = []
    for pipeline in pipelines:
        RGB_image, depth_image = realsense_utils.get_images(pipeline)
        image_sets.append((RGB_image, depth_image))

    return image_sets

def convert_pre(image_sets, model_input_width, model_input_height):
    RGB_images_conv = []
    for RGB_image, _ in image_sets:
        RGB_image_conv = cubemos_model_utils.convert_pre(RGB_image, model_input_width, model_input_height)
        RGB_images_conv.append(RGB_image_conv)

    return RGB_images_conv

def model_infer(RGB_images_conv, model):
    outputs = []
    for RGB_image_conv in RGB_images_conv:
        output = dlr_model_utils.infer(model, RGB_image_conv)
        outputs.append(output)

    return outputs

def convert_post(outputs):
    skeletons = []
    for output in outputs:
        pose_keypoints_coco_list, scores_list = cubemos_model_utils.convert_post(output)
        pose_keypoints_coco = pose_keypoints_coco_list[0]
        scores = scores_list[0]
        skeletons.append((pose_keypoints_coco, scores))

    return skeletons

def send(skeletons):
    for skeleton in skeletons:
        cafe_utils.send(skeletons)

def loop_proc(pipelines, model, model_input_width, model_input_height):
    # 画像取得
    image_sets = get_images(pipelines)

    # 前処理(変換)
    RGB_images_conv = convert_pre(image_sets, model_input_width, model_input_height)

    # モデルで推論処理
    outputs = model_infer(RGB_images_conv, model)

    # 後処理(骨格検出)
    skeletons = convert_post(outputs)

    # 結果を送信
    send(skeletons)

def main(
    n_realsense_devices, capture_width, capture_height, capture_fps, 
    model_folder_path, model_input_width, model_input_height
):
    pipelines, model = pre_proc(
        n_realsense_devices, capture_width, capture_height, capture_fps,
        model_folder_path
    )

    while True:
        loop_proc(pipelines, model, model_input_width, model_input_height)

if __name__ == "__main__":
    main(
        N_REALSENSE_DEVICES, CAPTURE_WIDTH, CAPTURE_HEIGHT, CAPTURE_FPS,
        MODEL_FOLDER_PATH, MODEL_INPUT_WIDTH, MODEL_INPUT_HEIGHT,
    )

ステップ1-2.時間計測するための用意をする

今回は、関数デコレータを利用しました。他にも、以下のような手段があると思います。

  • プロファイラ
  • デバッガ
  • time.time()

デコレータだと、timeのように各関数の内部を変更する必要がなく処理を共通化でき、プロファイラやデバッガのように環境構築することなく使用できるため、利用しやすいです。

実装した時間計測用のラッパは、以下のとおりです。stop_watchをデコレータとしてつけた関数の経過時間を計測し、計測した時間を関数名をキーとしてtime_resultに保存します。show_time_resultで計測した時間を表示し、reset_time_resultでクリアします。表示はmain_funcで指定した関数以外は、インデントして表示するようにしています。(このあたりは計測したいスクリプトの処理によって変更すると良いと思います。)

こちらの方のプログラムをもとに作成しました。

from functools import wraps
from operator import attrgetter
import time

class TimeMeasure:
    time_result = {}

    @classmethod
    def stop_watch(cls, func) :
        time_result = cls.time_result

        @wraps(func)
        def wrapper(*args, **kargs) :
            start = time.time()
            result = func(*args,**kargs)
            elapsed_time =  time.time() - start

            if not func.__name__ in time_result:
                time_result[func.__name__] = []
            time_result[func.__name__].append(elapsed_time)

            return result

        return wrapper

    @classmethod
    def show_time_result(cls, main_func, sub_funcs=None):
        time_result = cls.time_result

        main_func_name = main_func.__name__
        time_func = sum(time_result[main_func_name])
        print("{}: {:.3f}[s]={:.1f}[fps]".format(main_func_name, time_func, 1.0/time_func))

        if sub_funcs is None:
            sub_func_names = set(time_result.keys()) - set([main_func.__name__])
        else:
            sub_func_names = list(map(lambda func: func.__name__, sub_funcs))

        max_len_names = max(map(lambda name: len(name), sub_func_names))

        for sub_func_name in sub_func_names:
            time_func = sum(time_result[sub_func_name])
            format_str = "  {:<" + str(max_len_names) + "}: {:.3f}[s]"
            print(format_str.format(sub_func_name, time_func) )

    @classmethod
    def reset_time_result(cls):
        time_result = cls.time_result
        time_result.clear()

ステップ1-3.処理時間を計測する

各関数の処理時間を計測し、ボトルネックとなっている箇所を確認します。以下のように計測したい関数にデコレータとして追加し、ループ処理の最後で計測結果を表示します。(前と同じ箇所は省略しています)

...
import time_measure_utils
...

def pre_proc(
    ...

@time_measure_utils.TimeMeasure.stop_watch
def get_images(pipelines):
    ...

@time_measure_utils.TimeMeasure.stop_watch
def convert_pre(image_sets, model_input_width, model_input_height):
    ...

@time_measure_utils.TimeMeasure.stop_watch
def model_infer(RGB_images_conv, model):
    ...

@time_measure_utils.TimeMeasure.stop_watch
def convert_post(outputs):
    ...

@time_measure_utils.TimeMeasure.stop_watch
def send(skeletons):
    ...

@time_measure_utils.TimeMeasure.stop_watch
def loop_proc(pipelines, model, model_input_width, model_input_height):
    ...

def main(
    ...
):
    ...
    while True:
        loop_proc(pipelines, model, model_input_width, model_input_height)
        time_measure_utils.TimeMeasure.show_time_result(loop_proc)
        time_measure_utils.TimeMeasure.reset_time_result()

if __name__ == "__main__":
    ...

計測した結果、以下のようになりました。

loop_proc: 0.178[s]=5.6[fps]
  convert_post: 0.037[s]
  get_images  : 0.001[s]
  send        : 0.000[s]
  model_infer : 0.112[s]
  convert_pre : 0.027[s]

ステップ2.方針を考える

ステップ1の計測結果と、スクリプトの処理内容から、パイプライン化の方針を考えます。

ステップ2-1.並列のさせ方を考える

先程の処理結果から、ボトルネックとなっているのは、model_infer(GPUで学習モデルの推論処理を行う関数)であり、前処理(get_images、convert_pre)と後処理(convert_post、send)はそれぞれ合計しても、model_inferの時間よりも短いことがわかります。

よって、前処理 → 推論処理 → 後処理 とパイプライン化させれば良いことがわかります。

get_images → convert_pre → model_infer → convert_post → send とすることも考えられますが、これだと、get_imagesからsendにいたるまでの時間が4ループ分かかってしまうため、データ取得から送信までの遅延が大きくなります。

ステップ2-2.並列化の方式を考える

この後、パイプライン化するため、その並列化の方針について考えます。特に、

Pythonの場合、並列処理(並行処理・非同期処理)に関しては、他の言語と少し違います。詳しくは以下のリンクにかかれています。

Pythonにおける非同期処理: asyncio逆引きリファレンス - Qiita

Multiprocessing vs. Threading in Python: What you need to know.

簡単にまとめると、以下のようです。

  • threading:CPUレベルの処理並列化はされない。スレッドがネットワーク・ディスクI/Oなどを待っている時間、別のスレッドが処理を実行する。スレッド間でメモリ空間は共有される。
  • multiprocessingCPUレベルの処理並列化がされる。そのため、CPUバウンドな処理を高速化できる。スレッド間でメモリ空間は共有されず、別プロセスとして別メモリ空間を持ち、元のプロセスのメモリがコピーされる。プロセス作成のオーバーヘッドが大きい。
  • asyncio:CPUレベルの処理並列化はされない。ある処理の待ち時間に、別の処理を実行する。スレッドは1つ(複数作成されるわけではない)であるため、多数の処理を並行化できる。

今回のスクリプトでボトルネックとなっているのは、GPUで推論処理を実行する箇所です。そのため、GPUでの処理を待つ時間(I/O)がほとんどであり、CPUバウンドでなさそうなことが推測できます。前処理、後処理もそこまで時間がかかっているわけではなく、同時に動かしてもCPUバウンドにはならなさそうです。また、パイプラインとして動かすスレッドは3つで、数は多くありません。(より正確に検討するには、CPU・GPUの使用率を計測すると良いと思います。)

よって、threadingが適していると思われます。

他の言語の場合も同様に、並列化したい処理の内容に合わせて、並列化の方式を選択します。

ステップ3.スクリプトを変更する(実装)

ステップ2の検討内容に合わせて、スクリプトに並列処理を実装します。

先に結論ですが、以下のようなスクリプトを実装しました。ステップ1-1のスクリプトをskeleton_tracking_with_realsense.pyというファイル名で保存し、importして利用しています。実装内容について、下で説明します。

import threading
import queue
from skeleton_tracking_with_realsense import *

import time_measure_utils

# step 3-1
@time_measure_utils.TimeMeasure.stop_watch
def proc0(args, params):
    _ = args
    pipelines, model_input_width, model_input_height = params

    # 画像取得
    image_sets = get_images(pipelines)

    # 前処理(変換)
    RGB_images_conv = convert_pre(image_sets, model_input_width, model_input_height)

    return RGB_images_conv

@time_measure_utils.TimeMeasure.stop_watch
def proc1(args, params):
    RGB_images_conv = args
    model = params

    # モデルで推論処理
    outputs = model_infer(RGB_images_conv, model)

    return outputs

@time_measure_utils.TimeMeasure.stop_watch
def proc2(args, params):
    outputs = args
    _ = params

    # 後処理(骨格検出)
    skeletons = convert_post(outputs)

    # 結果を送信
    ret = send(skeletons)

    return ret

# step 3-2
def wrap_func_for_mt(func, params):
    def wrap_func(queue_input, queue_output):
        while True:
            input = queue_input.get()
            if input is None:
                queue_output.put(None)
                continue

            result = func(input, params)

            queue_output.put(result)

    return wrap_func

# step 3-6
@time_measure_utils.TimeMeasure.stop_watch
def loop_proc(queues_input, queues_output, inputs):
    for queue_input, input in zip(queues_input, inputs):
        queue_input.put(input)

    outputs = []
    for queue_output in queues_output:
        output = queue_output.get()
        outputs.append(output)

    return outputs

def main_multithread(
    n_realsense_devices, capture_width, capture_height, capture_fps, 
    model_folder_path, model_input_width, model_input_height
):
    pipelines, model = pre_proc(
        n_realsense_devices, capture_width, capture_height, capture_fps,
        model_folder_path
    )

    # step 3-2
    # funcs to proc in pipeline
    func_params = [
        (proc0, (pipelines, model_input_width, model_input_height)),
        (proc1, (model)),
        (proc2, ()),
    ]
    wrap_funcs = list(map(lambda func_param: wrap_func_for_mt(func_param[0], func_param[1]), func_params))

    # step 3-3
    # prepare queues
    queues_input = [queue.Queue() for _ in range(len(wrap_funcs))]
    queues_output = [queue.Queue() for _ in range(len(wrap_funcs))]

    # step 3-4
    # create Threads
    threads = []
    for wrap_func, queue_input, queue_output in zip(wrap_funcs, queues_input, queues_output):
        t = threading.Thread(target=wrap_func, args=(queue_input, queue_output))
        threads.append(t)

    for t in threads:
        t.start()

    # step 3-5
    # start pipeline
    init_inputs = [[]] + [None]*(len(wrap_funcs) - 1)  # [[], None, None, ...]
    inputs = init_inputs
    while True:
        # step 3-6
        outputs = loop_proc(queues_input, queues_output, inputs)

        inputs = [[]] + outputs[:-1]
        output_pipeline = outputs[-1]

        time_measure_utils.TimeMeasure.show_time_result(loop_proc)
        time_measure_utils.TimeMeasure.reset_time_result()

if __name__ == "__main__":
    main_multithread(
        N_REALSENSE_DEVICES, CAPTURE_WIDTH, CAPTURE_HEIGHT, CAPTURE_FPS,
        MODEL_FOLDER_PATH, MODEL_INPUT_WIDTH, MODEL_INPUT_HEIGHT,
    )

ステップ3-1.処理を分ける

proc0、proc1、proc2関数のように、ステップ2-1で検討した並列化のさせ方で処理をまとめます。。引数を共通化し、それぞれの入出力となる引数と、パラメータの処理を追加します。引数は前プロセスから渡されるデータで、パラメータは予め決まった値をスレッド作成時に渡すものです。これによって、元の関数(get_images、convert_preなど)を変更することなく、利用できます。

ステップ3-2.パイプライン化用のラップ関数を作成する

wrap_func_for_mp関数のように、分けた処理をパイプラインで利用するためのラッパ関数を作成します。これによって、「データ入力用のキューとデータ出力用のキューを引数として受け取り、以降、入力用キューにデータが来たら、処理して、出力用キューにデータを入れる、というループを繰り返す関数」になります。(ややこしくてすみません)

今回は、Noneを受け取ったら入力がないと判断し、なにも処理せずNoneを次にわたす、という仕様にしています。(もっと凝るならば、あるシグナルを受け取ったらループを中断する、といった処理を追加しても良いかもしれません)

ステップ3-3.スレッド間のデータのやりとりのためのキューを作成する

各スレッドとデータをやり取りするための、キューを作成します。今回は、入力用のキューと出力用のキューを分け、それぞれの処理からのデータのやり取りを、メインループの方でコントロールしています。

ステップ3-4.各処理用のスレッドを作成・開始する

ラップした各関数に、対応する入力用キュー・出力用キューを渡し、スレッドを作成します。その後、スレッドを開始します。この段階では、入力用キューにデータがないため、各処理はラップ関数の input = queue_input.get() の箇所で止まります。

ステップ3-5.最初の入力を作成する

開始直後は、パイプラインの先頭以外データがありませんので、自分で作成します。

今回は、ステップ3-2で述べたように、データがないことをNoneで表すことにしたため、proc0以外の入力はNoneにしています。また、proc0では引数を取らないため、ダミーデータとしして[]を入力しています。

ステップ3-6.ループ処理

入力データを各入力用キューに渡し、出力用キューにデータが来るのを待ちます。データが揃ったら、出力データを1つずらし、次の入力データとします。これを繰り返すことで、処理がパイプラインでつながります。

4.動作確認

実行すると以下のようになりました。処理時間が短縮されており、ボトルネックだったproc1とほぼ同じになっています。うまくパイプライン化できていることが確認できました。

loop_proc: 0.124[s]=8.1[fps]
  proc1: 0.123[s]
  proc2: 0.046[s]
  proc0: 0.040[s]

また、ステップ0-2で述べた当初の目的であった、8[fps]程度まで処理速度を向上させることができました。

まとめ

今回は、処理速度を向上させることを目的に、パイプライン化するための考え方・手順・実装方法をまとめました。

かなり長くなってしましましたが、ご参考になれば幸いです。

間違っているところなどがあれば、コメントでお教えいただけると助かります。

参考にさせていただいたページ

Pythonで処理の時間を計測する冴えた方法 - Qiita

Pythonにおける非同期処理: asyncio逆引きリファレンス - Qiita

Multiprocessing vs. Threading in Python: What you need to know.