send::
msg snakk.text (url ="/diesel/rest/testdiesel/stream"
...)
Create a named stream. It will be owned by this flow...
val total:Number=0
=0
send::
msg diesel.stream.new (stream="s1"
)
Start a generator in same flow
send::
msg testdiesel.streams.startGenerator (stream="s1"
, howMany:Number=3
)
send::
msg diesel.stream.consume (stream="s1"
, xtimeout:Number=1000
)
expect:: (total is 15
)
send::
msg diesel.stream.new (stream="s2"
, batch:Boolean, batchSize:Number=2
)
Start a generator in same flow
send::
msg testdiesel.streams.startGenerator (stream="s2"
, howMany:Number=3
)
val total:Array=[0]
=[0]
send::
msg diesel.stream.consume (stream="s2"
, xtimeout:Number=1000
)
expect:: (total is [0,1,2,3,4,5]
)
Creating a simple AIP for batch processing of JSON records.
Create a stream from this flow,
send::
msg diesel.stream.new (stream="s3"
)
send::
msg testdiesel.stream.consume (stream="s3"
, xtimeout:Number=1000
)
send::
msg ctx.sleep (duration:Number=1000
)
Spawn a separate flow to put items in the stream "s3" :
send::
msg snakk.text (url ="/diesel/rest/putAllInS3"
..., verb="POST"
, body :Array)
// $send diesel.stream.new(stream="s4") // Start a generator in same flow // $send testdiesel.streams.startGenerator(stream="s4", howMany=3) // $send diesel.stream.consume(stream="s1", xtimeout=1000) // $send ctx.log(x=total) // $expect (total is 15)
send::
msg diesel.stream.new (stream="s2"
, batch:Boolean, batchSize:Number=2
)
Start a generator in same flow
send::
msg testdiesel.streams.startGenerator (stream="s2"
, howMany:Number=3
)
val total:Array=[0]
=[0]
send::
msg diesel.stream.consume (stream="s2"
, xtimeout:Number=1000
)
expect:: (total is [0,1,2,3,4,5]
)
send::
msg diesel.stream.new (stream="stest85"
, timeoutMillis:Number=10000
)
//$val stest85 = payload
// $send ctx.sleep(duration=1000)
// $send diesel.stream.put(stream="stest85", data=123)
send::
msg diesel.stream.consume (stream="stest85"
)
After creating the stream, get the ref
not the name, if you want to use it distributed. Then send this ref to sub-flows. These sub-flows running on other nodes, can then send to this stream.
send::
msg diesel.stream.new (stream="stest95"
, timeoutMillis:Number=100000
)
val stest95=dieselStream
val stest95ref=dieselStreamRef
// $send diesel.stream.put(stream="stest85", data=123)
send::
msg testdiesel.please.streamremoteputdone (streamRef)
send::
msg diesel.stream.consume (stream="stest95"
)
val ref:JSON={
"node": "127.0.0.1:9002",
"cat": "DieselStream",
"id": "stest95"
}
send::
msg diesel.stream.put (streamRef, data:Number=1
)
You need to log in to post a comment!