データサイエンスと品質コード

通常、機械学習モデルはjupyterノートブックに組み込まれています。コードは、非常に長い式のシートではなく、記述された関数の「ひざの上で」呼び出されるように見えます。 このようなコードをサポートすることはほとんど不可能であるため、各プロジェクトはほとんどゼロから書き直されます。 そして、本番環境でこのコードを実装することを考えることさえ怖いです。


したがって、本日、データセットおよびデータサイエンスモデルを操作するためのPythonライブラリのプレビューを厳格な裁判所に提出します。 これにより、Pythonコードは次のようになります。


my_dataset. load('/some/path'). normalize(). resize(shape=(256, 256, 256)). random_rotate(angle=(-30, 30)). random_crop(shape=(64, 64, 64)) for i in range(MAX_ITER): batch = my_dataset.next_batch(BATCH_SIZE, shuffle=True) #  ,      

この記事では、コードをシンプルで理解しやすく、便利にするための主要なクラスとメソッドについて学びます。



図書館はまだ最終仕上げを行っており、まだ一般公開されていません。
この記事は完全なドキュメントではなく、ライブラリの簡単な説明とその使用例にすぎません。
あなたのコメントは、ライブラリを完成させ、必要な機能を含めるのに役立ちます。


データセット


データの量は非常に大きくなる可能性があり、データの処理が開始されるまでに、たとえば徐々にデータが到着する場合など、すべてのデータがまったくない場合があります。 したがって、 Datasetクラスはそれ自体にデータを格納しません。 インデックス-データの要素のリスト(識別子または単にシリアル番号の場合もあります)、およびデータを操作するためのメソッドが定義されているBatchクラスが含まれます。


 dataset = Dataset(index = some_index, batch_class=DataFrameBatch) 

Datasetの主な目的は、バッチの形成です。


 batch = dataset.next_batch(BATCH_SIZE, shuffle=True) # batch -   DataFrameBatch, #  BATCH_SIZE   

または、ジェネレータを呼び出すことができます:


 for batch in dataset.gen_batch(BATCH_SIZE, shuffle=False, one_pass=True): # batch -   DataFrameBatch 

バッチは、厳密に順序付けられた状態または無秩序に収集され、無限に繰り返されるか、データに応じて1サイクルだけ実行されます。 状況に応じて、ステップごとに異なるサイズのバッチを作成することもできます。


反復に加えて、別の便利な操作がDatasetで利用できますcv_splitデータセットをトレイン、テスト、および検証に分割します。 そして、これは特に便利ですが、それぞれが再びデータセットです。


 dataset.cv_split([0.7, 0.2, 0.1]) #    70 / 20 / 10 #     for i in range(MAX_ITER): batch = dataset.train.next_batch(BATCH_SIZE, shuffle=True) #  ,      

索引


データセットの要素のアドレス指定は、インデックスを使用して行われます。 識別子のセット(クライアント、トランザクション、CTスナップショット)または単なるシリアル番号(たとえば、 numpy.arange(N) )にすることができます。 データセットは(ほぼ)任意に大きく、RAMに収まらない場合があります。 しかし、これは必須ではありません。 結局、データ処理はバッチで実行されます。


インデックスの作成は非常に簡単です。


 ds_index = DatasetIndex(sequence_of_item_ids) 

シーケンスは、リスト、 numpy配列、 pandas.Seriesまたはその他のpandas.Seriesデータ型にすることができます。


ソースデータが別のファイルに保存されている場合、これらのファイルのリストからすぐにインデックスを作成すると便利です。


 ds_index = FilesIndex(path='/some/path/*.dat', no_ext=True) 

ここで、インデックス要素は、指定されたディレクトリからのファイル名(拡張子なし)です。


データセットの要素(3次元CT画像など)が別のディレクトリに保存されることがあります。


 ds_index = FilesIndex(path='/ct_images_??/*', dirs=True) 

これにより、 /ct_images_01/ct_images_02/ct_images_02などからすべてのサブディレクトリの共通インデックスが構築されます。 ファイルインデックスは、その要素のフルパスを記憶しています。 したがって、後でloadまたはsaveメソッドで、パスindex.get_fullpath(index_item)簡単に取得できます。


ほとんどの場合、インデックスを操作する必要はありませんが、必要な作業はすべて内部で行われ、すでにバッチ全体でのみ作業しています。


クラスバッチ


データのすべてのストレージロジックと処理メソッドは、 Batchクラスで定義されます。 例としてCT画像を操作するためのクラスを作成しましょう。 CTImagesBatchの子孫となる基本クラスBatchには、このバッチの要素のリストとNone初期化されたdata属性を格納する属性indexが既にあります。 そして、これで十分であるため、コンストラクターを再定義しません。


したがって、すぐにload actionメソッドの作成に進みます。


 class CTImagesBatch(Batch): @action def load(self, src, fmt): if fmt == 'dicom': self.data = self._load_dicom(src) elif fmt == 'blosc': self.data = self._load_blosc(src) elif fmt == 'npz': self.data = self._load_npz(src) else: raise ValueError("Incorrect format") return self 

