Hi! Welcome...

I am Ashish. I am Member of Apache MINA PMC, ASF Committer and avid Code hacker. Contact me at paliwalashish at gmail dot com. I work for Terracotta [http://www.terracotta.org] as Solution Architect. I occasionally contribute to jsmpp [http://code.google.com/p/jsmpp/] .

21 November 2008 ~ 0 Comments

Apache MINA – Blacklist Filter Explained


Blacklist filter blocks connections from the blacklisted remote Addresses. The filter is very useful, for dropping the connection originating from addresses, not of interest.

Useful API’s

setBlacklist(InetAddress[] addresses) – Sets a list of IP Addresses that needs to be blocked. This API call clears all the previously added Addresses

setSubnetBlacklist(Subnet[] subnets) – Sets a List of Subnets to be blocked. This API call clears all the previously added Addresses

setBlacklist(Iterable<InetAddress> addresses) – Sets a list of IP Addresses that needs to be blocked. This API call clears all the previously added Addresses

setSubnetBlacklist(Iterable<Subnet> subnets) - Sets a List of Subnets to be blocked. This API call clears all the previously added Addresses

block(InetAddress address) - Adds a remote address to the blacklist

block(Subnet subnet) – Adds a Subnet to be blocked

unblock(InetAddress address) – Removes remote address from the blacklist

unblock(Subnet subnet) – Removes a Subnet from the blacklist

Under the Hood

For each of the events, the Filter checks for the remote address, in the blacklisted Address. If found, the session is closed. The isBlocked() API, checks for the remote address in the configured list and returns true if found.

 

Blacklist Filter

Blacklist Filter

How to Use Blacklist Filter

To use a blacklist filter, we need to follow some simple steps listed below

  • Create an instance of Blacklist filter
  • Add IP Addresses/Subnets to be blacklisted
  • Add the Backlist filter to the Filter chain

Code Snippet

public BlacklistFilter createBlacklistFilter() throws UnknownHostException {
        BlacklistFilter filter = new BlacklistFilter();
 
        // Get the IP Addresses to be blocked
        // from a config file or from DB, whatever
        InetAddress[] blockedAddresses = null;
 
        // populate the list
        // Ensure that the list if populated correctly as per requirement
        blockedAddresses = new InetAddress[1];
 
        // block all connection from Localhost
        blockedAddresses[0] = InetAddress.getByName("127.0.0.1");
        filter.setBlacklist(blockedAddresses);
 
        return filter;
    }

The code is fairly straight forward. We create an instance of Blacklist Filter and add the list of Addresses to be blocked. Then we need to add the Filter instance in the Filter chain of our implementation.

Usage Scenario’s

  • Conditionally allow traffic from only selected sources

Think Differently :-)

  • The default MINA implementation stores the blacklisted Addresses in a List, Custom data structure can be used to speed up search
  • Can extend the Filter’s functionality to allow traffic only from selected sources.

08 November 2008 ~ 0 Comments

Apache MINA – CumulativeProtocolDecoder Explained


Apache MINA uses the concept of Protocol Decoder to decoder read bytes into High Level Message objects. There are conditions in which a single packet may not contain the complete bytes to convert raw message into High Level Objects. For such situations, CumulativeProtocolDecoder can be used. Basically, encoder makes the framework buffer the data, till the complete data is received, to convert the byte[] into a high level object.

So, how do we use this Decoder

Extend from the class CumulativeProtocolDecoder and implement the doDecode().

Conditions to take care of

  1. read the IoBuffer and decode as per yur logic
  2. If the data is sufficient to create your message, return true, else return false
Under the Hood
The magic lies in the asbtract class CumulativeProtocolDecoder, which internally buffer the data and keep on calling our doDecoder(). Based on the return type from doDecode(), it either keeps buffering or allows the framework to call next filter in the chain.
Implementation of CumulativeProtocolDecoder can be seen in detail in the following post

01 November 2008 ~ 7 Comments

Integrating Apache MINA with Spring


So far we have seen Apache MINA code samples in standalone form. Apache MINA can be nicely integrated into DI frameworks like Spring. Lets see how to integrate a simple MINA application with Spring

This is how our application is structure. To see details of this application, please refer to the post Implementing Trap Receiver in 30 minutes using Apache MINA

To integrate with Spring, we need to do following

  • One Handler
  • Two Filter – Logging Filter and a ProtocolCodec Filter
  • NioDatagram Socket

This is how our code looks like for the standalone application

