BG Development


  Reply to this topicStart new topicStart Poll

> RabbitMQ multiple queues
ivelinqnev
Публикувано на: 05-08-2021, 15:15
Quote Post



Име:
Група: Потребител
Ранг: Старо куче

Мнения: 627
Регистриран на: 02.12.08



Здравейте, за пръв път използвам RabbitMQ и търся мнение на хора, които имат опит с този message broker. Имам 5 опашки, като във всяка една се пъхат точно определени съобщения. От клиентска имам 5 listeners, които създават по 3 consumers на опашка. За consumer имам следния код, който се гружи за обработката на съобщенията.

CODE
public class ConsumerWorker extends DefaultConsumer {
   private long sleep;
   private Channel channel;
   private ExecutorService executorService;
   final Map<String, Object> sourceListeners;

   /**
    * Creates a new <code>ConsumerWorker</code> instance.
    * @param prefetch
    * @param threadExecutor
    * @param sleep
    * @param channel
    * @param queue
    * @throws IOException
    */
   public ConsumerWorker(final int prefetch,
           final ExecutorService threadExecutor, final long sleep,
           final Channel channel, final String queue,
           final Map<String, Object> sourceListeners) throws IOException {
       super(channel);

       this.sleep = sleep;
       this.channel = channel;
       this.sourceListeners = sourceListeners;
       this.executorService = threadExecutor;
       this.channel.basicQos(prefetch);

       this.channel.queueDeclare(queue, true, false, false, null);
       this.channel.basicConsume(queue, false, this);

   }

   @Override
   public void handleDelivery(String consumerTag, Envelope envelope,
           AMQP.BasicProperties properties, byte[] body) throws IOException {
       final String message = new String(body);
       DataSourceIdPayload payload = GsonFactory.getGson().fromJson(message,
               DataSourceIdPayload.class);
       if (properties != null) {
           payload.setHeaders(properties.getHeaders());
       }

       Runnable task = new ScheduledDataSourceProcessor(#, #,#, #, #);
       executorService.submit(task);
   }

}
и съответния клас, който имплементира
QUOTE
Runnable
, но той не е толкова важен и интересен, за да го показвам.

Както казах по- горе
QUOTE
ConsumerWorker
се създава във всеки един listeners с по 3 consumers. Примерно:

CODE
for (int i = 0; i < 3; i++) {
           new ConsumerWorker([prefetchCount], [threadExecutor], 3,
                   connection.createChannel(),
                   [QueueName], null);
}
.
QUOTE
Threadpool
ми е един за всички listeners. С брой тредове
CODE
DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2;


Това добър подход ли е? Какво мога да подобря в него? Благодаря!

Това мнение е било редактирано от ivelinqnev на 05-08-2021, 15:19
PMEmail Poster
Top
1 потребители преглеждат тази тема в момента (1 гости, 0 анонимни потребители)
Потребители, преглеждащи темата в момента:

Topic Options Reply to this topicStart new topicStart Poll

 


Copyright © 2003-2019 | BG Development | All Rights Reserved
RSS 2.0