Architect design JMS Topic messages to JSF 2.2 (PrimeFaces) client

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Architect design JMS Topic messages to JSF 2.2 (PrimeFaces) client

boborjan
I have the following project:

There is ActimeMQ JMS topic where GPS devices (unique ID is IMEI) sends (indirect) his current status (GPS position and other IO values) messages real-time (pro 8-12 secs). I have to show this data per devices (devices is associated with customers) real-time in a map in the browser for a user. (JMS subscriber can filter messages with IMEI).

I have an AppFuse 3.0 framework project:

- JSF Mojarra 2.2.10 (PrimeFaces 5.1)
- OmniFaces 1.8.1
- PrettyFaces 3.3.3
- Hibernate 4.1
- Spring 4.1.2
- Spring Security 3.2,
- EL 2.2
- MySQL 5.x
- (etc)

in a normal servlet (3.x) container like Tomcat 7.x.

No EJB, no CDI(?).

I (want) use the PrimeFaces Push (WebSocket) to sends JMS messages to browser.

I'am very confused how to design the architecture and implement to properly open/close JMS connection/session, who is careing about opened JMS sessions to avoid memory eating, closing JMS to avoid receive message to non authenticated user, etc

Situations where problem can occur

- HttpSession expired
- any ajax/non ajax request exception
- user close browser
- root user can view other customer devices (switch via dropdown), I have to change filtering on JMS messages real-time
- ?

