send::
msg snakk.text (url ="/diesel/rest/testdiesel/stream"...)
Create a named stream. It will be owned by this flow...
val total:Number=0=0
send::
msg diesel.stream.new (stream="s1")
Start a generator in same flow
send::
msg testdiesel.streams.startGenerator (stream="s1", howMany:Number=3)
send::
msg diesel.stream.consume (stream="s1", xtimeout:Number=1000)
expect:: (total is 15)
send::
msg diesel.stream.new (stream="s2", batch:Boolean, batchSize:Number=2)
Start a generator in same flow
send::
msg testdiesel.streams.startGenerator (stream="s2", howMany:Number=3)
val total:Array=[0]=[0]
send::
msg diesel.stream.consume (stream="s2", xtimeout:Number=1000)
expect:: (total is [0,1,2,3,4,5])
Creating a simple AIP for batch processing of JSON records.
Create a stream from this flow,
send::
msg diesel.stream.new (stream="s3")
send::
msg testdiesel.stream.consume (stream="s3", xtimeout:Number=1000)
send::
msg ctx.sleep (duration:Number=1000)
Spawn a separate flow to put items in the stream "s3" :
send::
msg snakk.text (url ="/diesel/rest/putAllInS3"..., verb="POST", body :Array)
// $send diesel.stream.new(stream="s4") // Start a generator in same flow // $send testdiesel.streams.startGenerator(stream="s4", howMany=3) // $send diesel.stream.consume(stream="s1", xtimeout=1000) // $send ctx.log(x=total) // $expect (total is 15)
send::
msg diesel.stream.new (stream="s2", batch:Boolean, batchSize:Number=2)
Start a generator in same flow
send::
msg testdiesel.streams.startGenerator (stream="s2", howMany:Number=3)
val total:Array=[0]=[0]
send::
msg diesel.stream.consume (stream="s2", xtimeout:Number=1000)
expect:: (total is [0,1,2,3,4,5])
send::
msg diesel.stream.new (stream="stest85", timeoutMillis:Number=10000)
//$val stest85 = payload
// $send ctx.sleep(duration=1000)
// $send diesel.stream.put(stream="stest85", data=123)
send::
msg diesel.stream.consume (stream="stest85")
After creating the stream, get the ref not the name, if you want to use it distributed. Then send this ref to sub-flows. These sub-flows running on other nodes, can then send to this stream.
send::
msg diesel.stream.new (stream="stest95", timeoutMillis:Number=100000)
val stest95=dieselStream
val stest95ref=dieselStreamRef
// $send diesel.stream.put(stream="stest85", data=123)
send::
msg testdiesel.please.streamremoteputdone (streamRef)
send::
msg diesel.stream.consume (stream="stest95")
val ref:JSON={
"node": "127.0.0.1:9002",
"cat": "DieselStream",
"id": "stest95"
}
send::
msg diesel.stream.put (streamRef, data:Number=1)
You need to log in to post a comment!