OracleQueueConnection - Yash-777/oracle-aq-samples GitHub Wiki

View Sessions: V$SESSION.

SQL Developer Workbench - Tools - manage Sessions. Filter by [OS User: MachineID, Module: JDBC Thin CLinet]
SQL Query StackOverflow

select s.sid as "Sid", s.serial# as "Serial#", nvl(s.username, ' ') as "Username", s.machine as "Machine", s.schemaname as "Schema name",
s.logon_time as "Login time",
s.module as "Module", s.program as "Program", s.osuser as "Os user", s.status as "Status", nvl(s.process, ' ') as "OS Process id"
from v$session s
where nvl(s.username, 'a') not like 'a' and status like 'ACTIVE'
order by 1,2;

ALTER SYSTEM KILL / DISCONNECT SESSION

ALTER SYSTEM KILL SESSION 'sid,serial#' IMMEDIATE;
ALTER SYSTEM DISCONNECT SESSION 'sid,serial#' IMMEDIATE;
select 'alter system kill session '''||sid ||','||serial#||''' immediate;', STATUS,OSUSER
  from v$session where STATUS='INACTIVE' and OSUSER='tomcat';
  
select 'alter system kill session '''||sid ||','||serial#||''' immediate;', STATUS,OSUSER,USERNAME
  from v$session where STATUS='INACTIVE' and OSUSER='tomcat';
  
-- Output
-- alter system kill session '10,11708' immediate;	INACTIVE	tomcat	APPDEV
-- alter system kill session '112,61828' immediate;	INACTIVE	tomcat	APPDEV

Oracle Checking Your Current Release Number

SELECT * FROM PRODUCT_COMPONENT_VERSION;

PRODUCT                                  VERSION     STATUS
---------------------------------------- ----------- -----------
NLSRTL                                   10.2.0.1.0  Production
Oracle Database 10g Enterprise Edition   10.2.0.1.0  Prod
PL/SQL                                   10.2.0.1.0  Production
...

Exceptions:

  1. oracle.jms.AQjmsException
oracle.jms.AQjmsException: IO Error: The Network Adapter could not establish the connection
	at oracle.jms.AQjmsDBConnMgr.checkForSecurityException(AQjmsDBConnMgr.java:934)
	at oracle.jms.AQjmsDBConnMgr.getConnection(AQjmsDBConnMgr.java:609)
	at oracle.jms.AQjmsDBConnMgr.<init>(AQjmsDBConnMgr.java:176)
	at oracle.jms.AQjmsConnection.<init>(AQjmsConnection.java:165)
	at oracle.jms.AQjmsQueueConnectionFactory.createQueueConnection(AQjmsQueueConnectionFactory.java:309)
	at EventController.createQueueConnection(EventController.java:---)
	
oracle.jms.AQjmsException: IO Error: Socket read timed out
	at oracle.jms.AQjmsDBConnMgr.checkForSecurityException(AQjmsDBConnMgr.java:934)
	at oracle.jms.AQjmsDBConnMgr.getConnection(AQjmsDBConnMgr.java:609)
	at oracle.jms.AQjmsDBConnMgr.<init>(AQjmsDBConnMgr.java:176)
	at oracle.jms.AQjmsConnection.<init>(AQjmsConnection.java:165)
	at oracle.jms.AQjmsQueueConnectionFactory.createQueueConnection(AQjmsQueueConnectionFactory.java:309)
	at EventController.createQueueConnection(EventController.java:---)
  1. ORA-01013
java.sql.SQLTimeoutException: ORA-01013: user requested cancel of current operation

java.sql.SQLException: ORA-01013: user requested cancel of current operation
        at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:439)
        at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:395)
        at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:802)
        at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:436)
        at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:186)
        at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:521)
        at oracle.jdbc.driver.T4CStatement.doOall8(T4CStatement.java:194)
