JBoss.orgCommunity Documentation
As mentioned stream module is centered on three main interfaces:
org.mobicents.protocols.stream.api.Stream - this class declares sets of methods to perform read and write operations, it looks as follows:
public interface Stream {
/**
* Registers this stream with the given selector, returning a selection key.
* This method first verifies that this channel is open and that the given initial
* interest set is valid.
*
* If this stream is already registered with the given selector then the selection key
* representing that registration is returned after setting its interest set to the
* given value.
*
* @param selector
* @param op The selector with which this channel is to be registered
* @return
*/
public SelectorKey register(StreamSelector selector) throws IOException;
public int read(byte[] b) throws IOException;
public int write(byte[] d) throws IOException;
/**
* Closes this streamer implementation. After closing stream its selectors are invalidated!
*/
public void close();
/**
* Returns the provider that created this stream.
*
* @return
*/
public SelectorProvider provider();
}
org.mobicents.protocols.stream.api.StreamSelector - this interface defines methods which are used to interrogate registered stream for IO readiness.
public interface StreamSelector {
public static final int OP_READ = 0x1;
public static final int OP_WRITE = 0x2;
/**
* Performs query of registeres stream. Returns set of keys pointing to streams ready to perform IO.
* @param operation - operation which streams are queried. Value is equal to on of OP_X.
* @param timeout
* @return
* @throws IOException
*/
public Collection<SelectorKey> selectNow(int operation, int timeout) throws IOException;
/**
* Checks if selector has been closed.
* @return
*/
public boolean isClosed();
/**
* closeses selector, removes all stream from internal register.
*/
public void close();
/**
* Returns registered streams.
* @return
*/
public Collection<Stream> getRegisteredStreams();
}
org.mobicents.protocols.stream.api.SelectorKey - this interface declares contarct for object representing stream in selector:
public interface SelectorKey {
/**
* Attach application specific object to this key. When underlying stream is
* ready for IO and key is returned, this attachment will be accessible.
*
* @param obj
*/
public void attach(Object obj);
/**
* Gets attachemnt.
*
* @return
*/
public Object attachment();
/**
* Returns validity indicator.
*
* @return
*/
public boolean isValid();
/**
* Indicates if underlying stream is ready to read.
*
* @return
*/
public boolean isReadable();
/**
* Indicates if underlying stream is ready to write.
*
* @return
*/
public boolean isWriteable();
/**
* Returns stream associated with this key
*
* @return
*/
public Stream getStream();
/**
* Get selector for this key.
*
* @return
*/
public StreamSelector getStreamSelector();
/**
* Cancels this key. Equals deregistration of stream
*/
public void cancel(); // Oleg verify this.
}
Example use of this API looks as follows:
Stream s = ....
StreamSelector selector = ...
s.register(selector);
while(true)
{
byte[] buff = new byte[....];
Collection<SelectorKey> selected = selector.selectNow(selector.OP_READ,0); //0, immediate check
for(SelectorKey key : selected)
{
int read = ket.getStream().read(buff);
System.err.println("Read: "+read);
}
selected.clear();
}
Datalink is basicaly small extension to async stream. Below example classes depict difference and use case:
import org.mobicents.protocols.link.DataLink;
import org.mobicents.protocols.link.LinkState;
import org.mobicents.protocols.link.LinkStateListener;
import org.mobicents.protocols.stream.api.SelectorKey;
import org.mobicents.protocols.stream.api.SelectorProvider;
import org.mobicents.protocols.stream.api.StreamSelector;
class XServer implements LinkStateListener
{
private DataLink link;
private volatile boolean started = false;
private StreamSelector selector;
private int rxCount, txCount;
private InetSocketAddress address, remote;
public XServer(InetSocketAddress address, InetSocketAddress remote) throws Exception {
link = DataLink.open(address, remote);
link.setListener(this);
selector = SelectorProvider.getSelector("org.mobicents.protocols.link.SelectorImpl");
link.register(selector);
}
public void start() {
started = true;
new Thread(this).start();
link.activate();
}
public void stop() {
started = false;
link.close();
System.out.println("rx=" + rxCount);
System.out.println("tx=" + txCount);
}
public void run() {
byte[] rxBuffer = new byte[172];
byte[] txBuffer = new byte[172];
while (started) {
try {
Collection<SelectorKey> keys = selector.selectNow(StreamSelector.OP_READ, 20);
for (SelectorKey key : keys) {
int len = key.getStream().read(rxBuffer);
rxCount++;
System.out.println("Read " + len +" bytes: "+Arrays.toString(rxBuffer));
}
keys.clear();
keys = selector.selectNow(StreamSelector.OP_WRITE, 20);
txBuffer[txCount%txBuffer]++;
for (SelectorKey key : keys) {
key.getStream().write(txBuffer);
txCount++;
}
Thread.currentThread().sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void onStateChange(LinkState state) {
System.err.println("DatalinkState: "+state);
}
}
class XClient implements LinkStateListener
{
private DataLink link;
private volatile boolean started = false;
private StreamSelector selector;
private InetSocketAddress address, remote;
public XClient(InetSocketAddress address, InetSocketAddress remote) throws Exception {
link = DataLink.open(address, remote);
link.setListener(this);
selector = SelectorProvider.getSelector("org.mobicents.protocols.link.SelectorImpl");
link.register(selector);
}
public void start() {
started = true;
new Thread(this).start();
link.activate();
}
public void stop() {
started = false;
link.close();
}
public void run() {
byte[] rxBuffer = new byte[172];
//byte[] txBuffer = new byte[172];
while (started) {
try {
Collection<SelectorKey> keys = selector.selectNow(StreamSelector.OP_READ, 20);
for (SelectorKey key : keys) {
int len = key.getStream().read(rxBuffer);
System.out.println("Read " + len +" bytes: "+Arrays.toString(rxBuffer));
}
keys.clear();
keys = selector.selectNow(StreamSelector.OP_WRITE, 20);
for (SelectorKey key : keys) {
key.getStream().write(rxBuffer);
}
Thread.currentThread().sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void onStateChange(LinkState state) {
System.err.println("DatalinkState: "+state);
}
}