Implementing XML Decoder for Apache MINA

Will continues to post articles about MINA, to be updated, Subscribe in a reader

Apache MINA has wonderful concept of ProtocolDecoder to process Decoding protocol specific messages. XML is one of the most widely used format for EDA. Lets see how can we implement a Protocol Decoder for Apache MINA.

Algorithm

The picture below describes the basic algorithm that we need to use to construct an XML message from bytes.

 

The logic is simple, keep reading the bytes till the XML message is balanced. Balanced here means, that end of the root element has been achieved. For eg. If the xml document has root element as , we have to read the bytes till we received .

Its very particular to note that large XML packets when sent over TCP, may get fragmented and we shall received the same amount of read events while using Apache MINA low level API’s.

This type of situations where, we need to wait to data to completely arrive, calls for the use of CumulativeProtocolDecoder. As the name signifies, the decoder waits till, we get the balanced xml. Once the balanced XML is found, we write the parsed object to the output, to be processed further.

Lets see the code. My apologies for the unformatted code 🙁

public abstract class XMLDecoder extends CumulativeProtocolDecoder  {

/*
* As per XML specification 1.0, http://www.w3.org/TR/REC-xml
*/
private static final char XML_START_TAG = '<';
private static final char XML_END_TAG = '>';
private static final char XML_PI_TAG = '?';
private static final char XML_COMMENT_TAG = '!';

protected static enum ParseState {ELEMENT_START, ELEMENT_END, COMMENTS, ENDELEMENT, PI, UNDEFINED};

protected static final int ELEMENT_START = 1;
protected static final int ELEMENT_END = 2;

private static Logger logger = LoggerFactory.getLogger(XMLDecoder.class);

@Override
protected boolean doDecode(IoSession session, IoBuffer ioBuffer,
ProtocolDecoderOutput decoderOutput) throws Exception {

int startPosition = ioBuffer.position();

if(!ioBuffer.hasRemaining()) {
logger.debug("NO bytes to read keep waiting...");
return false;
}

// Continue to read the bytes and keep parsing
char currentChar = '0', previousChar = '0';

boolean rootElementStarted = false;
boolean rootElementPresent = false;
boolean isBalanced = false;

int rootStartPosition, rootEndPosition;

ParseState parsingState = ParseState.UNDEFINED;
logger.debug("Lets start decoding the XML");

String root = null;

boolean markedForEndElement = false;

while(ioBuffer.hasRemaining()) {
previousChar = currentChar;
currentChar = (char)ioBuffer.get();

switch (parsingState) {
case ELEMENT_START:
if(currentChar == XML_PI_TAG){
logger.debug("Got PI Element");
parsingState = ParseState.PI;
} else if(currentChar == XML_COMMENT_TAG) {
logger.debug("Got Comment Element");
parsingState = ParseState.COMMENTS;
} else if((currentChar == ' ' || currentChar == XML_END_TAG)
&& rootElementStarted && !rootElementPresent) {
rootEndPosition = ioBuffer.position();
rootElementPresent = true;

// Copy the Root Element
int cPos = ioBuffer.position();
int mPos = ioBuffer.markValue();

char[] rootChar = new char[cPos - mPos];
for(int i = mPos - 1, j =0; i < cPos - 1; i++) {
rootChar[j++] = (char)ioBuffer.get(i);
}

root = new String(rootChar);
logger.debug("Root Element = "+ root);
parsingState = ParseState.ELEMENT_END;
logger.debug("Root Element detection completed "+rootEndPosition);
} else if(currentChar == XML_END_TAG) {
parsingState = ParseState.ELEMENT_END;
} else if(!rootElementStarted && !rootElementPresent) {
rootStartPosition = ioBuffer.position();
ioBuffer.mark();
rootElementStarted = true;
logger.debug("Got the root element at "+rootStartPosition);
} else if (currentChar == '/') {
// Change state
if(previousChar == XML_START_TAG) {
parsingState = ParseState.ENDELEMENT;
}
}
break;

case ENDELEMENT:
if(currentChar == XML_END_TAG) {
parsingState = ParseState.ELEMENT_END;

int cPos = ioBuffer.position();
int mPos = ioBuffer.markValue();

char[] el = new char[cPos - mPos];
for(int i = mPos - 1, j =0; i < cPos - 1; i++) {
el[j++] = (char)ioBuffer.get(i);
}
markedForEndElement = false;
if(root.equalsIgnoreCase(new String(el))) {
logger.debug("XML is balanced."+root);
isBalanced = true;
}

break;
} else if (currentChar == ' ') {
continue;
} else {

// mark the position, we need to compare the it to see that if its the end element
if(!markedForEndElement) {
ioBuffer.mark();
markedForEndElement = true;
}
}
break;

case ELEMENT_END:
if(currentChar == XML_START_TAG) {
parsingState = ParseState.ELEMENT_START;
}
break;

case UNDEFINED:
if(currentChar == XML_START_TAG) {
parsingState = ParseState.ELEMENT_START;
}
break;

case COMMENTS:
if (currentChar == '-') {
previousChar = currentChar;
} else if (previousChar == '-' && currentChar == '>') {
parsingState = ParseState.ELEMENT_END;
}
break;

case PI:
if (currentChar == '?') {
previousChar = currentChar;
} else if (previousChar == '?' && currentChar == XML_END_TAG) {
parsingState = ParseState.ELEMENT_END;
}
break;

default:
break;
}
}

if(isBalanced) {
decoderOutput.write(parserXML(ioBuffer));
}

if(isBalanced && !ioBuffer.hasRemaining()) {
logger.debug("No more bytes to process");
return true;
}

ioBuffer.position(startPosition);
return false;
}

/**
* Extending classes can implement their custom XML parsing to create Objects
* from XML and use them appropriately in Handler
*
* @param xmlBuffer
* @return
*/
public abstract Object parserXML(IoBuffer xmlBuffer);
}