public class EventController implements javax.jms.MessageListener {
	public static int retryConnectionCount = 0, retryAttempts = 5;
	private void createQueueConnection() throws SQLException {
		QueueConnectionFactory qconFactory;
		QueueConnection qcon = null;
		QueueSession qsession;
		Queue queue;
		try {
			log.info("Creating QueueConnection in EventContorller");
			ResourceBundle dataStorageProp = ResourceBundle.getBundle("datastorages");
			int port = Integer.parseInt(dataStorageProp.getString("db1.port"));
			retryAttempts = Integer.parseInt(dataStorageProp.getString("connection.retry.attempts"));
			log.info("EventContorller retryConnectionCount:"+retryConnectionCount+", retryAttempts:"+retryAttempts);
			qconFactory = AQjmsFactory.getQueueConnectionFactory(dataStorageProp.getString("db1.host"),
					dataStorageProp.getString("db1.dbname"), port, "thin");

			// create queue connection
			// EX: AQjmsException: IO Error: The Network Adapter could not establish the connection
			qcon = qconFactory.createQueueConnection(dataStorageProp.getString("db1.techuser"),
					dataStorageProp.getString("db1.techpwd"));
			
			qcon.setExceptionListener(new ExceptionListener() {
				public void onException(JMSException exception) {
					log.info("EvenQueue Connection ExceptionListener triggered: " + exception.getMessage());
					log.error("EvenQueue connection", exception);
					try {
						log.info("restarting JSMConnection...");
						createQueueConnection();
					} catch (Exception e) {
						log.error("Error pausing thread" + e.getMessage());
					}
				}
			});
			
			qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

			queue = ((AQjmsSession) qsession).getQueue(dataStorageProp.getString("db1.sid"), "AQ_EVENT_QUE");
			AQ_EVENT_OBJ _AQ_EVENT_OBJFactory = new AQ_EVENT_OBJ(false);

			log.debug("ClassName::" + queue.getClass());
			log.debug("DbName" + queue.getQueueName());
			qreceiver = ((AQjmsSession) qsession).createReceiver(queue, _AQ_EVENT_OBJFactory);
			qreceiver.setMessageListener(this);
			qcon.start();
		} catch (Exception e) {
			log.error("Error while creating queue connection", e);
			
			if (qcon == null && retryConnectionCount < retryAttempts) {
				retryConnectionCount += 1;
				try {
					Thread.sleep(1000 * 60); // Wait 60 seconds to re-attempt the connection
				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
				createQueueConnection();
			} else {
				log.info("No of Retry attempts also failed...");
				System.err.println("No of Retry attempts also failed...");
			}
		}
	}

	public void onMessage(Message msg) {
		log.info("Received Message from Event Queue");
		AQ_EVENT_OBJ messageData = null;
		try {
			String msgText;
			if (msg instanceof TextMessage) {
				msgText = ((TextMessage) msg).getText();
			} else if (msg instanceof AQjmsAdtMessage) {
				messageData = (AQ_EVENT_OBJ) ((AQjmsAdtMessage) msg).getAdtPayload();
				msgText = messageData.toString();

				/** Enqueue Starts */
				eventJobStrtTime = new Date();
				fireEvent(messageData);
			} else {
				msgText = msg.toString();
			}
			log.info("Received Message from Event Queue" + msgText);
		} catch (JMSException jmse) {
			log.error("Error while receiving message from queue ", jmse);
		} catch (Throwable t) {
			log.error("Error while receiving message from queue ", t);
		}
	}

	private void fireEvent(AQ_EVENT_OBJ messageData) throws SQLException {
		retryConnectionCount = 0; // For each DownTime retry attempts reset to zero
		log.info("EventController::fireEvent::EventTopic:" + messageData.getEventtopic());
		// ...
	}
}

package com.java.oracle;

import java.io.IOException;
import java.sql.SQLException;
import java.util.Map;

import javax.jms.*;

import oracle.jdbc.driver.OracleDriver;
import oracle.jdbc.pool.OracleDataSource;
import oracle.jms.*;

// https://docs.oracle.com/cd/B28359_01/server.111/b28420/jm_opers.htm#sthref1609
// https://github.com/Yash-777/oracle-aq-samples/blob/master/java/oracle-aq-jms-api/src/com/sachinhandiekar/oracle/aq/JMSAQTest.java

public class OracleQueueConnection {
    static String DBQueueName = "AQ_EVENT_QUE", DBQueueTable = "AQ_EVENT_TBL", DB_Sid = "yash_30",
            DB_Driver = "thin", DB_Protocol = "tcp";
    
