InquiryThread.swift 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. //
  2. // InquiryThread.swift
  3. // NexilisLite
  4. //
  5. // Created by Akhmad Al Qindi Irsyam on 23/11/22.
  6. //
  7. import Foundation
  8. import FMDB
  9. class InquiryThread {
  10. static let `default` = InquiryThread()
  11. private var isRunning = false
  12. private var semaphore = DispatchSemaphore(value: 0)
  13. private var connection = DispatchSemaphore(value: 0)
  14. private var dispatchQueue = DispatchQueue(label: "InquiryThread")
  15. private var queue = [TMessage]()
  16. init() {
  17. DispatchQueue.global().async { [self] in
  18. while Database.shared.database == nil {
  19. Thread.sleep(forTimeInterval: 1.0)
  20. }
  21. Database.shared.database?.inTransaction({ (fmdb, rollback) in
  22. do {
  23. if let cursor = Database.shared.getRecords(fmdb: fmdb, query: "select status, message, id from INQUIRY") {
  24. while cursor.next() {
  25. let status = cursor.int(forColumnIndex: 0)
  26. if status == 1 {
  27. delInquiry(fmdb: fmdb, messageId: cursor.string(forColumnIndex: 2)!)
  28. continue
  29. }
  30. if let cursorMessage = Database.shared.getRecords(fmdb: fmdb, query: "select message_id from MESSAGE where message_id = '\(cursor.string(forColumnIndex: 2)!)'") {
  31. if cursorMessage.next() {
  32. if let message = cursor.string(forColumnIndex: 1) {
  33. addQueue(message: TMessage(data: message))
  34. }
  35. } else {
  36. delInquiry(fmdb: fmdb, messageId: cursor.string(forColumnIndex: 2)!)
  37. }
  38. cursorMessage.close()
  39. }
  40. }
  41. cursor.close()
  42. }
  43. } catch {
  44. rollback.pointee = true
  45. print("Access database error: \(error.localizedDescription)")
  46. }
  47. })
  48. }
  49. }
  50. private func delInquiry(fmdb: Any, messageId: String) {
  51. _ = Database.shared.deleteRecord(fmdb: fmdb as! FMDatabase, table: "INQUIRY", _where: "id = '\(messageId)'")
  52. }
  53. func addQueue(message: TMessage) {
  54. //print("MASUK INQUIRY ADD")
  55. addInquiry(message: message)
  56. queue.append(message)
  57. semaphore.signal()
  58. }
  59. private func addQueue(_ message: TMessage, at: Int) {
  60. Thread.sleep(forTimeInterval: 1)
  61. queue.insert(message, at: at)
  62. semaphore.signal()
  63. }
  64. private func addInquiry(message: TMessage) {
  65. DispatchQueue.global().async {
  66. let messageId = message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID)
  67. if !messageId.isEmpty {
  68. Database.shared.database?.inTransaction({ (fmdb, rollback) in
  69. do {
  70. //print("SIMPAN MESSAGE TO INQUIRY TABLE")
  71. _ = try Database.shared.insertRecord(fmdb: fmdb, table: "INQUIRY", cvalues: [
  72. "id" : messageId,
  73. "status" : 0,
  74. "message" : message.pack(),
  75. ], replace: true)
  76. } catch {
  77. rollback.pointee = true
  78. print("Access database error: \(error.localizedDescription)")
  79. }
  80. })
  81. }
  82. }
  83. }
  84. private var isWait = false
  85. func set(wait: Bool) {
  86. isWait = wait
  87. if !isWait {
  88. connection.signal()
  89. semaphore.signal()
  90. }
  91. }
  92. func getQueue() -> TMessage {
  93. while queue.isEmpty || queue.count == 0 {
  94. //print("QUEUE INQUIRY.wait")
  95. // DispatchQueue.global(qos: .background).async {
  96. self.semaphore.wait()
  97. // }
  98. }
  99. return queue.remove(at: 0)
  100. }
  101. func run() {
  102. //print("RUN QUEUE")
  103. if (isRunning) {
  104. return
  105. }
  106. //print("isRunning false")
  107. isRunning = true
  108. dispatchQueue.async {
  109. while self.isRunning {
  110. if self.isWait {
  111. //print("CONNECTION INQUIRY.wait")
  112. self.connection.wait()
  113. } else {
  114. self.process(message: self.getQueue())
  115. }
  116. }
  117. }
  118. }
  119. private func process(message: TMessage) {
  120. //print("inquiry process", message.toLogString())
  121. mobileInquiry(message: message)
  122. }
  123. /**
  124. *
  125. */
  126. private func mobileInquiry(message: TMessage) {
  127. Nexilis.saveMessage(message: message)
  128. //print("save message sendChat")
  129. if !self.isWait {
  130. //print("inquiry write", message.toLogString())
  131. var res = Nexilis.write(message: CoreMessage_TMessageBank.getMobileInquiry(message_id: message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID)))
  132. while res == nil {
  133. res = Nexilis.write(message: CoreMessage_TMessageBank.getMobileInquiry(message_id: message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID)))
  134. }
  135. } else {
  136. queue.append(message)
  137. }
  138. }
  139. }