The Open Anzo Project

Semantic Application Middleware

Connecting the Web Client to the JMS Communication Bus using Cometd and Bayeux

Overview

The main method of communication with the Anzo services is via the CommunicationBus (a.k.a combus). One can think of the communication bus as essentially a cloud to which you can publish a request, it will be handled by an appropriate service, and a response will eventually come out of the cloud. Operations are inherently asynchronous. A particular message is identified as a response to a particular request by using unique correlation ids in the messages. The main mechanism for participating in the CommunicationBus is via JMS. The web browsers can't communicate via JMS, we need a mechanism for participating in the combus via HTTP.

Messages can come to clients on the combus at any moment. Some are notification messages rather than responses to a particular request. HTTP does not normally allow incoming messages so we need special techniques to simulate that mechanism. Comet (or reverse AJAX) is a broad name for such techniques. Anzo.JS uses the Cometd framework to implement Comet. Cometd is essentially an implementation of the Bayeux protocol on top of HTTP.

Cometd allows different mechanisms for simulating incoming messages via HTTP. The one we will mainly be using is referred to a long polling. The rough idea is that the client makes an XMLHttpRequest call to the server to ask for any messages pending for the client. If there are any messages pending, then they are sent back immediately via the HTTP response. If there aren't any messages, the server keeps the connection open waiting for a message to arrive. It waits for a message for about 30 seconds or so. If no message has arrived, it closes the connection, waits a bit and reopens another connection later. The technique is essentially basic polling but with the longer lasting connections to reduce the number of times a connection is started and stopped. That is essentially an optimization which allows closer to real-time message arrival as well as reduced the initial connection overhead, typically the most intensive part of the HTTP conversation.

Extending ActiveMQ's JMS to the web client

The main idea is that we use Bayeux to expose basic JMS functionality to the Anzo.JS client. So essentially we are bridging JMS over Bayeux.

The client API we provide will expose the following basic functionality:

  • connect and disconnect
    • This will handle all of the process of connection to Bayeux and the combus. It will perform the Bayeux handshake and will create a JMS temporary topic for messages intended for this particular client to be sent.
  • publish
    • This is the mechanism for sending a message to a service when a response is expected. This is sometimes calle dthe request/response mechanism.
    • Internally is will take care of generating a unique correlation ID for the request. When the response arrives, it will match the correlation id and call the appropriate callback.
  • topicSubscribe/topicUnsubscribe
    • One key service of the combus is the ability to receive notifications whenever a particular named graph changes. The methods allow the client to register a listener for such notifications. The named graph changes are simply published on a JMS topic. This mechanism allows web clients to subscribe to those topics as well as other topics such as the transaction event topic.
  • messageListeners
    • Other systems on the combus such as the real-time update notification system may send a client messages at any time. The client can register listeners for any message that isn't a graph update message or a response to another message.

Mapping JMS onto Bayeux

We use the Cometd framework included with Jetty and Dojo as a base transport between web clients and the server. Cometd is an implementation of the Bayeux protocol, a pub-sub protocol between web clients, brokered in the web server. Unlike the ActiveMQ-jetty bridge which enables web clients to participate in pub-sub via a centralized MQ broker, the Cometd/Bayeux model actually hosts a broker in the web server, managing pub/sub between web clients only. However, because Cometd allows us to listen to Bayeux channels and publish messages on the web server to web clients using a simple extension point, we can easily use this mechanism to provide our bridge between Bayeux and JMS. This functionality is implemented as a special listener, BayeuxJMSBridge.

