易云轻量版服务端
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

websocket_handler.go 20 KiB

3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846
  1. package handler
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/gogo/protobuf/sortkeys"
  7. "github.com/gorilla/websocket"
  8. ipfs "github.com/ipfs/go-ipfs-api"
  9. "github.com/pkg/errors"
  10. "io"
  11. "locking-kit-server/consts"
  12. "locking-kit-server/db"
  13. "locking-kit-server/env"
  14. "log"
  15. "os"
  16. "os/exec"
  17. "path/filepath"
  18. "strconv"
  19. "strings"
  20. "time"
  21. )
  22. /**
  23. * @author yuanrh
  24. * @description websocket服务核心逻辑
  25. * @date 2021/6/28 11:11
  26. **/
  27. var TaskToWebChanel chan map[string] interface{}
  28. var TaskToDownloadOrUploadChanel chan map[string] interface{}
  29. var MessageToWebChanel chan map[string] interface{}
  30. type ProjLockingMsg struct {
  31. Id int64 ` description:"主键ID" json:"Id,string"`
  32. UserId int64 ` description:"用户ID" json:"UserId,string"`
  33. Title string `description:"消息标题"`
  34. Body string `description:"消息主体"`
  35. Status int8 `description:"消息状态 0:未读,1:已读"`
  36. Type int8 `description:"消息类型 "`
  37. Parameter string `description:"参数"`
  38. UnixTime int64 `description:"unix时间戳" json:"UnixTime,string"`
  39. CreateUserId int64 `description:"创建者" json:"CreateUserId,string"`
  40. CreateTime time.Time `description:"创建时间"`
  41. ModifyUserId int64 `description:"修改者" json:"ModifyUserId,string"`
  42. ModifyTime time.Time `description:"修改时间"`
  43. }
  44. //监听任务同步情况
  45. func SubscriptionTaskSyncHandler(conn *websocket.Conn) (err error){
  46. if env.LoginStatus == 0{
  47. return nil
  48. }
  49. //获取全部任务,包括 同步中、已同步、同步异常
  50. prefix := fmt.Sprintf("/%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_SYNC)
  51. task,err := db.QueryWithPrefix(prefix)
  52. if err != nil{
  53. log.Printf("监听同步任务失败,%v", err)
  54. return err
  55. }
  56. var taskIds []int64
  57. //排序
  58. for k, _ := range task {
  59. taskId,err :=strconv.ParseInt(strings.Split(k, "/")[3], 10, 64)
  60. if err!=nil{
  61. log.Println(err)
  62. }
  63. taskIds = append(taskIds, taskId)
  64. }
  65. sortkeys.Int64s(taskIds)
  66. for _,taskId := range taskIds {
  67. key := fmt.Sprintf("/%v%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_SYNC,taskId)
  68. data,err := taskStringToMap(task[key])
  69. if err !=nil{
  70. log.Println(err)
  71. continue
  72. }
  73. TaskToWebChanel <- data
  74. }
  75. //加入下载队列
  76. for index,taskId := range taskIds {
  77. key := fmt.Sprintf("/%v%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_SYNC,taskId)
  78. data,err := taskStringToMap(task[key])
  79. if err !=nil{
  80. log.Println(err)
  81. continue
  82. }
  83. //解析开机下载任务
  84. //优先下载中
  85. if data[consts.TASK_SYNC_STATUS] == consts.TASK_SYNC_STATUS_ING {
  86. TaskToDownloadOrUploadChanel <- data
  87. continue
  88. }
  89. if data[consts.TASK_SYNC_STATUS] == consts.TASK_SYNC_STATUS_WAIT {
  90. TaskToDownloadOrUploadChanel <- data
  91. }
  92. //
  93. if len(taskIds)>1000 && index<(len(taskIds)-1001){
  94. err = db.DeleteWithPrefix(key)
  95. log.Println(err)
  96. }
  97. }
  98. //备份
  99. go func() {
  100. for true {
  101. syncUploadToBackUpNode()
  102. time.Sleep(2 * time.Minute)
  103. }
  104. }()
  105. //开启异步下载/上传任务
  106. syncDownloadOrUpload()
  107. //任务同步信道
  108. for v := range TaskToWebChanel {
  109. value := CopyMap(v)
  110. data,err := json.Marshal(value)
  111. if err!=nil{
  112. log.Println(err)
  113. }
  114. if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
  115. log.Printf("监听同步任务失败,%v", err)
  116. return err
  117. }
  118. }
  119. return nil
  120. }
  121. //任务对象字符串转map对象
  122. func taskStringToMap(task string)(data map[string] interface{}, err error){
  123. err = json.Unmarshal([]byte(task),&data)
  124. if err != nil{
  125. log.Println(err)
  126. return data,err
  127. }
  128. return data,err
  129. }
  130. //处理下载、上传队列
  131. func syncDownloadOrUpload(){
  132. go func() {
  133. log.Print("开启异步处理下载、上传队列...")
  134. for v := range TaskToDownloadOrUploadChanel {
  135. my := CopyMap(v)
  136. //延迟2秒进入下载
  137. time.Sleep(2*time.Second)
  138. if my[consts.TASK_TYPE]==consts.TASK_TYPE_DOWNLOAD{
  139. downLoadTask(my)
  140. continue
  141. }
  142. upLoadTask(my)
  143. }
  144. }()
  145. }
  146. //执行单个上传任务
  147. func upLoadTask(task map[string] interface{}){
  148. remoteIpfsApi := ipfs.NewShell(env.IpfsApi)
  149. isUp := remoteIpfsApi.IsUp()
  150. if !isUp {
  151. log.Println("盒子节点网络连接不通!")
  152. //TaskToDownloadOrUploadChanel <- task
  153. //return
  154. }
  155. //上传启动标识,有上传进度则设置为true
  156. var uploading bool=false
  157. //cmd ipfs get
  158. cmd := exec.Command(env.IpfsPath, "add", task[consts.TASK_ABSOLUTE_PATH].(string))
  159. uploadProgress := make(chan string,10000)
  160. var stdout, stderr []byte
  161. var errStdout, errStderr error
  162. stdoutIn, _ := cmd.StdoutPipe()
  163. stderrIn, _ := cmd.StderrPipe()
  164. cmd.Start()
  165. go func() {
  166. stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, uploadProgress)
  167. }()
  168. go func() {
  169. stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, uploadProgress)
  170. }()
  171. //上传
  172. updateTaskProgress(task, 0)
  173. //异步给前端反馈上传进度
  174. go func(){
  175. first := true
  176. //millSeconds := time.Now().UnixNano() / 1e6
  177. //current :=time.Now().UnixNano() / 1e6
  178. for content := range uploadProgress { // 通道关闭后会退出for range循环
  179. //current =time.Now().UnixNano() / 1e6
  180. if first {
  181. pro,err := contentToJSONByte(content)
  182. if err!=nil{
  183. log.Print(err)
  184. continue
  185. }
  186. if pro.Process == 0{
  187. continue
  188. }
  189. //设置上传启动标识为true
  190. updateTaskProgress(task, pro.Process)
  191. uploading=true
  192. //millSeconds = current
  193. first=false
  194. }
  195. //if current-millSeconds>500{
  196. pro,err := contentToJSONByte(content)
  197. if err!=nil{
  198. log.Print(err)
  199. continue
  200. }
  201. if pro.Process == 0{
  202. continue
  203. }
  204. uploading=true
  205. updateTaskProgress(task, pro.Process)
  206. //millSeconds = current
  207. //}
  208. }
  209. }()
  210. log.Println("资源上传中...")
  211. //
  212. //设置30秒连接超时,30秒未启动上传则上传失败
  213. go func() {
  214. index :=0
  215. for true{
  216. if uploading==true{
  217. return
  218. }
  219. index++
  220. time.Sleep(time.Duration(1)*time.Second)
  221. if uploading==false && index==30{
  222. log.Println("资源连接超时(30s),上传进程被终止")
  223. err := cmd.Process.Kill()
  224. if err!=nil{
  225. log.Println(err)
  226. }
  227. updateTaskStatusToFail(task)
  228. return
  229. }
  230. if index>30{
  231. return
  232. }
  233. }
  234. }()
  235. //等待执行完成
  236. err := cmd.Wait()
  237. log.Println("add 执行完成")
  238. if err != nil {
  239. log.Println("cmd.Run() failed with %s\n", err)
  240. updateTaskStatusToFail(task)
  241. return
  242. }
  243. if errStdout != nil || errStderr != nil {
  244. log.Println("failed to capture stdout or stderr\n")
  245. updateTaskStatusToFail(task)
  246. return
  247. }
  248. outStr := string(stdout)
  249. fileHash := strings.Split(outStr," ")[1]
  250. task["IpfsCid"] = fileHash
  251. log.Printf("out:%s", outStr)
  252. defer close(uploadProgress)
  253. //ipfs provide
  254. cmd = exec.Command(env.IpfsPath,"dht","provide",fileHash)
  255. err = cmd.Run()
  256. if err != nil {
  257. log.Println(err)
  258. return
  259. }
  260. //加入ipfs盒子节点备份队列
  261. task[consts.TASK_IPFS_API]= env.IpfsApi
  262. err = insertFileNodeBackupTask(task)
  263. if err!=nil{
  264. log.Println(err)
  265. }
  266. //更新远程服务中心文件对象Hash值和历史记录
  267. var id string
  268. var projId string
  269. var folderId string
  270. var modifyUserId string
  271. var fileSzie string
  272. log.Println(task)
  273. if task["Id"]!=nil {
  274. id = task["Id"].(string)
  275. }
  276. projId = task["ProjId"].(string)
  277. folderId = task["FolderId"].(string)
  278. modifyUserId = task["ModifyUserId"].(string)
  279. fileSzie = strconv.FormatInt(interfaceToInt64(task["FileSize"]),10)
  280. returnData, err := replaceIntoArchiveById(id, projId, folderId, task["ArchName"].(string), task["Extension"].(string), task["IpfsCid"].(string), fileSzie, task["RelativePath"].(string), modifyUserId, interfaceToInt(task["Version"]))
  281. if err !=nil{
  282. log.Printf("replaceIntoArchiveById err %v", err)
  283. updateTaskStatusToFail(task)
  284. if err.Error()=="版本冲突"{
  285. sendConflictNotifyToWeb(2, task)
  286. }
  287. return
  288. }
  289. if returnData!=nil{
  290. //远程对象重置
  291. remoteTask := returnData
  292. task["ModifyName"] = remoteTask["ModifyName"].(string)
  293. //更新工作空间文件对象
  294. replceIntoWorkSpaceFileObject(task[consts.TASK_ABSOLUTE_PATH].(string),remoteTask, false)
  295. }
  296. //完成上传
  297. updateTaskProgress(task, 100.00)
  298. log.Printf("叮,资源文件[ %v ]上传完成",task[consts.TASK_ABSOLUTE_PATH].(string))
  299. }
  300. //异步处理备份节点
  301. func syncUploadToBackUpNode(){
  302. key := fmt.Sprintf("/%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_FILE_NODE_BACK)
  303. records,err := db.QueryWithPrefix(key)
  304. if err != nil{
  305. log.Println()
  306. }
  307. if records==nil || len(records)==0{
  308. return
  309. }
  310. for k, v := range records {
  311. var task = make(map[string] interface{})
  312. err := json.Unmarshal([]byte(v),&task)
  313. if err !=nil{
  314. log.Println(err)
  315. }
  316. if task[consts.TASK_IPFS_API]==""{
  317. log.Println("ipfs api 未配置")
  318. }
  319. //备份
  320. remoteIpfsApi := ipfs.NewShell(env.IpfsApi)
  321. isUp := remoteIpfsApi.IsUp()
  322. if !isUp {
  323. log.Println("盒子节点网络连接不通!")
  324. continue
  325. }
  326. //判断备份节点对等节点是否已添加连接
  327. localsh := ipfs.NewShell("localhost:5001")
  328. idOut,err :=localsh.ID()
  329. localId :=idOut.ID
  330. swarmConnInfos,err :=remoteIpfsApi.SwarmPeers(context.Background())
  331. hasConnect := false
  332. for _,swarmconn := range swarmConnInfos.Peers {
  333. if swarmconn.Peer==localId{
  334. hasConnect=true
  335. break
  336. }
  337. }
  338. if !hasConnect{
  339. log.Println("中继处理")
  340. swarmConnectAddr :="/ipfs/"+"12D3KooWER8uoGrzeLuJHTXoTMnR4jjHfpcA6ZcpXBaQNJvG5jMP"+"/p2p-circuit/ipfs/"+localId
  341. errT := remoteIpfsApi.SwarmConnect(context.Background(),swarmConnectAddr)
  342. if errT!=nil{
  343. log.Printf("中继失败,引导节点备份失败 %v", err)
  344. }
  345. }
  346. err = remoteIpfsApi.Pin(task["IpfsCid"].(string))
  347. if err != nil{
  348. log.Println(err)
  349. }
  350. log.Printf("文件对象 %v 备份成功", v)
  351. //移除
  352. err = db.DeleteWithPrefix(k)
  353. if err != nil{
  354. log.Println(err)
  355. }
  356. }
  357. }
  358. func CopyMap(source map[string] interface{} ) map[string] interface{}{
  359. target := make(map[string] interface{})
  360. for k, v := range source {
  361. target[k]=v
  362. }
  363. return target
  364. }
  365. //新增文件节点备份任务
  366. func insertFileNodeBackupTask(task map[string] interface{}) error{
  367. myTask := CopyMap(task)
  368. //key=userId:TASK_SYNC_STATUS_WAIT:taskId
  369. archiveByte,err := json.Marshal(myTask)
  370. if err !=nil{
  371. log.Println(err)
  372. return err
  373. }
  374. // key -> /userid/TASK_SYNC/417367746536689664
  375. key := fmt.Sprintf("/%v%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_FILE_NODE_BACK,myTask[consts.TASK_ID].(string))
  376. err = db.ReplaceInto(key,string(archiveByte))
  377. if err !=nil{
  378. log.Println(err)
  379. return err
  380. }
  381. return nil
  382. }
  383. //执行单个下载任务
  384. func downLoadTask(task map[string] interface{}){
  385. //检查文件目录是否存在,不存在则创建
  386. directory := fmt.Sprintf("%v%v%v%v%v%v%v%v%v",env.WorkSpace,string(os.PathSeparator),env.CurrentUserPhone,string(os.PathSeparator),task["ProjName"],string(os.PathSeparator),task["NodeName"],string(os.PathSeparator),task["RelativePath"])
  387. _,err := os.Stat(directory)
  388. if err != nil {
  389. //创建文件目录
  390. err = os.MkdirAll(directory, os.ModePerm)
  391. if err!=nil{
  392. log.Println(err)
  393. updateTaskStatusToFail(task)
  394. return
  395. }
  396. }
  397. //判断文件是否冲突(本地文件存在,且已修改未提交)
  398. filePath := filepath.Clean(fmt.Sprintf("%v%v%v.%v",directory,string(os.PathSeparator),task["ArchName"].(string),task["Extension"].(string)))
  399. orginFilePath := filePath
  400. conflict,err := isConflict(orginFilePath)
  401. if err!=nil{
  402. log.Println(err)
  403. updateTaskStatusToFail(task)
  404. return
  405. }
  406. log.Printf("文件冲突:%v", conflict)
  407. if conflict {
  408. //文件名_冲突文件_作者_时间戳
  409. modifyUnix := strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll(task["ModifyTime"].(string),"-",""),":",""),"+0800",""),"T","")
  410. filePath = filepath.Clean(fmt.Sprintf("%v%v%v_冲突文件_%v_%v.%v",directory,string(os.PathSeparator),task["ArchName"].(string), task["ModifyName"].(string), modifyUnix, task["Extension"].(string)))
  411. }
  412. //临时文件命名.locking
  413. tempFilePath :=filePath+".locking"
  414. //下载启动标识,有下载进度则设置为true
  415. var downloading bool = false
  416. //cmd ipfs get
  417. progress := make(chan string,10000)
  418. var stdout, stderr []byte
  419. var errStdout, errStderr error
  420. cmd := exec.Command(env.IpfsPath,"get", task["IpfsCid"].(string),"-o", tempFilePath)
  421. stdoutIn, _ := cmd.StdoutPipe()
  422. stderrIn, _ := cmd.StderrPipe()
  423. cmd.Start()
  424. go func() {
  425. stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, progress)
  426. }()
  427. go func() {
  428. stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, progress)
  429. }()
  430. //开始下载
  431. updateTaskProgress(task, 0)
  432. //异步读取进度
  433. //异步定时读取进度反馈给前端,每500ms返回一次进度
  434. go func(){
  435. //millSeconds := time.Now().UnixNano() / 1e6
  436. for content := range progress { // 通道关闭后会退出for range循环
  437. //current :=time.Now().UnixNano() / 1e6
  438. if !downloading{
  439. log.Printf("%v.%v %v 资源连接成功,下载中...",task["ArchName"],task["Extension"], task["IpfsCid"])
  440. }
  441. downloading = true
  442. //current-millSeconds>500 &&
  443. if strings.Index(content,"100.00%")==-1{
  444. pro,err := contentToJSONByte(content)
  445. if err!=nil{
  446. log.Print(err)
  447. continue
  448. }
  449. if pro.Process == 0{
  450. pro.Process = 1
  451. }
  452. //设置下载启动标识和下载时间戳
  453. //millSeconds = current
  454. //更新进度、状态
  455. updateTaskProgress(task, pro.Process)
  456. break
  457. }
  458. }
  459. }()
  460. //设置30秒连接超时,30秒未启动下载则下载失败
  461. go func() {
  462. index :=0
  463. for true{
  464. //启动下载则不做超时判断
  465. if downloading==true{
  466. return
  467. }
  468. index++
  469. time.Sleep(time.Duration(1)*time.Second)
  470. if downloading==false && index==30{
  471. err = cmd.Process.Kill()
  472. log.Println("资源连接超时(30s),下载进程被终止")
  473. updateTaskStatusToFail(task)
  474. return
  475. }
  476. if index>30{
  477. return
  478. }
  479. }
  480. }()
  481. //等待下载执行完成
  482. err = cmd.Wait()
  483. if err != nil {
  484. updateTaskStatusToFail(task)
  485. log.Printf("cmd.Run() failed with %s\n", err)
  486. return
  487. }
  488. if errStdout != nil || errStderr != nil {
  489. updateTaskStatusToFail(task)
  490. log.Printf("failed to capture stdout or stderr\n")
  491. return
  492. }
  493. outStr := string(stdout)
  494. log.Printf("下载成功,:%s", outStr)
  495. //临时文件重命名
  496. err = os.Rename(tempFilePath, filePath)
  497. if err!=nil{
  498. log.Println(err)
  499. }
  500. time.Sleep(1*time.Second)
  501. //完成下载
  502. task[consts.TASK_ABSOLUTE_PATH] = orginFilePath
  503. updateTaskProgress(task, 100.00)
  504. //更新工作空间文件对象
  505. replceIntoWorkSpaceFileObject(orginFilePath,task, conflict)
  506. //冲突发送,通知
  507. if conflict {
  508. sendConflictNotifyToWeb(3, task)
  509. }
  510. defer close(progress)
  511. }
  512. // type 2:上传冲突,3:下载冲突
  513. func sendConflictNotifyToWeb(msgType int64, param map[string] interface{}){
  514. var lockingMsg = make(map[string] interface{})
  515. lockingMsg["UserId"]=env.CurrentUserId
  516. lockingMsg["Title"]="文件冲突"
  517. lockingMsg["Body"]="文件冲突"
  518. lockingMsg["Status"]=0
  519. lockingMsg["Type"]= msgType
  520. lockingMsg["Parameter"]=param
  521. MessageToWebChanel <- lockingMsg
  522. }
  523. //在线通知
  524. func ListeningRemoteMessage(){
  525. log.Println("开启在线通知线程")
  526. for true {
  527. unixTime := strconv.FormatInt(time.Now().Unix(),10)
  528. time.Sleep(3*time.Minute)
  529. data,err := GetLockingMsgListByFilter(env.CurrentUserId,unixTime)
  530. if err != nil{
  531. log.Printf("消息通知失败! %v", err)
  532. continue
  533. }
  534. if data==nil{
  535. log.Println("未查询到通知消息")
  536. continue
  537. }
  538. //遍历消息
  539. for _, msg := range data {
  540. MessageToWebChanel <- msg.(map[string] interface{})
  541. }
  542. }
  543. }
  544. //是否存在文件冲突
  545. func isConflict(filePath string) (isConflict bool,err error){
  546. key := fmt.Sprintf("/%v%v%v", env.CurrentUserPhone, consts.ETCD_DIRECTOR_WORKSPACE_FILE_OBJECT,filePath)
  547. taskMap,err := db.QueryWithPrefix(key)
  548. if err != nil{
  549. log.Println(err)
  550. return false,err
  551. }
  552. //不存在则否
  553. taskString := taskMap[key]
  554. if len(taskString)==0{
  555. return false,err
  556. }
  557. var task map[string] interface{}
  558. err = json.Unmarshal([]byte(taskString),&task)
  559. if err != nil{
  560. log.Println(err)
  561. return false,err
  562. }
  563. return task[consts.TASK_IS_MODIFY].(bool),nil
  564. }
  565. //更新任务进度
  566. func updateTaskProgress(task map[string] interface{}, progress float64){
  567. task[consts.TASK_SYNC_PROGRESS]= progress
  568. if progress==100.00{
  569. task[consts.TASK_SYNC_STATUS]= consts.TASK_SYNC_STATUS_FINISH
  570. }else{
  571. task[consts.TASK_SYNC_STATUS]= consts.TASK_SYNC_STATUS_ING
  572. }
  573. taskByte,err := json.Marshal(task)
  574. if err !=nil{
  575. log.Printf("序列化失败 %v \n", err)
  576. }
  577. // key -> /userid/TASK_SYNC/417367746536689664
  578. key := fmt.Sprintf("/%v%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_SYNC,task[consts.TASK_ID])
  579. err = db.ReplaceInto(key,string(taskByte))
  580. if err !=nil{
  581. log.Printf("etcd put 失败 %v \n", err)
  582. }
  583. TaskToWebChanel <- task
  584. }
  585. //更新任务状态为失败
  586. func updateTaskStatusToFail(task map[string] interface{}){
  587. task[consts.TASK_SYNC_STATUS] = consts.TASK_SYNC_STATUS_FAIL
  588. taskByte,err := json.Marshal(task)
  589. if err !=nil{
  590. log.Printf("序列化失败 %v \n", err)
  591. }
  592. // key -> /userid/TASK_SYNC/417367746536689664
  593. key := fmt.Sprintf("/%v%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_SYNC,task[consts.TASK_ID])
  594. err = db.ReplaceInto(key,string(taskByte))
  595. if err !=nil{
  596. log.Printf("etcd put 失败 %v \n", err)
  597. }
  598. TaskToWebChanel <- task
  599. }
  600. func interfaceToInt64(id interface{}) int64{
  601. switch id.(type) {
  602. case float64:
  603. return int64(id.(float64))
  604. default:
  605. return id.(int64)
  606. }
  607. }
  608. func interfaceToInt(id interface{}) int{
  609. switch id.(type) {
  610. case float64:
  611. return int(id.(float64))
  612. default:
  613. return id.(int)
  614. }
  615. }
  616. /**
  617. 文件上传下载进度
  618. */
  619. type processStruct struct {
  620. Size string `json:"size"`
  621. CurrentSize string `json:"currentSize"`
  622. Unit string `json:"unit"`
  623. CurrentUnit string `json:"currentUnit"`
  624. Process float64 `json:"process"`
  625. Hash string `json:"hash"`
  626. CommitHistoryHash string `json:"commitHistoryHash"`
  627. Version int `json:"version"`
  628. }
  629. //更新工作空间文件对象
  630. func replceIntoWorkSpaceFileObject(filePath string,task map[string] interface{}, conflict bool){
  631. //冲突文件不做标记
  632. if strings.Index(filePath,"_冲突文件_")>-1{
  633. return
  634. }
  635. key := fmt.Sprintf("/%v%v%v", env.CurrentUserPhone, consts.ETCD_DIRECTOR_WORKSPACE_FILE_OBJECT,filePath)
  636. if !conflict {
  637. task[consts.TASK_IS_MODIFY] = false
  638. }else{
  639. task[consts.TASK_IS_MODIFY] = true
  640. }
  641. taskByte,err := json.Marshal(task)
  642. if err !=nil{
  643. log.Println(err)
  644. return
  645. }
  646. err = db.ReplaceInto(key, string(taskByte))
  647. if err !=nil{
  648. log.Println(err)
  649. }
  650. }
  651. //读取解析cmd返回文件上传或下载进度信息
  652. func contentToJSONByte(content string) (pro processStruct, err error){
  653. sts :=strings.Split(content," ")
  654. //log.Printf("content:%v,length:%v", content, len(sts))
  655. if len(sts)<8{
  656. log.Println("字符长度小于8")
  657. log.Println(content)
  658. return pro,errors.New("字符长度小于8")
  659. }
  660. //for i, v := range sts {
  661. // log.Printf("%v -> %v", i, v)
  662. //}
  663. var processFloat float64
  664. //var err error
  665. //if (len(sts)==9 || len(sts)==8){
  666. //processFloat,err =strconv.ParseFloat(strings.Replace(sts[7],"%","",1), 64)
  667. //}else if (len(sts)==10 || len(sts)==11){
  668. //processFloat,err =strconv.ParseFloat(strings.Replace(sts[9],"%","",1), 64)
  669. //}else{
  670. //processFloat,err =strconv.ParseFloat(strings.Replace(sts[len(sts)-2],"%","",1), 64)
  671. //}
  672. if strings.Index(content,"m")>-1{
  673. processFloat,err =strconv.ParseFloat(strings.Replace(sts[len(sts)-2],"%","",1), 64)
  674. }else{
  675. processFloat,err =strconv.ParseFloat(strings.Replace(sts[len(sts)-1],"%","",1), 64)
  676. }
  677. if err !=nil{
  678. log.Printf("%v, length:%v, err:%v",content, len(sts), err)
  679. //for i, v := range sts {
  680. // log.Printf("%v -> %v", i, v)
  681. //}
  682. return pro,err
  683. }
  684. if processFloat==0{
  685. //log.Println("当前进度0")
  686. pro.Process = 0
  687. return pro,err
  688. }
  689. pro.Size = sts[4]
  690. pro.CurrentSize = sts[1]
  691. pro.Unit = sts[2]
  692. pro.CurrentUnit = sts[5]
  693. pro.Process = processFloat
  694. return pro,err
  695. }
  696. func copyAndCapture(w io.Writer, r io.Reader, progress chan string) ([]byte, error) {
  697. var out []byte
  698. buf := make([]byte, 1024, 1024)
  699. for {
  700. n, err := r.Read(buf[:])
  701. if n > 0 {
  702. d := buf[:n]
  703. out = append(out, d...)
  704. if strings.Index(string(d),"Saving")>-1{
  705. continue
  706. }
  707. progress <- string(d)
  708. }
  709. if err != nil {
  710. //log.Println(err)
  711. // Read returns io.EOF at the end of file, which is not an error for us
  712. if err == io.EOF {
  713. err = nil
  714. }
  715. return out, err
  716. }
  717. }
  718. // never reached
  719. panic(true)
  720. return nil, nil
  721. }