memcached / MemcacheDBのデータ構造。 パート2

memcachedのデータ構造に関する記事の続き。 この最後の部分では、イベントログ、配列、およびテーブルの3つのデータ構造について検討します。

イベントログ


挑戦する


このデータ構造の目的は、最後のT秒間に分散システムで発生したイベントを保存することです。 各イベントには発生した時点があり、イベントの残りの内容はアプリケーションロジックによって決定されます。

イベントログの操作:



解決策


def time(): """         (, UNIX Epoch). @return:     @rtype: C{int} """ class Event: """ ,    . """ def when(self): """  ,    (). """ def serialize(self): """  . @return:   @rtype: C{str} """ @static def deserialize(serialized): """   . @param serialized:       @type serialized: C{str} @return:    @rtype: C{list(Event)} """ class MCEventLog(MemcacheObject): def __init__(self, mc, name, timeChunk=10, numChunks=10): """ . @param name:    @type name: C{str} @param timeChunk:       @type timeChunk: C{int} @param numChunks:      @type numChunks: C{int} """ super(MCEventLog, self).__init__(mc) self.keyTemplate = 'messagelog' + name + '_%d'; self.timeChunk = timeChunk self.numChunks = numChunks def put(self, event): """    . @param event:  @type event: L{Event} """ serialized = event.serialize() key = self.keyTemplate % (event.when() // self.timeChunk % self.numChunks) while True: try: self.mc.append(key, serialized) return except KeyError: pass try: self.mc.add(key, serialized, self.timeChunk * (self.numChunks-1)) return except KeyError: pass def fetch(self, first=None, last=None): """        (  ). @param first:     @type first: C{int} @param last:     @type last: C{int} @return:   @rtype: C{list(Event)} """ if last is None or last > time(): last = time() if first is None or last < first or (last-first) > self.timeChunk * (self.numChunks-1): first = time() — self.timeChunk * (self.numChunks-1) firstKey = first / self.timeChunk % self.numChunks lastKey = last / self.timeChunk % self.numChunks if firstKey < lastKey: keyRange = range(firstKey, lastKey+1) else: keyRange = range(firstKey, self.numChunks) + range(0, lastKey+1) keys = [self.keyTemplate % n for n in keyRange] result = [] for key in keys: try: events = Event.deserialize(self.mc.get(key)) except KeyError: continue result.extend(filter(lambda e: e.when() >= first and e.when() <= last, l)) return result 


議論


イベントログの主なアイデアは、memcachedのnumChunksキーで構成される循環バッファーです。 各キーはtimeChunk秒の間アクティブ(つまり、値が埋め込まれます)にtimeChunk 、その後次のキーがアクティブになります(最後のキーがアクティブだった場合、このロールは最初のキーに移動します)。 バッファサイクル全体、つまり 同じキーを2回使用する間の期間はnumChunks * timeChunk秒であり、各キーのライフタイムは(numChunks - 1) * timeChunk秒です。したがって、次の使用時までにtimeChunkを法とするキー作成時間のシフトがあると、キーは破棄されることが保証されます。 したがって、イベントログの容量(またはイベントが保存される期間)は、 (numChunks - 1) * timeChunk秒です。 このようにログをキーに分割すると、関心のある時間間隔に対応するキーのみをログから抽出できます。

numChunksnumChunksの選択は、イベントログのアプリケーションによって異なります。まず、目的のイベント保存期間が決定され、次にイベントの頻度によってtimeChunk値が選択されるため、イベントログの各キーのサイズは比較的小さくなります(たとえば、10-20Kb)。 これらの考慮事項から、2番目のパラメーターnumChunks値を見つけることができます。

この例では、 Eventクラスが使用されます。このクラスには、イベントが発生した時間という、興味のある唯一のプロパティがあります。 イベントログのputメソッドは、パラメーターとして渡されたイベントeventが「最近」発生した、つまり、 (numChunks - 1) * timeChunk秒(ログ容量event.when() )から経過したことをevent.when()しています。 put実行されている場合、キーが計算され、そのタイムスタンプに従って、イベントに関する情報が配置されます。 その後、前の例ですでによく知られている手法を使用して、キーが作成されるか、イベントのシリアル化された表現が既存のキーの値に追加されます。

fetchメソッドは、 firstからlastまでの時間間隔で発生したイベントを含むことができるログキーの潜在的なセットを計算します。 時間枠が設定されていない場合、 lastは現在の瞬間と等しいと見なされ、 firstは現在のログ容量から遠い瞬間と見なされます。 キーのセットは、メソッドのリング構造を考慮して計算されます。その後、対応するキーが選択され、それらに記録されたイベントがデシリアライズされ、追加フィルタリングが実行されて[first, last]セグメントに入ります。

上記のメソッドシグネチャにより、ログから新しいイベントを出力する連続呼び出しが可能になります。

  1. 最初はevents = fetch()と呼ばれevents = fetch()lastSeen max(events.when())として計算されます。
  2. 後続のすべての呼び出しは次のようになりますevents = fetch(first=lastSeen) 、while
    lastSeen毎回再計算されます。


配列


タスク1


任意の型の値のリストが配列に格納され、リストは比較的まれに更新され、リスト全体がはるかに頻繁に受信されます。

配列操作:



解決策1


 def serializeArray(array): """     . """ def deserializeArray(str): """     . """ class MCArray1(MemcacheObject): def __init__(self, mc, name): """ . @param name:   @type name: C{str} """ super(MCArray1, self).__init__(mc) self.lock = MCLock(name) self.key = 'array' + name def fetch(self): """    . @return:  @rtype: C{list} """ try: return deserializeArray(self.mc.get(self.key)) except KeyError: return [] def change(self, add_elems=[], delete_elems=[]): """   ,      . @param add_elems: ,    @type add_elems: C{list} @param delete_elems: ,    @type delete_elems: C{list} """ while not self.lock.try_lock(): pass try: try: array = deserializeArray(self.mc.get(self.key)) except KeyError: array = [] array = filter(lambda e: e not in delete_elems, array) + add_elems self.mc.set(self.key, serializeArray(array), 0) finally: self.lock.unlock() 


ディスカッション1


上記の解決方法は、実際には配列とは関係ありませんが、どのデータ構造にも適用できます。 リーダーが多く、ライターが比較的少ない場合のリーダーライターモデルに基づいています。 fetchメソッドを使用するリーダーはいつでも配列のコンテンツを取得します。「ライター」が1つのmemcachedコマンドでコンテンツを書き込むことは重要です。つまり、memcachedのgetおよびset操作の内部アトミック性と、 fetch hangehange間の同期の欠如にもかかわらずfetch結果は常に一貫しています。次の変更の前後の値になります。 上記のMCLockロックを使用して、 MCLock同時にアレイを変更するのをブロックします。

この状況では、ロックの使用を避け、 change機能を使用してアトミックな変更を保証するために、memcachedプロトコルからgetscas 、およびaddコマンドを使用することgets addます。

タスク2


配列には、あるタイプの値のリストが格納されます。多くの場合、「配列に値を追加する」という形式の操作が発生します。 比較的まれに、配列全体がクエリされます。 実装を容易にするために、将来的に整数の配列が考慮されますが、データ型は問題を解決するために重要ではありません。

配列操作:



決定2


 def serializeInt(int): """       (str). """ def deserializeIntArray(str): """       . """ class MCArray2(MemcacheObject): def __init__(self, mc, name): """ . @param name:   @type name: C{str} """ super(MCArray2, self).__init__(mc) self.key = 'array' + name def fetch(self): """    . @return:  @rtype: C{list} """ try: return deserializeIntArray(self.mc.get(self.key)) except KeyError: return [] def add(self, element): """    . @param element: ,      @type element: C{int} """ element = serializeInt(element) while True: try: self.mc.append(self.key, element) except KeyError: return try: self.mc.add(self.key, element, 0) except KeyError: return 


ディスカッション2


この実装は、イベントログに対して同様のコードを実際に繰り返しますが、キーが1つしかないために単純化されています。 データ型「配列」の最初の実装と比較して、memcached操作の数が減り、配列を変更するすべてのプロセスを遅延なく実行できます(ロックなし)。 第1の実施形態と同様に、配列に要素を追加するときに重複の存在はチェックされない(アプリケーションに応じて、良いことも悪いこともあり得る)。

説明した例に対して、以下の改善(または拡張)が可能です。



テーブル


挑戦する


多くの行を保存する必要があります。 セットの操作:



このデータ構造は、目的の行をすばやく検索するテーブルと見なすことができます。 または、分散メモリに保存されたハッシュとして。

解決策


 def serializeArray(array): """     . """ def deserializeArray(str): """     . """ class MCTable(MemcacheObject): def __init__(self, mc, name): """ . @param name:   @type name: C{str} """ super(MCTable, self).__init__(mc) self.lock = MCLock(name) self.key = 'table' + name def has(self, key): """     . @param key:  @type key: C{str} @rtype: C{bool} """ try: self.mc.get(self.key + '_v_' + key) return True except KeyError: return False def fetch(self): """     . @return:   @rtype: C{list(str)} """ try: return deserializeArray(self.mc.get(self.key + '_keys')) except KeyError: pass def add(self, key): """    . @param key:  @type key: C{str} """ while not self.lock.try_lock(): pass try: try: array = deserializeArray(self.mc.get(self.key + '_keys')) except KeyError: array = [] if key not in array: array.append(key) self.mc.set(self.key + '_v_' + key, 1, 0) self.mc.set(self.key + '_keys', serializeArray(array), 0) finally: self.lock.unlock() def delete(self, key): """    .    add(). """ 


議論


一般的に、memcachedは巨大なハッシュテーブルですが、データ構造に必要な操作が1つありません。それは、キーのリストを取得することです。 したがって、テーブルの実装では、テーブルの各要素を格納するために個別のキーを使用し、すべての要素のリストを格納するために別のキーを使用します。 すべての要素のリストを格納する実装は、実際には「配列1」の実装と一致します。 すべての要素のリストへのアクセスをシリアル化するには、ロックが使用されますが、 fetchaddメソッドは互いに同期されません。 すべての要素のリストはアトミックに変更され、キーを読み取るときに常に一貫した状態が得られます。

テーブル内のキーのチェックは可能な限り迅速です。memcached内の対応するキーのチェックがチェックされます。 要素のリストの変更は常に、リスト全体を保存するキーと、各要素の個別のキー(検証のみに使用される)で同時に発生します。

上記の図に基づいて、各テーブル要素に関連する値が保存される場合、本格的なハッシュを実装できます。この値は、要素に対応する個別のキーにのみ書き込む必要があり、要素のリストには値が含まれません。

おわりに


そのため、この記事で説明する「トリック」または「トリック」のリストを次に示します。



この記事では、複数取得リクエストの使用など、memcachedに固有の最適化の問題については取り上げませんでした。 これは、ソースコードとストーリーが過負荷にならないように意図的に行われました。 多くの場合、上記の例は、Pythonでの理想的な実装の例としてではなく、擬似コードのようなものと考えるべきです。

間違いを見つけた場合、タスクに対してより明確で最適なソリューションを提供したい場合、他のデータ構造の実装を提供したい場合-コメントと批判を喜んでいたします。

Source: https://habr.com/ru/post/J50247/


All Articles