From ad10e666379999be6cced5c004b4495b00c94515 Mon Sep 17 00:00:00 2001 From: yuan_rh <545873205@qq.com> Date: Tue, 17 Aug 2021 11:08:45 +0800 Subject: [PATCH] fixbug --- handler/http_handler.go | 7 ++----- handler/websocket_handler.go | 36 ++++++++++++++++++++++++------------ 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/handler/http_handler.go b/handler/http_handler.go index 58b0bc8..3aee975 100644 --- a/handler/http_handler.go +++ b/handler/http_handler.go @@ -108,8 +108,6 @@ func Login(w http.ResponseWriter, r *http.Request) { TaskToDownloadOrUploadChanel = make(chan map[string] interface{}, 100000) - TaskToNodeBackupChanel = make(chan map[string] interface{}, 100000) - MessageToWebChanel = make(chan map[string] interface{}, 100000) //工作目录监控 @@ -259,6 +257,8 @@ func startFileWatchProcess(){ log.Println("chan-->"+("create"+";"+ev.Name)) //ch <- ("create"+";"+ev.Name) // 创建文件 不做任何动作 + dir := filepath.Dir(ev.Name) + env.GobalFileWatch.Add(dir); updateFileObjectTaskIsModify(ev.Name) } } @@ -320,9 +320,6 @@ func Logout(w http.ResponseWriter, r *http.Request) { if TaskToWebChanel!=nil{ close(TaskToWebChanel) } - if TaskToNodeBackupChanel!=nil{ - close(TaskToNodeBackupChanel) - } if MessageToWebChanel!=nil{ close(MessageToWebChanel) } diff --git a/handler/websocket_handler.go b/handler/websocket_handler.go index 10c240b..d7b5eed 100644 --- a/handler/websocket_handler.go +++ b/handler/websocket_handler.go @@ -31,8 +31,6 @@ var TaskToWebChanel chan map[string] interface{} var TaskToDownloadOrUploadChanel chan map[string] interface{} -var TaskToNodeBackupChanel chan map[string] interface{} - var MessageToWebChanel chan map[string] interface{} type ProjLockingMsg struct { @@ -133,8 +131,7 @@ func SubscriptionTaskSyncHandler(conn *websocket.Conn) (err error){ //任务同步信道 for v := range TaskToWebChanel { - var value = make(map[string] interface{}) - value = v + value := CopyMap(v) data,err := json.Marshal(value) if err!=nil{ log.Println(err) @@ -151,8 +148,10 @@ func SubscriptionTaskSyncHandler(conn *websocket.Conn) (err error){ //任务对象字符串转map对象 func taskStringToMap(task string)(data map[string] interface{}, err error){ + err = json.Unmarshal([]byte(task),&data) if err != nil{ + log.Println(err) return data,err } return data,err @@ -163,13 +162,14 @@ func syncDownloadOrUpload(){ go func() { log.Print("开启异步处理下载、上传队列...") for v := range TaskToDownloadOrUploadChanel { + my := CopyMap(v) //延迟2秒进入下载 time.Sleep(2*time.Second) - if v[consts.TASK_TYPE]==consts.TASK_TYPE_DOWNLOAD{ - downLoadTask(v) + if my[consts.TASK_TYPE]==consts.TASK_TYPE_DOWNLOAD{ + downLoadTask(my) continue } - upLoadTask(v) + upLoadTask(my) } }() } @@ -308,7 +308,10 @@ func upLoadTask(task map[string] interface{}){ //加入ipfs盒子节点备份队列 task[consts.TASK_IPFS_API]= env.IpfsApi - insertFileNodeBackupTask(task) + err = insertFileNodeBackupTask(task) + if err!=nil{ + log.Println(err) + } //更新远程服务中心文件对象Hash值和历史记录 var id string @@ -419,24 +422,33 @@ func syncUploadToBackUpNode(){ } +func CopyMap(source map[string] interface{} ) map[string] interface{}{ + + target := make(map[string] interface{}) + for k, v := range source { + target[k]=v + } + return target +} + //新增文件节点备份任务 func insertFileNodeBackupTask(task map[string] interface{}) error{ + myTask := CopyMap(task) //key=userId:TASK_SYNC_STATUS_WAIT:taskId - archiveByte,err := json.Marshal(task) + archiveByte,err := json.Marshal(myTask) if err !=nil{ + log.Println(err) return err } // key -> /userid/TASK_SYNC/417367746536689664 - key := fmt.Sprintf("/%v%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_FILE_NODE_BACK,task[consts.TASK_ID].(string)) + key := fmt.Sprintf("/%v%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_FILE_NODE_BACK,myTask[consts.TASK_ID].(string)) err = db.ReplaceInto(key,string(archiveByte)) if err !=nil{ log.Println(err) return err } - //添加到备份队列 - TaskToNodeBackupChanel <- task return nil }