ハスケル。 マルチスレッドアプリケーションのテスト

この記事は、関数型プログラミングコースの実践教材に基づいて、 アカデミック大学ヴァレリーイサエフの講師が編集しました。

マルチスレッドアプリケーションの作成が、シングルスレッドプログラムの開発にない多くの問題に関連していることは誰にとっても秘密ではないと思います。
1つの問題は、アプリケーションのテストです。
操作の実行順序を制御することはできないため、プログラム実行の結果も制御できません。 エラーが発生した場合でも、同じレーキをもう一度踏むのはそれほど簡単ではありません。
マルチスレッドアプリケーションのテスト方法に関する小さなレシピを提供したいと思います。
必要な成分のうち、 haskellQuickCheck 、いくつかのモナド、味のある塩/ haskell taste。

実施例


実用的な例として、食事する哲学者の問題を取り上げます。

MVar aは、タイプaの値を含むか、空の参照です。
putMVar ref xは、値xを参照リンクに配置します。
takeMVar refはリンクのコンテンツを読み取り、その後は空のままにします。
既に空だった場合、ストリームは何か他の書き込みが行われるまでスリープ状態になります。
()単一の値を持つ型で、型自体と同じ方法で示されます- ()
MVar ()ようなリンクを持つフォークをモデル化します。
したがって、フォークには2つの状態があります。フォークが哲学者によって占有されている場合、フォークは空です。 プラグが空いている場合、値()が含まれます。

 import System.Random import Control.Monad import Control.Concurrent import Control.Monad.Cont import Control.Monad.Trans import Data.IORef import Test.QuickCheck import Test.QuickCheck.Gen import Test.QuickCheck.Monadic -- sleep       ( 0  0.3) sleep :: IO () sleep = randomRIO (0, 300000) >>= threadDelay phil :: Int --  . -> MVar () --    . -> MVar () --    . -> IO () phil n leftFork rightFork = forever $ do putStrLn $ show n ++ " is awaiting" sleep takeMVar leftFork putStrLn $ show n ++ " took left fork" -- sleep takeMVar rightFork putStrLn $ show n ++ " took right fork" sleep putMVar leftFork () putMVar rightFork () putStrLn $ show n ++ " put forks" sleep runPhil :: Int -> IO () runPhil n = do --  ,   . forks <- replicateM n $ newMVar () --  5 ,     phil. forM_ [1..n] $ \i -> forkIO $ phil i (forks !! (i - 1)) (forks !! (i `mod` n)) main = do runPhil 5 --    ,  ,     . forever (threadDelay 1000000000) 

このプログラムではデッドロックが発生する可能性があります。
それを楽しむために、あなたはラインのコメントを外すことができます- sleepして少し待ってください。
私たちの目標は、このエラーを検出するテストを作成することです。
しかし、これを行う前に、運用の順序をどのように管理するかを理解する価値があります。 このために、IOの代わりに、別のモナドを使用します。

sleepphil runPhilの定義をまとめて、他のモナドでもrunPhilようにします。

 sleep :: MonadIO m => m () sleep = do r <- liftIO $ randomRIO (0, 100) r `times` liftIO (threadDelay 300) where times :: Monad m => Int -> m () -> m () times ra = mapM_ (\_ -> a) [1..r] 

これで、 sleep機能はIO操作をサポートする任意のモナドで動作できます。 MonadIOクラスでは、これを許可するliftIO関数が1つだけliftIOています。
ランダムな秒数で1回スリープする代わりに、0.3ミリ秒間ランダムな数回スリープすることに注意してください。 その理由は、モナドでは、 liftIO内のliftIOがアトミックに実行されるためです。 したがって、私たちが眠りにつく時間は何にも影響を与えません。何回それをするかが重要です。

モナドは1つのスレッドでMVarため、 MVarは役に立たず、後でphil関数がMVarおよび他のタイプのリンクで機能できるという事実に基づいて、リンクのタイプを決定します。
これを行うには、 MonadConcurrentモナドMonadConcurrentを定義します。このMonadConcurrentでは、参照による作成、読み取り、書き込みの操作、およびスレッドの作成が行われます。

 class Monad m => MonadConcurrent m where type CVar m :: * -> * newCVar :: a -> m (CVar ma) takeCVar :: CVar ma -> ma putCVar :: CVar ma -> a -> m () fork :: m () -> m () 