I've made some tests with a @SessionScoped @ManagedBean implements JMS MessageListener, but for example @PreDestroy never invoked...JMS is still opened after HttpSession is expired.

    @Component
    @ManagedBean
    @SessionScoped
    public class HomePage extends BasePage implements Serializable, MessageListener { // SessionAwareMessageListener
   
    private static final long serialVersionUID = 1L;
   
    /*
    * Channel name (url) for client push request  
    */
    private final static String CHANNEL = "/notify";
   
    @Autowired
    private CarManager carManager;
   
    @Autowired
    private TopicConnectionFactory connectionFactory;
   
    @Autowired
    private Topic jmsTopic;
   
    private List<Car> carsOnMap;
   
    private Map<String, CarDTO> carByIMEI;
   
    private TopicConnection connection;
    private TopicSession session;
    private TopicSubscriber subscriber;
   
    private ObjectMapper jsonMapper = new ObjectMapper();
   
    public HomePage() {
    super();
    }
   
    public void startJMSTopicSubscriber(String msgSelector) {
    try {
    if (connection == null) {
    LOG.info("JMS connection not started yet, init and start now");
    connection = connectionFactory.createTopicConnection();
    connection.start();
    } else {
    LOG.info("JMS connection already started, close subscriber");
    if (subscriber != null) {
    subscriber.close();
    }
    // if (session != null) { session.close(); }
    // connection.stop();
    subscriber = null;
    // session = null;
    }
    if (session == null) {
    LOG.info("JMS connection session not created yet, create it");
    session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    }
    LOG.info("create JMS subscriber with selector: {}", msgSelector);
    subscriber = session.createSubscriber(jmsTopic, msgSelector, false);
    subscriber.setMessageListener(this);
   
    } catch (Exception e) {
    LOG.error("Error in JMS Topic start", e);
    }
    }
   
    public void stopJMSTopicSubscriber() {
    try {
    if (subscriber != null) {
    subscriber.close();
    LOG.debug("JMS subscriber closed on leaving HomePage view");
    }
    } catch (JMSException e) {
    LOG.error("Error closing JMS subscriber", e);
    }
    try {
    if (session != null) {
    session.close();
    LOG.debug("JMS session closed on leaving HomePage view");
    }
    } catch (JMSException e) {
    LOG.error("Error closing JMS session", e);
    }
    try {
    if (connection != null) {
    connection.stop();
    connection.close();
    LOG.debug("JMS connection stopped/closed on leaving HomePage view");
    }
    } catch (JMSException e) {
    LOG.error("Error stopping/closing JMS connection", e);
    }
   
    }
   
    @PostConstruct
    public void init() {
    reloadCars();
    }
   
    @PreDestroy
    public void destroy() {
    LOG.debug("PreDestroy called, try stoppping JMS");
    stopJMSTopicSubscriber();
    }
   
    public void reloadCars() {
    LOG.debug("Reloading cars from DB");
    if (managedPartner == null) {
    managedPartner = getLoggedInUser().getPartner();
    }
    if (managedPartner != null) {
    carsOnMap = getCarManager().getCarsByPartner(managedPartner);
    } else {
    carsOnMap = new ArrayList<Car>();
    }
   
    LOG.debug("Generate IMEI filter for JMS subscriber; build CarByIME map");
    String imeis = "";
    StringBuilder sb = new StringBuilder("imei IN (");
    carByIMEI = new HashMap<String, CarDTO>();
    int i = 0;
    for (Car car : carsOnMap) {
    if (car.getDevice() != null && StringUtils.isNotBlank(car.getDevice().getMobileDevice())) {
    CarDTO carDTO = new CarDTO(car);
    carByIMEI.put(car.getDevice().getMobileDevice(), carDTO);
    if (i > 0) {
    sb.append(',');
    }
    sb.append('\'');
    sb.append(car.getDevice().getMobileDevice());
    sb.append('\'');
    }
    i++;
    }
    sb.append(')');
    imeis = sb.toString();
   
    startJMSTopicSubscriber(imeis);
   
    RequestContext reqCtx = RequestContext.getCurrentInstance();
    try {
    reqCtx.addCallbackParam("carsIMEI", jsonMapper.writeValueAsString(carByIMEI));
    } catch (Exception e) {
    LOG.error("Error in mapping car imeis to JSON", e);
    }
    } // reloadCars
   
    @Override
    public void onMessage(Message msg) {
    try {
    if (msg instanceof ActiveMQBytesMessage) {
    ActiveMQBytesMessage byteMsg = (ActiveMQBytesMessage) msg;
    WMessage wmsg = null;
    String invalidMsg = "";
    try {
    wmsg = new WMessage(correctbuf);
    } catch (Exception e1) {
    invalidMsg = processDeviceJMSMsgData(buf);
    }
    if (wmsg != null) {
    EventBus eventBus = EventBusFactory.getDefault().eventBus();
    eventBus.publish(CHANNEL, wmsg);
    } else {
    LOG.warn("INVALID Msg: {}", invalidMsg);
    }
    } else {
    LOG.trace(String.format("* * * JMS onMessage, type: %s, msg:%s", msg.getClass().getName(), msg));
    }
    } catch (Exception jmsex) {
    LOG.error("device JMS onMessage error:", jmsex);
    }
    } // onMessage
   
    }


    @PushEndpoint("/notify")
    @Singleton
    public class NotifyResource {
   
    //protected final Log log = LogFactory.getLog(getClass());
    private static final Logger LOG = LoggerFactory.getLogger(NotifyResource.class);
   
   
    @OnOpen
    public void onOpen(RemoteEndpoint r, EventBus e) {
    LOG.debug("push onOpen");
    }
   
    @OnClose
    public void onClose(RemoteEndpoint r, EventBus e) {
    LOG.debug("push onClose");
    }
   
    @OnMessage(encoders = { JSONEncoder.class })
    public WMessage onMessage(WMessage message) {
    //log.debug(message);
    LOG.debug(message.toString());
    return message;
    }
    }

