Will continues to post articles about MINA, to be updated,
Subscribe in a reader
Apache MINA has wonderful concept of ProtocolDecoder to process Decoding protocol specific messages. XML is one of the most widely used format for EDA. Lets see how can we implement a Protocol Decoder for Apache MINA.
Algorithm
The picture below describes the basic algorithm that we need to use to construct an XML message from bytes.

The logic is simple, keep reading the bytes till the XML message is balanced. Balanced here means, that end of the root element has been achieved. For eg. If the xml document has root element as , we have to read the bytes till we received .
Its very particular to note that large XML packets when sent over TCP, may get fragmented and we shall received the same amount of read events while using Apache MINA low level API’s.
This type of situations where, we need to wait to data to completely arrive, calls for the use of CumulativeProtocolDecoder. As the name signifies, the decoder waits till, we get the balanced xml. Once the balanced XML is found, we write the parsed object to the output, to be processed further.
Lets see the code. My apologies for the unformatted code
public abstract class XMLDecoder extends CumulativeProtocolDecoder {
/*
* As per XML specification 1.0, http://www.w3.org/TR/REC-xml
*/
private static final char XML_START_TAG = '<';
private static final char XML_END_TAG = '>';
private static final char XML_PI_TAG = '?';
private static final char XML_COMMENT_TAG = '!';
protected static enum ParseState {ELEMENT_START, ELEMENT_END, COMMENTS, ENDELEMENT, PI, UNDEFINED};
protected static final int ELEMENT_START = 1;
protected static final int ELEMENT_END = 2;
private static Logger logger = LoggerFactory.getLogger(XMLDecoder.class);
@Override
protected boolean doDecode(IoSession session, IoBuffer ioBuffer,
ProtocolDecoderOutput decoderOutput) throws Exception {
int startPosition = ioBuffer.position();
if(!ioBuffer.hasRemaining()) {
logger.debug("NO bytes to read keep waiting...");
return false;
}
// Continue to read the bytes and keep parsing
char currentChar = '0', previousChar = '0';
boolean rootElementStarted = false;
boolean rootElementPresent = false;
boolean isBalanced = false;
int rootStartPosition, rootEndPosition;
ParseState parsingState = ParseState.UNDEFINED;
logger.debug("Lets start decoding the XML");
String root = null;
boolean markedForEndElement = false;
while(ioBuffer.hasRemaining()) {
previousChar = currentChar;
currentChar = (char)ioBuffer.get();
switch (parsingState) {
case ELEMENT_START:
if(currentChar == XML_PI_TAG){
logger.debug("Got PI Element");
parsingState = ParseState.PI;
} else if(currentChar == XML_COMMENT_TAG) {
logger.debug("Got Comment Element");
parsingState = ParseState.COMMENTS;
} else if((currentChar == ' ' || currentChar == XML_END_TAG)
&& rootElementStarted && !rootElementPresent) {
rootEndPosition = ioBuffer.position();
rootElementPresent = true;
// Copy the Root Element
int cPos = ioBuffer.position();
int mPos = ioBuffer.markValue();
char[] rootChar = new char[cPos - mPos];
for(int i = mPos - 1, j =0; i < cPos - 1; i++) {
rootChar[j++] = (char)ioBuffer.get(i);
}
root = new String(rootChar);
logger.debug("Root Element = "+ root);
parsingState = ParseState.ELEMENT_END;
logger.debug("Root Element detection completed "+rootEndPosition);
} else if(currentChar == XML_END_TAG) {
parsingState = ParseState.ELEMENT_END;
} else if(!rootElementStarted && !rootElementPresent) {
rootStartPosition = ioBuffer.position();
ioBuffer.mark();
rootElementStarted = true;
logger.debug("Got the root element at "+rootStartPosition);
} else if (currentChar == '/') {
// Change state
if(previousChar == XML_START_TAG) {
parsingState = ParseState.ENDELEMENT;
}
}
break;
case ENDELEMENT:
if(currentChar == XML_END_TAG) {
parsingState = ParseState.ELEMENT_END;
int cPos = ioBuffer.position();
int mPos = ioBuffer.markValue();
char[] el = new char[cPos - mPos];
for(int i = mPos - 1, j =0; i < cPos - 1; i++) {
el[j++] = (char)ioBuffer.get(i);
}
markedForEndElement = false;
if(root.equalsIgnoreCase(new String(el))) {
logger.debug("XML is balanced."+root);
isBalanced = true;
}
break;
} else if (currentChar == ' ') {
continue;
} else {
// mark the position, we need to compare the it to see that if its the end element
if(!markedForEndElement) {
ioBuffer.mark();
markedForEndElement = true;
}
}
break;
case ELEMENT_END:
if(currentChar == XML_START_TAG) {
parsingState = ParseState.ELEMENT_START;
}
break;
case UNDEFINED:
if(currentChar == XML_START_TAG) {
parsingState = ParseState.ELEMENT_START;
}
break;
case COMMENTS:
if (currentChar == '-') {
previousChar = currentChar;
} else if (previousChar == '-' && currentChar == '>') {
parsingState = ParseState.ELEMENT_END;
}
break;
case PI:
if (currentChar == '?') {
previousChar = currentChar;
} else if (previousChar == '?' && currentChar == XML_END_TAG) {
parsingState = ParseState.ELEMENT_END;
}
break;
default:
break;
}
}
if(isBalanced) {
decoderOutput.write(parserXML(ioBuffer));
}
if(isBalanced && !ioBuffer.hasRemaining()) {
logger.debug("No more bytes to process");
return true;
}
ioBuffer.position(startPosition);
return false;
}
/**
* Extending classes can implement their custom XML parsing to create Objects
* from XML and use them appropriately in Handler
*
* @param xmlBuffer
* @return
*/
public abstract Object parserXML(IoBuffer xmlBuffer);
}
The implementation is pretty straight forward. We take each character and try to match the characters as specified in XML specification.
Some keys things in the implementation:
1. The Decode function just collects the bytes till we get the balanced XML document
2. Once we get the balanced XML document, we shall call the abstract function parseXML(). The function has been kept abstract, so that its easy to implement custom parsing using desired XML library like JAXB, JIBX etc
3. We have to return true from doDecode(), the moment we have balanced XML. Return type true indicates to the framework that we are not waiting for any more data. A false, forces the framework to keep accumulating the data, till we write it to the output. Now it must be clear why, its called Cumulative decoder.
Still have Queries, please leave a comment and I shall revert back to you.