ここでは、言語の拡張であるタイプファミリを使用しました。
この場合、異なるモナドに対して異なるタイプのリンクを定義できるように、この拡張が必要です。
拡張機能を使用するには、次の行をファイルの先頭に追加する必要があります(同時に、後で必要になる拡張機能を接続します)。

 {-# LANGUAGE TypeFamilies, ExistentialQuantification, GeneralizedNewtypeDeriving #-} 

IOモナドのこのクラスのinstance定義しinstance
ここではすべてが簡単ですMVar適切な操作を使用するだけMVar

 instance MonadConcurrent IO where type CVar IO = MVar newCVar = newMVar takeCVar = takeMVar putCVar = putMVar fork m = forkIO m >> return () 

関数philrunPhil要約します。

 phil :: (MonadIO m, MonadConcurrent m) => Int -> CVar m () -> CVar m () -> m () phil n leftFork rightFork = forever $ do liftIO $ putStrLn $ show n ++ " is awaiting" sleep takeCVar leftFork liftIO $ putStrLn $ show n ++ " took left fork" takeCVar rightFork liftIO $ putStrLn $ show n ++ " took right fork" sleep putCVar leftFork () putCVar rightFork () liftIO $ putStrLn $ show n ++ " put forks" sleep runPhil :: (MonadIO m, MonadConcurrent m) => Int -> m () runPhil n = do forks <- replicateM n $ newCVar () forM_ [1..n] $ \i -> fork $ phil i (forks !! (i - 1)) (forks !! (i `mod` n)) 

プログラムを実行し、以前と同じように機能することを確認します。

モナドコンカレント


そして今、楽しみが始まります。

作業するモナドを定義します(先を見て、 Contと呼ばれます)。 また、 Contが同時に最も複雑で強力なモナドの1つであることを提案しようと思います。
このモナドを使用すると、制御フローで何でもできます。たとえば、アクションを実行する代わりに、構造に保存して(このため、タイプAction宣言します)、後で異なる順序で実行することができます。

 data Action = Atom (IO Action) | forall a. ReadRef (MaybeRef a) (a -> Action) | forall a. WriteRef (MaybeRef a) a Action | Fork Action Action | Stop 

各コンストラクターを個別に扱いましょう。
Stopアクションは、計算が完了したことを意味します。
Forkアクションは、計算ブランチ、つまり、同時に実行できる2つのスレッドがあることを意味します。
AtomアクションはアトミックIO操作を実行し、次のステップで実行する必要があるActionを含む新しいActionを返します。

例:
getSum関数は、キーボードから2つの数値を読み取り、それらの合計を出力して終了するアクションを定義します。

 getSum :: Action getSum = Atom $ do x <- readLn --    return $ Atom $ do --   y <- readLn --    return $ Atom $ do --   print (x + y) --   return Stop --   

次:
WriteRef ref val actアクションWriteRef ref val actは、 refリンクでvalの値を記録します。継続はactです。
ReadRef ref actアクションReadRef ref actは、参照refによって値を読み取り、この値を取得して継続を返します。
Action任意のタイプのリンクを保存できるように、言語の別の拡張機能である実存的数量化を使用します。

MaybeRefタイプは、 MaybeRef代わりに使用するリンクのタイプを表し、 Maybeへの参照として定義されます。

 newtype MaybeRef a = MaybeRef (IORef (Maybe a)) 

これでモナドを定義できます。
約束したとおり、 Contモナドをラップするだけです。

 newtype Concurrent a = Concurrent (Cont Action a) deriving Monad 

Cont Actionモナドは次のように構成されています。
タイプaの値を返す代わりに、タイプa継続(a -> Action) a- (a -> Action)を取得し、この関数に値を渡し、結果を返します。
つまり、 Cont Action a = (a -> Action) -> Actionと仮定できます。
具体的には、 (a -> Action) -> ActionCont Action aまたはその逆に変換する次の関数ペアがあります。

 cont :: ((a -> Action) -> Action) -> Cont Action a. runCont :: Cont Action a -> (a -> Action) -> Action 

これで、 MonadIOおよびMonadConcurrentインスタンスを定義できます。

 instance MonadIO Concurrent where liftIO m = Concurrent $ cont $ \c -> Atom $ do a <- m return (ca) 

ここで何が起こるか見てみましょう。
liftIOはIO操作を受け入れ、それをアトミックアクションにラップします。 つまり、継続して(つまり、cのタイプa -> Action )関数を渡し、IO操作mを実行するアトミックアクションを返します。
アトミック操作がActionを返すようにAtomを定義しました。これは継続です。
実際、これは私たちがやっていることです: mを実行した後、 cを呼び出して必要な継続を返します。

次に、 instance MonadConcurrent定義しinstance MonadConcurrent
定義したばかりのliftIO関数を使用して、 newCVarリンクを作成します。
takeCVarおよびputCVar 、対応するアクションputCVar返し、この構造内で継続を継続します。
forkでは、両方のスレッドが格納されているアクションを返します。1つはfork関数への引数で渡され、もう1つは継続から取得されます。

 instance MonadConcurrent Concurrent where type CVar Concurrent = MaybeRef newCVar a = liftIO $ liftM MaybeRef $ newIORef (Just a) takeCVar v = Concurrent $ cont (ReadRef v) putCVar va = Concurrent $ cont $ \c -> WriteRef va $ c () fork (Concurrent m) = Concurrent $ cont $ \c -> Fork (runCont m $ \_ -> Stop) $ c () 

私たちのモナドはほとんど準備ができています、それを起動する方法を学ぶためだけに残ります。
始めるために、 Actionを起動する関数を書きましょう。 アクションのリストを取り、各要素は個別のスレッドです。
アクションを起動するための戦略は異なる場合があります。 2つのポイントを決定します。スレッドを実行する順序と、空の変数から値を読み取ろうとした場合の対処方法です。 変数に何も含まれていない可能性があることを思い出させてください。その後、他のスレッドが変数に何かを入れるのを待つ必要があります。
最初に、スレッドを順番に実行する単純なバージョンを作成しましょう。 空の変数から読み取ろうとするスレッドはキューの最後に移動します。

 runAction :: [Action] -> IO () --    , . runAction [] = return () --   ,  ,   ,    . runAction (Atom m : as) = do a' <- m runAction $ as ++ [a'] --       . runAction (Fork a1 a2 : as) = runAction $ as ++ [a1,a2] --    . runAction (Stop : as) = runAction as runAction (ReadRef (MaybeRef ref) c : as) = do --   . ma <- readIORef ref case ma of --    -,  Just a -> do --   . writeIORef ref Nothing --     . runAction (as ++ [ca]) --     ,       ,         . Nothing -> runAction (as ++ [ReadRef (MaybeRef ref) c]) --    ,     . runAction (WriteRef (MaybeRef ref) val a : as) = do writeIORef ref (Just val) runAction (as ++ [a]) 

