How do I send a message directly to a parking lot queue, prevent requeue and exit the program flow?

2020-02-14 java rabbitmq message-queue spring-amqp

I currently have 4 queues:

  • test-queue
  • test-queue-short-term-dead-letter
  • test-queue-long-term-dead-letter
  • test-queue-parking-lot

When a message comes into test-queue, I do a check to see if the message is in the correct format. If it isn't I want to send the message directly to the parking lot queue.

I can't use AmqpRejectAndDontRequeue() because it will automatically send the message to the configured DLQ (test-queue-short-term-dead-letter).

Using RabbitTemplate.convertAndSend() with another exception such as BadRequestException doesn't work. The message goes to the parking lot queue as expected, however the same message will stay in the test-queue

Using RabbitTemplate.convertAndSend() on it's own won't work as the program continues execution.

All queues are bound to a single direct exchange, each with unique routing keys. The test-queue is configured with the following arguments:

  • x-dead-letter-exchange: ""
  • x-dead-letter-routing-key: <shortTermDeadLetterKey>

Receiver:

  @RabbitListener(queues = "test-queue")
  public void receiveMessage(byte[] person) {
    String personString = new String(person);

    if (!personString.matches(desiredRegex)) {
      rabbitTemplate.convertAndSend("test-exchange", "test-queue-parking-lot",
          "invalid person");
      log.info("Invalid person");
    }
    ...some other code which I dont want to run as the message has arrived in the incorrect format
}

Answers

The problem was solved by manually acknowledging the message and returning from the method.

  @RabbitListener(queues = "test-queue")
  public void receiveMessage(byte[] person, Channel channel,
  @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception) {
    String personString = new String(person);

    if (!personString.matches(desiredRegex)) {
      rabbitTemplate.convertAndSend("test-exchange", "test-queue-parking-lot",
          "invalid person");
      log.info("Invalid person");
      channel.basicAck(tag, false);
      return;
    }
    ...some other code which I dont want to run as the message has arrived in the incorrect format
}

Related