Quick start - fanhubgt/StreamEPS GitHub Wiki

***Quick Start

A quick introduction to the StreamEPS engine processing.

public class EngineTest extends TestCase {

    public EngineTest(String testName) {
        super(testName);
    }

    public void testEngine() {

        //1: initialize the engine, decider, reciever and producer.
        IEPSDecider<IContextPartition<ISegmentContext>> decider = new SegmentDecider();
        IEPSReceiver<IContextPartition<ISegmentContext>, TestEvent> receiver = new SegmentReceiver();
        IEPSEngine<IContextPartition<ISegmentContext>, TestEvent> engine = new SegmentEngine();
        IEPSProducer producer = new EPSProducer();

        //2: set the engine, decider and receiver properties.
        EngineBuilder engineBuilder = new EngineBuilder(decider, engine, receiver);
        engineBuilder.setProducer(producer);

        //set the properties: sequence size, asychronous flag, queue flag.
        engineBuilder.buildProperties(20, false, true);

        //3: set up a pattern detector for the decider.
        PatternBuilder patternBuilder = new PatternBuilder(new HighestSubsetPE<TestEvent>());
        patternBuilder.buildParameter("value", 18);/*No comparison operator needed.*/
         //.buildPatternMatchListener(new TestPatternMatchListener())
         //.buildPatternUnMatchListener(new TestUnPatternMatchListener());

        //add the pattern 1 detector built to the engine/decider.
        engineBuilder.buildPattern(patternBuilder.getBasePattern());

        //pattern 2: repeated process.
        patternBuilder=new PatternBuilder(new TrendPatternPE(new DecreasingAssertion()));
        patternBuilder.buildParameter("value");
       // .buildPatternMatchListener(new TestPatternMatchListener())
       // .buildPatternUnMatchListener(new TestUnPatternMatchListener());

        engineBuilder.buildPattern(patternBuilder.getBasePattern());

        //pattern 3: repeated pattern detector process.
        patternBuilder=new PatternBuilder( new HighestSubsetPE<TestEvent>());
        patternBuilder.buildParameter("value", 10);
        //.buildPatternMatchListener(new TestPatternMatchListener())
        //.buildPatternUnMatchListener(new TestUnPatternMatchListener());

        engineBuilder.buildPattern(patternBuilder.getBasePattern());
           //new TestPatternMatchListener(),
           // new TestUnPatternMatchListener());
        
        //4: create the receiver context for the segment partitioning test.
        ReceiverContextBuilder contextBuilder = new ReceiverContextBuilder(new ReceiverContext(), new SegmentParam());

        contextBuilder.buildIdentifier(IDUtil.getUniqueID(new Date().toString()))
                .buildContextDetail(IDUtil.getUniqueID(new Date().toString()), ContextDimType.SEGMENT_ORIENTED)
                .buildPredicateTerm("value", PredicateOperator.EQUAL, 2.677)
                .buildSegmentParamAttribute("value")
                .buildSegmentParamAttribute("name")
                .buildContextParameter("Test Event", contextBuilder.getSegmentParam());

        receiver.setReceiverContext(contextBuilder.getContext());

        //create the filter context for a filtering test process.
        FilterContextBuilder filterContextBuilder = new FilterContextBuilder(new FilterContext());
        filterContextBuilder.buildPredicateTerm("value", PredicateOperator.GREATER_THAN_OR_EQUAL, 19)
                .buildContextEntry("TestEvent", new ComparisonContentEval())
                .buildEvaluatorContext(FilterType.COMPARISON)
                .buildEPSFilter().buildFilterContext();


        IEPSFilter filter = filterContextBuilder.getFilter();
        assertNotNull("Filter is not functional", filter);

        producer.setFilterContext(filterContextBuilder.getFilterContext());

        //5: build and retrieve the modified engine and shoot some events.
        engine = engineBuilder.getEngine();

        assertNotNull(engine);

        Random rand = new Random(50);
        for (int i = 0; i < 40; i++) {
            TestEvent event = new TestEvent("E" + i, ((double) rand.nextDouble())+ 29-(2*i) );
            //TestEvent event = new TestEvent("E" + i,(double)i);
            engine.sendEvent(event, false);
        }

         System.out.println();
        System.out.println("==============Decider Context============");

        //6: check for the decider context
        IDeciderContext<IMatchedEventSet<TestEvent>> context = ((SegmentDecider) engine.getDecider()).getMatchDeciderContext();

        assertNotNull("Decider context is not properly set.",context);

        for (TestEvent event : context.getDeciderValue()) {
            System.out.println("Name:" + event.getName() + "=====Value:" + event.getValue());
        }

        //7: check for the filter context
        IFilterContext<IFilterValueSet<ISortedAccumulator<TestEvent>>> fcontext = producer.getFilterContext();

        assertNotNull("Filter contxt is not set properly", fcontext);

        ISortedAccumulator<TestEvent> accumulator = fcontext.getFilteredValue().getValueSet().getWindow();

        System.out.println();
        System.out.println("============Filter Context=============");

        for (Object key : accumulator.getMap().keySet()) {
            for (TestEvent event : accumulator.getAccumulatedByKey(key)) {
                System.out.println("Name:" + event.getName() + "=====Value:" + event.getValue());
            }
        }
    }
}
⚠️ **GitHub.com Fallback** ⚠️