    static String DBHost_ServerName, DBPort, DBName, DB_User, DB_Password;
    
    
    /*Kill InActive are inserting the data into table. to reproduce the issue.

    inactive 39310 - KILL
    active   56942 - Event Table added data, INACTIVE after read.

    51798  inactive    JDBC Thin Client
    24825  inactive    JDBC Thin Client
    */
    //static String username="yash_30", password="frumL$-w99$4", url="jdbc:oracle:thin:@sl02441:1524:yash_2Q";
    /*static String connectionString = 
"jdbc:oracle:thin:@(DESCRIPTION=(ENABLE=BROKEN)(ADDRESS=(PROTOCOL=tcp)(PORT=1524)(HOST=sl02441))(CONNECT_DATA=(SID=yash_2Q)))";
*/            
    // JMS Configuration Start
    public static QueueConnectionFactory queueConnFact;
    public static QueueConnection queueConn;
    public static QueueSession queueSession;
    public static QueueReceiver queueReceiver;
    
    public static void setConnectionParams(String env) {
        Map<String, String> connectionUrls = DBConnection.connectionUrls;
        String envParams = connectionUrls.get(env);
        String[] envParam = envParams.split("~");
        DBHost_ServerName = envParam[0];
        DBPort = envParam[1];
        DBName = envParam[2];
        DB_User = envParam[3];
        DB_Password = envParam[4];
    }
    
