Friday, August 05, 2016

A Persistent Queue with C#

This is just a bit of programming trivia.

I wanted to run a queue of tasks each of which could take a while to complete. Given that the program could be terminated before all tasks had been run then I needed to store any remaining tasks until a later opportunity to process them. Also I wanted to run the task queue on a background thread and be able to add to the queue of tasks from the UI thread.

Fortunately .NET has an inbuilt ConcurrentQueue<T> in System.Collections.Concurrent that can be accessed from multiple threads without issue. All that needed to be added was some code to persist uncompleted queue tasks. In the code below I am using the .NET port of SQLite but a flat file would work fine as would any other database.

As I was using a database I wanted to differentiate between queued tasks already in the relevant table and any added from the UI thread. [Not of course forgetting to delete tasks in the database that are completed.] So a little class to represent a task – the task itself being just a string in this instance.

class QueuedTask {     private string task;     private bool fromDatabase = false;     public QueuedTask(string task)     {         this.task = task;     }     public QueuedTask(string task, bool fromDatabase)     {         this.task = task;         this.fromDatabase = fromDatabase;     }     public bool InDatabase     {         get { return fromDatabase; }     }     public string Task     {         get { return task; }     } }
Creating an instance of my PersistentQueue Class starts a timer that loads any saved tasks into the queue on another thread so as not to block the UI thread.. The main program form FormClosing() event calls the class SaveTasks() method that saves any remaining tasks in the queue to the database.

class PersistentQueue {     public ConcurrentQueue<QueuedTask> TaskQueue = new ConcurrentQueue<QueuedTask>();     private string dbPath = "";     private Timer readTimer = new Timer();     private const int startDelay = 10000;     public PersistentQueue(string dbPath)     {         this.dbPath = dbPath;         readTimer.Interval = startDelay;         readTimer.Elapsed += (sender, e) => loadSavedTasks();         readTimer.Start();     }     public void SaveTasks()     {         readTimer.Close();         if (!TaskQueue.IsEmpty)         {             try             {                 SQLiteConnection newCon = new SQLiteConnection(dbPath);                 newCon.Open();                 SQLiteCommand mCommand = new SQLiteCommand("Insert Into Tasks (Task) Values(@Task)", newCon);                 mCommand.Parameters.Add(new SQLiteParameter("@Task"));                 QueuedTask mTask;                 while (TaskQueue.TryDequeue(out mTask))                 {                     if (!mTask.InDatabase)                     {                         mCommand.Parameters["@Task"].Value = mTask.Task;                         mCommand.ExecuteNonQuery();                     }                 }                 mCommand.Dispose();                 newCon.Close();             }             catch (Exception ex)             {             }         }     }     private async void loadSavedTasks()     {         readTimer.Stop();         await Task.Run(() => loadTasks());         // this is where you might start a new Timer to execute any tasks sitting         // in the queue - again using await Task.Run() and to periodically check the         // queue for new entries after the current queue has been emptied     }     private void loadTasks()     {         try         {             SQLiteConnection newCon = new SQLiteConnection(dbPath);             newCon.Open();             SQLiteCommand mCommand = new SQLiteCommand("Select Task From Tasks", newCon);             SQLiteDataReader mReader = mCommand.ExecuteReader();             while (mReader.Read())             {                 TaskQueue.Enqueue(new QueuedTask(Convert.ToString(mReader[0]), true));             }             mReader.Close();             mCommand.Dispose();             newCon.Close();         }         catch (Exception ex)         {             LiteLog myerror = new LiteLog("PersistentQueue.cs", "loadTasks", ex.Message);             myerror.UpdateClass();         }     } }

No comments: