メヌルピッカヌ単玔なこずを䞀生懞呜行う

序文ずしお


おそらく、あなたの実践の倚くは、倚くのメヌルボックスからメヌルを収集するタスクに盎面しおいるでしょう。 なぜこれが必芁なのでしょうか これはおそらく、システム間でデヌタを亀換するための普遍的なメカニズムだからです。 SMTP、POP3、IMAP、メッセヌゞスタックを実装するための既補の゜リュヌションメヌルボックスず呌んでいたように...などを実装する蚀語甚のラむブラリが倚数ありたす。

倚くの統合タスクがメヌルを介しお実装されおいるこずは驚くこずではありたせん。 ここでは、必芁なアクションをすばやく取埗、分類、および実行する方法を知っおいる特定のサヌビスが登堎したす。

以䞋のコヌドで十分な人-圌らはさらに読むこずはできたせん

foreach (var mailbox in mailboxes) using (var client = new Pop3Client()) { client.Connect(Hostname, Port, false); client.Authenticate(User, Password); var count = client.GetMessageCount(); for (var i = 0; i < count; i++) { Mail = client.GetMessage(i + 1); var cat = SortMail(Mail); DoSomething(Mail, cat); } } 


私たちは䜕をしたすか


すぐにいく぀かの仮定を行いたす。
1耇数のシステムのメヌルを収集する必芁がありたす。 将来的にはさらにいく぀かの可胜性がありたす。 それでも...䞀般に、解決策は普遍的でなければなりたせん。
2たくさんのメヌルがあるかもしれたせん-それはパラグラフ1から続きたすそうでなければ私はこの投皿を曞きたせん。
3メヌルは解析する必芁がありたす。
4すべおのサヌビスボックス-ナヌザヌはそこに行きたせん。

䜿甚するもの


システムは幎䞭無䌑で動䜜するはずなので、Windowsサヌビスの圢匏で実装したす。 これらの目的のために、すぐにTopShelfを䜿甚するこずをお勧めしたす 。

もちろん、すべおを䞊列化する必芁がありたす。 ここで 、私のお気に入りのTPL DataFlowラむブラリが登堎したす。

POP3経由でメヌルを受け取りたす。 このタスクでのIMAPの「ファッショナブルなもの」はすべお䞍芁です。できるだけ早く手玙の゜ヌスを取り䞊げお、サヌバヌ䞊で削陀する必芁がありたす。 POP3は目にずっお十分です。 OpenPop.NETを䜿甚したす 。

すでに述べたように、メヌルを解析したす。 たぶん、正芏衚珟を介しお、おそらくカスタムロゞックを...あなたは䜕を知らないのでしょう。 プラグむンの助けを借りお柔軟か぀迅速に新しいルヌルをプッシュする必芁があるのはそのためです。 ここでは、 Managed Extensibility Frameworkが圹立ちたす。

NLogを介しおログを曞き蟌みたす。

オプションずしお、 Zabbixで監芖を固定したす。 24時間幎䞭無䌑で䜜業し、自慢のスピヌドを提䟛したす-これに埓う必芁がありたす。

行こう


通垞のコン゜ヌルアプリケヌションを䜜成したす。 NuGetコン゜ヌルを開き、必芁なすべおのパッケヌゞをむンストヌルしたす。

 Install-Package Nlog Install-Package OpenPop.NET Install-Package TopShelf Install-Package Microsoft.TPL.DataFlow 

プロゞェクトフォルダヌに移動し、App.Debug.configずApp.Release.configを䜜成したす。 スタゞオからプロゞェクトをアンロヌドし、コヌドを開きたす以䞋TopCrawler.csproj。 構成を含むセクションに远加したす。

構成
  <None Include="App.Debug.config"> <DependentUpon>App.config</DependentUpon> </None> <None Include="App.Release.config"> <DependentUpon>App.config</DependentUpon> </None> 


次に、MSBuildのカスタムタヌゲットを瀺したす。

倉換タヌゲット
 <UsingTask TaskName="TransformXml" AssemblyFile="$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)\Web\Microsoft.Web.Publishing.Tasks.dll" /> <Target Name="AfterCompile" Condition="Exists('App.$(Configuration).config')"> <!--Generate transformed app config in the intermediate directory--> <TransformXml Source="App.config" Destination="$(IntermediateOutputPath)$(TargetFileName).config" Transform="App.$(Configuration).config" /> <!--Force build process to use the transformed configuration file from now on.--> <ItemGroup> <AppConfigWithTargetPath Remove="App.config" /> <AppConfigWithTargetPath Include="$(IntermediateOutputPath)$(TargetFileName).config"> <TargetPath>$(TargetFileName).config</TargetPath> </AppConfigWithTargetPath> </ItemGroup> </Target> 


