JBoss.orgCommunity Documentation

Chapter 4. Source overview & Example

4.1. Stream
4.2. Datalink

As mentioned previously, the stream module is centered on three main interfaces:

org.mobicents.protocols.stream.api.Stream

This class declares sets of methods to perform read and write operations:



public interface Stream {
    /**
     * Registers this stream with the given selector, returning a selection key.
     * This method first verifies that this channel is open and that the given initial 
     * interest set is valid.
     * 
     * If this stream is already registered with the given selector then the selection key 
     * representing that registration is returned after setting its interest set to the 
     * given value.
     * 
     * @param selector 
     * @param op The selector with which this channel is to be registered
     * @return
     */
    public SelectorKey register(StreamSelector selector) throws IOException;
    public int read(byte[] b) throws IOException;
    public int write(byte[] d) throws IOException;
    /**
     * Closes this streamer implementation. After closing stream its selectors are invalidated!
     */
    public void close();
    /**
     * Returns the provider that created this stream.
     * 
     * @return
     */
    public SelectorProvider provider();
}
          
org.mobicents.protocols.stream.api.StreamSelector

This interface defines methods that are used to interrogate registered stream for IO readiness.



public interface StreamSelector {
    public static final int OP_READ = 0x1;
    public static final int OP_WRITE = 0x2;
    /**
     * Performs query of registeres stream. Returns set of keys pointing to streams ready to perform IO.
     * @param operation - operation which streams are queried. Value is equal to on of OP_X.
     * @param timeout
     * @return
     * @throws IOException
     */
    public Collection<SelectorKey> selectNow(int operation, int timeout) throws IOException;
    /**
     * Checks if selector has been closed.
     * @return
     */
    public boolean isClosed();
    /**
     * closeses selector, removes all stream from internal register.
     */
    public void close();
    /**
     * Returns registered streams.
     * @return
     */
    public Collection<Stream> getRegisteredStreams();
}
          
org.mobicents.protocols.stream.api.SelectorKey

This interface declares the contract for the object representing the stream in selector:



public interface SelectorKey {
    /**
     * Attach application specific object to this key. When underlying stream is
     * ready for IO and key is returned, this attachment will be accessible.
     * 
     * @param obj
     */
    public void attach(Object obj);
    /**
     * Gets attachemnt.
     * 
     * @return
     */
    public Object attachment();
    /**
     * Returns validity indicator.
     * 
     * @return
     */
    public boolean isValid();
    /**
     * Indicates if underlying stream is ready to read.
     * 
     * @return
     */
    public boolean isReadable();
    /**
     * Indicates if underlying stream is ready to write.
     * 
     * @return
     */
    public boolean isWriteable();
    /**
     * Returns stream associated with this key
     * 
     * @return
     */
    public Stream getStream();
    /**
     * Get selector for this key.
     * 
     * @return
     */
    public StreamSelector getStreamSelector();
    /**
     * Cancels this key. Equals deregistration of stream
     */
    public void cancel(); // Oleg verify this.
}
          

Below is an example use of this API:



Stream s = ....
StreamSelector selector = ...
s.register(selector);
        
        while(true)
        {
            byte[] buff = new byte[....];
            Collection<SelectorKey> selected = selector.selectNow(selector.OP_READ,0); //0, immediate check
            for(SelectorKey key : selected)
            {
                int read = ket.getStream().read(buff);
                System.err.println("Read: "+read);
            
            }
            selected.clear();
        }
        
        

Datalink is basically a small extension of the async stream. The example below shows the difference and use case:



import org.mobicents.protocols.link.DataLink;
import org.mobicents.protocols.link.LinkState;
import org.mobicents.protocols.link.LinkStateListener;
import org.mobicents.protocols.stream.api.SelectorKey;
import org.mobicents.protocols.stream.api.SelectorProvider;
import org.mobicents.protocols.stream.api.StreamSelector;
class XServer implements LinkStateListener 
{
    private DataLink link;
    
    private volatile boolean started = false;
    private StreamSelector selector;
    private int rxCount, txCount;
    private InetSocketAddress address, remote;
    
    public XServer(InetSocketAddress address, InetSocketAddress remote) throws Exception {
        link = DataLink.open(address, remote);
        link.setListener(this);
        selector = SelectorProvider.getSelector("org.mobicents.protocols.link.SelectorImpl");
        link.register(selector);
    }
    public void start() {
        started = true;
        new Thread(this).start();
        link.activate();
    }
    public void stop() {
        started = false;
        link.close();
        
        System.out.println("rx=" + rxCount);
        System.out.println("tx=" + txCount);
    }
    
    public void run() {
        byte[] rxBuffer = new byte[172];
        byte[] txBuffer = new byte[172];
        
        while (started) {
            try {
                
                Collection<SelectorKey> keys = selector.selectNow(StreamSelector.OP_READ, 20);
                for (SelectorKey key : keys) {
                    int len = key.getStream().read(rxBuffer);
                    rxCount++;
                    System.out.println("Read " + len  +" bytes: "+Arrays.toString(rxBuffer));
                }
                
                keys.clear();
                keys = selector.selectNow(StreamSelector.OP_WRITE, 20);
                txBuffer[txCount%txBuffer]++;
                for (SelectorKey key : keys) {
                    key.getStream().write(txBuffer);
                    txCount++;
                }
                
                Thread.currentThread().sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    public void onStateChange(LinkState state) {
        System.err.println("DatalinkState: "+state);
    }
}
class XClient implements LinkStateListener 
{
    private DataLink link;
    
    private volatile boolean started = false;
    private StreamSelector selector;
    private InetSocketAddress address, remote;
    
    public XClient(InetSocketAddress address, InetSocketAddress remote) throws Exception {
        link = DataLink.open(address, remote);
        link.setListener(this);
        selector = SelectorProvider.getSelector("org.mobicents.protocols.link.SelectorImpl");
        link.register(selector);
    }
    public void start() {
        started = true;
        new Thread(this).start();
        link.activate();
    }
    public void stop() {
        started = false;
        link.close();
        
    }
    
    public void run() {
        byte[] rxBuffer = new byte[172];
        //byte[] txBuffer = new byte[172];
        
        while (started) {
            try {
                
                Collection<SelectorKey> keys = selector.selectNow(StreamSelector.OP_READ, 20);
                for (SelectorKey key : keys) {
                    int len = key.getStream().read(rxBuffer);
               
                    System.out.println("Read " + len  +" bytes: "+Arrays.toString(rxBuffer));
                }
                
                keys.clear();
                keys = selector.selectNow(StreamSelector.OP_WRITE, 20);
                
                for (SelectorKey key : keys) {
                    key.getStream().write(rxBuffer);
         
                }
                
                Thread.currentThread().sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    public void onStateChange(LinkState state) {
        System.err.println("DatalinkState: "+state);
    }
}