JBoss.orgCommunity Documentation

Mobicents Stream Library User Guide


This manual uses several conventions to highlight certain words and phrases and draw attention to specific pieces of information.

In PDF and paper editions, this manual uses typefaces drawn from the Liberation Fonts set. The Liberation Fonts set is also used in HTML editions if the set is installed on your system. If not, alternative but equivalent typefaces are displayed. Note: Red Hat Enterprise Linux 5 and later includes the Liberation Fonts set by default.

Four typographic conventions are used to call attention to specific words and phrases. These conventions, and the circumstances they apply to, are as follows.

Mono-spaced Bold

Used to highlight system input, including shell commands, file names and paths. Also used to highlight key caps and key-combinations. For example:

The above includes a file name, a shell command and a key cap, all presented in Mono-spaced Bold and all distinguishable thanks to context.

Key-combinations can be distinguished from key caps by the hyphen connecting each part of a key-combination. For example:

The first sentence highlights the particular key cap to press. The second highlights two sets of three key caps, each set pressed simultaneously.

If source code is discussed, class names, methods, functions, variable names and returned values mentioned within a paragraph will be presented as above, in Mono-spaced Bold. For example:

Proportional Bold

This denotes words or phrases encountered on a system, including application names; dialogue box text; labelled buttons; check-box and radio button labels; menu titles and sub-menu titles. For example:

The above text includes application names; system-wide menu names and items; application-specific menu names; and buttons and text found within a GUI interface, all presented in Proportional Bold and all distinguishable by context.

Note the > shorthand used to indicate traversal through a menu and its sub-menus. This is to avoid the difficult-to-follow 'Select Mouse from the Preferences sub-menu in the System menu of the main menu bar' approach.

Mono-spaced Bold Italic or Proportional Bold Italic

Whether Mono-spaced Bold or Proportional Bold, the addition of Italics indicates replaceable or variable text. Italics denotes text you do not input literally or displayed text that changes depending on circumstance. For example:

Note the words in bold italics above username, domain.name, file-system, package, version and release. Each word is a placeholder, either for text you enter when issuing a command or for text displayed by the system.

Aside from standard usage for presenting the title of a work, italics denotes the first use of a new and important term. For example:

If you find a typographical error in this manual, or if you have thought of a way to make this manual better, we would love to hear from you! Please submit a report in the the Issue Tracker, against the product Mobicents Stream Library , or contact the authors.

When submitting a bug report, be sure to mention the manual's identifier: StreamLibrary_User_Guide

If you have a suggestion for improving the documentation, try to be as specific as possible when describing it. If you have found an error, please include the section number and some of the surrounding text so we can find it easily.

  1. Downloading the source code

    Use SVN to checkout a specific release source, the base URL is http://mobicents.googlecode.com/svn/tags/protocols/stream, then add the specific release version, lets consider 1.0.0.BETA3.

    [usr]$ svn co http://mobicents.googlecode.com/svn/tags/protocols/stream/1.0.0.BETA3 stream-1.0.0.BETA3
  2. Building the source code

    Important

    Maven 2.0.9 (or higher) is used to build the release. Instructions for using Maven2, including install, can be found at http://maven.apache.org

    Use Maven to build the binaries.

    				    [usr]$ cd stream-1.0.0.BETA3
    				    [usr]$ mvn install
    				    

    Once the process finishes you should have the binary jar file in the target directory.

Similar process as for Section 2.1.1, “Release Source Code Building”, the only change is the SVN source code URL, which is http://mobicents.googlecode.com/svn/trunk/protocols/stream.

As mentioned previously, the stream module is centered on three main interfaces:

org.mobicents.protocols.stream.api.Stream

This class declares sets of methods to perform read and write operations:



public interface Stream {
    /**
     * Registers this stream with the given selector, returning a selection key.
     * This method first verifies that this channel is open and that the given initial 
     * interest set is valid.
     * 
     * If this stream is already registered with the given selector then the selection key 
     * representing that registration is returned after setting its interest set to the 
     * given value.
     * 
     * @param selector 
     * @param op The selector with which this channel is to be registered
     * @return
     */
    public SelectorKey register(StreamSelector selector) throws IOException;
    public int read(byte[] b) throws IOException;
    public int write(byte[] d) throws IOException;
    /**
     * Closes this streamer implementation. After closing stream its selectors are invalidated!
     */
    public void close();
    /**
     * Returns the provider that created this stream.
     * 
     * @return
     */
    public SelectorProvider provider();
}
          
org.mobicents.protocols.stream.api.StreamSelector

This interface defines methods that are used to interrogate registered stream for IO readiness.



public interface StreamSelector {
    public static final int OP_READ = 0x1;
    public static final int OP_WRITE = 0x2;
    /**
     * Performs query of registeres stream. Returns set of keys pointing to streams ready to perform IO.
     * @param operation - operation which streams are queried. Value is equal to on of OP_X.
     * @param timeout
     * @return
     * @throws IOException
     */
    public Collection<SelectorKey> selectNow(int operation, int timeout) throws IOException;
    /**
     * Checks if selector has been closed.
     * @return
     */
    public boolean isClosed();
    /**
     * closeses selector, removes all stream from internal register.
     */
    public void close();
    /**
     * Returns registered streams.
     * @return
     */
    public Collection<Stream> getRegisteredStreams();
}
          