public void initialize() throws IOException {
 
	// Create an Acceptor
	NioDatagramAcceptor acceptor = new NioDatagramAcceptor();
 
	// Add Handler
	acceptor.setHandler(new ServerHandler());
 
	acceptor.getFilterChain().addLast("logging",
				new LoggingFilter());
	acceptor.getFilterChain().addLast("codec",
				new ProtocolCodecFilter(new SNMPCodecFactory()));
 
	// Create Session Configuration
	DatagramSessionConfig dcfg = acceptor.getSessionConfig();
        dcfg.setReuseAddress(true);
        logger.debug("Starting Server......");
        // Bind and be ready to listen
        acceptor.bind(new InetSocketAddress(DEFAULT_PORT));
        logger.debug("Server listening on "+DEFAULT_PORT);
}

To integrate with Spring, we need to do following

  1. Set the IO handler
  2. Create the Filters and add to the chain
  3. Create the Socket and set Socket Parameters

NOTE: The latest MINA releases doesn’t have the package specific to Spring, like its earlier versions. The package is now named Integration Beans, to make the implementation work for all DI frameworks.

Lets see the Spring xml file. Please see that I have removed generic part from xml and have put only the specific things needed to pull up the implementation. This example has been derived from Chat example shipped with MINA release.

Now lets pull things together

  1. Lets set the IO Handler

  2. Lets create the Filter chain
  3. Here, we create instance of our IoFilter. See that for the ProtocolCodec factory, we have used Constructor injection. Logging Filter creation is straight forward. Once we have defined the beans for the filters to be used, we now create the Filter Chain to be used for the implementation. We define a bean with id “FilterChainBuidler” and add the defined filters to it. We are almost ready, and we just need to create the Socket and call bind

  4. Lets complete the last part of creating the Socket and completing the chain

Now we create our ioAcceptor, set IO handler and Filter Chain. Now we have to write a function to read this file using Spring and start our application. Here’s the code

public void initializeViaSpring() throws Exception {
	new ClassPathXmlApplicationContext("trapReceiverContext.xml");
}

We just call this method from main, and this shall initialize our MINA application. Will try to write a post using some other DI framework like Google Guice.

Reference:

31 October 2008 ~ 11 Comments

Implementing XML Decoder for Apache MINA


Will continues to post articles about MINA, to be updated, Subscribe in a reader

Apache MINA has wonderful concept of ProtocolDecoder to process Decoding protocol specific messages. XML is one of the most widely used format for EDA. Lets see how can we implement a Protocol Decoder for Apache MINA.

Algorithm

The picture below describes the basic algorithm that we need to use to construct an XML message from bytes.

 

The logic is simple, keep reading the bytes till the XML message is balanced. Balanced here means, that end of the root element has been achieved. For eg. If the xml document has root element as , we have to read the bytes till we received .

Its very particular to note that large XML packets when sent over TCP, may get fragmented and we shall received the same amount of read events while using Apache MINA low level API’s.

This type of situations where, we need to wait to data to completely arrive, calls for the use of CumulativeProtocolDecoder. As the name signifies, the decoder waits till, we get the balanced xml. Once the balanced XML is found, we write the parsed object to the output, to be processed further.

