рд╕реНрдЯреЙрд░реНрдо рдлреНрд░реЗрдорд╡рд░реНрдХ рд╕реАрдЦрдирд╛ред рднрд╛рдЧ II

рдкрд╣рд▓реЗ рднрд╛рдЧ рдиреЗ рд╕реНрдЯреЙрд░реНрдо рдХреА рдореВрд▓ рдЕрд╡рдзрд╛рд░рдгрд╛рдУрдВ рдХреА рдЬрд╛рдВрдЪ рдХреАред

рдХрд╛рд░реНрдпреЛрдВ рдХреА рд╡рд┐рднрд┐рдиреНрди рдХрдХреНрд╖рд╛рдПрдВ рдЕрд▓рдЧ-рдЕрд▓рдЧ рд╡рд┐рд╢реНрд╡рд╕рдиреАрдпрддрд╛ рдЖрд╡рд╢реНрдпрдХрддрд╛рдУрдВ рдХреЛ рдкреНрд░рд╕реНрддреБрдд рдХрд░рддреА рд╣реИрдВред рдпрд╛рддреНрд░рд╛рдУрдВ рдХреЗ рдЖрдБрдХрдбрд╝реЛрдВ рдХреА рдЧрдгрдирд╛ рдХрд░рддреЗ рд╕рдордп рдПрдХ-рджреЛ рд░рд┐рдХреЙрд░реНрдб рдорд┐рд╕ рдХрд░рдирд╛ рдПрдХ рдмрд╛рдд рд╣реИ, рдЬрд╣рд╛рдБ рдЦрд╛рддрд╛ рд╕реИрдХрдбрд╝реЛрдВ-рд╣рдЬрд╛рд░реЛрдВ рдореЗрдВ рдЬрд╛рддрд╛ рд╣реИ рдФрд░ рд╡рд┐рд╢реЗрд╖ рд╕рдЯреАрдХрддрд╛ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рдирд╣реАрдВ рд╣реЛрддреА рд╣реИред рдФрд░ рдХрд╛рдлреА рдЕрдиреНрдп - рдЦреЛрдиреЗ рдХреЗ рд▓рд┐рдП, рдЙрджрд╛рд╣рд░рдг рдХреЗ рд▓рд┐рдП, рдЧреНрд░рд╛рд╣рдХ рдХреЗ рднреБрдЧрддрд╛рди рдХреЗ рдмрд╛рд░реЗ рдореЗрдВ рдЬрд╛рдирдХрд╛рд░реАред

рдЕрдЧрд▓рд╛, рд╣рдо рд╕реНрдЯреЙрд░реНрдо рдореЗрдВ рд▓рд╛рдЧреВ рдбреЗрдЯрд╛ рд╣рд╛рдирд┐ рд╕реБрд░рдХреНрд╖рд╛ рддрдВрддреНрд░ рдХреЛ рджреЗрдЦреЗрдВрдЧреЗред

рдореВрд▓ рдЙрджрд╛рд╣рд░рдг


рдЯреЛрдВрдЯреА

рдЕрдЧрд░ рдпрд╣ рд╣рдорд╛рд░реЗ рд▓рд┐рдП рдорд╣рддреНрд╡рдкреВрд░реНрдг рдирд╣реАрдВ рд╣реИ рдХрд┐ рдХреНрдпрд╛ рдЯрдкрд▓ рдХреЛ рд╕рдВрд╕рд╛рдзрд┐рдд рдХрд░рддреЗ рд╕рдордп рддреНрд░реБрдЯрд┐рдпрд╛рдВ рдереАрдВ, рддреЛ рдЯреЛрдВрдЯреА рдиреЗ рдЯреЗрдЯрд▓ рдХреЛ рд╕реНрдкрд╛рдЙрдЯ рдСрдЙрдЯрдкреБрдЯрдХреЙрд▓рд┐рдХреНрдЯрд░ рдХреЛ рдПрдорд┐рдЯ (рдирдИ рд╡реИрд▓реНрдпреВ (...)) рд╡рд┐рдзрд┐ рд╕реЗ рдХреЙрд▓ рдХрд░рдХреЗ рднреЗрдЬрд╛ред