The implementation is pretty straight forward. We take each character and try to match the characters as specified in XML specification.

Some keys things in the implementation:
1. The Decode function just collects the bytes till we get the balanced XML document
2. Once we get the balanced XML document, we shall call the abstract function parseXML(). The function has been kept abstract, so that its easy to implement custom parsing using desired XML library like JAXB, JIBX etc
3. We have to return true from doDecode(), the moment we have balanced XML. Return type true indicates to the framework that we are not waiting for any more data. A false, forces the framework to keep accumulating the data, till we write it to the output. Now it must be clear why, its called Cumulative decoder.

Still have Queries, please leave a comment and I shall revert back to you.

19 thoughts on “Implementing XML Decoder for Apache MINA

  1. Hello !

    Very interesting blog !

    I have tried to implement a protocol using the CumulativeProtocolDecoder . Then i wanted to test it using Junit , but when i simulate a 2 step decoding (a fragmented packet) i lost the data that came from the first call on doDecode … The ioBuffer is cleared after the return false ???

  2. Nope ioBuffer is not cleared after false. returning false is an indicator to keep buffering till we receive complete payload. I tested the code above with 4 fragmented packets and it worked fine. If you can send the code, I can give it a try.

  3. My unit test might be wrong…
    Can you post the unit test you use to test this decoder ?
    I think that it can be very usefull 🙂

    Thanks

  4. I tested it in almost real environment. Had a TCP client that sent the request. The xml file was pom.xml from MINA project.

    Here is the code

    public void connect(String host, int port) {
    NioSocketConnector connector = new NioSocketConnector();
    connector.getFilterChain().addLast(“Logger”, new LoggingFilter());
    connector.setHandler(new XMLHandler());

    ConnectFuture future = connector.connect(new InetSocketAddress(
    host, port));

    future.addListener(new IoFutureListener() {

    @Override
    public void operationComplete(ConnectFuture connectFuture) {
    if (connectFuture.isConnected()) {
    session = connectFuture.getSession();
    }
    }
    });
    }

    public void send(String xmlData, String ip, int port) throws IOException {
    if(xmlData == null) {
    logger.error(“Data is null”);
    return;
    }

    // Connect
    connect(ip, port);

    byte[] data = xmlData.getBytes();

    IoBuffer buffer = IoBuffer.allocate(data.length);
    buffer.put(data);
    buffer.flip();
    // write to the session

    logger.debug(“Writing to Session”);
    while(session == null) {
    continue;
    }
    session.write(buffer);
    session.close();
    }

    /**
    * @param args
    */
    public static void main(String[] args) {
    XMLClient client = new XMLClient();

    try {
    File f = new File(“pom.xml”);

    BufferedReader reader = new BufferedReader(new FileReader(f));

    StringBuffer buffer = new StringBuffer();

    String line;

    while((line = reader.readLine()) != null) {
    buffer.append(line);
    }

    client.send(buffer.toString(), “localhost”, 8081);
    } catch (Exception e) {
    e.printStackTrace();
    }
    }

    Hope it helps 🙂

  5. Thank you ! It helped me a lot !

    I was wondering about your decoder , if someone connect to the server and sends data like “foobarfoobarfoobar…..” etc , the bytebuffer will “explode” (OutOfMemoryError) ?

  6. Yup this is a problem and unfortunately I don’t have a solution as of now. My thought process on handling this is to keep a tab on xml size and disconnect violating clients. Also, since we know the type of xml we are handling, it shall be easy. There should be a way around this. Let me check async web implementation for this. It will be a good idea to post this query in MINA forum as well.

  7. Ashish,

    First of all thanks a lot for this example.

    I am one who started the thread “http://www.nabble.com/MINA—design-guidelines-td20077627.html#a20095703”

    Do you have any updates on how to handle clients sending very long xml documents? Also any thoughts on how to process mutiple xml documents came as part of single read.

    What are your thoughts on the following ideas?
    1) Keep reading the data until all the xml documents are read and constructed and then process the xml documents one at a time and write the result back to client session.

    2) Create a new thread as soon as an xml document is read and process the constructed xml document using the new thread.

    Could you please post the complete code (server & client implementation including the encoders & decoders) for testing the XMLDecoder program in the blog.

    Any suggestions would be greatly appreciated.

  8. Well I did tried for multiple xml documents and then realized its one of the exceptional scenarios, hence haven’t worked on it much.

    On point 1, I would say as soon as the XML document is complete, process it and write response back onto the session

    Well you can use Executors for threading.

    Let me refine my program, before I post it here. Will be a while before I can do it.

  9. Once I get a complete XML document (could be the first one out of many that the client wrote to the socket in a single write operation) my question is:

    If I return true from doDecode and process the first assembled xml document, will I be able to read the subsequent xml documents from that session again?

    It would be a great help if you could refine and post your example here at the earliest possible. Once again I truly appreciate you taking the time to help new programmers.

  10. Nope you don’t have to return true, just write the XML to protocoldecoder output and continue to create the XML structure. Though I am not sure about the memory implication of the same

  11. Hi,
    I want to use tcp mina endpoint to read XML file.I have written custom XML codec by implementing Protocolcodecfactory interface.And it got two unimplemented methods protocoldecoder and protocolencoder.I just implemented these interfaces.When i try to get entire XML,i am not able to get.It is getting broken into 4 to 5 pieces.I am receiving in file endpoint as 4 to 5 files.what could be the problem?

      • thanks avinash.But i don’t have any decoding logic in decode method of ProtocolDecoder.My requirement is to receive complete xml file as one file into file endpoint.My decode method is keep on calling until buffer gets empty.but i want decode method to be called once only. Can you send me sample code to do that?

        • decode() shall be called each time a message is received. Its the logic in decoder to decide if the XML is complete or not. The code is in the post itself.

  12. Thanks ashish for quick reply for new beginners.
    In the post it is extending cumulativeprotocolcodec factory.but i am implementing prtotocoldecoder in my case.
    here is the complete code.Plz help me out how can i get the full xml file on file endpoint.

    Groovy class:
    class SampleRouteBuilder extends SpringRouteBuilder {

    void configure() {
    from(“mina:tcp://localhost:6000sync=true&codec=#XMLCodec”)
    .to(“file:data/outbox”)
    }
    }

    public class XMLCodec implements ProtocolCodecFactory {

    @Override
    public ProtocolDecoder getDecoder() throws Exception {
    System.out.println(“inside getDecoder”);
    return new XMLDecoder();
    }

    @Override
    public ProtocolEncoder getEncoder() throws Exception {
    System.out.println(“Inside getencoder”);
    return new XMLEncoder();
    }
    }

    XMLDecoder:

    public class XMLDecoder implements ProtocolDecoder {

    public void dispose(IoSession arg0) throws Exception {

    }

    public void finishDecode(IoSession session, ProtocolDecoderOutput output)
    throws Exception {

    }

    public void decode(IoSession session, ByteBuffer buffer,
    ProtocolDecoderOutput output) throws Exception {
    String buf = null;

    if (buffer.remaining() > 0) {

    buf = IOUtils.toString(MinaConverter.toInputStream(buffer));
    System.out.println(“Buffer” + buf);
    }
    output.write(buf);
    }
    }
    XMLEncoder:

    public class XMLEncoder implements ProtocolEncoder {

    public void dispose(IoSession arg0) throws Exception {

    }

    public void encode(IoSession arg0, Object arg1, ProtocolEncoderOutput arg2)
    throws Exception {

    }
    }

    decode method of XMLDecoder will be called for 4 to 5 times.In the outbox folder,uploaded xml file is getting splitted into 4/5 file.
    tell me how to receive uploaded file into outbox folder as a single file.Please tell me where am i doing mistake

  13. Thank you for posting this example. It was very helpful — even several years later. I also came across your StaxXmlDecoder in the mina email archives (2009).

    I’m sure you know this by now, but for those who find that email (or only care about the string in the above, the extra characters after the end element are due to ioBuffer.array() returning its contents between the position marked by limit() and the end (i.e., capacity()).

    I dealt with this by extracting the substring of interest from the resulting string. I’m new to mina (and NIO) so I don’t know that this is the best approach. Since I’ve one XML doc per connection, it should be ok for me.

    String encoding = xpp.getInputEncoding() != null
    ? xpp.getInputEncoding() : “ISO-8859-1”;
    String xml = new String(ioBuffer.array(), encoding)
    .substring(0, ioBuffer.position());

Leave a Reply

Your email address will not be published. Required fields are marked *