OutgoingThread.swift 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. //
  2. // OutgoingThread.swift
  3. // Runner
  4. //
  5. // Created by Yayan Dwi on 20/04/20.
  6. // Copyright © 2020 The Chromium Authors. All rights reserved.
  7. //
  8. import Foundation
  9. import FMDB
  10. class OutgoingThread {
  11. static let `default` = OutgoingThread()
  12. private var isRunning = false
  13. private var semaphore = DispatchSemaphore(value: 0)
  14. private var connection = DispatchSemaphore(value: 0)
  15. private var dispatchQueue = DispatchQueue(label: "OutgoingThread")
  16. private var queue = [TMessage]()
  17. init() {
  18. Database.shared.database?.inTransaction({ (fmdb, rollback) in
  19. if let cursor = Database.shared.getRecords(fmdb: fmdb, query: "select message from OUTGOING") {
  20. while cursor.next() {
  21. if let message = cursor.string(forColumnIndex: 0) {
  22. addQueue(message: TMessage(data: message))
  23. }
  24. }
  25. cursor.close()
  26. }
  27. })
  28. }
  29. func addQueue(message: TMessage) {
  30. queue.append(message)
  31. semaphore.signal()
  32. addOugoing(message: message)
  33. }
  34. private func addQueue(_ message: TMessage, at: Int) {
  35. Thread.sleep(forTimeInterval: 1)
  36. queue.insert(message, at: at)
  37. semaphore.signal()
  38. }
  39. private func addOugoing(message: TMessage) {
  40. DispatchQueue.global().async {
  41. let messageId = message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID)
  42. if !messageId.isEmpty {
  43. Database.shared.database?.inTransaction({ (fmdb, rollback) in
  44. do {
  45. _ = try Database.shared.insertRecord(fmdb: fmdb, table: "OUTGOING", cvalues: [
  46. "id" : messageId,
  47. "message" : message.pack(),
  48. ], replace: true)
  49. } catch {
  50. rollback.pointee = true
  51. print(error)
  52. }
  53. })
  54. }
  55. }
  56. }
  57. private func delOutgoing(fmdb: Any, messageId: String) {
  58. _ = Database.shared.deleteRecord(fmdb: fmdb as! FMDatabase, table: "OUTGOING", _where: "id = '\(messageId)'")
  59. }
  60. func delOutgoing(fmdb: Any, packageId: String) {
  61. _ = Database.shared.deleteRecord(fmdb: fmdb as! FMDatabase, table: "OUTGOING", _where: "package = '\(packageId)'")
  62. }
  63. private var isWait = false
  64. func set(wait: Bool) {
  65. isWait = wait
  66. if !isWait {
  67. connection.signal()
  68. }
  69. }
  70. func getQueue() -> TMessage {
  71. while queue.isEmpty || queue.count == 0 {
  72. print("QUEUE.wait")
  73. semaphore.wait()
  74. }
  75. return queue.remove(at: 0)
  76. }
  77. func run() {
  78. if (isRunning) {
  79. return
  80. }
  81. isRunning = true
  82. dispatchQueue.async {
  83. while self.isRunning {
  84. if self.isWait {
  85. print("CONNECTION.wait")
  86. self.connection.wait()
  87. }
  88. self.process(message: self.getQueue())
  89. }
  90. }
  91. }
  92. private func process(message: TMessage) {
  93. print("outgoing process", message.toLogString())
  94. if message.getCode() == CoreMessage_TMessageCode.SEND_CHAT {
  95. sendChat(message: message)
  96. } else if message.getCode() == CoreMessage_TMessageCode.DELETE_CTEXT {
  97. deleteMessage(message: message)
  98. }
  99. }
  100. /**
  101. *
  102. */
  103. private func sendChat(message: TMessage) {
  104. // if media exist upload first
  105. var fileName = message.getBody(key: CoreMessage_TMessageKey.IMAGE_ID, default_value: "")
  106. if fileName.isEmpty {
  107. fileName = message.getBody(key: CoreMessage_TMessageKey.AUDIO_ID)
  108. }
  109. if fileName.isEmpty {
  110. fileName = message.getBody(key: CoreMessage_TMessageKey.VIDEO_ID)
  111. }
  112. if fileName.isEmpty {
  113. fileName = message.getBody(key: CoreMessage_TMessageKey.FILE_ID)
  114. }
  115. let isMedia = !fileName.isEmpty
  116. if isMedia {
  117. if (!message.getBody(key: CoreMessage_TMessageKey.THUMB_ID).isEmpty) {
  118. Network().upload(name: message.getBody(key: CoreMessage_TMessageKey.THUMB_ID)) { (result, progress) in
  119. if result, progress == 100 {
  120. do {
  121. let fileManager = FileManager.default
  122. let documentDir = try fileManager.url(for: .documentDirectory, in: .userDomainMask, appropriateFor: nil, create: true)
  123. let fileDir = documentDir.appendingPathComponent(message.getBody(key: CoreMessage_TMessageKey.THUMB_ID))
  124. let path = fileDir.path
  125. if FileManager.default.fileExists(atPath: path) {
  126. let data = try Data(contentsOf: URL(fileURLWithPath: path))
  127. message.setMedia(media: [UInt8] (data))
  128. }
  129. } catch {}
  130. Network().upload(name: fileName) { (result, progress) in
  131. if result {
  132. if let delegate = Nexilis.shared.messageDelegate {
  133. delegate.onUpload(name: fileName, progress: progress)
  134. }
  135. if progress == 100 {
  136. if let response = Nexilis.writeSync(message: message) {
  137. print("sendChat", response.toLogString())
  138. let messageId = response.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID)
  139. Database.shared.database?.inTransaction({ (fmdb, rollback) in
  140. _ = Database.shared.updateRecord(fmdb: fmdb, table: "MESSAGE", cvalues: [
  141. "status" : response.getBody(key: CoreMessage_TMessageKey.STATUS, default_value: "2")
  142. ], _where: "message_id = '\(messageId)'")
  143. self.delOutgoing(fmdb: fmdb, messageId: messageId)
  144. })
  145. } else {
  146. InquiryThread.default.addQueue(message: message)
  147. }
  148. }
  149. } else {
  150. InquiryThread.default.addQueue(message: message)
  151. }
  152. }
  153. }
  154. }
  155. } else {
  156. Network().upload(name: fileName) { (result, progress) in
  157. if result {
  158. if let delegate = Nexilis.shared.messageDelegate {
  159. delegate.onUpload(name: fileName, progress: progress)
  160. }
  161. if progress == 100 {
  162. if let response = Nexilis.writeSync(message: message) {
  163. print("sendChat", response.toLogString())
  164. let messageId = response.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID)
  165. Database.shared.database?.inTransaction({ (fmdb, rollback) in
  166. _ = Database.shared.updateRecord(fmdb: fmdb, table: "MESSAGE", cvalues: [
  167. "status" : response.getBody(key: CoreMessage_TMessageKey.STATUS, default_value: "2")
  168. ], _where: "message_id = '\(messageId)'")
  169. self.delOutgoing(fmdb: fmdb, messageId: messageId)
  170. })
  171. } else {
  172. InquiryThread.default.addQueue(message: message)
  173. }
  174. }
  175. } else {
  176. InquiryThread.default.addQueue(message: message)
  177. }
  178. }
  179. }
  180. } else {
  181. if let response = Nexilis.writeSync(message: message) {
  182. print("sendChat", response.toLogString())
  183. let messageId = response.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID)
  184. Database.shared.database?.inTransaction({ (fmdb, rollback) in
  185. _ = Database.shared.updateRecord(fmdb: fmdb, table: "MESSAGE", cvalues: [
  186. "status" : response.getBody(key: CoreMessage_TMessageKey.STATUS, default_value: "2")
  187. ], _where: "message_id = '\(messageId)'")
  188. self.delOutgoing(fmdb: fmdb, messageId: messageId)
  189. })
  190. } else {
  191. InquiryThread.default.addQueue(message: message)
  192. }
  193. }
  194. }
  195. private func deleteMessage(message: TMessage) {
  196. let messageId = message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID)
  197. let type = message.getBody(key: CoreMessage_TMessageKey.DELETE_MESSAGE_FLAG)
  198. Database.shared.database?.inTransaction({ (fmdb, rollback) in
  199. if type == "1" {
  200. _ = Database.shared.updateRecord(fmdb: fmdb, table: "MESSAGE", cvalues: [
  201. "message_text" : "🚫 _This message was deleted_",
  202. "lock" : "1",
  203. "thumb_id" : "",
  204. "image_id" : "",
  205. "file_id" : "",
  206. "audio_id" : "",
  207. "video_id" : "",
  208. "reff_id" : "",
  209. "attachment_flag" : 0,
  210. "is_stared" : "0",
  211. "credential" : "0",
  212. "read_receipts" : "4"
  213. ], _where: "message_id = '\(messageId)'")
  214. if let package = Nexilis.write(message: message) {
  215. _ = Database.shared.updateRecord(fmdb: fmdb, table: "OUTGOING", cvalues: [
  216. "package" : package
  217. ], _where: "id = '\(messageId)'")
  218. }
  219. if let delegate = Nexilis.shared.messageDelegate {
  220. delegate.onMessage(message: message)
  221. }
  222. } else {
  223. _ = Database.shared.deleteRecord(fmdb: fmdb, table: "MESSAGE", _where: "message_id = '\(messageId)'")
  224. _ = Database.shared.deleteRecord(fmdb: fmdb, table: "MESSAGE_SUMMARY", _where: "message_id = '\(messageId)'")
  225. let l_pin = message.getBody(key: CoreMessage_TMessageKey.L_PIN)
  226. let chat = message.getBody(key: CoreMessage_TMessageKey.CHAT_ID)
  227. do {
  228. var pin = l_pin
  229. if !chat.isEmpty {
  230. pin = chat
  231. }
  232. let queryGetLastMessageId = "SELECT message_id FROM MESSAGE where opposite_pin = '\(pin)' OR l_pin = '\(pin)' order by server_date desc LIMIT 1"
  233. var messageId = ""
  234. if let cursorData = Database.shared.getRecords(fmdb: fmdb, query: queryGetLastMessageId), cursorData.next() {
  235. messageId = cursorData.string(forColumnIndex: 0) ?? ""
  236. cursorData.close()
  237. }
  238. if !messageId.isEmpty {
  239. _ = try Database.shared.insertRecord(fmdb: fmdb, table: "MESSAGE_SUMMARY", cvalues: [
  240. "l_pin" : pin,
  241. "message_id" : messageId,
  242. "counter" : 0
  243. ], replace: true)
  244. }
  245. } catch {
  246. rollback.pointee = true
  247. print(error)
  248. }
  249. self.delOutgoing(fmdb: fmdb, messageId: messageId)
  250. }
  251. })
  252. }
  253. }