Our end goal is to have a general JMS client in Javascript that hides the underlying Bayeux transport and Cometd mechanism. However, we include only the parts of JMS that we need to implement our core Anzo.JS services, and we bend the abstraction slightly to perform anzo-specific handshaking at the JMS level. For example, JMS request/response operations assume the existence of a special temporary JMS destination that the server sets up on behalf of the user. What follows is a detailed description of the protocol:

  1. Connecting
    1. Bayeux/Cometd handshake: Under the covers, Cometd does a handshake with the Cometd servlet. The important bit about the handshaking that we'll use later on is that the server assigns the client a unique (to the server) clientId.
    2. JMSConnect
      1. Setup Messaging to Client: Even though Bayeux supports multiple channels of communication, for simplicity, all messages in our system will reach the client over a single bayeux channel of the form /anzo/user/<username>/<clientId>. The clientId is required because a single user may connect from multiple web clients. Thus, the first stage of connection is the cometd client subscribing to that single channel. This subscription process is first-order operation on the Cometd dojo libary, and his handled automatically by the Cometd server mechanism.
      2. Setup temporary JMS destination: The Bayeux channel is only a transport-layer mechanism for us, and should not be confused with the JMS destination that entities outside of the web world use to send messages to the web client. To create such a destination, the JMSClient sends a connect message over the Bayeux channel /anzo/control. The BayeuxJMSBridge is subscribed to the control channel, and thus gets called when a message arrives. The message type, in this case connect is a concept invented by us to multiplex control messages. In response to the connect message, the server uses the Java JMS client to create a temporary topic, and associates that topic (destination, a generic term for a JMS queue or topic) with the clientId, say in in Map. It also creates a JMS Message Consumer that effectively subscribes to the JMS destination, on behalf of the user. It then sends a response message to the connecting client. We deviate slightly from a standard JMS subscription here but not sending the temporary destination Id to the client. The web client need not know about, only that a way now exists for messages to be delivered to it. The Bayeux Bridge will keep track of the mapping between clients and temporary JMS destinations. A quick discussion point here. All the connect messages from the client contained a correlationId that we place in the response so the client can match request/response pairs. The Bayeux protocol contains a messageId field that perhaps could have been used for this purpose. We don't use it because the semantics of a Bayeux messageId are a single message from a client to server or vice versa. A correlationId is a more long-lived id that identifies a transaction.
  2. Publishing a message. In the case of anzo-js, nearly all messages sent from our javascript JMSClient will be requests in request/response pairs. To send a message, we create a simple JSON packet including the body text, body format, the JMS destination, and a correlationId derived from our clientId and an increasing integer counter. We then send this message over Bayeux to /anzo/bridge, a channel designed to forward message from the web client to the ActiveMQ broker. When the bridge receives a message on this channel, it pulls out the various bits from the Bayeux message, and creates an analogous JMS message. An important step is to lookup the temporary JMS destination for the clientId, and set this as the reply destination as the JMS message. Finally, the message is sent out over JMS.
  3. Receiving a message. Receiving a message works very much like publishing, only in reverse. The JMS message listener running in the bridge, receives a message over JMS. We grab the destination from the message, and use it to lookup the clientId and username associated with that temporary destination. We then derive the proper Bayeux Channel and forward the message along.
  4. Request-response The JMSClient maintains a table of request/response id-callback pairs. When a message comes in with a known correlationId, the proper callback. When a request is issued, the JMSClient also starts a timeout timer so that if a response is not received, we can call the callback with an error. In this way, we handle timeouts as well as make the request/response table does not grow infinitely large.
  5. Topic (un)Subscriptions To subscribe to a updates about particular named graph and other pub/sub topics, the client submits a topicSubscribe message to /anzo/control containing the name of the topic. The BayeuxJMSBridge will need to subscribe to the topic to receive the notification on behalf of the client. The issue here is that the bridge is subscribing as a superuser since all clients share its single JMS connection. Topics (for example those used to send graph updates) have their access control handled by the JMS subsystem. For example, only users that can read a graph are allowed to subscribe to its topic. So the bridge must perform access control checks before allowing a client to subscribe to updates. All clients subscribed to the same topic will share the bridge's one subscription to that topic. So the bridge essentially keeps a list of subscribed users on its own. When a topic's access control changes, the bridge must also be sure to unsubscribe any clients that no longer have access to a particular topic. It does this by listening to events via the IAuthorizationEventListener interface. When it sees a topic's read privileges removed, it goes through each subscribed client an kicks out any that do not have read access any longer. The update message reach the client via the regular channel that all messages reach it (/anzo/user/<username>/<clientId>).
  6. Disconnecting: To review, a logic JMS connection between the web client and the server is really a set of Java JMS resources that the server has allocated on behalf of the client. So far, we have setup a temporary JMS topic and corresponding MessageConsumer to receive messages on that topic. These must be torn down once it has been determined the user has disconnected. Additional similar resources will be related to message selectors used in selector trackers for notification (see below). Now we must determine what is meant by disconnect. Ideally, the client will invoke our disconnect routine by publishing a disconnect message on /anzo/control. In that case we simply close all the resources we have allocated. The difficult bit comes when the client passively disconnects. Fortunately, the underlying Cometd/Bayeux protocol involves a period reconnect message, each time the parked long-poll request times-out. We can listen for these reconnects, and if we do not hear one in a particular amount of time, we can force a disconnect of the user. If the client were to later attempt to send a message, we can send an error indicating that the client must reconnect.
  7. Security:
    1. Authentication: Authentication of web clients is done using built in mechanisms of Jetty or other webapp server. Once the user has been authenticated against the Anzo Authentication Service, we'll set the authenticated userId on a thread local variable so that it may be accessed inside our Bayeux extension classes for authorization. We are a bit concerned about using the thread local because of the continuation stuff, but the Jetty/Cometd developer advised me to do it that way, and in initial test, it seems to work.
    2. Authorization: All users are authenticated at the HTTP basic auth level so non-authenticated users can cause limited mischief in the system. Authenticated users must only be prevented from receiving messages not intended from them, and must not send messages into the JMS cloud purporting to be from a different user. The latter breach is prevented inherently by the system design because the BayeuxJMSBridge appends the replyTo address as well as runAsUser properties on the JMS message before sending it out. It also assures that only these values are set, removing any incorrect or malicious values added by the client. Again we have must break our abstraction slightly setting the runAsUser value on the server. This is a higher level concept that we are dealing with at the JMS level. However, for security, the server, logged in as the system user, must vouch for the identity of the web client. The former concern (messages reaching only the correct web client) requires a special mechanism. Recall that that web clients receive messages via special channels that they subscribe to, and any user subscribed to that special channel will receive those messages. The broker cannot check at delivery time that all subscribed users are authorized recipients. Therefore, we must only allow the authorized users to subscribe to the channel that is created for them. Without such authorization, consider the following attack. A malicious user sniffs the wire for JMS packets, and sees the temporary Bayeux channel for another user. He then subscribes to this channel to receive all of the messages. All we have to do is to compare the authenticated username with the channel the client is subscribing to. Fortunately, the Cometd/Bayeux framework allows us to register a SecurityPolicy that allows us to do just that. Note that graph update messages and arbitrary topics have a slightly different authorization technique which is described above.
  8. Performance and logging: For scalability and logging purposes we must include a couple pieces of information in the HTTP headers.
    1. BridgeId In distributed deployments, all Bayeux messages must contain an identifier of the application server holding the state for the client. This will be used by a network dispatcher or sprayer to make sure that once a client has been assigned to a particular bridge, all messages for the duration of the bayeux connection will be sprayed there.
    2. CorrelationId Because we may have to examine HTTP logs to debug problems on the server, it is important that the correlationId, which also contains the clientId, be readily accessible to software the searches and displays HTTP log files.

Alternative BayeuxJMSBridge Design

The design above truly treats the BayeuxJMSBridge as a JMS client on the combus. However, it is not strictly necessary that the bridge be a true JMS client. An alternative design is to treat Bayeux as a separate protocol over which service container components may expose their services. the main advantage of this design would be efficiency in the situation where other services resider in the same service container. The current design still has the messages go over JMS. But it's possible the communication could have happened with straight method calls.

Copyright © 2007 - 2008 OpenAnzo.org