Java Spring FrameworkのRabbitMQ遅延メッセージ交換の例

画像 この投稿では、RabbitMQで保留中のメッセージを使用する方法を示します。 遅延キューを使用すると便利なタスクの例として、ポストバックメカニズム( s2s ping backs2s pixel )を取り上げます

ポストバックメカニズムの概要:


1.イベントがあります
2.アプリケーションは、このイベントについてサードパーティのサービスに通知する必要があります。
3.サードパーティのサービスが利用できなかった場合は、数分後に再度通知を繰り返す必要があります

再通知するために、遅延キューを使用します。

デフォルトでは、RabbitMQはメッセージの遅延方法を認識せず、発行後すぐに配信されます。 遅延配信機能は、 rabbitmq-delayed-message-exchangeプラグインとして利用できます。

プラグインは実験的なものです。 一般的に非常に安定しているという事実にもかかわらず、それはあなた自身の危険とリスクで本番で使用されるべきです。

RMQとプラグインを使用してDockerコンテナーを構築する


基礎として、テストに役立つ管理プラグインを使用して公式イメージを取得します。

Dockerfile:

FROM rabbitmq:3.6-management RUN apt-get update && apt-get install -y curl RUN curl http://www.rabbitmq.com/community-plugins/v3.6.x/rabbitmq_delayed_message_exchange-0.0.1.ez > $RABBITMQ_HOME/plugins/rabbitmq_delayed_message_exchange-0.0.1.ez RUN rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange 

組立
 docker build --tag=x25/rmq-delayed-message-exchange . 

打ち上げ
 docker run -d --name rmq -p 5672:5672 -p 15672:15672 x25/rmq-delayed-message-exchange 

春のAMQP


Spring Frameworkは、 pring-rabbitプロジェクトのプラグインを完全にサポートしています。 バージョン1.6.4以降では、xml / bean構成と注釈の両方を使用できます。

Spring Boot Amqp Starterを使用します。

Mavenの依存関係
 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 

gradleの依存関係
 compile "org.springframework.boot:spring-boot-starter-amqp" 

注釈による構成