I implemented a Spring HttpSessionEventPublisher to catch session destroy but I don't know how to get "httpsession active" JMS connection to close. Should I manage the JMS connections pro user at my own?!?

    public class JMSContainerShutDownHook extends HttpSessionEventPublisher implements ServletContextListener {

        private static final Log log = LogFactory.getLog(JMSContainerShutDownHook.class);

        private void shutDownJMSContainer(ServletContext servletContext) {
                log.info("Destroyed called");
                ...
                log.info("Exiting.");
        }
       
        @Override
        public void contextInitialized(ServletContextEvent servletContextEvent) {
                log.info("contextInitialized called");
        }

        @Override
        public void contextDestroyed(ServletContextEvent servletContextEvent) {
                log.info("contextDestroyed called");
                ServletContext servletContext = servletContextEvent.getServletContext();
                shutDownJMSContainer(servletContext);
        }

        @Override
        public void sessionCreated(HttpSessionEvent event) {
                log.info("HTTP sessionCreated called");
                ServletContext servletContext = event.getSession().getServletContext();
                super.sessionCreated(event);
        }

        @Override
        public void sessionDestroyed(HttpSessionEvent event) {
                log.info("HTTP sessionDestroyed called");
                ServletContext servletContext = event.getSession().getServletContext();
                shutDownJMSContainer(servletContext);
                super.sessionDestroyed(event);
        }

}

Any kind of help is welcome

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Architect design JMS Topic messages to JSF 2.2 (PrimeFaces) client

