DataCollector
in a functional way
#1089
Replies: 2 comments
-
Currently There is some work going on to add a simple runtime to I don't have enough time to make your above code functional, but you can always capture something like the above code within an static class Foo<RT>
where RT: struct, HasCancel<RT>
{
public static Aff<RT, Unit> MainAsync(IEnumerable<string> ips) =>
ips.SequenceParallel(Loop)
.Map(_ => unit);
static Aff<RT, Unit> Loop(string ip) =>
Aff<RT, Unit>(rt =>
{
using (var local = CancellationTokenSource.CreateLinkedTokenSource(rt.CancellationToken))
{
_ = await Connect(ip, local.Token, async stream =>
{
// for health check
var task = Task.Run(async () =>
{
while (!local.IsCancellationRequested)
{
await stream.WriteAsync(Encoding.ASCII.GetBytes("||>GET DEVICE.NAME\r\n"), local.Token);
await Task.Delay(TimeSpan.FromSeconds(5), local.Token);
}
}, local.Token);
// call external APIs
await foreach (var reading in ReadingsAsync(stream, timeout: TimeSpan.FromSeconds(10),
ct: local.Token))
{
var timestamp = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
switch (reading)
{
case "||[105]":
goto Disconnect;
case { } s when s.StartsWith("DM"):
await PostConnected(timestamp, ip);
break;
case {Length: > 0 and <= 200}:
await PostScanData(reading, timestamp, ip);
break;
}
}
Disconnect:
local.Cancel();
try
{
await task;
}
catch (TaskCanceledException)
{
}
return unit;
});
}
await PostDisconnected(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), ip);
await Task.Delay(TimeSpan.FromSeconds(5), ct);
return unit;
});
} Obviously that means you spend all your time writing boilerplate like The |
Beta Was this translation helpful? Give feedback.
-
Thanks for the help. When I try to access to
@louthy could you help me figuring out, what could be the problem? public static class DataCollector<RT>
where RT : struct, HasCancel<RT>, HasConsole<RT>
{
public static Aff<RT, Unit> Main(IEnumerable<string> ips) =>
ips.SequenceParallel(Loop)
.Map(_ => unit);
static Aff<RT, Unit> Loop(string ip) =>
use(Eff(() => new TcpClient()), client =>
from ct in cancelToken<RT>()
from _ in client.ConnectAsync(ip, 23, ct).ToUnit().ToAff()
let stream = client.GetStream()
from cancel in fork(Health(stream))
from __ in TcpConnect(stream) | readLine | writeLine
select unit)
| @catch(error => Console<RT>.writeLine(error.Message).ToAff());
static Aff<RT, Unit> Health(Stream stream) =>
from ct in cancelToken<RT>()
let cmd = Encoding.ASCII.GetBytes("||>GET DEVICE.NAME\r\n")
from _ in Aff(() => stream.WriteAsync(cmd, ct).ToUnit()).Repeat(Schedule.spaced(5 * second) | Schedule.RepeatForever)
select unit;
static Producer<RT, TextReader, Unit> TcpConnect(Stream stream) =>
from reader in use<RT, StreamReader>(SuccessEff(new StreamReader(stream)))
from _ in Producer.yield<RT, TextReader>(reader)
select unit;
static Pipe<RT, TextReader, string, Unit> readLine
{
get
{
return from tr in awaiting<TextReader>()
from ct in cancelToken<RT>()
from ln in enumerate2(go(tr))
from __ in yield(ln)
select unit;
static async IAsyncEnumerable<string> go(TextReader reader)
{
while (true)
{
var line = await reader.ReadLineAsync().ConfigureAwait(false);
if(line == null) yield break;
yield return line;
}
}
}
}
static Consumer<RT, string, Unit> writeLine =>
from l in awaiting<string>()
from _ in Console<RT>.writeLine(l)
select unit;
} UpdateUsing static Pipe<RT, TextReader, string, Unit> readLine
{
get
{
return from tr in awaiting<TextReader>()
from ct in cancelToken<RT>()
from ln in enumerate(go(tr))
// from _ in yield(ln)
select unit;
static async IAsyncEnumerable<string> go(TextReader reader)
{
while (true)
{
var line = await reader.ReadLineAsync().ConfigureAwait(false);
if (line == null) yield break;
yield return line;
}
}
}
} |
Beta Was this translation helpful? Give feedback.
-
Dear @louthy,
I would like to discuss on the snippet below.
I would like to rewrite it using functional approach.
My main difficulties that I don't know what is the up-to-date information with the cancellation.
I have bumped into PR where
Eff
also got some kind of cancellation support, how it is possible at all, that a sync process gets cancelled? - Or am I wrong?I know it is much, but would it be possible to show us what would be your approach?
Thank you for all the effort bringing functional programming closer to C#.
Beta Was this translation helpful? Give feedback.
All reactions