putMVar動作は、 WriteRef実装とは若干異なることに注意してください。
リンクにすでに値がある場合、 putMVar変数が解放されるまでスレッドputMVarフリーズします。 この場合、値を書き換えます。
コードを過度に複雑にしないために、この状況ではputMVarとしてputMVarするバージョンを作成する価値はありません。

次に、 Concurrentを起動する関数を記述し、 mainを再定義します。

 runConcurrent :: Concurrent () -> IO () runConcurrent (Concurrent c) = runAction [runCont c $ \_ -> Stop] main = runConcurrent (runPhil 5) 

現在、同じスレッドで作業しており、 threadDelayがすべての作業を停止しているため、速度はわずかに低下しています。

テストを書く


「料理に調味料を加える」時が来ました-私たちの例のためにテストを書きます。
これを行うには、テスト用のランダム入力を生成するQuickCheckライブラリを使用します。 スレッドをさまざまな順序で実行するため、テストの入力はリストから次のスレッドを選択する順序です。
入力を数字のリストでエンコードすることは可能ですが、問題は、スレッドの数が変化する可能性があるため、これらの数字がどの範囲から選択されるべきかを事前に知らないことです。
したがって、入力データをInt -> Int型の関数のリストでエンコードします。これらの関数は、数値nを取得し、区間[0,n-1]から数値を返します。

 newtype Route = Route [Int -> Int] 

QuickCheckライブラリーによって提供されるArbitraryクラスは、ランダム要素を生成できるタイプを記述することを目的としています。
このクラスでは、2つの関数が宣言されています- shrinkarbitrary
shrinkにはデフォルトの実装があるため、再定義しません。
arbitrary関数では、各関数が区間[0,n-1]から数値を返すランダム関数のリストを生成します。

 instance Arbitrary Route where arbitrary = fmap Route (listOf arbitraryFun) where arbitraryFun = MkGen $ \qsn -> unGen (choose (0, n - 1)) qs 