рдпрджрд┐ рд╣рдо рдпрд╣ рдкрддрд╛ рд▓рдЧрд╛рдирд╛ рдЪрд╛рд╣рддреЗ рд╣реИрдВ рдХрд┐ рдХреНрдпрд╛ рдЯреБрдкрд▓ рдХреЛ рд╕рдлрд▓рддрд╛рдкреВрд░реНрд╡рдХ рд╕рдВрд╕рд╛рдзрд┐рдд рдХрд┐рдпрд╛ рдЧрдпрд╛ рдерд╛, рддреЛ рдХреЙрд▓ рдПрдорд┐рдЯ (рдирдпрд╛ рдорд╛рди (...), msgId) рдХреА рддрд░рд╣ рджрд┐рдЦрд╛рдИ рджреЗрдЧрд╛, рдЬрд╣рд╛рдВ msgId рдПрдХ рдордирдорд╛рдирд╛ рд╡рд░реНрдЧ рдХрд╛ рдПрдХ рдСрдмреНрдЬреЗрдХреНрдЯ рд╣реИред рдЗрд╕ рд╕реНрдерд┐рддрд┐ рдореЗрдВ, ISpout рдЗрдВрдЯрд░рдлрд╝реЗрд╕ рддрд░реАрдХреЗ рдкреНрд░рджрд╛рди рдХрд░рддрд╛ рд╣реИ:
рдЬрд╣рд╛рдБ msgId msgId рд╣реИ рдЬрд┐рд╕рдХреЗ рд╕рд╛рде SpoutOutputCollector.emit рдХрд╣рд╛ рдЬрд╛рддрд╛ рдерд╛ред
FailAwareSpout рдЙрджрд╛рд╣рд░рдг:
public class FailAwareSpout extends BaseRichSpout { private Message[] messages; // Skipped ... private static class Message implements Serializable { private String message; private int failCount; private Message(String message) { this.message = message; } } // Skipped ... @Override public void nextTuple() { // Skipped ... //  Tuple c msgId outputCollector.emit(new Values(messages[messageId].message), messageId); } // Tuple   @Override public void ack(Object msgId) { Message m = messages[(Integer) msgId]; System.out.println("IN>> [" + Thread.currentThread().getId() + "] message " + m.message + " processed successfully"); } // Tuple   @Override public void fail(Object msgId) { Message m = messages[(Integer) msgId]; if(++m.failCount > MAX_RETRY_COUNT) { throw new IllegalStateException("Too many message processing errors"); } System.out.println("IN>> [" + Thread.currentThread().getId() + "] message " + m.message + " processing failed " + "[" + m.failCount + "]"); //       sendQueue.addLast((Integer) msgId); } } 

рдЕрдЧрд▓реЗ рдереБрдВрдХрд▓, рдПрдХ, рдФрд░ рдЕрд╕рдлрд▓ рд╡рд┐рдзрд┐рдпреЛрдВ рдХреЛ рдПрдХ рд╣реА рдереНрд░реЗрдб рдкрд░ рдХреЙрд▓ рдХрд┐рдпрд╛ рдЬрд╛рддрд╛ рд╣реИ рдФрд░ рдЯреЛрдВрдЯреА рдлрд╝реАрд▓реНрдбреНрд╕ рддрдХ рдкрд╣реБрдВрдЪрдиреЗ рдкрд░ рдЕрддрд┐рд░рд┐рдХреНрдд рд╕рд┐рдВрдХреНрд░рдирд╛рдЗрдЬрд╝реЗрд╢рди рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рдирд╣реАрдВ рд╣реЛрддреА рд╣реИред

рдкреЗрдВрдЪ

рд╕реНрдЯреЙрд░реНрдо рдХреЛ рдкреНрд░рд╕рдВрд╕реНрдХрд░рдг рдХреЗ рдкрд░рд┐рдгрд╛рдореЛрдВ рдХреЗ рдмрд╛рд░реЗ рдореЗрдВ рд╕реВрдЪрд┐рдд рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рдмреЛрд▓реНрдЯ рдХреЗ рд▓рд┐рдП, рдЗрд╕реЗ IRichBolt рдЗрдВрдЯрд░рдлрд╝реЗрд╕ рдХреЛ рд▓рд╛рдЧреВ рдХрд░рдирд╛ рд╣реЛрдЧрд╛ред рдРрд╕рд╛ рдХрд░рдиреЗ рдХрд╛ рд╕рдмрд╕реЗ рдЖрд╕рд╛рди рддрд░реАрдХрд╛ рд╣реИ рдмреЗрд╕рд░рд┐рдЪрдмреЛрд▓реНрдЯ рд╡рд░реНрдЧ рд╡рд┐рд░рд╛рд╕рдд рдореЗрдВ рдорд┐рд▓рд╛ рд╣реИ ред
рдмреЛрд▓реНрдЯ рд╕реНрдЯреЙрд░реНрдо рдХреЛ рдЕрдкрдиреЗ рдХрд╛рд░реНрдп рдХреЗ рдкрд░рд┐рдгрд╛рдореЛрдВ рдХреЗ рдмрд╛рд░реЗ рдореЗрдВ рдмрддрд╛рддреЗ рд╣реИрдВ, рдЬрд┐рд╕рдореЗрдВ рдирд┐рд╖реНрдкрд╛рджрди (рдЯреБрдкрд▓) рд╡рд┐рдзрд┐ рдореЗрдВ рдЖрдЙрдЯрдкреБрдЯрдХреЙрд▓реНрдЯрд░ рдХреНрд▓рд╛рд╕ рдХреЗ рдирд┐рдореНрди рддрд░реАрдХреЛрдВ рдХреЛ рдХреЙрд▓ рдХрд░рдХреЗ:
FailingBolt рдЙрджрд╛рд╣рд░рдг:
 public class FailingBolt extends BaseRichBolt { OutputCollector outputCollector; // Skipped ... @Override public void execute(Tuple tuple) { // Skipped ... outputCollector.ack(tuple); //    } else { // Skipped ... outputCollector.fail(tuple); //     } } // Skipped ... } 

рдЙрдкрдпреЛрдЧ рдХрд╛ рдЙрджрд╛рд╣рд░рдг: BasicFailApp , Spout FailAwareSpout рдФрд░ рдмреЛрд▓реНрдЯ FailingBolt рдмреЗрддрд░рддреАрдм рдврдВрдЧ рд╕реЗ рдкреНрд░рд╕рдВрд╕реНрдХрд░рдг рддреНрд░реБрдЯрд┐рдпреЛрдВ рдХреЛ рдкреИрджрд╛ рдХрд░рддреЗ рд╣реИрдВред

BaseBasicBolt рд╡рд░реНрдЧ рд╕реЗ рд╡рд┐рд░рд╛рд╕рдд рдореЗрдВ рдорд┐рд▓реА рдмреЛрд▓реНрдЯ рдореЗрдВ, рдПрдХреНрдЬ (рдЯрдкрд▓) рдХреЛ рдирд┐рд╖реНрдкрд╛рджрди рд╡рд┐рдзрд┐ рд╕реЗ рдмрд╛рд╣рд░ рдирд┐рдХрд▓рдиреЗ рдХреЗ рдмрд╛рдж рд╕реНрд╡рдЪрд╛рд▓рд┐рдд рд░реВрдк рд╕реЗ рдХрд╣рд╛ рдЬрд╛рддрд╛ рд╣реИред

рдПрдВрдХрд░рд┐рдВрдЧ


рдЗрдирдкреБрдЯ рдЯрдкрд▓ рдХреЛ рд╕рдВрд╕рд╛рдзрд┐рдд рдХрд░рддреЗ рд╕рдордп, рдмреЛрд▓реНрдЯ рдПрдХ рд╕реЗ рдЕрдзрд┐рдХ рдЖрдЙрдЯрдкреБрдЯ рдЯрдкрд▓ рдЙрддреНрдкрдиреНрди рдХрд░ рд╕рдХрддрд╛ рд╣реИред рдпрджрд┐ рдмреЛрд▓реНрдЯ рдХреЛ рдПрдорд┐рдЯ (sourceTuple, resultTuple) рдХрд╣рд╛ рдЬрд╛рддрд╛ рд╣реИ рддреЛ рдореВрд▓ Tuple рдХреЗ рд░реВрдк рдореЗрдВ рдФрд░ рдЙрддреНрдкрдиреНрди Tuple рдХреЗ рд░реВрдк рдореЗрдВ рд╡рдВрд╢рдЬреЛрдВ рдХреЗ рд╕рд╛рде рдПрдХ DAG рдХрд╛ рдЧрдарди рдХрд┐рдпрд╛ рдЬрд╛рддрд╛ рд╣реИред рддреВрдлрд╛рди рдЧреНрд░рд╛рдл рдореЗрдВ рд╕рднреА рдиреЛрдбреНрд╕ рдХреЗ рд▓рд┐рдП рдкреНрд░рд╕рдВрд╕реНрдХрд░рдг рддреНрд░реБрдЯрд┐рдпреЛрдВ рдХреЛ рдЯреНрд░реИрдХ рдХрд░рддрд╛ рд╣реИред рдпрджрд┐ рдкрджрд╛рдиреБрдХреНрд░рдо рдореЗрдВ рдХрд┐рд╕реА рднреА рд╕реНрддрд░ рдкрд░ рддреНрд░реБрдЯрд┐ рд╣реЛрддреА рд╣реИ, рддреЛ рдореВрд▓ рдЯрдкрд▓ рдЙрддреНрдкрдиреНрди рдХрд░рдиреЗ рд╡рд╛рд▓реЗ рдЯреЛрдВрдЯреА рдХреЛ рдПрдХ рдЕрд╕рдлрд▓ рдХреЙрд▓ рджреНрд╡рд╛рд░рд╛ рдЕрдзрд┐рд╕реВрдЪрд┐рдд рдХрд┐рдпрд╛ рдЬрд╛рдПрдЧрд╛ред рдЧреБрдгрдХ рдЙрджрд╛рд╣рд░рдг:
 public class MultiplierBolt extends BaseRichBolt { // Skipped ... @Override public void execute(Tuple tuple) { //    Tuple    for(int i = 0; i < MULTI_COUNT; ++i) { // Anchoring,   Tuple   outputCollector.emit(tuple, new Values(tuple.getString(0) + " - " + i)); } outputCollector.ack(tuple); } // Skipped ... } 

рдПрдВрдХрд░рд┐рдВрдЧ рдЙрджрд╛рд╣рд░рдг: TreeFailApp

BaseBasicBolt рд╡рд░реНрдЧ рд╕реЗ рд╡рд┐рд░рд╛рд╕рдд рдореЗрдВ рдорд┐рд▓реА рдмреЛрд▓реНрдЯ рдореЗрдВ , рдирд┐рд╖реНрдкрд╛рджрди рд╡рд┐рдзрд┐ (Tuple, BasicOutputCollector) рдХреЛ BasicOutputCollector рдХрд▓реЗрдХреНрдЯрд░ рдХреЗ рд╕рд╛рде рдХрд╣рд╛ рдЬрд╛рддрд╛ рд╣реИред BasicOutputCollector рдХреА рд╡рд┐рд╢реЗрд╖рддрд╛ рдпрд╣ рд╣реИ рдХрд┐ рдпрд╣ рд╕реНрд╡рдЪрд╛рд▓рд┐рдд рд░реВрдк рд╕реЗ рдЙрддреНрд╕рд░реНрдЬрди рдХрд░рдиреЗ рдкрд░ рдЗрдирдкреБрдЯ рдЯреНрдпреВрдкрд▓ рдкрд░ рдПрдВрдХрд░ рдмрдирд╛рддрд╛ рд╣реИред

рдХреНрдпреЛрдВрдХрд┐ рддреВрдлрд╛рди рдПрдХ рд╡рд┐рддрд░рд┐рдд рдкреНрд░рдгрд╛рд▓реА рд╣реИ, рдЯрдкрд▓ рдХреЛ рдПрдХ рдХреНрд▓рд╕реНрдЯрд░ рдиреЛрдб рд╕реЗ рджреВрд╕рд░реЗ рдореЗрдВ рд╕реНрдерд╛рдирд╛рдВрддрд░рд┐рдд рдХрд┐рдпрд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИред рдЗрд╕ рд╕рдВрдмрдВрдз рдореЗрдВ, рддреВрдлрд╛рди рдкреНрд░рд╕рдВрд╕реНрдХрд░рдг рд╕рдордп рдХреА рдЯреНрд░реИрдХрд┐рдВрдЧ рдкреНрд░рджрд╛рди рдХрд░рддрд╛ рд╣реИред рдбрд┐рдлрд╝реЙрд▓реНрдЯ рд░реВрдк рд╕реЗ, рдкреВрд░реЗ рдЧреНрд░рд╛рдлрд╝ рдХреЛ 30 рд╕реЗрдХрдВрдб рдореЗрдВ рд╕рдВрд╕рд╛рдзрд┐рдд рдХрд┐рдпрд╛ рдЬрд╛рдирд╛ рдЪрд╛рд╣рд┐рдП, рдпрд╛ рд╕реНрдЯреЙрд░реНрдо рдЧреНрд░рд╛рдлрд╝ рдЙрддреНрдкрдиреНрди рдХрд░рдиреЗ рд╡рд╛рд▓реЗ рдЯреЛрдВрдЯреА рдкрд░ рд╡рд┐рдлрд▓ рд╡рд┐рдзрд┐ рдХреЛ рдХреЙрд▓ рдХрд░реЗрдЧрд╛ред рдЯрд╛рдЗрдордЖрдЙрдЯ рдХреЛ рдмрджрд▓рд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИред

рдХреЛрдб GitHub рдкрд░ рдЙрдкрд▓рдмреНрдз рд╣реИ ред

рдЕрдЧрд▓рд╛ рднрд╛рдЧ Transactional рдбреЗрдЯрд╛ рд╕реНрд░реЛрддреЛрдВ рдХреЗ рд╕рд╛рде рд╕рдВрдпреЛрдЬрди рдореЗрдВ рдЙрдкрдпреЛрдЧ рдХрд┐рдП рдЬрд╛рдиреЗ рд╡рд╛рд▓реЗ Transactional рдЯреЛрдкреЛрд▓реЙрдЬреА рдкрд░ рдХреЗрдВрджреНрд░рд┐рдд рд╣реЛрдЧрд╛ред

рдпреБрдкреАрдбреАред рд▓реЗрдЦ рдХрд╛ рдЕрдВрддрд┐рдо рднрд╛рдЧ рдкреНрд░рдХрд╛рд╢рд┐рдд рд╣реБрдЖ рд╣реИред

Source: https://habr.com/ru/post/In186436/


All Articles