ブートストラップと注釈により、Springは作業の大部分を担います。 書くだけ:

 @RabbitListener(bindings = @QueueBinding(value = @Queue(value = DELAY_QUEUE_NAME), exchange = @Exchange(value = DELAY_EXCHANGE_NAME, delayed = "true"), key = DELAY_QUEUE_NAME)) public void onMessage(Message<?> message) { //... } 

また、アプリケーションが起動すると、RabbitAdminはdelayed exchangequeue 、イベントハンドラーの作成を自動的に宣言し、アノテーション付きメソッドにアタッチします。

メッセージを処理するためにより多くのスレッドが必要ですか? これは、外部構成ファイル( spring.rabbitmq.listener.concurrencyプロパティー)またはアノテーションのcontainerFactoryパラメーターを介して構成さます。

 // : @Configuration public class RabbitConfiguration { @Bean(name = "containerFactory") @Autowired public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrentConsumers(10); factory.setPrefetchCount(30); return factory; } } // : @RabbitListener(containerFactory = "containerFactory", ...) 

RabbitTemplateを使用して保留中のメッセージを送信すると便利です。

 rabbitTemplate.send( DELAY_EXCHANGE_NAME, DELAY_QUEUE_NAME, MessageBuilder .withBody(data) .setHeader("x-delay", MESSAGE_DELAY_MS).build() ); 

すぐに送信されますが、 x-delayヘッダーで指定された遅延で配信されます。 最大許容遅延時間(2 ^ 32-1)ms。

サンプルアプリケーションの準備:

 @SpringBootApplication public class Application { private final Logger log = LoggerFactory.getLogger(Application.class); private final static String DELAY_QUEUE_NAME = "delayed.queue"; private final static String DELAY_EXCHANGE_NAME = "delayed.exchange"; private final static String DELAY_HEADER = "x-delay"; private final static String NUM_ATTEMPT_HEADER = "x-num-attempt"; private final static long RETRY_BACKOFF = 5000; @Autowired private RabbitTemplate rabbitTemplate; @RabbitListener(bindings = @QueueBinding(value = @Queue(value = DELAY_QUEUE_NAME), exchange = @Exchange(value = DELAY_EXCHANGE_NAME, delayed = "true"), key = DELAY_QUEUE_NAME)) public void onMessage(Message<byte[]> message) { String endpointUrl = new String(message.getPayload()); Long numAttempt = (Long) message.getHeaders().getOrDefault(NUM_ATTEMPT_HEADER, 1L); log.info("Message received, url={}, attempt={}", endpointUrl, numAttempt); if (!doNotifyEndpoint(endpointUrl) && numAttempt < 3) { rabbitTemplate.send( DELAY_EXCHANGE_NAME, DELAY_QUEUE_NAME, MessageBuilder .withBody(message.getPayload()) .setHeader(DELAY_HEADER, numAttempt * RETRY_BACKOFF) .setHeader(NUM_ATTEMPT_HEADER, numAttempt + 1) .build() ); } } private boolean doNotifyEndpoint(String url) { try { HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection(); /* @todo: set up connection timeouts */ return (connection.getResponseCode() == 200); } catch (Exception e) { log.error(e.getMessage()); return false; } } public static void main(String[] args) { SpringApplication.run(Application.class, args); } } 

application.yml
 spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / listener: prefetch: 10 concurrency: 50 

build.gradle
 buildscript { repositories { mavenCentral() } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:1.4.2.RELEASE") } } apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'idea' apply plugin: 'org.springframework.boot' jar { baseName = 'rmq-delayed-demo' version = '0.1.0' } repositories { mavenCentral() } sourceCompatibility = 1.8 targetCompatibility = 1.8 dependencies { compile("org.springframework.boot:spring-boot-starter-amqp") testCompile("org.springframework.boot:spring-boot-starter-test") } 

遅延配信はコントロールパネル(rmq-management)で確認します。ポート15672で利用可能です:

画像

ログ:

 2016-12-21 14:27:25.927: Message received, url=http://localhost:1234, attempt=1 2016-12-21 14:27:25.941: Connection refused (Connection refused) 2016-12-21 14:27:30.946: Message received, url=http://localhost:1234, attempt=2 2016-12-21 14:27:30.951: Connection refused (Connection refused) 2016-12-21 14:27:40.954: Message received, url=http://localhost:1234, attempt=3 

XML設定


XML構成を使用するtrue 、エクスチェンジBeanでdelayedプロパティをtrueに設定するだけで、RabbitAdminが残りを自動的に実行します。

Integration Frameworkと組み合わせたxml構成の例
 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int:channel id="to-delayed-rmq" /> <int-amqp:outbound-channel-adapter channel="to-delayed-rmq" amqp-template="rabbitTemplate" exchange-name="delayed.exchange" routing-key="delayed.binding" mapped-request-headers="x-delay" /> <int-amqp:inbound-channel-adapter channel="from-delayed-rmq-queue" queue-names="delayed.queue" message-converter="amqpMessageConverter" connection-factory="rabbitConnectionFactory" concurrent-consumers="10" prefetch-count="50" /> <int:service-activator input-channel="from-delayed-rmq-queue" method="onMessage"> <bean id="postbackServiceActivator" class="PostbackServiceActivator" /> </int:service-activator> <rabbit:queue name="delayed.queue" /> <rabbit:direct-exchange name="delayed.exchange" delayed="true"> <rabbit:bindings> <rabbit:binding queue="delayed.queue" key="delayed.binding" /> </rabbit:bindings> </rabbit:direct-exchange> </beans> 

便利なリンク


Source: https://habr.com/ru/post/J318118/


All Articles