
この図は、Qtで実装された形式のMapReduceを示しています。
QFuture<T> QtConcurrent::mappedReduced(const Sequence &sequence, MapFunction mapFunction, ReduceFunction reduceFunction ) T QtConcurrent::blockingMappedReduced(const Sequence &sequence, MapFunction mapFunction, ReduceFunction reduceFunction )
職場の同僚はQt ConcurrentのMapReduceについて知らないことに直面しました。 ゲーテが言ったように:「私たちが理解していないこと、私たちは所有していない」 カットの下には、Map、Reduce、Fork –結合モデル、およびMapReduceを使用して簡単な問題を解決する例について少し説明します。
タスク
タスクはそのままインターネットから取得されました。
1,000,000,000個の要素を持つ配列の最大要素を検索するコンソールプログラムを作成します。
MapReduceは、高次関数mapおよびreduceで構成されています。 高階関数は、他の関数を引数として取る関数です。
地図
Mapはリストの各要素に関数を適用し、結果のリストを返します。 C ++では、これはstd :: transformで記述できます:
std::list<int> list{ 1, 2, 3, 4, 5, 6 }; std::list<int> newList(list.size(), 0); std::transform(list.begin(), list.end(),newList.begin(), [](int v){ return v*2; }); for(auto i: newList){ std::cout<<i<<" "; }
削減(累積)
ウィキペディアには定義があります。特定の関数を使用して、データ構造を単一のアトミック値に変換する高次関数です。 単純な場合、 reduceは多くの要素(リスト、ベクトルなど)を蓄積します。
C ++では、これはstd::for_each
および関数オブジェクトを介して記述できます。
struct Max{ Max():value(std::numeric_limits<int>::min()){ } void operator()(int val){ value = std::max(value, val); } int value; }; struct Sum{ Sum(): value(0){ } void operator()(int val){ value += val; } int value; };
フォーク–モデルに参加
MapReduceを使用して問題を解決する方法は明確ではない場合があります。 ここに見えるはずです、おそらくいくつかの理論がありますか? 分岐結合並列計算モデルがあります。 その中心には:
- 大きなタスクを小さなサブタスクに分割し、それらを並行して実行します(フォーク)。
- サブタスクのソリューションを最終結果に結合(結合)します。

モデルを示す写真( ウィキペディアから取得)。 最初の画像のようなもの。 QRのMapReduceは、フォーク結合モデルの実装です。
このようなタスクの場合、標準的な解決策は、ベクトルを取得し、それをいくつかのばらばらのセグメントに分割し、セグメント内の極大値を見つけ、最後に結果を結合することです。 std ::スレッドでは、次のようになります。
using DataSet = std::vector<int>; const size_t DATASET_SIZE = 1000000000; struct Task { size_t first; size_t last; DataSet& data; int localMaximum; }; using Tasks = std::vector<Task>; void max(Task& task) { int localMax = task.data[task.first]; for(size_t item = task.first; item < task.last; ++item) { localMax = std::max(localMax, task.data[item]); } task.localMaximum = localMax; } DataSet data(DATASET_SIZE);
タスクをサブタスクに分割すると、 QtConcurrent :: blockingMappedReducedを使用してコンパクトに書き込むことができます
using DataSet = std::vector<int>; const size_t DATASET_SIZE = 1000000000; struct Task { size_t first; size_t last; DataSet& data; }; int mapMax(const Task& task) { int localMax = task.data[task.first]; for(size_t item = task.first; item < task.last; ++item) { localMax = std::max(localMax, task.data[item]); } return localMax; } void reduceMax(int& a, const int& b) { a = std::max(a, b); } using Tasks = std::vector<Task>;
ここで注意すべきこと:
- Task構造にはlocalMaximumフィールドがありません;最大値はmapMax関数から返されます;
- reduceMax関数のシグネチャに対して、値の戻り値は最初の引数を介しています。
完全なサンプルコード #include <QtCore/QtDebug> #include <QtCore/QElapsedTimer> #include <QtCore/QCoreApplication> #include <QtConcurrent/QtConcurrent> #include <cstdlib> #include <thread> #include <vector> #include <algorithm> using DataSet = std::vector<int>; const size_t DATASET_SIZE = 1000000000; struct Task { size_t first; size_t last; DataSet& data; }; int mapMax(const Task& task) { int localMax = task.data[task.first]; for(size_t item = task.first; item < task.last; ++item) { localMax = std::max(localMax, task.data[item]); } return localMax; } void reduceMax(int& a, const int& b) { a = std::max(a, b); } using Tasks = std::vector<Task>; int main(int argc, char *argv[]) { std::srand(unsigned(std::time(0))); QCoreApplication a(argc, argv); DataSet data(DATASET_SIZE); for(size_t i = 0; i < data.size(); ++i) { data[i] = std::rand(); } QElapsedTimer timer; timer.start(); const auto threadCount = std::thread::hardware_concurrency(); const auto taskSize = data.size()/threadCount; Tasks tasks; size_t first = 0; size_t last = taskSize; for(size_t i = 0; i < threadCount; ++i) { tasks.push_back(Task{first, last, data}); first+=taskSize; last = std::min(last+taskSize, data.size()); } timer.start(); const auto Max = QtConcurrent::blockingMappedReduced(tasks, mapMax, reduceMax); qDebug() << "Maximum" << Max << "time" <<timer.elapsed() << "milliseconds"; return 0; }