RabbitMQ Dead Letter Exchange Nedir, Neden Kullanılır?
RabbitMQ ya da onun gibi diğer message broker’lar ile çalışırken; mesajların iletilmesi, iletin mesajın işlendikten sonra ilgili kaynaktan (queue, topic, etc.) silinmesi veya mesaja ait işlemlerin doğru bir şekilde gerçekleşmesi gibi olayıların başarılı bir şekilde gerçekleşmesini isteriz. Bunlar da beraberinde mesajların yönetimi sorumluluğunu getirmektedir. Bu yazıda, RabbitMQ içerisinde yer alan; mesajların işlenemediği, beklediği kuyruk üzerindeki TTL’i sona erdiği ya da kuyruğun belirli sınıra ulaşıp mesajı alamadığı durumlarda bizlere kuyruk üzerindeki veri için işlem yapmamızı sağlayan Dead Letter Exchange (DLX) hakkında bilgiler edineceğiz.
RabbitMQ Nedir?
RabbitMQ; Erlang dili ile yazılmış ve uygulamalar arası haberleşmeyi sağlayan open source bir message broker’dir. Kaynak kodlarına GitHub üzerinden erişilebilmektedir.
Message Broker Nedir?
Message Broker, IBM sayfasında:
“Message Broker; uygulamalar, sistemler ve servislerin birbirleri arasında haberleşmesini sağlayan yazılımdır. Message Broker, ilgili mesajlaşma protokollerini kullanarak bu haberleşmeyi sağlamaktadır. Uygulamalar, servisler ve sistemler farklı dillerde yazılsa bile, birbirlerinden farklı servisler arasındaki haberleşmeyi direkt sağlamaktadır.” olarak tanımlanmıştır.
RabbitMQ Dead Letter Exchange Nedir ve Ne Zaman Kullanılır?
RabbitMQ Dead Letter Exchange; bizlere mesajların ilgili kuyruğa yönlendirilememesi, işlenememesi ya da aşağıdaki mesajın işlenememesine sebep olan diğer 3 durum gerçekleştiğinde
- Mesajın kuyruk içerisindeki sahip olduğu TTL (Time to Live) değerinin sona ermesi,
- Mesajın consumer tarafından reject edilmesi
- Kuyruk, alabileceği maksimum mesaj sayısına eriştiği durumda
ilgili mesajı alıp tekrardan işlememize olanak sağlayan bir exchange tipidir.
Aşağıda genel hatları ile bu yapının mimarisi yer almaktadır.
Diyagram üzerinde yer alan ifadeleri incelediğimizde ise:
- Producer: İlgili exchange’e mesajı gönderen uygulama.
- Exchange: Producer tarafından gönderilen mesajın hangi kuyruğa iletilmesi gerektiğini bilen ve ileten yapı.
- Queue: Exchange tarafından iletilen mesajın tutulduğu alan.
- Consumer: İlgili kuyruktan mesajı alıp işleten uygulama.
- Dead Letter Exchange: Reject edilmiş mesajı, ilgili kuyruğa yönlendirecek olan yapı.
- Dead Letter Queue: Reject edilmiş olan mesajların barınacağı alan.
Java ile RabbitMQ Dead Letter Exchange
Bu kısımda ise yukarıda bahsedilen teorik kısmın Java ile olan implementasyonu yer alacaktır. Devam etmeden önce uygulamanın kaynak kodlarına GitHub hesabından erişebilirsiniz.
Senaryo olarak da Devnot etkinlikleri için bilet satışının gerçekleştiği ve ödeme işlemlerinden sonra da RabbitMQ’ya bir mesaj gönderilerek uygulamanın bu mesajı alıp bilet kesme işlemi gerçekleştireceğini var sayalım. Yukarıdaki diyagramda bu senaryoda kullanılacak olan; queue
, message
, data
ve exchange
gibi ifadeler yer almaktadır.
Ödeme ve bilet kesme işlemlerinin kritikliğinden dolayı da kuyruk üzerindeki mesajlar için bazı kısıtlar yer alacaktır, bunlar; kuyrukta maksimum 10 adet mesaj bekleyebilir ve maksimum bekleme süresi de 5 saniye olacaktır. Bu kurallara uymayan mesajlar da yazacağımız Exchange
ile ilgili kuyruğa giderek orada tekrar işlenecektir.
Kurulum | RabbitMQ Docker
Dead Letter Exchange’in Java ile olan uygulamasına geçmeden önce aşağıdaki Docker Cli komutu ile RabbitMQ’yu arayüzü ile kurup ilgili port yönlendirmesini de yapabilirsiniz.
docker run -d --hostname my-rabbit --name rabbitmq-dlx -p 15672:15672 -p 5672:5672 rabbitmq:3-management
Bu işlemi gerçekleştirdikten sonra http://localhost:15672/ adresine giderek RabbitMQ dashboard ve diğer kontrolleri yapabilirsiniz. Buradaki giriş için gerekli olan kullanıcı adı ve şifre default olarak guest
gelmektedir.
❯ curl -I 'http://localhost:15672/' HTTP/1.1 200 OK content-length: 2884 content-security-policy: script-src 'self' 'unsafe-eval' 'unsafe-inline'; object-src 'self' content-type: text/html date: Mon, 25 Apr 2022 07:14:30 GMT etag: "4229579948" last-modified: Wed, 18 Aug 2021 13:14:19 GMT server: Cowboy vary: origin
Yukarıdaki tanımlamalar yapıldıktan sonra RabbitMQ kullanıma hazır olacaktır.
Aşağıda RabbitMQ içerisinde mesaj olarak göndereceğimiz kullanıcı ve ödeme bilgilerini temsil edecek olan PaymentInfo
sınıfı yer almaktadır.
class PaymentInfo { private static final Random random = new Random(); private final Integer userId; private final Integer paymentId; public PaymentInfo() { this.userId = Math.abs(random.nextInt()); this.paymentId = Math.abs(random.nextInt()); } @Override public String toString() { return "PaymentInfo{" + "userId=" + userId + ", paymentId=" + paymentId + '}'; } }
Mesajın işlenemediği durumda hangi exchange
ve routing key
değerleri ile işleme devam edeceğini kuyruğu oluştururken header’a parametre olarak geçmekteyiz. Yukarıdaki diyagramı da dikkate aldığımızda tanımlamalar aşağıdaki gibi olacaktır.
// ... private static final String EXCHANGE_NAME = "tickets"; private static final String EXCHANGE_TYPE = "direct"; private static final String ROUTING_KEY = "payment_is_done"; private static final String QUEUE_NAME = "TICKETS_AFTER_APPROVING_PAYMENT"; private static final String DLX_EXCHANGE_NAME = "tickets_dlx"; private static final String DLX_QUEUE_NAME = "DLX_TICKETS_AFTER_APPROVING_PAYMENT"; private static final boolean DURABLE = true; private static final boolean EXCLUSIVE = false; private static final boolean AUTO_DELETE = false; private static final Map<String, Object> ARGS = new HashMap<>() { { put("x-max-length", 10); put("x-message-ttl", 5000); put("x-dead-letter-exchange", DLX_EXCHANGE_NAME); put("x-dead-letter-routing-key", ROUTING_KEY); } }; // ...
Yukarıda static olan tanımlamalar ve header içerisine gidecek olan kuyrukta bekleyecek maksimum mesaj ve TTL yer almaktadır.
// ... ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE); channel.exchangeDeclare(DLX_EXCHANGE_NAME, EXCHANGE_TYPE); channel.queueDeclare(QUEUE_NAME, DURABLE, EXCLUSIVE, AUTO_DELETE, ARGS); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); channel.queueDeclare(DLX_QUEUE_NAME, DURABLE, EXCLUSIVE, AUTO_DELETE, null); channel.queueBind(DLX_QUEUE_NAME, DLX_EXCHANGE_NAME, ROUTING_KEY); // ...
Buradaki tanımlamalara bakıldığında da TICKETS_AFTER_APPROVING_PAYMENT
ve DLX_TICKETS_AFTER_APPROVING_PAYMENT
adlı 2 kuyruk oluşturuldu ve ilgili exchange’lara bağlandı. TICKETS_AFTER_APPROVING_PAYMENT
adlı kuyrukta ise diğerinden farklı olarak header içerisine parametre olarak DLX bilgileri ve ilgili kısıtlar geçildi.
Yukarıdaki tanımlardan sonra aşağıdaki kodda ise ilgili kuyruğa her saniye 1 kayıt yazan publisher
yer almaktadır. Eğer kuyrukta bekleyen kayıt sayısı limiti aştığında mesajlar artık DLX_TICKETS_AFTER_APPROVING_PAYMENT
adlı kuyruğa gidecektir ve ilgili consumer
da oradan mesajları okumaya başlayacaktır.
// ... Consumer<PaymentInfo> publish = (PaymentInfo paymentInfo) -> { try { System.out.println("It has been sent! PaymentInfo: " + paymentInfo); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, paymentInfo.toString().getBytes(StandardCharsets.UTF_8)); } catch (IOException e) { e.printStackTrace(); } }; Runnable messagePublisher = () -> { Executors .newScheduledThreadPool(1) .scheduleAtFixedRate(() -> publish.accept(new PaymentInfo()), 1L, 1L, TimeUnit.SECONDS); }; DeliverCallback deliverCallbackDlx = (consumerTag, delivery) -> { String routingKey = delivery.getEnvelope().getRoutingKey(); String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received WITH DLX! '" + routingKey + "':'" + message + "'"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; Runnable messageConsumerDlx = () -> { try { channel.basicConsume(DLX_QUEUE_NAME, false, deliverCallbackDlx, consumerTag -> { }); } catch (IOException e) { e.printStackTrace(); } }; messagePublisher.run(); messageConsumerDlx.run(); // ...
Yukarıdaki kodu çalıştırdığımızda ise console çıktısı aşağıdaki gibi olacaktır:
It has been sent! PaymentInfo: PaymentInfo{userId=76335932, paymentId=1996560747} It has been sent! PaymentInfo: PaymentInfo{userId=2064688013, paymentId=648854917} It has been sent! PaymentInfo: PaymentInfo{userId=1169316240, paymentId=474258643} It has been sent! PaymentInfo: PaymentInfo{userId=1248018143, paymentId=18080868} It has been sent! PaymentInfo: PaymentInfo{userId=1391253442, paymentId=1825543379} It has been sent! PaymentInfo: PaymentInfo{userId=1702743921, paymentId=805738981} [x] Received WITH DLX! 'payment_is_done':'PaymentInfo{userId=76335932, paymentId=1996560747}' It has been sent! PaymentInfo: PaymentInfo{userId=816735114, paymentId=401707640} [x] Received WITH DLX! 'payment_is_done':'PaymentInfo{userId=2064688013, paymentId=648854917}' It has been sent! PaymentInfo: PaymentInfo{userId=2060671911, paymentId=1210826228} [x] Received WITH DLX! 'payment_is_done':'PaymentInfo{userId=1169316240, paymentId=474258643}' [x] Received WITH DLX! 'payment_is_done':'PaymentInfo{userId=1248018143, paymentId=18080868}' It has been sent! PaymentInfo: PaymentInfo{userId=1438400373, paymentId=877235346} It has been sent! PaymentInfo: PaymentInfo{userId=812357989, paymentId=1097085130} [x] Received WITH DLX! 'payment_is_done':'PaymentInfo{userId=1391253442, paymentId=1825543379}'
Yazı içerisinde;
- RabbitMQ Dead Letter Exchange Nedir?
- RabbitMQ Dead Letter Exchange Nasıl Kullanılır?
- Java ile RabbitMQ Dead Letter Exchange Kullanımı
gibi soruların, diğer konuların anlatımı ve kod örnekleri ile bilgiler aktarıldı. Uygulamanın kaynak kodlarına GitHub hesabından erişebilirsiniz.
Kanyaklar:
- https://www.rabbitmq.com/dlx.html
- https://www.cloudamqp.com/blog/when-and-how-to-use-the-rabbitmq-dead-letter-exchange.html