まず、メソッドの前に@actionデコレータを@action必要があります(少し後で理由がわかります)。


次に、 Batchオブジェクトを返す必要があります。 同じクラスの新しいオブジェクト(この場合はCTImagesBatch)、または別のクラスのオブジェクト(ただし、確かにBatch子孫)か、単純にself返すことができます。


このアプローチにより、データに対する一連のアクションを記述することができます。 さらに、処理中、データは内容だけでなく、形式と構造も異なる場合があります。


プライベートメソッド_load_dicom_load_bloscおよび_load_npz時間を無駄にしません。 特定の形式のファイルからデータをロードし、3次元のnumpy配列[バッチサイズ、画像の幅、画像の高さ]を返すことができます。 主なことは、ここで各バッチのデータの配置方法を決定したことであり、この配列を引き続き使用します。


次に、非常に複雑な画像処理を実行するvery_complicated_processingメソッドを記述します。 バッチ内の画像は互いに独立しているため、それらを並行して処理すると便利です。


 class CTImagesBatch(Batch): ... @action @inbatch_parallel(target='threads') def very_complicated_processing(self, item, *args, **kwargs): #   ... return processed_image_as_array 

つまり、メソッドは単一のスナップショットを処理しているように記述され、このスナップショットのインデックスは最初のパラメーターで渡されます。


並列処理のマジックを機能させるには、並列化技術(プロセス、スレッドなど)が設定されているデコレータ、および並列化の前後に呼び出される前処理および後処理関数でメソッドをラップする必要があります。


ところで、 asyncメソッドとして集中的な入出力を伴う操作を記述し、 target='async'を使用して並列化すると、データのロードとアンロードが大幅に高速化されます。


これがすべてプログラミングの利便性を高めることは明らかですが、ここで並列処理が必要かどうかの「 思考 」を完全に排除するわけではありません。


すべてのactionメソッドが記述されたら、バッチを操作できます。


 for i in range(MAX_ITER): batch = ct_images_dataset.next_batch(BATCH_SIZE, shuffle=True) processed_batch = batch.load('/some/path/', 'dicom') .very_complicated_processing(some_arg=some_value) .resize(shape=(256, 256, 256)) .random_rotate(angle=(-30, 30)) .random_crop(shape=(64, 64, 64)) #  ,   processed_batch    

見た目は良さそうですが、どういうわけか、バッチ処理の反復とデータ処理が混在しているのは間違っています。 はい。そして、 next_batch以外に何も存在しないように、モデルのトレーニングサイクルを可能な限り減らしたいとnext_batchます。


一般に、一連のactionメソッドをデータセットレベルに移動する必要があります。


パイプライン


そしてそれを行うことができます。 結局のところ、これらすべてのactionデコレータがフェンスで囲まれたのは無駄ではありません。 メソッドをデータセットレベルに転送するというtransferringな魔法を隠しています。 だから書くだけ:


 ct_images_pipeline = ct_images_dataset.pipeline(). .load('/some/path/', 'dicom') .very_complicated_processing(some_arg=some_value) .resize(shape=(256, 256, 256)). .random_rotate(angle=(-30, 30)) .random_crop(shape=(64, 64, 64)) # ... for i in range(MAX_ITER): batch = ct_images_pipeline.next_batch(BATCH_SIZE, shuffle=True) #  ,       

新しいDataset下位クラスを作成して、これらのメソッドをすべて記述する必要はありません。 これらは対応するBatchクラスにあり、 @actionデコレータでマークされています。つまり、 Datasetクラスにあるかのように安全に呼び出すことができます。


もう1つのトリックは、このアプローチでは、すべてのactionメソッドが「遅延」(遅延)になり、遅延して実行されることです。 つまり、 next_batch呼び出されたときにこのバッチが形成された瞬間に、各バッチに対して読み込み、処理、サイズ変更などのアクションが実行されます。


また、各バッチの処理には時間がかかることがあるため、事前にバッチを作成しておくと便利です。 これは、モデルトレーニングがGPUで実行される場合に特に重要です。これは、新しいバッチを見越して単純なGPUがその高性能のすべての利点を簡単に「食べる」ことができるためです。


 batch = ct_images_pipeline.next_batch(BATCH_SIZE, shuffle=True, prefetch=3) 

prefetchパラメーターは、3つのバッチを並行して読み取る必要があることを示します。 さらに、並列化テクノロジ(プロセス、スレッド)を指定できます。


データセットの結合


実際の機械学習タスクでは、単一のデータセットを扱う必要はほとんどありません。 ほとんどの場合、少なくとも2つのデータセットXとYがあります。たとえば、家のパラメーターのデータとその値のデータです。 コンピュータービジョンタスクでは、画像自体に加えて、クラスラベル、セグメント化マスク、境界ボックスがまだあります。


一般に、複数のデータセットから並列バッチを形成できると便利です。 このために、 join操作を実行joinか、 JointDataset作成できます。


共同データセット


バッチの並列反復のみが必要な場合は、単一のデータセットを作成する方が便利です。


 joint_dataset = JointDataset((ds_X, ds_Y)) 

