Browse Source

fixbug

master
yuan_rh 3 years ago
parent
commit
ad10e66637
2 changed files with 26 additions and 17 deletions
  1. +2
    -5
      handler/http_handler.go
  2. +24
    -12
      handler/websocket_handler.go

+ 2
- 5
handler/http_handler.go View File

@@ -108,8 +108,6 @@ func Login(w http.ResponseWriter, r *http.Request) {

TaskToDownloadOrUploadChanel = make(chan map[string] interface{}, 100000)

TaskToNodeBackupChanel = make(chan map[string] interface{}, 100000)

MessageToWebChanel = make(chan map[string] interface{}, 100000)

//工作目录监控
@@ -259,6 +257,8 @@ func startFileWatchProcess(){
log.Println("chan-->"+("create"+";"+ev.Name))
//ch <- ("create"+";"+ev.Name)
// 创建文件 不做任何动作
dir := filepath.Dir(ev.Name)
env.GobalFileWatch.Add(dir);
updateFileObjectTaskIsModify(ev.Name)
}
}
@@ -320,9 +320,6 @@ func Logout(w http.ResponseWriter, r *http.Request) {
if TaskToWebChanel!=nil{
close(TaskToWebChanel)
}
if TaskToNodeBackupChanel!=nil{
close(TaskToNodeBackupChanel)
}
if MessageToWebChanel!=nil{
close(MessageToWebChanel)
}


+ 24
- 12
handler/websocket_handler.go View File

@@ -31,8 +31,6 @@ var TaskToWebChanel chan map[string] interface{}

var TaskToDownloadOrUploadChanel chan map[string] interface{}

var TaskToNodeBackupChanel chan map[string] interface{}

var MessageToWebChanel chan map[string] interface{}

type ProjLockingMsg struct {
@@ -133,8 +131,7 @@ func SubscriptionTaskSyncHandler(conn *websocket.Conn) (err error){
//任务同步信道
for v := range TaskToWebChanel {

var value = make(map[string] interface{})
value = v
value := CopyMap(v)
data,err := json.Marshal(value)
if err!=nil{
log.Println(err)
@@ -151,8 +148,10 @@ func SubscriptionTaskSyncHandler(conn *websocket.Conn) (err error){

//任务对象字符串转map对象
func taskStringToMap(task string)(data map[string] interface{}, err error){

err = json.Unmarshal([]byte(task),&data)
if err != nil{
log.Println(err)
return data,err
}
return data,err
@@ -163,13 +162,14 @@ func syncDownloadOrUpload(){
go func() {
log.Print("开启异步处理下载、上传队列...")
for v := range TaskToDownloadOrUploadChanel {
my := CopyMap(v)
//延迟2秒进入下载
time.Sleep(2*time.Second)
if v[consts.TASK_TYPE]==consts.TASK_TYPE_DOWNLOAD{
downLoadTask(v)
if my[consts.TASK_TYPE]==consts.TASK_TYPE_DOWNLOAD{
downLoadTask(my)
continue
}
upLoadTask(v)
upLoadTask(my)
}
}()
}
@@ -308,7 +308,10 @@ func upLoadTask(task map[string] interface{}){

//加入ipfs盒子节点备份队列
task[consts.TASK_IPFS_API]= env.IpfsApi
insertFileNodeBackupTask(task)
err = insertFileNodeBackupTask(task)
if err!=nil{
log.Println(err)
}

//更新远程服务中心文件对象Hash值和历史记录
var id string
@@ -419,24 +422,33 @@ func syncUploadToBackUpNode(){

}

func CopyMap(source map[string] interface{} ) map[string] interface{}{

target := make(map[string] interface{})
for k, v := range source {
target[k]=v
}
return target
}

//新增文件节点备份任务
func insertFileNodeBackupTask(task map[string] interface{}) error{

myTask := CopyMap(task)
//key=userId:TASK_SYNC_STATUS_WAIT:taskId
archiveByte,err := json.Marshal(task)
archiveByte,err := json.Marshal(myTask)
if err !=nil{
log.Println(err)
return err
}
// key -> /userid/TASK_SYNC/417367746536689664
key := fmt.Sprintf("/%v%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_FILE_NODE_BACK,task[consts.TASK_ID].(string))
key := fmt.Sprintf("/%v%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_FILE_NODE_BACK,myTask[consts.TASK_ID].(string))
err = db.ReplaceInto(key,string(archiveByte))
if err !=nil{
log.Println(err)
return err
}

//添加到备份队列
TaskToNodeBackupChanel <- task
return nil
}



Loading…
Cancel
Save