Most CEP use cases have to deal with streams of events. The streams can be provided to the application in various forms, from JMS queues to flat text files, from database tables to raw sockets or even through web service calls. In any case, the streams share a common set of characteristics:
events in the stream are ordered by a timestamp. The timestamp may have different semantics for different streams but they are always ordered internally.
volumes of events are usually high.
atomic events are rarely useful by themselves. Usually meaning is extracted from the correlation between multiple events from the stream and also from other sources.
streams may be homogeneous, i.e. contain a single type of events, or heterogeneous, i.e. contain multiple types of events.
Drools generalized the concept of a stream as an "entry point" into the engine. An entry point is for drools a gate from which facts come. The facts may be regular facts or special facts like events.
In Drools, facts from one entry point (stream) may join with facts from any other entry point or event with facts from the working memory. Although, they never mix, i.e., they never lose the reference to the entry point through which they entered the engine. This is important because one may have the same type of facts coming into the engine through several entry points, but one fact that is inserted into the engine through entry point A will never match a pattern from a entry point B, for example.
Entry points are declared implicitly in Drools by directly making use of them in rules. I.e. referencing an entry point in a rule will make the engine, at compile time, to identify and create the proper internal structures to suppor that entry point.
So, for instance, lets imagine a banking application, where transactions are fed into the system coming from streams. One of the streams contains all the transactions executed in ATM machines. So, if one of the rules says: a withdraw is authorized if and only if the account balance is over the requested withdraw amount, the rule would look like:
Example 2.8. Example of Stream Usage
rule "authorize withdraw" when WithdrawRequest( $ai : accountId, $am : amount ) from entry-point "ATM Stream" CheckingAccount( accountId == $ai, balance > $am ) then // authorize withdraw end
In the previous example, the engine compiler will identify that
the pattern is tied to the entry point "ATM Stream" and will both create all
the necessary structures for the rulebase to support the "ATM Stream" and
will only match WithdrawRequests coming from the "ATM Stream". In the
previous example, the rule is also joining the event from the stream with a
fact from the main working memory (CheckingAccount).
Now, lets imagine a second rule that states that a fee of $2 must be applied to any account for which a withdraw request is placed at a bank branch:
Example 2.9. Using a different Stream
rule "apply fee on withdraws on branches" when WithdrawRequest( $ai : accountId, processed == true ) from entry-point "Branch Stream" CheckingAccount( accountId == $ai ) then // apply a $2 fee on the account end
The previous rule will match events of the exact same type as
the first rule (WithdrawRequest), but from two different streams, so an
event inserted into "ATM Stream" will never be evaluated against the pattern
on the second rule, because the rule states that it is only interested in
patterns coming from the "Branch Stream".
So, entry points, besides being a proper abstraction for streams, are also a way to scope facts in the working memory, and a valuable tool for reducing cross products explosions. But that is a subject for another time.
Inserting events into an entry point is equally simple. Instead of inserting events directly into the working memory, insert them into the entry point as shown in the example bellow:
Example 2.10. Inserting facts into an entry point
// create your rulebase and your session as usual StatefulKnowledgeSession session = ... // get a reference to the entry point WorkingMemoryEntryPoint atmStream = session.getWorkingMemoryEntryPoint( "ATM Stream" ); // and start inserting your facts into the entry point atmStream.insert( aWithdrawRequest );
The previous example shows how to manually insert facts into a given entry point. Although, usually, the application will use one of the many adapters to plug a stream end point, like a JMS queue, directly into the engine entry point, without coding the inserts manually. The Drools pipeline API has several adapters and helpers to do that as well as examples on how to do it.