org.mobicents.protocols.stream.api.SelectorKey

This interface declares the contract for the object representing the stream in selector:



public interface SelectorKey {
    /**
     * Attach application specific object to this key. When underlying stream is
     * ready for IO and key is returned, this attachment will be accessible.
     * 
     * @param obj
     */
    public void attach(Object obj);
    /**
     * Gets attachemnt.
     * 
     * @return
     */
    public Object attachment();
    /**
     * Returns validity indicator.
     * 
     * @return
     */
    public boolean isValid();
    /**
     * Indicates if underlying stream is ready to read.
     * 
     * @return
     */
    public boolean isReadable();
    /**
     * Indicates if underlying stream is ready to write.
     * 
     * @return
     */
    public boolean isWriteable();
    /**
     * Returns stream associated with this key
     * 
     * @return
     */
    public Stream getStream();
    /**
     * Get selector for this key.
     * 
     * @return
     */
    public StreamSelector getStreamSelector();
    /**
     * Cancels this key. Equals deregistration of stream
     */
    public void cancel(); // Oleg verify this.
}
          

Below is an example use of this API:



Stream s = ....
StreamSelector selector = ...
s.register(selector);
        
        while(true)
        {
            byte[] buff = new byte[....];
            Collection<SelectorKey> selected = selector.selectNow(selector.OP_READ,0); //0, immediate check
            for(SelectorKey key : selected)
            {
                int read = ket.getStream().read(buff);
                System.err.println("Read: "+read);
            
            }
            selected.clear();
        }
        
        

Datalink is basically a small extension of the async stream. The example below shows the difference and use case:



import org.mobicents.protocols.link.DataLink;
import org.mobicents.protocols.link.LinkState;
import org.mobicents.protocols.link.LinkStateListener;
import org.mobicents.protocols.stream.api.SelectorKey;
import org.mobicents.protocols.stream.api.SelectorProvider;
import org.mobicents.protocols.stream.api.StreamSelector;
class XServer implements LinkStateListener 
{
    private DataLink link;
    
    private volatile boolean started = false;
    private StreamSelector selector;
    private int rxCount, txCount;
    private InetSocketAddress address, remote;
    
    public XServer(InetSocketAddress address, InetSocketAddress remote) throws Exception {
        link = DataLink.open(address, remote);
        link.setListener(this);
        selector = SelectorProvider.getSelector("org.mobicents.protocols.link.SelectorImpl");
        link.register(selector);
    }
    public void start() {
        started = true;
        new Thread(this).start();
        link.activate();
    }
    public void stop() {
        started = false;
        link.close();
        
        System.out.println("rx=" + rxCount);
        System.out.println("tx=" + txCount);
    }
    
    public void run() {
        byte[] rxBuffer = new byte[172];
        byte[] txBuffer = new byte[172];
        
        while (started) {
            try {
                
                Collection<SelectorKey> keys = selector.selectNow(StreamSelector.OP_READ, 20);
                for (SelectorKey key : keys) {
                    int len = key.getStream().read(rxBuffer);
                    rxCount++;
                    System.out.println("Read " + len  +" bytes: "+Arrays.toString(rxBuffer));
                }
                
                keys.clear();
                keys = selector.selectNow(StreamSelector.OP_WRITE, 20);
                txBuffer[txCount%txBuffer]++;
                for (SelectorKey key : keys) {
                    key.getStream().write(txBuffer);
                    txCount++;
                }
                
                Thread.currentThread().sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    public void onStateChange(LinkState state) {
        System.err.println("DatalinkState: "+state);
    }
}
class XClient implements LinkStateListener 
{
    private DataLink link;
    
    private volatile boolean started = false;
    private StreamSelector selector;
    private InetSocketAddress address, remote;
    
    public XClient(InetSocketAddress address, InetSocketAddress remote) throws Exception {
        link = DataLink.open(address, remote);
        link.setListener(this);
        selector = SelectorProvider.getSelector("org.mobicents.protocols.link.SelectorImpl");
        link.register(selector);
    }
    public void start() {
        started = true;
        new Thread(this).start();
        link.activate();
    }
    public void stop() {
        started = false;
        link.close();
        
    }
    
    public void run() {
        byte[] rxBuffer = new byte[172];
        //byte[] txBuffer = new byte[172];
        
        while (started) {
            try {
                
                Collection<SelectorKey> keys = selector.selectNow(StreamSelector.OP_READ, 20);
                for (SelectorKey key : keys) {
                    int len = key.getStream().read(rxBuffer);
               
                    System.out.println("Read " + len  +" bytes: "+Arrays.toString(rxBuffer));
                }
                
                keys.clear();
                keys = selector.selectNow(StreamSelector.OP_WRITE, 20);
                
                for (SelectorKey key : keys) {
                    key.getStream().write(rxBuffer);
         
                }
                
                Thread.currentThread().sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    public void onStateChange(LinkState state) {
        System.err.println("DatalinkState: "+state);
    }
}
        
        

Revision History
Revision 1.0Wed June 2 2010Bartosz Baranowski
Creation of the Mobicents Stream Library User Guide.