Lets see the code. My apologies for the unformatted code :-(

public abstract class XMLDecoder extends CumulativeProtocolDecoder  {
 
/*
* As per XML specification 1.0, http://www.w3.org/TR/REC-xml
*/
private static final char XML_START_TAG = '&lt;';
private static final char XML_END_TAG = '&gt;';
private static final char XML_PI_TAG = '?';
private static final char XML_COMMENT_TAG = '!';
 
protected static enum ParseState {ELEMENT_START, ELEMENT_END, COMMENTS, ENDELEMENT, PI, UNDEFINED};
 
protected static final int ELEMENT_START = 1;
protected static final int ELEMENT_END = 2;
 
private static Logger logger = LoggerFactory.getLogger(XMLDecoder.class);
 
@Override
protected boolean doDecode(IoSession session, IoBuffer ioBuffer,
ProtocolDecoderOutput decoderOutput) throws Exception {
 
int startPosition = ioBuffer.position();
 
if(!ioBuffer.hasRemaining()) {
logger.debug("NO bytes to read keep waiting...");
return false;
}
 
// Continue to read the bytes and keep parsing
char currentChar = '0', previousChar = '0';
 
boolean rootElementStarted = false;
boolean rootElementPresent = false;
boolean isBalanced = false;
 
int rootStartPosition, rootEndPosition;
 
ParseState parsingState = ParseState.UNDEFINED;
logger.debug("Lets start decoding the XML");
 
String root = null;
 
boolean markedForEndElement = false;
 
while(ioBuffer.hasRemaining()) {
previousChar = currentChar;
currentChar = (char)ioBuffer.get();
 
switch (parsingState) {
case ELEMENT_START:
if(currentChar == XML_PI_TAG){
logger.debug("Got PI Element");
parsingState = ParseState.PI;
} else if(currentChar == XML_COMMENT_TAG) {
logger.debug("Got Comment Element");
parsingState = ParseState.COMMENTS;
} else if((currentChar == ' ' || currentChar == XML_END_TAG)
&amp;&amp; rootElementStarted &amp;&amp; !rootElementPresent) {
rootEndPosition = ioBuffer.position();
rootElementPresent = true;
 
// Copy the Root Element
int cPos = ioBuffer.position();
int mPos = ioBuffer.markValue();
 
char[] rootChar = new char[cPos - mPos];
for(int i = mPos - 1, j =0; i &lt; cPos - 1; i++) {
rootChar[j++] = (char)ioBuffer.get(i);
}
 
root = new String(rootChar);
logger.debug("Root Element = "+ root);
parsingState = ParseState.ELEMENT_END;
logger.debug("Root Element detection completed "+rootEndPosition);
} else if(currentChar == XML_END_TAG) {
parsingState = ParseState.ELEMENT_END;
} else if(!rootElementStarted &amp;&amp; !rootElementPresent) {
rootStartPosition = ioBuffer.position();
ioBuffer.mark();
rootElementStarted = true;
logger.debug("Got the root element at "+rootStartPosition);
} else if (currentChar == '/') {
// Change state
if(previousChar == XML_START_TAG) {
parsingState = ParseState.ENDELEMENT;
}
}
break;
 
case ENDELEMENT:
if(currentChar == XML_END_TAG) {
parsingState = ParseState.ELEMENT_END;
 
int cPos = ioBuffer.position();
int mPos = ioBuffer.markValue();
 
char[] el = new char[cPos - mPos];
for(int i = mPos - 1, j =0; i &lt; cPos - 1; i++) {
el[j++] = (char)ioBuffer.get(i);
}
markedForEndElement = false;
if(root.equalsIgnoreCase(new String(el))) {
logger.debug("XML is balanced."+root);
isBalanced = true;
}
 
break;
} else if (currentChar == ' ') {
continue;
} else {
 
// mark the position, we need to compare the it to see that if its the end element
if(!markedForEndElement) {
ioBuffer.mark();
markedForEndElement = true;
}
}
break;
 
case ELEMENT_END:
if(currentChar == XML_START_TAG) {
parsingState = ParseState.ELEMENT_START;
}
break;
 
case UNDEFINED:
if(currentChar == XML_START_TAG) {
parsingState = ParseState.ELEMENT_START;
}
break;
 
case COMMENTS:
if (currentChar == '-') {
previousChar = currentChar;
} else if (previousChar == '-' &amp;&amp; currentChar == '&gt;') {
parsingState = ParseState.ELEMENT_END;
}
break;
 
case PI:
if (currentChar == '?') {
previousChar = currentChar;
} else if (previousChar == '?' &amp;&amp; currentChar == XML_END_TAG) {
parsingState = ParseState.ELEMENT_END;
}
break;
 
default:
break;
}
}
 
if(isBalanced) {
decoderOutput.write(parserXML(ioBuffer));
}
 
if(isBalanced &amp;&amp; !ioBuffer.hasRemaining()) {
logger.debug("No more bytes to process");
return true;
}
 
ioBuffer.position(startPosition);
return false;
}
 
/**
* Extending classes can implement their custom XML parsing to create Objects
* from XML and use them appropriately in Handler
*
* @param xmlBuffer
* @return
*/
public abstract Object parserXML(IoBuffer xmlBuffer);
}

The implementation is pretty straight forward. We take each character and try to match the characters as specified in XML specification.

Some keys things in the implementation:
1. The Decode function just collects the bytes till we get the balanced XML document
2. Once we get the balanced XML document, we shall call the abstract function parseXML(). The function has been kept abstract, so that its easy to implement custom parsing using desired XML library like JAXB, JIBX etc
3. We have to return true from doDecode(), the moment we have balanced XML. Return type true indicates to the framework that we are not waiting for any more data. A false, forces the framework to keep accumulating the data, till we write it to the output. Now it must be clear why, its called Cumulative decoder.

Still have Queries, please leave a comment and I shall revert back to you.

28 October 2008 ~ 1 Comment

Implementing Trap Sender using SNMP4J


In this post, we shall implement a Trap Sender using SNMP4J. We may choose to use Apache MINA for sending Traps or can resort to using DatagramSocket class directly.

This shall be the logical flow of the implementation

  • Get the encoded Trap Data
  • Send the Trap 
Lets look at the first component, on getting the encoded Trap data

 

The code snippet above shows a simple way of creating and encoding a Trap PDU. Essentially, we create an instance of PDU class and sets the type as Trap. This is important, else SNMP4J shall throw an exception. Thereafter, we can set the trap parameters. Here, we have hardcoded the parameters, there can be custom implementations that can take these from config files or from UI. After setting the parameters, we just call the encode function passing the Output stream and collect the byte array to be sent.

Sending part is even simpler     


The send code is preety straight forward. Here we have used Datagram Socket, we can use Apache MINA UDP Client implementation to send the trap as well.

References