    // https://docs.oracle.com/cd/B28359_01/server.111/b28310/manproc008.htm#ADMIN11193
    public static void main(String[] args) {
        try {
            
            // Workbench - Tools - manage Sessions
            // Filter by [OS User: MachineID, Module: JDBC Thin CLinet, 
            setConnectionParams("CLOUD_DEV");
            
            createQueueConnection();
            
            // https://examples.javacodegeeks.com/enterprise-java/jms/jms-client-example/
            // class ConsumerMessageListener implements MessageListener
            // this implements javax.jms.MessageListener @override onMessage(Message msg) - receiveEvents();
            // queueReceiver.setMessageListener(this);
            for (int i = 1; i <= 5; i++) {
                System.out.println("Enter something read the events");
                    System.in.read();
                    System.in.read();
                    
                try {
                    receiveEvents();
                    
                } catch (Exception e) {
                    // When we kill active connection. Listener 
                    // oracle.jms.AQjmsException: JMS-120: Dequeue failed
                    // java.sql.SQLRecoverableException: No more data to read from socket
                    e.printStackTrace();
                }
            }
            
            /*System.out.println("Enter something to quit the session...");
            System.in.read();
            System.in.read();*/
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println("Closing the connection");
            closeConnections();
        }
    }
    public static void createQueueConnection() throws SQLException {
        OracleDriver driverClassName = new oracle.jdbc.driver.OracleDriver();
        java.sql.DriverManager.registerDriver(driverClassName);
        
        Queue queue;
        try
        {
            int port = Integer.parseInt(DBPort);
            // qc_fact = AQjmsFactory.getQueueConnectionFactory(host, ora_sid, port, driver);
            queueConnFact = AQjmsFactory.getQueueConnectionFactory(DBHost_ServerName, DBName, port, DB_Driver);
            
            queueConn = queueConnFact.createQueueConnection(DB_User, DB_Password);
            
            System.out.println("Queue Connection : "+ queueConn);
            queueSession = queueConn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // https://docs.oracle.com/cd/E17781_01/server.112/e18804/monitoring.htm#ADMQS246
            // https://docs.oracle.com/javaee/7/api/javax/jms/ExceptionListener.html
            
            // https://docs.oracle.com/javaee/7/api/javax/jms/Connection.html#setExceptionListener-javax.jms.ExceptionListener-
            queueConn.setExceptionListener(new ExceptionListener() {
                public void onException(JMSException exception) {
                    System.out.println("EvenQueue ExceptionListener JMSException triggered: " + exception.getMessage());
                    try {
                        // Thread.sleep(5000); // Wait 5 seconds (JMS server restarted?)
                        System.out.println("restartJSMConnection...");
                        try {
                            queueSession.close();
                            System.out.println("queueSession closed success...");
                            queueConn.close();
                            System.out.println("queueConn closed success...");
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        createQueueConnection();
                    } catch (Exception e) {
                        System.out.println("Error pausing thread" + e.getMessage());
                    }
                }
            });
            
            queue = ((AQjmsSession) queueSession).getQueue(DB_Sid, DBQueueName);
            
            queueConn.start();
            
            System.out.println("ClassName :" + queue.getClass());
            System.out.println("DbName : " + queue.getQueueName());
            
            AQ_EVENT_OBJ dbTable_Mapping_Pojo = new AQ_EVENT_OBJ(false);
            //ORADataFactory xmlData = XMLType.getORADataFactory();
            
            //((AQjmsDestination) queue).start(queueSession, true, true);
            queueReceiver = ((AQjmsSession) queueSession).createReceiver(queue, dbTable_Mapping_Pojo);
            
        } catch (Exception e) {
            System.out.println("Error while creating queue connection"+ e);
        }
    }
    public static void receiveEvents() {
        //SERVER: queueReceiver.setMessageListener((EventQueueController) obj); // MessageListener.onMessage(Message msg)
        try {
            javax.jms.Message message = queueReceiver.receive(); // receiveNoWait
            System.out.println("javax.jms.Message : "+ message);
            getMessageData(message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public static void closeConnections() {
        try {
            // java.sql.SQLRecoverableException: No more data to read from socket
            // oracle.jms.AQjmsException: No more data to read from socket
            //queueSession.close();
            queueConn.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public static void getMessageData(Message msg) throws JMSException, SQLException {
        System.out.println("Stop thread, So that if you wish you can interupt session.");
        try {
            System.in.read();
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
        
        if (msg == null) {
            System.out.println("no messages");
        }
        /*else if (msg instanceof oracle.xdb.XMLType) {
            oracle.xdb.XMLType xmlMessage = (oracle.xdb.XMLType) ((AQjmsAdtMessage) msg).getAdtPayload();
            System.out.println("Message Dequeued ==> " + xmlMessage.getStringVal());
        }*/
        else if (msg instanceof TextMessage) {
            String msgText = ((TextMessage) msg).getText();
            System.out.println("TextMessage : " + msgText);
        }
        else if (msg instanceof AQjmsAdtMessage) {
            AQ_EVENT_OBJ messageData = (AQ_EVENT_OBJ) ((AQjmsAdtMessage) msg).getAdtPayload();
            String msgText = messageData.toString();
            
            String eventtopic = messageData.getEventtopic();
            System.out.println("EventTopic: " + eventtopic + ", Message: "+msgText);
        }
        else {
            String msgText = msg.toString();
            System.out.println("Msg : " + msgText);
        }
    }
    
    public static void createQueueConnection(Object obj) throws SQLException {
        
    }
    public static OracleDataSource getOracleDataSource() throws SQLException {
        OracleDataSource ds = new OracleDataSource();
        ds.setDriverType( DB_Driver );
        ds.setServerName( DBHost_ServerName );
        ds.setPortNumber( Integer.parseInt(DBPort) );
        ds.setDatabaseName( DB_Sid ); // sid, xe
        ds.setUser( DB_User );
        ds.setPassword( DB_Password );

        return ds;
    }
}
/*
<!-- Java Message Service: https://mvnrepository.com/artifact/javax.jms/jms
https://community.oracle.com/thread/3994924
https://github.com/ceharris/oracle-aq-demo/blob/master/pom.xml
https://docs.oracle.com/cd/B10500_01/appdev.920/a96587/apexampl.htm#59717
https://stackoverflow.com/questions/5576415/jms-topic-vs-queues -->
<dependency>
    <groupId>javax.jms</groupId>
    <artifactId>jms</artifactId>
    <version>1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/javax.transaction/javax.transaction-api
https://stackoverflow.com/a/49112022/5081877 -->
<dependency>
    <groupId>javax.transaction</groupId>
    <artifactId>javax.transaction-api</artifactId>
    <version>1.2</version>
</dependency>
*/

AQ Event Class:

public class AQ_EVENT_OBJ implements ORAData, ORADataFactory
{
  public static final String _SQL_NAME = "NEON30.AQ_EVENT_OBJ";

  protected MutableStruct _struct;

  private static int[] _sqlType = { 91, 12, 12, 12 }; // java.sql.Timestamp, String, ..
  private static ORADataFactory[] _factory = new ORADataFactory[4];
  protected static final AQ_EVENT_OBJ _AQ_EVENT_OBJFactory = new AQ_EVENT_OBJ(false);

  public static ORADataFactory getORADataFactory() {
    return _AQ_EVENT_OBJFactory;
  }

  /* constructor */
  public AQ_EVENT_OBJ(boolean init) {
    if (init)
      _struct = new MutableStruct(new Object[4], _sqlType, _factory);
  }
  public AQ_EVENT_OBJ() {
    this(true);
  }

  public AQ_EVENT_OBJ(java.sql.Timestamp eventtime, String eventtopic, String eventid, String eventidDatatype) throws SQLException {
    this(true);
    setEventtime(eventtime);
    setEventtopic(eventtopic);
    setEventid(eventid);
    setEventidDatatype(eventidDatatype);
  }

  /* ORAData interface */
  public Datum toDatum(Connection c) throws SQLException {
    return _struct.toDatum(c, _SQL_NAME);
  }
  /* ORADataFactory interface */
  public ORAData create(Datum d, int sqlType) throws SQLException {
    return create(null, d, sqlType);
  }
  protected ORAData create(AQ_EVENT_OBJ o, Datum d, int sqlType) throws SQLException {
    if (d == null)
      return null;
    if (o == null)
      o = new AQ_EVENT_OBJ(false);
    o._struct = new MutableStruct((STRUCT) d, _sqlType, _factory);
    return o;
  }

  /* accessor methods */
  public java.sql.Timestamp getEventtime() throws SQLException {
    return (java.sql.Timestamp) _struct.getAttribute(0);
  }

  public void setEventtime(java.sql.Timestamp eventtime) throws SQLException {
    _struct.setAttribute(0, eventtime);
  }

  public String getEventtopic() throws SQLException {
    return (String) _struct.getAttribute(1);
  }

  public void setEventtopic(String eventtopic) throws SQLException {
    _struct.setAttribute(1, eventtopic);
  }

  public String getEventid() throws SQLException {
    return (String) _struct.getAttribute(2);
  }

  public void setEventid(String eventid) throws SQLException {
    _struct.setAttribute(2, eventid);
  }

  public String getEventidDatatype() throws SQLException {
    return (String) _struct.getAttribute(3);
  }

  public void setEventidDatatype(String eventidDatatype) throws SQLException {
    _struct.setAttribute(3, eventidDatatype);
  }
}
⚠️ **GitHub.com Fallback** ⚠️