ds_Xds_Yが同じインデックスに基づいていない場合、インデックスの長さが同じで、 ds_Yが同じであることが重要です。つまり、 ds_Y[i]の値はds_X[i]値に対応します。 この場合、データセットの作成は少し異なります。


 joint_dataset = JointDataset((ds_X, ds_Y), align='order') 

そして、すべてが完全に標準的な方法で行われます:


 for i in range(MAX_ITER): batch_X, batch_Y = joint_dataset.next_batch(BATCH_SIZE, shuffle=True) 

next_batchが返すのは1つのバッチではなく、各データセットからのバッチを含むタプルだけです。


当然、 JointDatasetはパイプラインで構成することもできます。


 pl_images = ct_images_ds.pipeline() .load('/some/path', 'dicom') .hu_normalize() .resize(shape=(256,256,256)) .segment_lungs() pl_labels = labels_ds.pipeline() .load('/other/path', 'csv') .apply(lambda x: (x['diagnosis'] == 'C').astype('int')) full_ds = JointDataset((pl_images, pl_labels), align='same') for i in range(MAX_ITER): images_batch, labels_batch = full_ds.next_batch(BATCH_SIZE, shuffle=True) #    ,        

また、パイプラインはデータセットのコンポーネントであるため、画像とラベルの読み込みと処理はnext_batch呼び出されたときにのみ開始されます。 つまり、すべての計算が実行され、必要な場合にのみバッチが形成されます。


結合操作


ただし、データセットで操作を実行し、別のデータセットからデータを適用する必要がある場合は、他の状況もあります。


これは、CT画像を使用した例で最もよく実証されます。 がんの成長の座標とサイズを読み込み、それらから3次元マスクを形成します。


 pl_masks = nodules_ds.pipeline() .load('/other/path', 'csv') .calculate_3d_masks() 

CT画像を読み込んでマスクを適用し、癌性領域のみを選択します。


 pl_images = ct_images_ds.pipeline(). .load('/some/path', 'dicom') .hu_normalize() .resize(shape=(256, 256, 256)) .join(pl_masks) .apply_masks(op='mult') 

joinは、データセットを指定します。 このため、次のactionメソッド(この例ではapply_masks )は、このデータセットのバッチを最初の引数として使用します。 バッチだけでなく、必要なバッチだけです。 たとえば、 ct_images_dsの現在のバッチに画像ct_images_ds 、および14が含まれている場合、マスク付きの添付のバットは画像ct_images_ds 、および14にも適用されます。


当然、最初にjoinせずに明示的に渡すこともできるため、この引数を考慮してapply_masksメソッドを記述する必要があります。 さらに、 actionメソッドでは、バッチの要素のインデックスと識別子について考えることができなくなります-マスクの配列を画像の配列に適用するだけです。


繰り返しますが、 pl_images.next_batchを呼び出すまで、画像もマスクもダウンロードと計算は開始されませんpl_images.next_batch


すべてをまとめる


それでは、完全なワークフローデータサイエンスプロジェクトがどのようになるかを見てみましょう。


  1. インデックスとデータセットを作成する
     ct_images_index = FilesIndex(path='/ct_images_??/*', dirs=True) ct_images_dataset = Dataset(index = ct_images_index, batch_class=CTImagesBatch) 
  2. 前処理を行い、処理した画像を保存します


     ct_images_dataset.pipeline() .load(None, 'dicom') #     dicom     .hu_normalize() .resize(shape=(256, 256, 256)) .segment_lungs() .save('/preprocessed/images', 'blosc') .run(BATCH_SIZE, shuffle=False, one_pass=True) 

  3. モデルのデータの準備と拡張について説明します


     ct_preprocessed_index = FilesIndex(path='/preprocessed/images/*') ct_preprocessed_dataset = Dataset(index = ct_preprocessed_index, batch_class=CTImagesBatch) # ct_images_pipeline = ct_preprocessed_dataset.pipeline() .load(None, 'blosc') .split_to_patches(shape=(64, 64, 64)) # ct_masks_ds = Dataset(index = ct_preprocessed_index, batch_class=CTImagesBatch) ct_masks_pipeline = ct_masks_ds.pipeline(). .load('/preprocessed/masks', 'blosc') .split_to_patches(shape=(64, 64, 64)) # full_ds = JointDataset((ct_images_pipeline, ct_masks_pipeline)) 

  4. トレーニングバッチを作成し、モデルをトレーニングします


     full_ds.cv_split([0.8, 0.2]) for i in range(MAX_ITER): images, masks = full_ds.train.next_batch(BATCH_SIZE, shuffle=True) #  ,       

  5. モデルの品質を確認します
     for images, masks in full_ds.test.gen_batch(BATCH_SIZE, shuffle=False, one_pass=True): #     

クリアで高品質なコードの開発、複雑なデータの前処理による以前に作成されたモデルの再利用、さらには実稼働対応システムの開発まで、はるかに迅速に行える便利なライブラリがあります。


そして今、問題は次のとおりです。他にライブラリに追加する価値があるものは何か データやモデルを扱う際に何が本当に不足していますか?



Source: https://habr.com/ru/post/J326656/


All Articles