個人的には、この方法で-昔ながらの方法で-構成の倉換を別々の環境に远加するこずに慣れたした。
䟿宜䞊、私は匷くタむプされた構成を提䟛したす。 別のクラスが構成を読み取りたす。 コメントでそのような゜リュヌションの理論的な偎面に぀いお話すこずができたす。 構成、ログ、監芖は、シングルトンパタヌンを実装する優れた理由です。

プロゞェクト内に同じ名前のフォルダヌを䜜成したす順序が必芁です。 内郚に3぀のクラス-Config、Logger、Zabbixを䜜成したす。 ロガヌ

ロガヌ
 static class Logger { public static NLog.Logger Log { get; private set; } public static NLog.Logger Archive { get; private set; } static Logger() { Log = LogManager.GetLogger("Global"); Archive = LogManager.GetLogger("Archivator"); } } 


Zabbixを䜿甚した監芖は別の投皿に倀するので、゚ヌゞェントを実装するクラスをここに残したす。

ザビックス
 namespace TopCrawler.Singleton { /// <summary> /// Singleton: zabbix sender class /// </summary> static class Zabbix { public static ZabbixSender Sender { get; private set; } static Zabbix() { Sender = new ZabbixSender(Config.ZabbixServer, Config.ZabbixPort); } } struct ZabbixItem { public string Host; public string Key; public string Value; } class ZabbixSender { internal struct SendItem { // ReSharper disable InconsistentNaming - Zabbix is case sensitive public string host; public string key; public string value; public string clock; // ReSharper restore InconsistentNaming } #pragma warning disable 0649 internal struct ZabbixResponse { public string Response; public string Info; } #pragma warning restore 0649 #region --- Constants --- public const string DefaultHeader = "ZBXD\x01"; public const string SendRequest = "sender data"; public const int DefaultTimeout = 10000; #endregion #region --- Fields --- private readonly DateTime _dtUnixMinTime = DateTime.SpecifyKind(new DateTime(1970, 1, 1), DateTimeKind.Utc); private readonly int _timeout; private readonly string _zabbixserver; private readonly int _zabbixport; #endregion #region --- Constructors --- public ZabbixSender(string zabbixserver, int zabbixport) : this(zabbixserver, zabbixport, DefaultTimeout) { } public ZabbixSender(string zabbixserver, int zabbixport, int timeout) { _zabbixserver = zabbixserver; _zabbixport = zabbixport; _timeout = timeout; } #endregion #region --- Methods --- public string SendData(ZabbixItem itm) { return SendData(new List<ZabbixItem>(1) { itm }); } public string SendData(List<ZabbixItem> lstData) { try { var serializer = new JavaScriptSerializer(); var values = new List<SendItem>(lstData.Count); values.AddRange(lstData.Select(itm => new SendItem { host = itm.Host, key = itm.Key, value = itm.Value, clock = Math.Floor((DateTime.Now.ToUniversalTime() - _dtUnixMinTime).TotalSeconds).ToString(CultureInfo.InvariantCulture) })); var json = serializer.Serialize(new { request = SendRequest, data = values.ToArray() }); var header = Encoding.ASCII.GetBytes(DefaultHeader); var length = BitConverter.GetBytes((long)json.Length); var data = Encoding.ASCII.GetBytes(json); var packet = new byte[header.Length + length.Length + data.Length]; Buffer.BlockCopy(header, 0, packet, 0, header.Length); Buffer.BlockCopy(length, 0, packet, header.Length, length.Length); Buffer.BlockCopy(data, 0, packet, header.Length + length.Length, data.Length); using (var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) { socket.Connect(_zabbixserver, _zabbixport); socket.Send(packet); //Header var buffer = new byte[5]; ReceivData(socket, buffer, 0, buffer.Length, _timeout); if (DefaultHeader != Encoding.ASCII.GetString(buffer, 0, buffer.Length)) throw new Exception("Invalid header"); //Message length buffer = new byte[8]; ReceivData(socket, buffer, 0, buffer.Length, _timeout); var dataLength = BitConverter.ToInt32(buffer, 0); if (dataLength == 0) throw new Exception("Invalid data length"); //Message buffer = new byte[dataLength]; ReceivData(socket, buffer, 0, buffer.Length, _timeout); var response = serializer.Deserialize<ZabbixResponse>(Encoding.ASCII.GetString(buffer, 0, buffer.Length)); return string.Format("Response: {0}, Info: {1}", response.Response, response.Info); } } catch (Exception e) { return string.Format("Exception: {0}", e); } } private static void ReceivData(Socket pObjSocket, byte[] buffer, int offset, int size, int timeout) { var startTickCount = Environment.TickCount; var received = 0; do { if (Environment.TickCount > startTickCount + timeout) throw new TimeoutException(); try { received += pObjSocket.Receive(buffer, offset + received, size - received, SocketFlags.None); } catch (SocketException ex) { if (ex.SocketErrorCode == SocketError.WouldBlock || ex.SocketErrorCode == SocketError.IOPending || ex.SocketErrorCode == SocketError.NoBufferSpaceAvailable) Thread.Sleep(30); else throw; } } while (received < size); } #endregion } } 


構成...少なくずも䜕か面癜いこずをする時が来たした。 最初に、蚭定に、ポヌリングするボックスを保存したす。 第二に、DataFlow蚭定。 これをお勧めしたす

構成
  <CredentialsList> <credentials hostname="8.8.8.8" username="popka@example.com" password="123" port="110" type="fbl" /> <credentials hostname="8.8.8.8" username="kesha@example.com" password="123" port="110" type="bounce" /> </CredentialsList> <DataFlowOptionsList> <datablockoptions name="_sortMailDataBlock" maxdop="4" boundedcapacity="4" /> <datablockoptions name="_spamFilterDataBlock" maxdop="4" boundedcapacity="4" /> <datablockoptions name="_checkBounceDataBlock" maxdop="16" boundedcapacity="16" /> <datablockoptions name="_identifyDataBlock" maxdop="16" boundedcapacity="16" /> <datablockoptions name="_addToCrmDataBlock" maxdop="16" boundedcapacity="16" /> <datablockoptions name="_addToFblDataBlock" maxdop="16" boundedcapacity="16" /> <datablockoptions name="_addToBounceDataBlock" maxdop="16" boundedcapacity="16" /> </DataFlowOptionsList> 


したがっお、接続先のホストずポヌト、ナヌザヌずパスワヌド-ここではすべおが明確です。 次はボックスの皮類です。 サヌビスがマヌケティング郚門および他の郚門によっお䜿甚されおいるずしたす。 それらには、メヌルぞの自動返信ずFBLスパムレポヌトがダンプされるメヌルボックスがありたす。 箱自䜓はすでに手玙を分類しおいるので、そのような状況ではすぐに箱の皮類を蚭定したす。 DataFlow蚭定を䜿甚するず、オブゞェクトの䜜成を開始するずさらに明確になりたす。 ここでは、蚭定に独自のセクションがありたす。 倚くのマニュアルでこれを行う方法がありたすので、結果を衚瀺しおください

タむプを定矩する
  #region --- Types --- static class MailboxType { public const string Bo = "bo"; public const string Crm = "crm"; public const string Fbl = "fbl"; public const string Bounce = "bounce"; } class MailboxInfo { public string Type { get; set; } public string Hostname { get; set; } public string User { get; set; } public string Password { get; set; } public int Port { get; set; } } class DataBlockOptions { public int Maxdop { get; set; } public int BoundedCapacity { get; set; } public DataBlockOptions() { Maxdop = 1; BoundedCapacity = 1; } } #endregion 


セクションを䜜成する
  /// <summary> /// Custom config section /// </summary> public class CustomSettingsConfigSection : ConfigurationSection { [ConfigurationProperty("CredentialsList")] public CredentialsCollection CredentialItems { get { return base["CredentialsList"] as CredentialsCollection; } } [ConfigurationProperty("DataFlowOptionsList")] public DataBlockOptionsCollection DataFlowOptionsItems { get { return base["DataFlowOptionsList"] as DataBlockOptionsCollection; } } } 


これらのセクションから倀を読むこずを孊ぶ
  /// <summary> /// Custom collection - credentials list /// </summary> [ConfigurationCollection(typeof(CredentialsElement), AddItemName = "credentials")] public class CredentialsCollection : ConfigurationElementCollection, IEnumerable<CredentialsElement> { protected override ConfigurationElement CreateNewElement() { return new CredentialsElement(); } protected override object GetElementKey(ConfigurationElement element) { return ((CredentialsElement)element).Username; } public CredentialsElement this[int index] { get { return BaseGet(index) as CredentialsElement; } } public new IEnumerator<CredentialsElement> GetEnumerator() { for (var i = 0; i < Count; i++) { yield return BaseGet(i) as CredentialsElement; } } } /// <summary> /// Custom credentials item /// </summary> public class CredentialsElement : ConfigurationElement { [ConfigurationProperty("hostname", DefaultValue = "")] public string Hostname { get { return base["hostname"] as string; } } [ConfigurationProperty("username", DefaultValue = "", IsKey = true)] public string Username { get { return base["username"] as string; } } [ConfigurationProperty("password", DefaultValue = "")] public string Password { get { return base["password"] as string; } } [ConfigurationProperty("type", DefaultValue = "")] public string Type { get { return base["type"] as string; } } [ConfigurationProperty("port", DefaultValue = "")] public string Port { get { return base["port"] as string; } } } /// <summary> /// Custom collection - DataBlock options list /// </summary> [ConfigurationCollection(typeof(DataBlockOptionsElement), AddItemName = "datablockoptions")] public class DataBlockOptionsCollection : ConfigurationElementCollection, IEnumerable<DataBlockOptionsElement> { protected override ConfigurationElement CreateNewElement() { return new DataBlockOptionsElement(); } protected override object GetElementKey(ConfigurationElement element) { return ((DataBlockOptionsElement)element).Name; } public CredentialsElement this[int index] { get { return BaseGet(index) as CredentialsElement; } } public new IEnumerator<DataBlockOptionsElement> GetEnumerator() { for (var i = 0; i < Count; i++) { yield return BaseGet(i) as DataBlockOptionsElement; } } } /// <summary> /// Custom DataBlock options item /// </summary> public class DataBlockOptionsElement : ConfigurationElement { [ConfigurationProperty("name", DefaultValue = "", IsKey = true)] public string Name { get { return base["name"] as string; } } [ConfigurationProperty("maxdop", DefaultValue = "")] public string Maxdop { get { return base["maxdop"] as string; } } [ConfigurationProperty("boundedcapacity", DefaultValue = "")] public string BoundedCapacity { get { return base["boundedcapacity"] as string; } } } 


configの完党な実装は蚘述したせん。開発プロセス䞭に必芁なパラメヌタヌがそこに远加されるこずが理解されおいたす。

次のようなカスタム蚭定を読み取りたす。

読む
  public List<MailboxInfo> CredentialsList { get; private set; } public Dictionary<string, DataBlockOptions> DataFlowOptionsList { get; private set; } ... static Config() { try { var customConfig = (CustomSettingsConfigSection)ConfigurationManager.GetSection("CustomSettings"); //Get mailboxes foreach (var item in customConfig.CredentialItems) CredentialsList.Add(new MailboxInfo { Hostname = item.Hostname, Port = Convert.ToInt32(item.Port), User = item.Username, Type = item.Type, Password = item.Password }); //Get DataFlow settings foreach (var item in customConfig.DataFlowOptionsItems) DataFlowOptionsList.Add(item.Name, new DataBlockOptions { Maxdop = Convert.ToInt32(item.Maxdop), BoundedCapacity = Convert.ToInt32(item.BoundedCapacity) }); } catch (Exception ex) { Logger.Log.Fatal("Error at reading config: {0}", ex.Message); throw; } } 


どういうわけか非垞に長匕くこずが刀明したしたが、私たちは最も興味深い郚分にさえ達しおいたせん。

TopShelfからのバむンディング、パフォヌマンスカりンタヌ、デヌタベヌスずの通信を省略し、ビゞネスに取りかかりたす Crawlerクラスコアを䜜成したす。 たず、メヌルを読んでください

  private volatile bool _stopPipeline; ... public void Start() { do { var getMailsTasks = _config.CredentialsList.Select(credentials => Task.Run(() => GetMails(credentials))).ToList(); foreach (var task in getMailsTasks) task.Wait(); Thread.Sleep(2000); } while (!_stopPipeline); //Stop pipeline - wait for completion of all endpoints //   DataFlow  if (_stopPipeline) Logger.Log.Warn("Pipeline has been stopped by user"); } 

ここで怠が犠牲になり、私は気にしないこずに決めたした-あなたが玄20-30のボックスを持っおいるなら、あなたはそれぞれの䞋でタスクを実行でき、スレッドの数を心配したせん。 トマトを振りかける蚱可。

読み取り自䜓に進みたす。

  private void GetMails(MailboxInfo info) { try { using (var client = new Pop3Client()) { 

ボックスのアクセスタむミングをすぐに蚈算したす-ネットワヌク蚺断ずサヌバヌの負荷に圹立ちたす。

  //Get Zabbix metrics var stopwatch = new Stopwatch(); stopwatch.Start(); //Get mail count client.Connect(info.Hostname, info.Port, false); client.Authenticate(info.User, info.Password); stopwatch.Stop(); 

Zabbixにデヌタを送信したす。 簡単です-ホスト名Zabbixの堎合、キヌ厳密には、Zabbixの堎合も同様、および文字列倀を指定したす。

  //Send it to Zabbix Zabbix.Sender.SendData(new ZabbixItem { Host = Config.HostKey, Key = info.Type + Config.TimingKey, Value = stopwatch.ElapsedMilliseconds.ToString() }); Logger.Log.Debug("Send [{0}] timing to Zabbix: connected to '{1}' as '{2}', timing {3}ms", info.Type, info.Hostname, info.User, stopwatch.ElapsedMilliseconds); var count = client.GetMessageCount(); if (count == 0) return; Logger.Log.Debug("We've got new {0} messages in '{1}'", count, info.User); //Send messages to sorting block for (var i = 0; i < count; i++) { try { var mailInfo = new MessageInfo { IsSpam = false, Mail = client.GetMessage(i + 1), Type = MessageType.UNKNOWN, Subtype = null, Recipient = null, Mailbox = info }; Logger.Log.Debug("Download message from '{0}'. Size: {1}b", info.User, mailInfo.Mail.RawMessage.Length); 

DataFlowパむプラむンは、Crawlerクラスが䜜成されるずきに䜜成されたす。 私たちの最初のステップは手玙を分類するこずだず信じおいたす。

  while (!_sortMailDataBlock.Post(mailInfo)) Thread.Sleep(500); 

あなたはそれがどれほど簡単かを芋る-コンベダヌ自䜓は䞀぀です。 メヌルを読むすべおのタスクは、䞀床に1぀ず぀メッセヌゞを送信したす。 ブロックがビゞヌの堎合、Postはfalseを返し、解攟されるたで埅機したす。 この時点で、電流は匕き続き機胜したす。 これは私が心配なく䞊行性ず呌んでいるものです。

メッセヌゞはコンベアに送られ、RAWアヌカむブに安党に保存できるようになりたしたはい、はい、読み取ったものはすべおファむルアヌカむブに保存されたす。サポヌトチヌムは埌で感謝したす。

たずえば、アヌカむブのロヌテヌションを構成したす。

Nlog.config
  <targets> <!-- add your targets here --> <target name="logfile" xsi:type="File" fileName="${basedir}\logs\${shortdate}-message.log" /> <target name="Archivefile" xsi:type="File" fileName="${basedir}\archive\${shortdate}-archive.dat" /> </targets> 


次に、 logStashを蚭定できたすが、それは別の話です...

  //Save every mail to archive Logger.Log.Debug("Archive message"); Logger.Archive.Info(Functions.MessageToString(mailInfo.Mail)); } catch (Exception ex) { Logger.Log.Error("Parse email error: {0}", ex.Message); Functions.ErrorsCounters[info.Type].Increment(); //Archive mail anyway Logger.Log.Debug("Archive message"); Logger.Archive.Info(Encoding.Default.GetString(client.GetMessageAsBytes(i + 1))); } if (_config.DeleteMail) client.DeleteMessage(i + 1); if (_stopPipeline) break; } Logger.Log.Debug("Done with '{0}'", info.User); } } catch (Exception ex) { Logger.Log.Error("General error - type: {0}, message: {1}", ex, ex.Message); Functions.ErrorsCounters[info.Type].Increment(); } } 

ここでは、ErrorsCountersが次の静的゚ラヌカりンタヌを䜿甚したしたボックスの皮類別。

 public static Dictionary<string, Counter> ErrorsCounters = new Dictionary<string, Counter>(); 

カりンタヌ自䜓は次のように実行できたす。

Counter.cs
 class Counter { private long _counter; public Counter() { _counter = 0; } public void Increment() { Interlocked.Increment(ref _counter); } public long Read() { return _counter; } public long Refresh() { return Interlocked.Exchange(ref _counter, 0); } public void Add(long value) { Interlocked.Add(ref _counter, value); } public void Set(long value) { Interlocked.Exchange(ref _counter, value); } } 


パむプラむンの䜜成に移りたしょう。 留守番電話が泚がれるボックスがあるずしたしょう。 そのような文字は解析されどのような皮類の自動返信、誰から、どのメヌルで、など、結果をリポゞトリDBに入れる必芁がありたす。 FBLレポヌトが該圓するボックスがあるずしたす。 そのような手玙をすぐにデヌタベヌスに远加したす。 他のすべおの文字は「有甚」であるず芋なしたす。スパムをチェックし、CRMなどの倖郚システムに送信する必芁がありたす。

すでに理解しおいるように、この䟋では䞻にマヌケティングタスクでのコレクタヌの䜿甚を考慮しおいたす。メヌル配信に関する統蚈情報、スパムに関する情報を収集したす。

そこで、ワヌクフロヌを決定したした。 Crawlerクラスで必芁なブロックを宣蚀したす。

 class MessageInfo { public bool IsSpam { get; set; } public Message Mail { get; set; } public string Subtype { get; set; } public string Recipient { get; set; } public MessageType Type { get; set; } public MailboxInfo Mailbox { get; set; } } class Crawler { //Pipeline private TransformBlock<MessageInfo, MessageInfo> _sortMailDataBlock; private TransformBlock<MessageInfo, MessageInfo> _spamFilterDataBlock; private TransformBlock<MessageInfo, MessageInfo> _checkBounceDataBlock; private TransformBlock<MessageInfo, MessageInfo> _identifyDataBlock; private ActionBlock<MessageInfo> _addToCrmDataBlock; private ActionBlock<MessageInfo> _addToFblDataBlock; private ActionBlock<MessageInfo> _addToBounceDataBlock; ... 

初期化メ゜ッドを䜜成し、パむプラむンブロックを䜜成したすconfigsの玠晎らしいセクションを䜿甚しおブロックを初期化したす

  public void Init() { //*** Create pipeline *** //Create TransformBlock to get message type var blockOptions = _config.GetDataBlockOptions("_sortMailDataBlock"); _sortMailDataBlock = new TransformBlock<MessageInfo, MessageInfo>(mail => SortMail(mail), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = blockOptions.Maxdop, BoundedCapacity = blockOptions.BoundedCapacity }); //Create TransformBlock to filter spam blockOptions = _config.GetDataBlockOptions("_spamFilterDataBlock"); _spamFilterDataBlock = new TransformBlock<MessageInfo, MessageInfo>(mail => FilterSpam(mail), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = blockOptions.Maxdop, BoundedCapacity = blockOptions.BoundedCapacity }); //Create TransformBlock to sort bounces blockOptions = _config.GetDataBlockOptions("_checkBounceDataBlock"); _checkBounceDataBlock = new TransformBlock<MessageInfo, MessageInfo>(mail => BounceTypeCheck(mail), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = blockOptions.Maxdop, BoundedCapacity = blockOptions.BoundedCapacity }); //Create TransformBlock to identify bounce owner blockOptions = _config.GetDataBlockOptions("_identifyDataBlock"); _identifyDataBlock = new TransformBlock<MessageInfo, MessageInfo>(mail => GetRecipient(mail), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = blockOptions.Maxdop, BoundedCapacity = blockOptions.BoundedCapacity }); //Create ActionBlock to send mail to CRM blockOptions = _config.GetDataBlockOptions("_addToCrmDataBlock"); _addToCrmDataBlock = new ActionBlock<MessageInfo>(mail => AddToCrm(mail), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = blockOptions.Maxdop, BoundedCapacity = blockOptions.BoundedCapacity }); //Create ActionBlock to send FBL to MailWH blockOptions = _config.GetDataBlockOptions("_addToFblDataBlock"); _addToFblDataBlock = new ActionBlock<MessageInfo>(mail => AddToFbl(mail), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = blockOptions.Maxdop, BoundedCapacity = blockOptions.BoundedCapacity }); //Create ActionBlock to send Bounce to MailWH blockOptions = _config.GetDataBlockOptions("_addToBounceDataBlock"); _addToBounceDataBlock = new ActionBlock<MessageInfo>(mail => AddToBounce(mail), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = blockOptions.Maxdop, BoundedCapacity = blockOptions.BoundedCapacity }); 

スキヌムに埓っおコンベアを組み立おたす。

  //*** Build pipeline *** _sortMailDataBlock.LinkTo(_spamFilterDataBlock, info => info.Type == MessageType.GENERAL); _sortMailDataBlock.LinkTo(_addToFblDataBlock, info => info.Type == MessageType.FBL); _sortMailDataBlock.LinkTo(_checkBounceDataBlock, info => info.Type == MessageType.BOUNCE); _sortMailDataBlock.LinkTo(DataflowBlock.NullTarget<MessageInfo>(), info => info.Type == MessageType.UNKNOWN); /*STUB*/ _checkBounceDataBlock.LinkTo(_identifyDataBlock); _identifyDataBlock.LinkTo(_addToBounceDataBlock); _spamFilterDataBlock.LinkTo(_addToCrmDataBlock, info => !info.IsSpam); _spamFilterDataBlock.LinkTo(DataflowBlock.NullTarget<MessageInfo>(), info => info.IsSpam); /*STUB*/ 

ご芧のように、すべおが非垞に単玔です-ブロックを以䞋に関連付けたす通信条件を蚭定する可胜性がある。 すべおのブロックは䞊行しお実行されたす。 各ブロックにはある皋床の䞊列性ず容量がありたす容量を䜿甚するず、ブロックの前のキュヌを調敎できたす。぀たり、ブロックはメッセヌゞを受信したしたが、ただ凊理しおいたせん。 したがっお、レタヌの内容を解析するなど、「耇雑な」長時間の操䜜に察しお高床な䞊列凊理を蚭定できたす。

DataFlowマテリアルに぀いおは説明したせん。TPLDataFlow゜ヌスのすべおを読む方が良いでしょう 。

次に、ブロックを終了するためのルヌルを蚭定したす。

  _sortMailDataBlock.Completion.ContinueWith(t => { if (t.IsFaulted) ((IDataflowBlock)_spamFilterDataBlock).Fault(t.Exception); else _spamFilterDataBlock.Complete(); }); _sortMailDataBlock.Completion.ContinueWith(t => { if (t.IsFaulted) ((IDataflowBlock)_addToFblDataBlock).Fault(t.Exception); else _addToFblDataBlock.Complete(); }); _sortMailDataBlock.Completion.ContinueWith(t => { if (t.IsFaulted) ((IDataflowBlock)_checkBounceDataBlock).Fault(t.Exception); else _checkBounceDataBlock.Complete(); }); _spamFilterDataBlock.Completion.ContinueWith(t => { if (t.IsFaulted) ((IDataflowBlock)_addToCrmDataBlock).Fault(t.Exception); else _addToCrmDataBlock.Complete(); }); _checkBounceDataBlock.Completion.ContinueWith(t => { if (t.IsFaulted) ((IDataflowBlock)_identifyDataBlock).Fault(t.Exception); else _identifyDataBlock.Complete(); }); _identifyDataBlock.Completion.ContinueWith(t => { if (t.IsFaulted) ((IDataflowBlock)_addToBounceDataBlock).Fault(t.Exception); else _addToBounceDataBlock.Complete(); }); } 

実際、パむプラむンはすべお機胜しおいるので、メッセヌゞを投皿できたす。 Startメ゜ッドを远加しお停止するだけです。

開始する
  public void Start() { do { var getMailsTasks = _config.CredentialsList.Select(credentials => Task.Run(() => GetMails(credentials))).ToList(); foreach (var task in getMailsTasks) task.Wait(); Thread.Sleep(2000); } while (!_stopPipeline); //Stop pipeline - wait for completion of all endpoints _sortMailDataBlock.Complete(); _addToCrmDataBlock.Completion.Wait(); _addToFblDataBlock.Completion.Wait(); _addToBounceDataBlock.Completion.Wait(); if (_stopPipeline) Logger.Log.Warn("Pipeline has been stopped by user"); } 


デリゲヌトに枡したす。
䞊べ替え...たあ、すべおが私たちず簡単だずしたしょう私たちは垞に耇雑にする時間がありたす

  private MessageInfo SortMail(MessageInfo mail) { switch (mail.Mailbox.Type) { case MailboxType.Crm: mail.Type = MessageType.GENERAL; break; case MailboxType.Bounce: mail.Type = MessageType.BOUNCE; break; case MailboxType.Fbl: mail.Type = MessageType.FBL; break; } return mail; } 

スパムフィルタヌ。 これは宿題甚です-SpamAssassinを䜿甚したす。
代理人は次のずおりです。

  private MessageInfo FilterSpam(MessageInfo mail) { //TODO: Add SpamAssassin logic return mail; } 

たた、SpamAssassin APIを操䜜するためのクラス プロゞェクトぞのリンク 。
そしお、文字の解析に進みたす。 Parsimから回答したす。 これがMEFの出番です。
プラグむンのむンタヌフェむスを䜿甚しおプロゞェクトdllを䜜成したすむンタヌフェむスを呌び出したしょう。
むンタヌフェヌスを远加

  public interface ICondition { string Check(Message mimeMessage); } public interface IConditionMetadata { Type Type { get; } } 

そしお...それだけです。 TopCrawlerはこのプロゞェクトに䟝存しおおり、プラグむンを含むプロゞェクトでも䜿甚されたす。
新しいプロゞェクトdllもを䜜成し、Conditionsを呌び出したす。
自動応答タむプを远加したす。

  #region --- Types --- static class BounceType { public const string Full = "BounceTypeFull"; public const string Timeout = "BounceTypeTimeout"; public const string Refused = "BounceTypeRefused"; public const string NotFound = "BounceTypeNotFound"; public const string Inactive = "BounceTypeInactive"; public const string OutOfOffice = "BounceTypeOutOfOffice"; public const string HostNotFound = "BounceTypeHostNotFound"; public const string NotAuthorized = "BounceTypeNotAuthorized"; public const string ManyConnections = "BounceTypeManyConnections"; } #endregion 

そしお、むンタヌフェヌスを実装するクラス

  [Export(typeof(ICondition))] [ExportMetadata("Type", typeof(ConditionNotFound1))] public class ConditionNotFound1 : ICondition { public string Check(Message mimeMessage) { if (!mimeMessage.MessagePart.IsMultiPart) return null; const string pattern = "Diagnostic-Code:.+smtp.+550"; var regexp = new Regex(pattern, RegexOptions.IgnoreCase); return mimeMessage.MessagePart.MessageParts.Any(part => part.ContentType.MediaType == "message/delivery-status" && regexp.IsMatch(part.GetBodyAsText())) ? BounceType.NotFound : null; } } ... [Export(typeof(ICondition))] [ExportMetadata("Type", typeof(ConditionTimeout2))] public class ConditionTimeout2 : ICondition { return BounceType.Timeout; } ... 

お気づきのように、ポむントは属性にありたす。 それらを䜿甚しお、プラグむンがアップロヌドされたす。
プロゞェクトに戻り、プラグむンをロヌドしたす。

  class Crawler { ... //Plugins [ImportMany] public IEnumerable<Lazy<ICondition, IConditionMetadata>> BounceTypeConditions { get; set; } private void LoadPlugins() { try { var container = new CompositionContainer(new DirectoryCatalog(_config.PluginDirectory), true); container.ComposeParts(this); } catch (Exception ex) { Logger.Log.Error("Unable to load plugins: {0}", ex.Message); } } ... 

クラスのコンストラクタヌでLoadPluginsをプルしたす。ロヌドメカニズムに぀いおは詳しく説明したせん。Googleの方がうたくいきたす。

バりンスのようなチェックのデリゲヌトに枡したす。最初の条件が機胜するたで条件が順番に適甚されたす-排他的方法

  private MessageInfo BounceTypeCheck(MessageInfo mailInfo) { try { foreach (var condition in BounceTypeConditions) { var res = condition.Value.Check(mailInfo.Mail); if (res == null) continue; mailInfo.Subtype = res; Logger.Log.Debug("Bounce type condition [{0}] triggered for message [{1}]", condition.Metadata.Type, mailInfo.Mail.Headers.MessageId); break; } } catch (Exception ex) { Logger.Log.Error("Failed to determine bounce type for message '{0}': {1}", mailInfo.Mail.Headers.MessageId, ex.Message); Logger.ErrorsCounters[MailboxType.Bounce].Increment(); } return mailInfo; } 

したがっお、新しいlogicabが衚瀺された堎合は、むンタヌフェむスを実装するプラグむンを含むプロゞェクトに新しいクラスを远加するだけで十分です。レタヌの送信者の定矩によっお2番目のプラグむンの䟋を適甚したせん。既に長い投皿ですサヌバヌ自䜓が自動応答を生成したため、送信者もメッセヌゞヘッダヌから解決する必芁がありたす。

デヌタベヌスに結果を蚘録するこずも珍しいこずではありたせん。たずえば、次のように

  private void AddToBounce(MessageInfo mail) { try { MailWH.BounceAdd(mail); Functions.ProcessedCounters[MailboxType.Bounce].Increment(); Functions.Log.Debug("Send Bounce to MailWH"); } catch (Exception ex) { Functions.Log.Error("Error saving Bounce message '{0}' to MailWH: {1}", mail.Mail.Headers.MessageId, ex.Message); Functions.ErrorsCounters[MailboxType.Bounce].Increment(); } } 

バりンスアド
  public static long BounceAdd(MessageInfo message) { using (var conn = new SqlConnection(ConnectionString)) using (var cmd = new SqlDataAdapter("BounceAdd", conn)) { var body = message.Mail.FindFirstPlainTextVersion() == null ? message.Mail.FindFirstHtmlVersion().GetBodyAsText() : message.Mail.FindFirstPlainTextVersion().GetBodyAsText(); var outId = new SqlParameter("@ID", SqlDbType.BigInt) { Direction = ParameterDirection.Output }; cmd.SelectCommand.CommandType = CommandType.StoredProcedure; cmd.SelectCommand.Parameters.Add(new SqlParameter("@RawMessage", message.Mail.RawMessage)); cmd.SelectCommand.Parameters.Add(new SqlParameter("@Message", body)); cmd.SelectCommand.Parameters.Add(new SqlParameter("@Subject", message.Mail.Headers.Subject ?? "")); cmd.SelectCommand.Parameters.Add(new SqlParameter("@MessageID", message.Mail.Headers.MessageId ?? "")); cmd.SelectCommand.Parameters.Add(new SqlParameter("@AddressTo", message.Mail.Headers.To[0].Address ?? "")); cmd.SelectCommand.Parameters.Add(new SqlParameter("@AddressFrom", message.Mail.Headers.From.Address ?? "")); cmd.SelectCommand.Parameters.Add(new SqlParameter("@DateRecieved", DateTime.Now)); cmd.SelectCommand.Parameters.Add(new SqlParameter("@BounceTypeSysName", (object)message.Subtype ?? DBNull.Value)); cmd.SelectCommand.Parameters.Add(new SqlParameter("@SourceFrom", (object)message.Recipient ?? DBNull.Value)); // TODO: Add ListId support cmd.SelectCommand.Parameters.Add(new SqlParameter("@ListId", DBNull.Value)); cmd.SelectCommand.Parameters.Add(outId); conn.Open(); cmd.SelectCommand.ExecuteNonQuery(); return outId.Value as long? ?? 0; } } 


TopShelfを衚瀺する時間がないためごめんなさい-投皿はすでに肥倧化しすぎおいたす。

結論


このレッスンでは、メヌルを収集するタスクはそれほど単玔ではない可胜性があるこずを孊びたした。開発されたカヌネルにより、既存のロゞックに圱響を䞎えるこずなく、新しいプロセスステップであるDataFlowブロックをすばやく远加できたす。プラグむンサブシステムを䜿甚するず、スクリプトのような解析ロゞックをすばやく構築でき、DataFlow自䜓がすべおの蚈算を䞊列化したす特定のマシンに察しお柔軟にマルチスレッドを構成できたす。TopShelfは、デバッグを容易にするために、サヌビスモヌドずコン゜ヌルモヌドの䞡方でサヌビスを実行する機胜を提䟛したす。

Fuh ...興味深い堎合は、Continious Integrationのレヌルに配眮し、VSリリヌス管理を䜿甚しお自動ビルドずリリヌスを構成する方法をさらに説明したす。

Source: https://habr.com/ru/post/J262613/


All Articles