
この記事は、関数型プログラミングコースの実践教材に基づいて、
アカデミック大学ヴァレリーイサエフの講師が編集しました。
マルチスレッドアプリケーションの作成が、シングルスレッドプログラムの開発にない多くの問題に関連していることは誰にとっても秘密ではないと思います。
1つの問題は、アプリケーションのテストです。
操作の実行順序を制御することはできないため、プログラム実行の結果も制御できません。 エラーが発生した場合でも、同じレーキをもう一度踏むのはそれほど簡単ではありません。
マルチスレッドアプリケーションのテスト方法に関する小さなレシピを提供したいと思います。
必要な成分のうち、
haskell 、
QuickCheck 、いくつかのモナド、味のある塩/
haskell taste。
実施例
実用的な例として、食事する哲学者の問題を取り上げます。
MVar aは、タイプaの値を含むか、空の参照です。
putMVar ref xは、値xを参照リンクに配置します。
takeMVar refはリンクのコンテンツを読み取り、その後は空のままにします。
既に空だった場合、ストリームは何か他の書き込みが行われるまでスリープ状態になります。
()単一の値を持つ型で、型自体と同じ方法で示されます-
() 。
MVar ()ようなリンクを持つフォークをモデル化します。
したがって、フォークには2つの状態があります。フォークが哲学者によって占有されている場合、フォークは空です。 プラグが空いている場合、値
()が含まれます。
import System.Random import Control.Monad import Control.Concurrent import Control.Monad.Cont import Control.Monad.Trans import Data.IORef import Test.QuickCheck import Test.QuickCheck.Gen import Test.QuickCheck.Monadic
このプログラムではデッドロックが発生する可能性があります。
それを楽しむために、あなたはラインのコメントを外すことができます-
sleepして少し待ってください。
私たちの目標は、このエラーを検出するテストを作成することです。
しかし、これを行う前に、運用の順序をどのように管理するかを理解する価値があります。 このために、IOの代わりに、別のモナドを使用します。
sleep 、
phil runPhilの定義をまとめて、他のモナドでも
runPhilようにします。
sleep :: MonadIO m => m () sleep = do r <- liftIO $ randomRIO (0, 100) r `times` liftIO (threadDelay 300) where times :: Monad m => Int -> m () -> m () times ra = mapM_ (\_ -> a) [1..r]
これで、
sleep機能はIO操作をサポートする任意のモナドで動作できます。
MonadIOクラスでは、これを許可する
liftIO関数が1つだけ
liftIOています。
ランダムな秒数で1回スリープする代わりに、0.3ミリ秒間ランダムな数回スリープすることに注意してください。 その理由は、モナドでは、
liftIO内の
liftIOがアトミックに実行されるためです。 したがって、私たちが眠りにつく時間は何にも影響を与えません。何回それをするかが重要です。
モナドは1つのスレッドで
MVarため、
MVarは役に立たず、後で
phil関数が
MVarおよび他のタイプのリンクで機能できるという事実に基づいて、リンクのタイプを決定します。
これを行うには、
MonadConcurrentモナド
MonadConcurrentを定義します。この
MonadConcurrentでは、参照による作成、読み取り、書き込みの操作、およびスレッドの作成が行われます。
class Monad m => MonadConcurrent m where type CVar m :: * -> * newCVar :: a -> m (CVar ma) takeCVar :: CVar ma -> ma putCVar :: CVar ma -> a -> m () fork :: m () -> m ()
ここでは、言語の拡張であるタイプファミリを使用しました。
この場合、異なるモナドに対して異なるタイプのリンクを定義できるように、この拡張が必要です。
拡張機能を使用するには、次の行をファイルの先頭に追加する必要があります(同時に、後で必要になる拡張機能を接続します)。
{-# LANGUAGE TypeFamilies, ExistentialQuantification, GeneralizedNewtypeDeriving #-}
IOモナドのこのクラスの
instance定義し
instance 。
ここではすべてが簡単です
MVar適切な操作を使用するだけ
MVar 。
instance MonadConcurrent IO where type CVar IO = MVar newCVar = newMVar takeCVar = takeMVar putCVar = putMVar fork m = forkIO m >> return ()
関数
philと
runPhil要約します。
phil :: (MonadIO m, MonadConcurrent m) => Int -> CVar m () -> CVar m () -> m () phil n leftFork rightFork = forever $ do liftIO $ putStrLn $ show n ++ " is awaiting" sleep takeCVar leftFork liftIO $ putStrLn $ show n ++ " took left fork" takeCVar rightFork liftIO $ putStrLn $ show n ++ " took right fork" sleep putCVar leftFork () putCVar rightFork () liftIO $ putStrLn $ show n ++ " put forks" sleep runPhil :: (MonadIO m, MonadConcurrent m) => Int -> m () runPhil n = do forks <- replicateM n $ newCVar () forM_ [1..n] $ \i -> fork $ phil i (forks !! (i - 1)) (forks !! (i `mod` n))
プログラムを実行し、以前と同じように機能することを確認します。
モナドコンカレント
そして今、楽しみが始まります。
作業するモナドを定義します(先を見て、
Contと呼ばれます)。 また、
Contが同時に最も複雑で強力なモナドの1つであることを提案しようと思います。
このモナドを使用すると、制御フローで何でもできます。たとえば、アクションを実行する代わりに、構造に保存して(このため、タイプ
Action宣言します)、後で異なる順序で実行することができます。
data Action = Atom (IO Action) | forall a. ReadRef (MaybeRef a) (a -> Action) | forall a. WriteRef (MaybeRef a) a Action | Fork Action Action | Stop
各コンストラクターを個別に扱いましょう。
Stopアクションは、計算が完了したことを意味します。
Forkアクションは、計算ブランチ、つまり、同時に実行できる2つのスレッドがあることを意味します。
AtomアクションはアトミックIO操作を実行し、次のステップで実行する必要がある
Actionを含む新しい
Actionを返します。
例:
getSum関数は、キーボードから2つの数値を読み取り、それらの合計を出力して終了するアクションを定義します。
getSum :: Action getSum = Atom $ do x <- readLn
次:
WriteRef ref val actアクション
WriteRef ref val actは、
refリンクで
valの値を記録します。継続は
actです。
ReadRef ref actアクション
ReadRef ref actは、参照
refによって値を読み取り、この値を取得して継続を返します。
Action任意のタイプのリンクを保存できるように、言語の別の拡張機能である実存的数量化を使用します。
MaybeRefタイプは、
MaybeRef代わりに使用するリンクのタイプを表し、
Maybeへの参照として定義されます。
newtype MaybeRef a = MaybeRef (IORef (Maybe a))
これでモナドを定義できます。
約束したとおり、
Contモナドをラップするだけです。
newtype Concurrent a = Concurrent (Cont Action a) deriving Monad
Cont Actionモナドは次のように構成されています。
タイプaの値を返す代わりに、タイプ
a継続
(a -> Action) a-
(a -> Action)を取得し、この関数に値を渡し、結果を返します。
つまり、
Cont Action a = (a -> Action) -> Actionと仮定できます。
具体的には、
(a -> Action) -> Actionを
Cont Action aまたはその逆に変換する次の関数ペアがあります。
cont :: ((a -> Action) -> Action) -> Cont Action a. runCont :: Cont Action a -> (a -> Action) -> Action
これで、
MonadIOおよび
MonadConcurrentインスタンスを定義できます。
instance MonadIO Concurrent where liftIO m = Concurrent $ cont $ \c -> Atom $ do a <- m return (ca)
ここで何が起こるか見てみましょう。
liftIOはIO操作を受け入れ、それをアトミックアクションにラップします。 つまり、継続して(つまり、cのタイプ
a -> Action )関数を渡し、IO操作
mを実行するアトミックアクションを返します。
アトミック操作が
Actionを返すように
Atomを定義しました。これは継続です。
実際、これは私たちがやっていることです:
mを実行した後、
cを呼び出して必要な継続を返します。
次に、
instance MonadConcurrent定義し
instance MonadConcurrent 。
定義したばかりの
liftIO関数を使用して、
newCVarリンクを作成します。
takeCVarおよび
putCVar 、対応するアクション
putCVar返し、この構造内で継続を継続します。
forkでは、両方のスレッドが格納されているアクションを返します。1つは
fork関数への引数で渡され、もう1つは継続から取得されます。
instance MonadConcurrent Concurrent where type CVar Concurrent = MaybeRef newCVar a = liftIO $ liftM MaybeRef $ newIORef (Just a) takeCVar v = Concurrent $ cont (ReadRef v) putCVar va = Concurrent $ cont $ \c -> WriteRef va $ c () fork (Concurrent m) = Concurrent $ cont $ \c -> Fork (runCont m $ \_ -> Stop) $ c ()
私たちのモナドはほとんど準備ができています、それを起動する方法を学ぶためだけに残ります。
始めるために、
Actionを起動する関数を書きましょう。 アクションのリストを取り、各要素は個別のスレッドです。
アクションを起動するための戦略は異なる場合があります。 2つのポイントを決定します。スレッドを実行する順序と、空の変数から値を読み取ろうとした場合の対処方法です。 変数に何も含まれていない可能性があることを思い出させてください。その後、他のスレッドが変数に何かを入れるのを待つ必要があります。
最初に、スレッドを順番に実行する単純なバージョンを作成しましょう。 空の変数から読み取ろうとするスレッドはキューの最後に移動します。
runAction :: [Action] -> IO ()
putMVar動作は、
WriteRef実装とは若干異なることに注意してください。
リンクにすでに値がある場合、
putMVar変数が解放されるまでスレッド
putMVarフリーズします。 この場合、値を書き換えます。
コードを過度に複雑にしないために、この状況では
putMVarとして
putMVarするバージョンを作成する価値はありません。
次に、
Concurrentを起動する関数を記述し、
mainを再定義します。
runConcurrent :: Concurrent () -> IO () runConcurrent (Concurrent c) = runAction [runCont c $ \_ -> Stop] main = runConcurrent (runPhil 5)
現在、同じスレッドで作業しており、
threadDelayがすべての作業を停止しているため、速度はわずかに低下しています。
テストを書く
「料理に調味料を加える」時が来ました-私たちの例のためにテストを書きます。
これを行うには、テスト用のランダム入力を生成する
QuickCheckライブラリを使用します。 スレッドをさまざまな順序で実行するため、テストの入力はリストから次のスレッドを選択する順序です。
入力を数字のリストでエンコードすることは可能ですが、問題は、スレッドの数が変化する可能性があるため、これらの数字がどの範囲から選択されるべきかを事前に知らないことです。
したがって、入力データを
Int -> Int型の関数のリストでエンコードします。これらの関数は、数値
nを取得し、区間
[0,n-1]から数値を返します。
newtype Route = Route [Int -> Int]
QuickCheckライブラリーによって提供される
Arbitraryクラスは、ランダム要素を生成できるタイプを記述することを目的としています。
このクラスでは、2つの関数が宣言されています-
shrinkと
arbitrary 。
shrinkにはデフォルトの実装があるため、再定義しません。
arbitrary関数では、各関数が区間
[0,n-1]から数値を返すランダム関数のリストを生成します。
instance Arbitrary Route where arbitrary = fmap Route (listOf arbitraryFun) where arbitraryFun = MkGen $ \qsn -> unGen (choose (0, n - 1)) qs
QuickCheckが必要
QuickCheckするため、
Route instance Showも定義し
instance Show 。
残念ながら、あまり有用な
show書くことはできません。 さらに、この関数は使用されないため、未定義のままにします。
instance Show Route where show = undefined
これで、
runActionよりスマートなバージョンの作成を開始できます。
最初の違いは、アトミックアクションの実行とリンクを使用した作業を分離することです。
まず、アトミックアクションを実行する補助関数
skipAtomsを
skipAtomsします。この関数はアクションのリストを受け入れ、
Atom 、
Fork 、および
Stop実行し、残りは結果として返されます。
skipAtoms :: [Action] -> IO [Action] skipAtoms [] = return [] skipAtoms (Atom m : as) = do a <- m skipAtoms (as ++ [a]) skipAtoms (Fork a1 a2 : as) = skipAtoms (as ++ [a1,a2]) skipAtoms (Stop : as) = skipAtoms as skipAtoms (a : as) = fmap (a:) (skipAtoms as)
runActionの新しいバージョンと以前のバージョンとの2番目の違いは、デッドロックの受信を追跡することです。
これを行うには、アクションの2つのリストを開始します。 最初は、アクティブな(当社が実行した)スレッドを保存します。 2番目には、リンクの更新を待機しているスレッドがあります。
アクティブなスレッドのリストが空で、待機リストがない場合、デッドロックが発生し、この場合は例外がスローされます。
3番目のイノベーションは、現在のステップで実行するストリーム番号を選択するために使用される
Routeタイプの引数です。
runAction :: Route -> [Action] -> [Action] -> IO () runAction _ [] [] = return () runAction _ [] _ = fail "Deadlock" runAction (Route []) _ _ = return () runAction (Route (r:rs)) as bs = do as <- skipAtoms as let n = length as case splitAt (rn) as of (as1, ReadRef (MaybeRef ref) c : as2) -> do ma <- readIORef ref case ma of Just a -> do writeIORef ref Nothing runAction (Route rs) (as1 ++ [ca] ++ as2) bs Nothing -> runAction (Route rs) (as1 ++ as2) (bs ++ [ReadRef (MaybeRef ref) c]) (as1, WriteRef (MaybeRef ref) xc : as2) -> do writeIORef ref (Just x) runAction (Route rs) (as1 ++ [c] ++ as2 ++ bs) []
runConcurrent関数はあまり変更されていません。
runConcurrent :: Route -> Concurrent () -> IO () runConcurrent r (Concurrent c) = runAction r [runCont c $ \_ -> Stop] []
round_robinを最初の引数として渡すことで、新しいバージョンがどのように機能するかを確認できます。 これは、以前の
runAction関数の
runAction似た単純な実行戦略です。 ここでは、単に無限リストを生成し、各要素について、スレッド数を法とする剰余を取得します。
round_robin :: Route round_robin = Route $ map rem [0..]
この入力データに対して計算を実行すると、この例の作業は乱数ジェネレーターに基づいているため、すぐにデッドロックが発生する可能性があります。したがって、入力データが常に同じであるにもかかわらず、実行順序は次のようになります。ランダム。
この例がより確定的である場合、入力データをランダムに変化させる必要がありますが、これは今から行います。
main = quickCheck $ monadicIO $ do r <- pick arbitrary run $ runConcurrent r (runPhil 5)
先ほど実装した
arbitrary関数を使用して、
Routeタイプの任意の要素を選択します。 次に、この入力で計算を開始します。
QuickCheckは
QuickCheck処理します。つまり、入力データのサイズを増やすたびにテストを100回実行します。
プログラムを実行すると、次のように表示されます。
... 3 took left fork 4 put forks 4 is awaiting 5 took left fork 4 took left fork 1 took right fork 1 put forks 1 is awaiting 1 took left fork 2 took left fork *** Failed! Exception: 'user error (Deadlock)' (after 36 tests):
取得に必要なもの!
おわりに
マルチスレッドアプリケーションでデッドロック状態を検出できるテストの作成方法を学びました。
その過程で、
Contモナド、型族、実存的数量化、および
QuickCheckライブラリーの使用例を見ました。
さらに、即興の素材からマルチスレッドプログラム実行のモデルを組み立てる方法を学びました。