QuickCheckが必要QuickCheckするため、 Route instance Showも定義しinstance Show
残念ながら、あまり有用なshow書くことはできません。 さらに、この関数は使用されないため、未定義のままにします。

 instance Show Route where show = undefined 

これで、 runActionよりスマートなバージョンの作成を開始できます。
最初の違いは、アトミックアクションの実行とリンクを使用した作業を分離することです。
まず、アトミックアクションを実行する補助関数skipAtomsskipAtomsします。この関数はアクションのリストを受け入れ、 AtomFork 、およびStop実行し、残りは結果として返されます。

 skipAtoms :: [Action] -> IO [Action] skipAtoms [] = return [] skipAtoms (Atom m : as) = do a <- m skipAtoms (as ++ [a]) skipAtoms (Fork a1 a2 : as) = skipAtoms (as ++ [a1,a2]) skipAtoms (Stop : as) = skipAtoms as skipAtoms (a : as) = fmap (a:) (skipAtoms as) 

runActionの新しいバージョンと以前のバージョンとの2番目の違いは、デッドロックの受信を追跡することです。
これを行うには、アクションの2つのリストを開始します。 最初は、アクティブな(当社が実行した)スレッドを保存します。 2番目には、リンクの更新を待機しているスレッドがあります。
アクティブなスレッドのリストが空で、待機リストがない場合、デッドロックが発生し、この場合は例外がスローされます。

3番目のイノベーションは、現在のステップで実行するストリーム番号を選択するために使用されるRouteタイプの引数です。

 runAction :: Route -> [Action] -> [Action] -> IO () runAction _ [] [] = return () runAction _ [] _ = fail "Deadlock" runAction (Route []) _ _ = return () runAction (Route (r:rs)) as bs = do as <- skipAtoms as let n = length as case splitAt (rn) as of (as1, ReadRef (MaybeRef ref) c : as2) -> do ma <- readIORef ref case ma of Just a -> do writeIORef ref Nothing runAction (Route rs) (as1 ++ [ca] ++ as2) bs Nothing -> runAction (Route rs) (as1 ++ as2) (bs ++ [ReadRef (MaybeRef ref) c]) (as1, WriteRef (MaybeRef ref) xc : as2) -> do writeIORef ref (Just x) runAction (Route rs) (as1 ++ [c] ++ as2 ++ bs) [] 

runConcurrent関数はあまり変更されていません。

 runConcurrent :: Route -> Concurrent () -> IO () runConcurrent r (Concurrent c) = runAction r [runCont c $ \_ -> Stop] [] 

round_robinを最初の引数として渡すことで、新しいバージョンがどのように機能するかを確認できます。 これは、以前のrunAction関数のrunAction似た単純な実行戦略です。 ここでは、単に無限リストを生成し、各要素について、スレッド数を法とする剰余を取得します。

 round_robin :: Route round_robin = Route $ map rem [0..] 

この入力データに対して計算を実行すると、この例の作業は乱数ジェネレーターに基づいているため、すぐにデッドロックが発生する可能性があります。したがって、入力データが常に同じであるにもかかわらず、実行順序は次のようになります。ランダム。
この例がより確定的である場合、入力データをランダムに変化させる必要がありますが、これは今から行います。

 main = quickCheck $ monadicIO $ do r <- pick arbitrary run $ runConcurrent r (runPhil 5) 

先ほど実装したarbitrary関数を使用して、 Routeタイプの任意の要素を選択します。 次に、この入力で計算を開始します。
QuickCheckQuickCheck処理します。つまり、入力データのサイズを増やすたびにテストを100回実行します。

プログラムを実行すると、次のように表示されます。

 ... 3 took left fork 4 put forks 4 is awaiting 5 took left fork 4 took left fork 1 took right fork 1 put forks 1 is awaiting 1 took left fork 2 took left fork *** Failed! Exception: 'user error (Deadlock)' (after 36 tests): 

取得に必要なもの!

おわりに


マルチスレッドアプリケーションでデッドロック状態を検出できるテストの作成方法を学びました。
その過程で、 Contモナド、型族、実存的数量化、およびQuickCheckライブラリーの使用例を見ました。
さらに、即興の素材からマルチスレッドプログラム実行のモデルを組み立てる方法を学びました。

Source: https://habr.com/ru/post/J224075/


All Articles