JBoss.orgCommunity Documentation

Chapter 4. Source overview & Example

4.1. Stream
4.2. Datalink

As mentioned stream module is centered on three main interfaces:

Example use of this API looks as follows:



Stream s = ....
StreamSelector selector = ...
s.register(selector);
        
        while(true)
        {
            byte[] buff = new byte[....];
            Collection<SelectorKey> selected = selector.selectNow(selector.OP_READ,0); //0, immediate check
            for(SelectorKey key : selected)
            {
                int read = ket.getStream().read(buff);
                System.err.println("Read: "+read);
            
            }
            selected.clear();
        }
        
        

Datalink is basicaly small extension to async stream. Below example classes depict difference and use case:



import org.mobicents.protocols.link.DataLink;
import org.mobicents.protocols.link.LinkState;
import org.mobicents.protocols.link.LinkStateListener;
import org.mobicents.protocols.stream.api.SelectorKey;
import org.mobicents.protocols.stream.api.SelectorProvider;
import org.mobicents.protocols.stream.api.StreamSelector;
class XServer implements LinkStateListener 
{
    private DataLink link;
    
    private volatile boolean started = false;
    private StreamSelector selector;
    private int rxCount, txCount;
    private InetSocketAddress address, remote;
    
    public XServer(InetSocketAddress address, InetSocketAddress remote) throws Exception {
        link = DataLink.open(address, remote);
        link.setListener(this);
        selector = SelectorProvider.getSelector("org.mobicents.protocols.link.SelectorImpl");
        link.register(selector);
    }
    public void start() {
        started = true;
        new Thread(this).start();
        link.activate();
    }
    public void stop() {
        started = false;
        link.close();
        
        System.out.println("rx=" + rxCount);
        System.out.println("tx=" + txCount);
    }
    
    public void run() {
        byte[] rxBuffer = new byte[172];
        byte[] txBuffer = new byte[172];
        
        while (started) {
            try {
                
                Collection<SelectorKey> keys = selector.selectNow(StreamSelector.OP_READ, 20);
                for (SelectorKey key : keys) {
                    int len = key.getStream().read(rxBuffer);
                    rxCount++;
                    System.out.println("Read " + len  +" bytes: "+Arrays.toString(rxBuffer));
                }
                
                keys.clear();
                keys = selector.selectNow(StreamSelector.OP_WRITE, 20);
                txBuffer[txCount%txBuffer]++;
                for (SelectorKey key : keys) {
                    key.getStream().write(txBuffer);
                    txCount++;
                }
                
                Thread.currentThread().sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    public void onStateChange(LinkState state) {
        System.err.println("DatalinkState: "+state);
    }
}
class XClient implements LinkStateListener 
{
    private DataLink link;
    
    private volatile boolean started = false;
    private StreamSelector selector;
    private InetSocketAddress address, remote;
    
    public XClient(InetSocketAddress address, InetSocketAddress remote) throws Exception {
        link = DataLink.open(address, remote);
        link.setListener(this);
        selector = SelectorProvider.getSelector("org.mobicents.protocols.link.SelectorImpl");
        link.register(selector);
    }
    public void start() {
        started = true;
        new Thread(this).start();
        link.activate();
    }
    public void stop() {
        started = false;
        link.close();
        
    }
    
    public void run() {
        byte[] rxBuffer = new byte[172];
        //byte[] txBuffer = new byte[172];
        
        while (started) {
            try {
                
                Collection<SelectorKey> keys = selector.selectNow(StreamSelector.OP_READ, 20);
                for (SelectorKey key : keys) {
                    int len = key.getStream().read(rxBuffer);
               
                    System.out.println("Read " + len  +" bytes: "+Arrays.toString(rxBuffer));
                }
                
                keys.clear();
                keys = selector.selectNow(StreamSelector.OP_WRITE, 20);
                
                for (SelectorKey key : keys) {
                    key.getStream().write(rxBuffer);
         
                }
                
                Thread.currentThread().sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    public void onStateChange(LinkState state) {
        System.err.println("DatalinkState: "+state);
    }
}