Skip to content

Commit

Permalink
Fix SendEmailSubscription timer
Browse files Browse the repository at this point in the history
  • Loading branch information
sistracia committed May 31, 2024
1 parent bbc832f commit 591d931
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 66 deletions.
19 changes: 3 additions & 16 deletions web/src/Server/DataAccess.fs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ let getRSSUrls (connectionString: string) (userId: string) : string list =
|> Sql.parameters [ "@user_id", Sql.string userId ]
|> Sql.execute (fun read -> read.text "url")

let insertUrlsQuery (userId: string) (urls: string array) (sql: Sql.SqlProps) =
sql
let insertUrls (connectionString: string) (userId: string) (urls: string array) =
connectionString
|> Sql.connect
|> Sql.executeTransaction
[ "INSERT INTO rss_urls (id, url, user_id) VALUES (@id, @url, @user_id)",
[ yield!
Expand All @@ -51,20 +52,6 @@ let insertUrlsQuery (userId: string) (urls: string array) (sql: Sql.SqlProps) =
[ "@id", Sql.text (Guid.NewGuid().ToString())
"@url", Sql.text url
"@user_id", Sql.text userId ]) ] ]

let insertUrls (connectionString: string) (userId: string) (urls: string array) =
connectionString |> Sql.connect |> insertUrlsQuery userId urls |> ignore

let insertUrlsWithCancellation
(cancellationToken: CancellationToken)
(connectionString: string)
(userId: string)
(urls: string array)
=
connectionString
|> Sql.connect
|> Sql.cancellationToken cancellationToken
|> insertUrlsQuery userId urls
|> ignore

let deleteUrls (connectionString: string) (userId: string) (urls: string array) : unit =
Expand Down
28 changes: 8 additions & 20 deletions web/src/Server/RSSWorker.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ module RSSWorker
open System
open System.IO
open System.Threading
open System.Threading.Tasks

open Shared
open Types

type IRSSProcessingService =
abstract member DoWork: stoppingToken: CancellationToken -> Task
abstract member DoWork: stoppingToken: CancellationToken -> unit Async

type RSSProcessingService(connectionString: string, publicHost: string, mailService: Mail.IMailService) =

Expand Down Expand Up @@ -91,7 +90,7 @@ type RSSProcessingService(connectionString: string, publicHost: string, mailServ

mailService.SendMail mailData

member private this.ProceedSubscriber(rssAggregate: RSSEmailsAggregate) : Async<(string * RSS seq) option> =
member private this.ProceedSubscriber(rssAggregate: RSSEmailsAggregate) : unit Async =
async {
let email: string = rssAggregate.Email
let rssHistories: string array = rssAggregate.Urls
Expand All @@ -110,26 +109,15 @@ type RSSProcessingService(connectionString: string, publicHost: string, mailServ
// Ignore if there is an error when sending email because invalid email or etc
with _ ->
()

return Some(rssAggregate.UserId, (this.LatestNewRSS newRSS))
else
return None
}

interface IRSSProcessingService with

member this.DoWork(stoppingToken: CancellationToken) =
task {
let! (newRSSList: (string * RSS seq) option array) =
DataAccess.aggreateRssEmails stoppingToken connectionString
|> List.map (this.ProceedSubscriber)
|> Async.Parallel

newRSSList
|> Array.choose id
|> Array.iter (fun (newRSS: (string * RSS seq)) ->
snd newRSS
|> Seq.map _.Origin
|> Seq.toArray
|> DataAccess.insertUrlsWithCancellation stoppingToken connectionString (fst newRSS))
async {
DataAccess.aggreateRssEmails stoppingToken connectionString
|> List.map (this.ProceedSubscriber)
|> Async.Parallel
|> Async.Ignore
|> Async.RunSynchronously
}
4 changes: 1 addition & 3 deletions web/src/Server/Server.fs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ let app: IHostBuilder =
let rssProcessingService: RSSWorker.RSSProcessingService =
RSSWorker.RSSProcessingService(connectionString, publicHost, mailService)

let minutesInMS: int = 1000 * 60

new Worker.SendEmailSubscription(minutesInMS, rssProcessingService, logger))
new Worker.SendEmailSubscription(rssProcessingService, logger))

use_router Router.defaultView
memory_cache
Expand Down
51 changes: 24 additions & 27 deletions web/src/Server/Worker.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,42 @@ module Worker

open System
open System.Threading
open System.Threading.Tasks
open Microsoft.Extensions.Hosting
open Microsoft.Extensions.Logging

type SendEmailSubscription(delay: int, rssProcessingService: RSSWorker.IRSSProcessingService, logger: ILogger<unit>) =
/// Ref: https://learn.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-8.0&tabs=visual-studio
type SendEmailSubscription(rssProcessingService: RSSWorker.IRSSProcessingService, logger: ILogger<unit>) =
inherit BackgroundService()

/// Called when the background service needs to run.
override this.ExecuteAsync(stoppingToken: CancellationToken) =
override __.ExecuteAsync(stoppingToken: CancellationToken) =
task {

Check warning on line 14 in web/src/Server/Worker.fs

View workflow job for this annotation

GitHub Actions / test

This state machine is not statically compilable. A 'let rec' occured in the resumable code specification. An alternative dynamic implementation will be used, which may be slower. Consider adjusting your code to ensure this state machine is statically compilable, or else suppress this warning.
logger.LogInformation "Background service start."
this.DoWork(stoppingToken) |> Async.AwaitTask |> ignore
}
logger.LogInformation "Timed Hosted Service running."

member private _.DoWork(stoppingToken: CancellationToken) : Task =
task {
let timer: PeriodicTimer = new PeriodicTimer(TimeSpan.FromSeconds(1.0))
let hourSend: int = 21
let mutable isSend: bool = DateTime.Now.Hour = hourSend

// Ref: https://stackoverflow.com/questions/73806802/how-to-use-while-loop-in-f-async-expression
try
logger.LogInformation "Background service running."

let mutable isSend: bool = DateTime.Now.Hour = 0
let rec loop () =
async {
logger.LogInformation "Timed Hosted Service is working"
let! (delayTask: bool) = timer.WaitForNextTickAsync(stoppingToken).AsTask() |> Async.AwaitTask

while true do
logger.LogInformation "Background service run."
if delayTask then
// Process RSS subscription every within range 12 midnight once
if DateTime.Now.Hour = hourSend && not isSend then
do! rssProcessingService.DoWork stoppingToken
isSend <- true
else if DateTime.Now.Hour <> hourSend then
isSend <- false

// Process RSS subscription every within range 12 midnight once
if DateTime.Now.Hour = 0 && not isSend then
do! rssProcessingService.DoWork stoppingToken
isSend <- true
else if DateTime.Now.Hour <> 0 then
isSend <- false
return! loop ()
}

do! Task.Delay(delay, stoppingToken)
do! Async.StartAsTask(loop ())
with (ex: exn) ->
logger.LogInformation $"error SendEmailSubscription.DoWork: {ex.Message}"
}

/// Called when a background service needs to gracefully shut down.
override this.StopAsync(stoppingToken: CancellationToken) =
task {
logger.LogInformation "Background service shutting down."
this.StopAsync stoppingToken |> Async.AwaitTask |> ignore
logger.LogInformation $"Timed Hosted Service is stopping: {ex.Message}"
}

0 comments on commit 591d931

Please sign in to comment.