Using Spring JMS with Oracle AQ [XMLType payload]

Using Spring JMS with Oracle AQ [ Without Spring JDBC Extension ]

Code available at github.com/sachin-handiekar/spring-jms-orac..

Create a queue table with XMLType payload

BEGIN
   DBMS_AQADM.CREATE_QUEUE_TABLE( 
                           Queue_table => 'QT_SAMPLE', 
                           Queue_payload_type => 'SYS.XMLTYPE', 
                           Sort_list => 'ENQ_TIME', 
                           COMMENT => 'A sample queue table');
END;
  • Create a queue
BEGIN 
   DBMS_AQADM.CREATE_QUEUE( 
                          Queue_name => 'Q_SAMPLE', 
                          Queue_table => 'QT_SAMPLE', 
                          Queue_type => 0, 
                          Max_retries => 5, 
                          Retry_delay => 10);
END;

Code AQ Connection factory

package com.sachinhandiekar.oracle.aq;

import javax.jms.ConnectionFactory;
import javax.sql.DataSource;

public class OracleAQQueueConnectionFactory {

 private DataSource dataSource;

 public ConnectionFactory createConnectionFactory() throws Exception {
  return oracle.jms.AQjmsFactory.getQueueConnectionFactory(dataSource);
 }

 public void setDataSource(DataSource dataSource) {
  this.dataSource = dataSource;
 }

}

AQ Destination Factory

package com.sachinhandiekar.oracle.aq;

import org.springframework.beans.factory.FactoryBean;
import oracle.jms.AQjmsSession;

import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;

public class OracleAQQueueDestinationFactory implements FactoryBean<queue> {

 private QueueConnectionFactory connectionFactory;

 private String queueUser;

 private String queueName;

 public Queue getObject() throws Exception {
  QueueConnection queueConnection = connectionFactory.createQueueConnection();
  AQjmsSession session = (AQjmsSession) queueConnection.createQueueSession(true, Session.SESSION_TRANSACTED);

  return session.getQueue(queueUser, queueName);
 }

  public Class<queue> getObjectType() {
  return javax.jms.Queue.class;
 }

 public boolean isSingleton() {
  return false;
 }

 public void setConnectionFactory(QueueConnectionFactory connectionFactory) {
  this.connectionFactory = connectionFactory;
 }

 public void setQueueUser(String queueUser) {
  this.queueUser = queueUser;
 }

 public void setQueueName(String queueName) {
  this.queueName = queueName;
 }
}

Message Listener for XMLType message

package com.sachinhandiekar.oracle.aq;

import javax.jms.Message;
import javax.jms.MessageListener;

import org.springframework.transaction.annotation.Transactional;

import oracle.jms.AQjmsAdtMessage;
import oracle.xdb.XMLType;

public class OracleMessageListener implements MessageListener {

 @Transactional
 public void onMessage(Message message) {

  try {

   //Converting message into XmlType payload
   XMLType xmlMsg = (XMLType) ((AQjmsAdtMessage) message).getAdtPayload();

   System.out.println("XML Payload ==> " + xmlMsg.getStringVal());

  }
  catch (Exception e) {
   // Catch any exception here
   e.printStackTrace();
  }

 }

}

Message Listener Container

package com.sachinhandiekar.oracle.aq;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import oracle.jms.AQjmsSession;
import oracle.xdb.XMLType;

import org.springframework.jms.listener.DefaultMessageListenerContainer;

public class OracleXMLMessageListenerContainer extends DefaultMessageListenerContainer {

 protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {
  return ((AQjmsSession) session).createConsumer(destination, null, XMLType.getORADataFactory(), null, false);
 }

}

Sending XML Message to the queue

package com.sachinhandiekar.oracle.aq;

import java.sql.Connection;
import java.sql.SQLException;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.sql.DataSource;

import oracle.jms.AQjmsSession;
import oracle.xdb.XMLType;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class Main {
 public static void main(String[] args) {
  ApplicationContext ctx = new ClassPathXmlApplicationContext("beans.xml");

  JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");

  final DataSource dataSource = (DataSource) ctx.getBean("dataSource");

  final String xmlMessage = "<sample>hello aq test 2</sample>";

  jmsTemplate.send("q_sample", new MessageCreator() {
   public Message createMessage(Session session) throws JMSException {

    Connection conn = null;
    XMLType payload = null;
    try {
     conn = dataSource.getConnection();
     payload = XMLType.createXML(conn, xmlMessage);
    }
    catch (SQLException e) {
     e.printStackTrace();
    }
    finally {
     try {
      conn.close();
     }
     catch (SQLException e) {
      // ignore it
     }
    }

    Message msg = ((AQjmsSession) session).createORAMessage(payload);

    return msg;
   }
  });

  System.out.println("Message Sent!!!");

 }
}

Did you find this article valuable?

Support Sachin Handiekar by becoming a sponsor. Any amount is appreciated!