streams-story Pub   Share

Streaming REST response

send::  msg snakk.text  (url ="/diesel/rest/testdiesel/stream"...)

Stream tests

Create a named stream. It will be owned by this flow...

val total:Number=0=0 send::  msg diesel.stream.new  (stream="s1")

Start a generator in same flow send::  msg testdiesel.streams.startGenerator  (stream="s1", howMany:Number=3)

send::  msg diesel.stream.consume  (stream="s1", xtimeout:Number=1000)

expect::  (total is 15)

Batch processing: putAll and onDataSlice

send::  msg diesel.stream.new  (stream="s2", batch:Boolean, batchSize:Number=2)

Start a generator in same flow

send::  msg testdiesel.streams.startGenerator  (stream="s2", howMany:Number=3)

val total:Array=[0]=[0] send::  msg diesel.stream.consume  (stream="s2", xtimeout:Number=1000)

expect::  (total is [0,1,2,3,4,5])

Batch stream API

Creating a simple AIP for batch processing of JSON records.

Create a stream from this flow,

  • start a consumer in another flow
  • and then call the API with a separate flow to put elements in it.

send::  msg diesel.stream.new  (stream="s3")

send::  msg testdiesel.stream.consume  (stream="s3", xtimeout:Number=1000)

send::  msg ctx.sleep  (duration:Number=1000)

Spawn a separate flow to put items in the stream "s3" :

send::  msg snakk.text  (url ="/diesel/rest/putAllInS3"..., verb="POST", body :Array)

Timer generator example, for pollers

TODO timer generators

// $send diesel.stream.new(stream="s4") // Start a generator in same flow // $send testdiesel.streams.startGenerator(stream="s4", howMany=3) // $send diesel.stream.consume(stream="s1", xtimeout=1000) // $send ctx.log(x=total) // $expect (total is 15)

Batch processing: putAll and onDataSlice

send::  msg diesel.stream.new  (stream="s2", batch:Boolean, batchSize:Number=2)

Start a generator in same flow

send::  msg testdiesel.streams.startGenerator  (stream="s2", howMany:Number=3)

val total:Array=[0]=[0] send::  msg diesel.stream.consume  (stream="s2", xtimeout:Number=1000)

expect::  (total is [0,1,2,3,4,5])

Timeout

send::  msg diesel.stream.new  (stream="stest85", timeoutMillis:Number=10000)

//$val stest85 = payload // $send ctx.sleep(duration=1000) // $send diesel.stream.put(stream="stest85", data=123)

send::  msg diesel.stream.consume  (stream="stest85")

Distributed - remote put and done

After creating the stream, get the ref not the name, if you want to use it distributed. Then send this ref to sub-flows. These sub-flows running on other nodes, can then send to this stream.

send::  msg diesel.stream.new  (stream="stest95", timeoutMillis:Number=100000)

val stest95=dieselStream val stest95ref=dieselStreamRef // $send diesel.stream.put(stream="stest85", data=123)

send::  msg testdiesel.please.streamremoteputdone  (streamRef)

send::  msg diesel.stream.consume  (stream="stest95")

val ref:JSON={ "node": "127.0.0.1:9002", "cat": "DieselStream", "id": "stest95" }

send::  msg diesel.stream.put  (streamRef, data:Number=1)


Was this useful?    

By: Razie | 2020-10-04 .. 2021-01-30 | Tags: story , dsl , streams


Viewed 919 times ( | History | Print ) this page.

You need to log in to post a comment!

© Copyright DieselApps, 2012-2025, all rights reserved.