Patrizio De Michele
Hi,
I didn't know PrimePush, but i know that Atmosphere (PrimePush is based
on Atmosphere FrameWork) supports JMS as technology for broadcasting  
messages.
So you have "only" to instruct Atmosphere with the message topic and with
the jms connection (with jndi's), after you can use the atmosphere
multi room chat sample as starting point,
instead of the username you have the IMEI
to distinguish between sockets and to broadcast messages.
In this scenario you don't have to handle jms (connection factory and  
topics)
everything is done by Atmosphere.
Accordingly to this page (http://blog.primefaces.org/?p=3066), PrimePush
is very similar to Atmosphere so maybe you can use atmosphere-jms artifact
to re-implement the chat sample.
You need a customized path (look here  
https://github.com/Atmosphere/atmosphere/wiki/Using-URI-path-templates-with-annotated-classes)  
so instead of '/notify/' you need something
like '/notify/{imei: [0-9]*}

Regarding your doubts:

- HttpSession expired --> close the user active socket, retrieving it with  
the IMEI
- user close browser --> probably the @OnClose annotated method is called
- root user can view other customer devices-> when root change device you  
close
the socket and you open a new one acting as the new device (you pass the  
new IMEI).

Bye, Patrizio

> I have the following project:
>
> There is ActimeMQ JMS topic where GPS devices (unique ID is IMEI) sends
> (indirect) his current status (GPS position and other IO values) messages
> real-time (pro 8-12 secs). I have to show this data per devices (devices  
> is
> associated with customers) real-time in a map in the browser for a user.
> (JMS subscriber can filter messages with IMEI).
>
> I have an AppFuse 3.0 framework project:
>
> - JSF Mojarra 2.2.10 (PrimeFaces 5.1)
> - OmniFaces 1.8.1
> - PrettyFaces 3.3.3
> - Hibernate 4.1
> - Spring 4.1.2
> - Spring Security 3.2,
> - EL 2.2
> - MySQL 5.x
> - (etc)
>
> in a normal servlet (3.x) container like Tomcat 7.x.
>
> No EJB, no CDI(?).
>
> I (want) use the PrimeFaces Push (WebSocket) to sends JMS messages to
> browser.
>
> *I'am very confused how to design the architecture and implement to  
> properly
> open/close JMS connection/session*, who is careing about opened JMS  
> sessions
> to avoid memory eating, closing JMS to avoid receive message to non
> authenticated user, etc
>
> Situations where problem can occur
>
> - HttpSession expired
> - any ajax/non ajax request exception
> - user close browser
> - root user can view other customer devices (switch via dropdown), I  
> have to
> change filtering on JMS messages real-time
> - ?
>
> I've made some tests with a @SessionScoped @ManagedBean implements JMS
> MessageListener, but for example @PreDestroy never invoked...JMS is still
> opened after HttpSession is expired.
>
>     @Component
>     @ManagedBean
>     @SessionScoped
>     public class HomePage extends BasePage implements Serializable,
> MessageListener { // SessionAwareMessageListener
>     private static final long serialVersionUID = 1L;
>     /*
>     * Channel name (url) for client push request
>     */
>     private final static String CHANNEL = "/notify";
>     @Autowired
>     private CarManager carManager;
>     @Autowired
>     private TopicConnectionFactory connectionFactory;
>     @Autowired
>     private Topic jmsTopic;
>     private List<Car> carsOnMap;
>     private Map<String, CarDTO> carByIMEI;
>     private TopicConnection connection;
>     private TopicSession session;
>     private TopicSubscriber subscriber;
>     private ObjectMapper jsonMapper = new ObjectMapper();
>     public HomePage() {
>     super();
>     }
>     public void startJMSTopicSubscriber(String msgSelector) {
>     try {
>     if (connection == null) {
>     LOG.info("JMS connection not started yet, init and start now");
>     connection = connectionFactory.createTopicConnection();
>     connection.start();
>     } else {
>     LOG.info("JMS connection already started, close subscriber");
>     if (subscriber != null) {
>     subscriber.close();
>     }
>     // if (session != null) { session.close(); }
>     // connection.stop();
>     subscriber = null;
>     // session = null;
>     }
>     if (session == null) {
>     LOG.info("JMS connection session not created yet, create it");
>     session = connection.createTopicSession(false,
> Session.AUTO_ACKNOWLEDGE);
>     }
>     LOG.info("create JMS subscriber with selector: {}", msgSelector);
>     subscriber = session.createSubscriber(jmsTopic, msgSelector,  
> false);
>     subscriber.setMessageListener(this);
>     } catch (Exception e) {
>     LOG.error("Error in JMS Topic start", e);
>     }
>     }
>     public void stopJMSTopicSubscriber() {
>     try {
>     if (subscriber != null) {
>     subscriber.close();
>     LOG.debug("JMS subscriber closed on leaving HomePage view");
>     }
>     } catch (JMSException e) {
>     LOG.error("Error closing JMS subscriber", e);
>     }
>     try {
>     if (session != null) {
>     session.close();
>     LOG.debug("JMS session closed on leaving HomePage view");
>     }
>     } catch (JMSException e) {
>     LOG.error("Error closing JMS session", e);
>     }
>     try {
>     if (connection != null) {
>     connection.stop();
>     connection.close();
>     LOG.debug("JMS connection stopped/closed on leaving HomePage  
> view");
>     }
>     } catch (JMSException e) {
>     LOG.error("Error stopping/closing JMS connection", e);
>     }
>    
>     }
>    
>     @PostConstruct
>     public void init() {
>     reloadCars();
>     }
>     @PreDestroy
>     public void destroy() {
>     LOG.debug("PreDestroy called, try stoppping JMS");
>     stopJMSTopicSubscriber();
>     }
>     public void reloadCars() {
>     LOG.debug("Reloading cars from DB");
>     if (managedPartner == null) {
>     managedPartner = getLoggedInUser().getPartner();
>     }
>     if (managedPartner != null) {
>     carsOnMap = getCarManager().getCarsByPartner(managedPartner);
>     } else {
>     carsOnMap = new ArrayList<Car>();
>     }
>     LOG.debug("Generate IMEI filter for JMS subscriber; build CarByIME
> map");
>     String imeis = "";
>     StringBuilder sb = new StringBuilder("imei IN (");
>     carByIMEI = new HashMap<String, CarDTO>();
>     int i = 0;
>     for (Car car : carsOnMap) {
>     if (car.getDevice() != null &&
> StringUtils.isNotBlank(car.getDevice().getMobileDevice())) {
>     CarDTO carDTO = new CarDTO(car);
>     carByIMEI.put(car.getDevice().getMobileDevice(), carDTO);
>     if (i > 0) {
>     sb.append(',');
>     }
>     sb.append('\'');
>     sb.append(car.getDevice().getMobileDevice());
>     sb.append('\'');
>     }
>     i++;
>     }
>     sb.append(')');
>     imeis = sb.toString();
>     startJMSTopicSubscriber(imeis);
>     RequestContext reqCtx = RequestContext.getCurrentInstance();
>     try {
>     reqCtx.addCallbackParam("carsIMEI",
> jsonMapper.writeValueAsString(carByIMEI));
>     } catch (Exception e) {
>     LOG.error("Error in mapping car imeis to JSON", e);
>     }
>     } // reloadCars
>     @Override
>     public void onMessage(Message msg) {
>     try {
>     if (msg instanceof ActiveMQBytesMessage) {
>     ActiveMQBytesMessage byteMsg = (ActiveMQBytesMessage) msg;
>     WMessage wmsg = null;
>     String invalidMsg = "";
>     try {
>     wmsg = new WMessage(correctbuf);
>     } catch (Exception e1) {
>     invalidMsg = processDeviceJMSMsgData(buf);
>     }
>     if (wmsg != null) {
>     EventBus eventBus = EventBusFactory.getDefault().eventBus();
>     eventBus.publish(CHANNEL, wmsg);
>     } else {
>     LOG.warn("INVALID Msg: {}", invalidMsg);
>     }
>     } else {
>     LOG.trace(String.format("* * * JMS onMessage, type: %s, msg:%s",
> msg.getClass().getName(), msg));
>     }
>     } catch (Exception jmsex) {
>     LOG.error("device JMS onMessage error:", jmsex);
>     }
>     } // onMessage
>    
>     }
>
>
>     @PushEndpoint("/notify")
>     @Singleton
>     public class NotifyResource {
>    
>     //protected final Log log = LogFactory.getLog(getClass());
>     private static final Logger LOG =
> LoggerFactory.getLogger(NotifyResource.class);
>    
>     @OnOpen
>     public void onOpen(RemoteEndpoint r, EventBus e) {
>     LOG.debug("push onOpen");
>     }
>    
>     @OnClose
>     public void onClose(RemoteEndpoint r, EventBus e) {
>     LOG.debug("push onClose");
>     }
>    
>     @OnMessage(encoders = { JSONEncoder.class })
>     public WMessage onMessage(WMessage message) {
>     //log.debug(message);
>     LOG.debug(message.toString());
>     return message;
>     }
>     }
>
> I implemented a Spring HttpSessionEventPublisher to catch session destroy
> but I don't know how to get "httpsession active" JMS connection to close.
> Should I manage the JMS connections pro user at my own?!?
>
>     public class JMSContainerShutDownHook extends  
> HttpSessionEventPublisher
> implements ServletContextListener {
>
> private static final Log log =
> LogFactory.getLog(JMSContainerShutDownHook.class);
>
> private void shutDownJMSContainer(ServletContext servletContext) {
> log.info("Destroyed called");
> ...
> log.info("Exiting.");
> }
>
> @Override
> public void contextInitialized(ServletContextEvent servletContextEvent)  
> {
> log.info("contextInitialized called");
> }
>
> @Override
> public void contextDestroyed(ServletContextEvent servletContextEvent) {
> log.info("contextDestroyed called");
> ServletContext servletContext =  
> servletContextEvent.getServletContext();
> shutDownJMSContainer(servletContext);
> }
>
> @Override
> public void sessionCreated(HttpSessionEvent event) {
> log.info("HTTP sessionCreated called");
> ServletContext servletContext = event.getSession().getServletContext();
> super.sessionCreated(event);
> }
>
> @Override
> public void sessionDestroyed(HttpSessionEvent event) {
> log.info("HTTP sessionDestroyed called");
> ServletContext servletContext = event.getSession().getServletContext();
> shutDownJMSContainer(servletContext);
> super.sessionDestroyed(event);
> }
>
> }
>
> Any kind of help is welcome
>
>
>
>
>
> --
> View this message in context:  
> http://appfuse.547863.n4.nabble.com/Architect-design-JMS-Topic-messages-to-JSF-2-2-PrimeFaces-client-tp4657729.html
> Sent from the AppFuse - User mailing list archive at Nabble.com.
>


--
[Insurance Consulting Group ICG SpA]



Patrizio De Michele
       

Via G. March, 14 scala B

57121 Livorno (Livorno)



  Tel. 0185-364600 Int. 402



Email
Skype

: [hidden email]
: pdemichele


Numero Verde: 800-900-424
www.icgnet.it



Ai sensi del Decreto Leigislativo 196/2003 si precisa che le informazioni  
contenute in questo messaggio sono riservate ad uso esclusivo del
destinatario. Qualora il messaggio in parola Le fosse pervenuto per  
errore, La preghiamo di eliminarlo senza copiarlo e di non inoltrarlo a  
terzi,
dandone gentilmente comunicazione. Grazie.

Pursuant to Legislative Decree n° 196/2003, you are hereby informed that  
this message contains confidential information indended only for the use  
of the addressee. If you are not the addressee, and have received this  
message by mistake, please delete it and immediately notify us. You may  
not copy or disseminate this message to anyone. Thank you.





Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Architect design JMS Topic messages to JSF 2.2 (PrimeFaces) client

boborjan
Hi Patrizio De Michele,

thank you for thinking about my doubts.
Sorry but I really need some code samples

Perhaps I don't need to broadcasting JMS messages per IMEI, because a customer (partner) has a unique code and want to see all of his vehicles movement "at once" on the map.

I don't see the multchat example, where is it? And I don't see any example with JMS.

Thanks Bye

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Architect design JMS Topic messages to JSF 2.2 (PrimeFaces) client

Patrizio De Michele
Follow the link http://blog.primefaces.org/?p=3066, at the end talks about
a chat.
If you want to group vehicles per partner you have to use a key
(call it "partnerId") to distinguish between partners
and inside the message coming from the client you'll have a field
for the 'imei'.

There are many chat examples in the standard samples of the atmosphere
distribution.
Here you can find all the examples:

https://github.com/Atmosphere/atmosphere-samples/tree/master/samples

Maybe you can ask for help on the atmosphere google group or
to primefaces users.

Bye, Patrizio

> Hi Patrizio De Michele,
>
> thank you for thinking about my doubts.
> Sorry but I really need some code samples
>
> Perhaps I don't need to broadcasting JMS messages per IMEI, because a
> customer (partner) has a unique code and want to see all of his vehicles
> movement "at once" on the map.
>
> I don't see the multchat example, where is it? And I don't see any  
> example
> with JMS.
>
> Thanks Bye
>
>
>
>
>
> --
> View this message in context:  
> http://appfuse.547863.n4.nabble.com/Architect-design-JMS-Topic-messages-to-JSF-2-2-PrimeFaces-client-tp4657729p4657731.html
> Sent from the AppFuse - User mailing list archive at Nabble.com.
>


--
[Insurance Consulting Group ICG SpA]



Patrizio De Michele
       

Via G. March, 14 scala B

57121 Livorno (Livorno)



    Tel. 0185-364600 Int. 402



Email
Skype

: [hidden email]
: pdemichele


Numero Verde: 800-900-424
www.icgnet.it



Ai sensi del Decreto Leigislativo 196/2003 si precisa che le informazioni
contenute in questo messaggio sono riservate ad uso esclusivo del
destinatario. Qualora il messaggio in parola Le fosse pervenuto per
errore, La preghiamo di eliminarlo senza copiarlo e di non inoltrarlo a
terzi,
dandone gentilmente comunicazione. Grazie.

Pursuant to Legislative Decree n° 196/2003, you are hereby informed that
this message contains confidential information indended only for the use
of the addressee. If you are not the addressee, and have received this
message by mistake, please delete it and immediately notify us. You may
not copy or disseminate this message to anyone. Thank you.
Loading...