OutgoingThread.swift 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. //
  2. // OutgoingThread.swift
  3. // Runner
  4. //
  5. // Created by Yayan Dwi on 20/04/20.
  6. // Copyright © 2020 The Chromium Authors. All rights reserved.
  7. //
  8. import Foundation
  9. import FMDB
  10. class OutgoingThread {
  11. static let `default` = OutgoingThread()
  12. private var isRunning = false
  13. private var semaphore = DispatchSemaphore(value: 0)
  14. private var connection = DispatchSemaphore(value: 0)
  15. private var dispatchQueue = DispatchQueue(label: "OutgoingThread")
  16. private var queue = [TMessage]()
  17. init() {
  18. Database.shared.database?.inTransaction({ (fmdb, rollback) in
  19. do {
  20. if let cursor = Database.shared.getRecords(fmdb: fmdb, query: "select message, id from OUTGOING") {
  21. while cursor.next() {
  22. if let message = cursor.string(forColumnIndex: 0) {
  23. if let cursorMessage = Database.shared.getRecords(fmdb: fmdb, query: "select message_id from MESSAGE where message_id = '\(cursor.string(forColumnIndex: 1)!)'") {
  24. if cursorMessage.next() {
  25. addQueue(message: TMessage(data: message))
  26. } else {
  27. delOutgoing(fmdb: fmdb, messageId: cursor.string(forColumnIndex: 1)!)
  28. }
  29. cursorMessage.close()
  30. } else {
  31. delOutgoing(fmdb: fmdb, messageId: cursor.string(forColumnIndex: 1)!)
  32. }
  33. }
  34. }
  35. cursor.close()
  36. }
  37. } catch {
  38. rollback.pointee = true
  39. print("Access database error: \(error.localizedDescription)")
  40. }
  41. })
  42. }
  43. func addQueue(message: TMessage) {
  44. queue.append(message)
  45. semaphore.signal()
  46. addOugoing(message: message)
  47. }
  48. private func addQueue(_ message: TMessage, at: Int) {
  49. Thread.sleep(forTimeInterval: 1)
  50. queue.insert(message, at: at)
  51. semaphore.signal()
  52. }
  53. private func addOugoing(message: TMessage) {
  54. DispatchQueue.global().async {
  55. let messageId = message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID)
  56. if !messageId.isEmpty {
  57. Database.shared.database?.inTransaction({ (fmdb, rollback) in
  58. do {
  59. _ = try Database.shared.insertRecord(fmdb: fmdb, table: "OUTGOING", cvalues: [
  60. "id" : messageId,
  61. "message" : message.pack(),
  62. ], replace: true)
  63. } catch {
  64. rollback.pointee = true
  65. print("Access database error: \(error.localizedDescription)")
  66. }
  67. })
  68. }
  69. }
  70. }
  71. private func delOutgoing(fmdb: Any, messageId: String) {
  72. _ = Database.shared.deleteRecord(fmdb: fmdb as! FMDatabase, table: "OUTGOING", _where: "id = '\(messageId)'")
  73. }
  74. func delOutgoing(fmdb: Any, packageId: String) {
  75. _ = Database.shared.deleteRecord(fmdb: fmdb as! FMDatabase, table: "OUTGOING", _where: "package = '\(packageId)'")
  76. }
  77. private var isWait = false
  78. func set(wait: Bool) {
  79. isWait = wait
  80. if !isWait {
  81. connection.signal()
  82. semaphore.signal()
  83. }
  84. }
  85. func getQueue() -> TMessage {
  86. while queue.isEmpty || queue.count == 0 {
  87. //print("QUEUE.wait")
  88. semaphore.wait()
  89. }
  90. return queue.remove(at: 0)
  91. }
  92. func run() {
  93. if (isRunning) {
  94. return
  95. }
  96. isRunning = true
  97. dispatchQueue.async {
  98. while self.isRunning {
  99. if self.isWait {
  100. //print("CONNECTION.wait")
  101. self.connection.wait()
  102. } else {
  103. self.process(message: self.getQueue())
  104. }
  105. }
  106. }
  107. }
  108. private func process(message: TMessage) {
  109. // print("outgoing process", message.toLogString())
  110. if self.isWait {
  111. queue.append(message)
  112. return
  113. }
  114. if message.getCode() == CoreMessage_TMessageCode.SEND_CHAT || message.getCode() == CoreMessage_TMessageCode.EDIT_MESSAGE {
  115. sendChat(message: message)
  116. } else if message.getCode() == CoreMessage_TMessageCode.DELETE_CTEXT {
  117. deleteMessage(message: message)
  118. }
  119. }
  120. /**
  121. *
  122. */
  123. private var maxRetryUpload: [String: Int] = [:]
  124. private var maxRetryUploadTime: [String: Int] = [:]
  125. private func sendChat(message: TMessage) {
  126. // if media exist upload first
  127. var fileName = message.getBody(key: CoreMessage_TMessageKey.IMAGE_ID, default_value: "")
  128. if fileName.isEmpty {
  129. fileName = message.getBody(key: CoreMessage_TMessageKey.AUDIO_ID)
  130. }
  131. if fileName.isEmpty {
  132. fileName = message.getBody(key: CoreMessage_TMessageKey.VIDEO_ID)
  133. }
  134. if fileName.isEmpty {
  135. fileName = message.getBody(key: CoreMessage_TMessageKey.FILE_ID)
  136. }
  137. let isMedia = !fileName.isEmpty
  138. if isMedia {
  139. if maxRetryUpload[message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID)] == nil {
  140. maxRetryUpload[message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID)] = 0
  141. maxRetryUploadTime[message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID)] = Date().currentTimeMillis()
  142. }
  143. if (!message.getBody(key: CoreMessage_TMessageKey.THUMB_ID).isEmpty) {
  144. Network().uploadHTTP(name: message.getBody(key: CoreMessage_TMessageKey.THUMB_ID)) { (result, progress, response) in
  145. if result, progress == 100 {
  146. do {
  147. let fileManager = FileManager.default
  148. let documentDir = try fileManager.url(for: .documentDirectory, in: .userDomainMask, appropriateFor: nil, create: true)
  149. let fileDir = documentDir.appendingPathComponent(message.getBody(key: CoreMessage_TMessageKey.THUMB_ID))
  150. let path = fileDir.path
  151. if FileManager.default.fileExists(atPath: path) {
  152. let data = try Data(contentsOf: URL(fileURLWithPath: path))
  153. message.setMedia(media: [UInt8] (data))
  154. }
  155. } catch {}
  156. Network().uploadHTTP(name: fileName) { (result, progress, response) in
  157. if result {
  158. if let delegate = Nexilis.shared.messageDelegate {
  159. delegate.onUpload(name: fileName, progress: progress)
  160. }
  161. if progress == 100 {
  162. if let response = Nexilis.writeSync(message: message) {
  163. //print("sendChat", response.toLogString())
  164. let messageId = response.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID)
  165. Database.shared.database?.inTransaction({ (fmdb, rollback) in
  166. do {
  167. _ = Database.shared.updateRecord(fmdb: fmdb, table: "MESSAGE", cvalues: [
  168. "status" : response.getBody(key: CoreMessage_TMessageKey.STATUS, default_value: "2")
  169. ], _where: "message_id = '\(messageId)'")
  170. self.delOutgoing(fmdb: fmdb, messageId: messageId)
  171. } catch {
  172. rollback.pointee = true
  173. print("Access database error: \(error.localizedDescription)")
  174. }
  175. })
  176. do {
  177. try FileEncryption.shared.writeSecure(filename: fileName)
  178. } catch {
  179. }
  180. } else {
  181. self.retryUpload(message: message, fileName: fileName)
  182. }
  183. }
  184. } else {
  185. self.retryUpload(message: message, fileName: fileName)
  186. }
  187. }
  188. } else {
  189. self.retryUpload(message: message, fileName: fileName)
  190. }
  191. }
  192. } else {
  193. Network().uploadHTTP(name: fileName) { (result, progress, response) in
  194. if result {
  195. if let delegate = Nexilis.shared.messageDelegate {
  196. delegate.onUpload(name: fileName, progress: progress)
  197. }
  198. if progress == 100 {
  199. if let response = Nexilis.writeSync(message: message) {
  200. //print("sendChat", response.toLogString())
  201. let messageId = response.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID)
  202. Database.shared.database?.inTransaction({ (fmdb, rollback) in
  203. do {
  204. _ = Database.shared.updateRecord(fmdb: fmdb, table: "MESSAGE", cvalues: [
  205. "status" : response.getBody(key: CoreMessage_TMessageKey.STATUS, default_value: "2")
  206. ], _where: "message_id = '\(messageId)'")
  207. self.delOutgoing(fmdb: fmdb, messageId: messageId)
  208. } catch {
  209. rollback.pointee = true
  210. print("Access database error: \(error.localizedDescription)")
  211. }
  212. })
  213. do{
  214. try FileEncryption.shared.writeSecure(filename: fileName)
  215. } catch {
  216. }
  217. } else {
  218. self.retryUpload(message: message, fileName: fileName)
  219. }
  220. }
  221. } else {
  222. self.retryUpload(message: message, fileName: fileName)
  223. }
  224. }
  225. }
  226. } else {
  227. if let response = Nexilis.writeSync(message: message) {
  228. //print("sendChat", response.toLogString())
  229. let messageId = response.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID)
  230. Database.shared.database?.inTransaction({ (fmdb, rollback) in
  231. do {
  232. _ = Database.shared.updateRecord(fmdb: fmdb, table: "MESSAGE", cvalues: [
  233. "status" : response.getBody(key: CoreMessage_TMessageKey.STATUS, default_value: "2")
  234. ], _where: "message_id = '\(messageId)'")
  235. self.delOutgoing(fmdb: fmdb, messageId: messageId)
  236. } catch {
  237. rollback.pointee = true
  238. print("Access database error: \(error.localizedDescription)")
  239. }
  240. })
  241. } else {
  242. InquiryThread.default.addQueue(message: message)
  243. }
  244. }
  245. }
  246. private func retryUpload(message: TMessage, fileName: String) {
  247. //print("masuk Retry")
  248. var maxRetry = Utils.getMaxRetryUpload()
  249. var maxRetryTime = Utils.getMaxRetryTimeUpload()
  250. if maxRetry == "0" {
  251. maxRetry = "5"
  252. }
  253. if maxRetryTime == "0" {
  254. maxRetryTime = "60000"
  255. }
  256. var countRetry = 0
  257. do {
  258. countRetry = maxRetryUpload[message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID)]!
  259. countRetry += 1
  260. //print("masuk Retry1 = \(countRetry)")
  261. maxRetryUpload.updateValue(countRetry, forKey: message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID))
  262. } catch {}
  263. if countRetry >= Int(maxRetry)! {
  264. Database.shared.database?.inTransaction({ (fmdb, rollback) in
  265. do {
  266. _ = Database.shared.updateRecord(fmdb: fmdb, table: "MESSAGE", cvalues: [
  267. "status" : "0"
  268. ], _where: "message_id = '\(message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID))'")
  269. _ = Database.shared.updateRecord(fmdb: fmdb, table: "MESSAGE_STATUS", cvalues: [
  270. "status" : "0"
  271. ], _where: "message_id = '\(message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID))'")
  272. self.delOutgoing(fmdb: fmdb, messageId: message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID))
  273. } catch {
  274. rollback.pointee = true
  275. print("Access database error: \(error.localizedDescription)")
  276. }
  277. })
  278. var dataMessage: [AnyHashable : Any] = [:]
  279. dataMessage["message_id"] = message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID)
  280. dataMessage["status"] = "0"
  281. NotificationCenter.default.post(name: NSNotification.Name(rawValue: Nexilis.failedSendMessage), object: nil, userInfo: dataMessage)
  282. maxRetryUpload.removeValue(forKey: message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID))
  283. maxRetryUploadTime.removeValue(forKey: message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID))
  284. if let delegate = Nexilis.shared.messageDelegate {
  285. delegate.onUpload(name: fileName, progress: 100)
  286. }
  287. } else {
  288. DispatchQueue.global().asyncAfter(deadline: .now() + 5, execute: { [self] in
  289. let timeRetry = maxRetryUploadTime[message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID)] ?? 0
  290. if timeRetry != 0 {
  291. if (timeRetry + Int(maxRetryTime)!) <= Date().currentTimeMillis() {
  292. Database.shared.database?.inTransaction({ (fmdb, rollback) in
  293. do {
  294. _ = Database.shared.updateRecord(fmdb: fmdb, table: "MESSAGE", cvalues: [
  295. "status" : "0"
  296. ], _where: "message_id = '\(message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID))'")
  297. _ = Database.shared.updateRecord(fmdb: fmdb, table: "MESSAGE_STATUS", cvalues: [
  298. "status" : "0"
  299. ], _where: "message_id = '\(message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID))'")
  300. self.delOutgoing(fmdb: fmdb, messageId: message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID))
  301. } catch {
  302. rollback.pointee = true
  303. print("Access database error: \(error.localizedDescription)")
  304. }
  305. })
  306. var dataMessage: [AnyHashable : Any] = [:]
  307. dataMessage["message_id"] = message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID)
  308. dataMessage["status"] = "0"
  309. NotificationCenter.default.post(name: NSNotification.Name(rawValue: Nexilis.failedSendMessage), object: nil, userInfo: dataMessage)
  310. maxRetryUpload.removeValue(forKey: message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID))
  311. maxRetryUploadTime.removeValue(forKey: message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID))
  312. if let delegate = Nexilis.shared.messageDelegate {
  313. delegate.onUpload(name: fileName, progress: 100)
  314. }
  315. return
  316. }
  317. }
  318. //print("retry sukses")
  319. sendChat(message: message)
  320. })
  321. }
  322. }
  323. private func deleteMessage(message: TMessage) {
  324. let messageId = message.getBody(key: CoreMessage_TMessageKey.MESSAGE_ID)
  325. let type = message.getBody(key: CoreMessage_TMessageKey.DELETE_MESSAGE_FLAG)
  326. Database.shared.database?.inTransaction({ (fmdb, rollback) in
  327. do {
  328. if type == "1" {
  329. _ = Database.shared.updateRecord(fmdb: fmdb, table: "MESSAGE", cvalues: [
  330. "message_text" : "🚫 _This message was deleted_",
  331. "lock" : "1",
  332. "thumb_id" : "",
  333. "image_id" : "",
  334. "file_id" : "",
  335. "audio_id" : "",
  336. "video_id" : "",
  337. "reff_id" : "",
  338. "attachment_flag" : 0,
  339. "is_stared" : "0",
  340. "credential" : "0",
  341. "read_receipts" : "4"
  342. ], _where: "message_id = '\(messageId)'")
  343. if let package = Nexilis.write(message: message) {
  344. _ = Database.shared.updateRecord(fmdb: fmdb, table: "OUTGOING", cvalues: [
  345. "package" : package
  346. ], _where: "id = '\(messageId)'")
  347. }
  348. if let delegate = Nexilis.shared.messageDelegate {
  349. NotificationCenter.default.post(name: NSNotification.Name(rawValue: "reloadTabChats"), object: nil, userInfo: nil)
  350. delegate.onMessage(message: message)
  351. }
  352. } else {
  353. _ = Database.shared.deleteRecord(fmdb: fmdb, table: "MESSAGE", _where: "message_id = '\(messageId)'")
  354. _ = Database.shared.deleteRecord(fmdb: fmdb, table: "MESSAGE_SUMMARY", _where: "message_id = '\(messageId)'")
  355. let l_pin = message.getBody(key: CoreMessage_TMessageKey.L_PIN)
  356. let chat = message.getBody(key: CoreMessage_TMessageKey.CHAT_ID)
  357. let scope = message.getBody(key: CoreMessage_TMessageKey.SCOPE_ID)
  358. do {
  359. var pin = l_pin
  360. if !chat.isEmpty {
  361. pin = chat
  362. }
  363. var queryGetLastMessageId = "SELECT message_id FROM MESSAGE where f_pin = '\(pin)' OR l_pin = '\(pin)' order by server_date desc LIMIT 1"
  364. if scope == "4" {
  365. queryGetLastMessageId = "SELECT message_id FROM MESSAGE where l_pin = '\(pin)' AND chat_id = '' order by server_date desc LIMIT 1"
  366. if !chat.isEmpty {
  367. queryGetLastMessageId = "SELECT message_id FROM MESSAGE where chat_id = '\(pin)' order by server_date desc LIMIT 1"
  368. }
  369. }
  370. var messageId = ""
  371. if let cursorData = Database.shared.getRecords(fmdb: fmdb, query: queryGetLastMessageId), cursorData.next() {
  372. messageId = cursorData.string(forColumnIndex: 0) ?? ""
  373. cursorData.close()
  374. }
  375. if !messageId.isEmpty {
  376. _ = try Database.shared.insertRecord(fmdb: fmdb, table: "MESSAGE_SUMMARY", cvalues: [
  377. "l_pin" : pin,
  378. "message_id" : messageId,
  379. "counter" : 0
  380. ], replace: true)
  381. }
  382. NotificationCenter.default.post(name: NSNotification.Name(rawValue: "reloadTabChats"), object: nil, userInfo: nil)
  383. } catch {
  384. rollback.pointee = true
  385. //print(error)
  386. }
  387. self.delOutgoing(fmdb: fmdb, messageId: messageId)
  388. }
  389. } catch {
  390. rollback.pointee = true
  391. print("Access database error: \(error.localizedDescription)")
  392. }
  393. })
  394. }
  395. }