package handler import ( "context" "encoding/json" "fmt" "github.com/gogo/protobuf/sortkeys" "github.com/gorilla/websocket" ipfs "github.com/ipfs/go-ipfs-api" "github.com/pkg/errors" "io" "locking-kit-server/consts" "locking-kit-server/db" "locking-kit-server/env" "log" "os" "os/exec" "path/filepath" "strconv" "strings" "time" ) /** * @author yuanrh * @description websocket服务核心逻辑 * @date 2021/6/28 11:11 **/ var TaskToWebChanel chan map[string] interface{} var TaskToDownloadOrUploadChanel chan map[string] interface{} var MessageToWebChanel chan map[string] interface{} type ProjLockingMsg struct { Id int64 ` description:"主键ID" json:"Id,string"` UserId int64 ` description:"用户ID" json:"UserId,string"` Title string `description:"消息标题"` Body string `description:"消息主体"` Status int8 `description:"消息状态 0:未读,1:已读"` Type int8 `description:"消息类型 "` Parameter string `description:"参数"` UnixTime int64 `description:"unix时间戳" json:"UnixTime,string"` CreateUserId int64 `description:"创建者" json:"CreateUserId,string"` CreateTime time.Time `description:"创建时间"` ModifyUserId int64 `description:"修改者" json:"ModifyUserId,string"` ModifyTime time.Time `description:"修改时间"` } //监听任务同步情况 func SubscriptionTaskSyncHandler(conn *websocket.Conn) (err error){ if env.LoginStatus == 0{ return nil } //获取全部任务,包括 同步中、已同步、同步异常 prefix := fmt.Sprintf("/%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_SYNC) task,err := db.QueryWithPrefix(prefix) if err != nil{ log.Printf("监听同步任务失败,%v", err) return err } var taskIds []int64 //排序 for k, _ := range task { taskId,err :=strconv.ParseInt(strings.Split(k, "/")[3], 10, 64) if err!=nil{ log.Println(err) } taskIds = append(taskIds, taskId) } sortkeys.Int64s(taskIds) for _,taskId := range taskIds { key := fmt.Sprintf("/%v%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_SYNC,taskId) data,err := taskStringToMap(task[key]) if err !=nil{ log.Println(err) continue } TaskToWebChanel <- data } //加入下载队列 for index,taskId := range taskIds { key := fmt.Sprintf("/%v%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_SYNC,taskId) data,err := taskStringToMap(task[key]) if err !=nil{ log.Println(err) continue } //解析开机下载任务 //优先下载中 if data[consts.TASK_SYNC_STATUS] == consts.TASK_SYNC_STATUS_ING { TaskToDownloadOrUploadChanel <- data continue } if data[consts.TASK_SYNC_STATUS] == consts.TASK_SYNC_STATUS_WAIT { TaskToDownloadOrUploadChanel <- data } // if len(taskIds)>1000 && index<(len(taskIds)-1001){ err = db.DeleteWithPrefix(key) log.Println(err) } } //备份 go func() { for true { syncUploadToBackUpNode() time.Sleep(2 * time.Minute) } }() //开启异步下载/上传任务 syncDownloadOrUpload() //任务同步信道 for v := range TaskToWebChanel { value := CopyMap(v) data,err := json.Marshal(value) if err!=nil{ log.Println(err) } if err := conn.WriteMessage(websocket.TextMessage, data); err != nil { log.Printf("监听同步任务失败,%v", err) return err } } return nil } //任务对象字符串转map对象 func taskStringToMap(task string)(data map[string] interface{}, err error){ err = json.Unmarshal([]byte(task),&data) if err != nil{ log.Println(err) return data,err } return data,err } //处理下载、上传队列 func syncDownloadOrUpload(){ go func() { log.Print("开启异步处理下载、上传队列...") for v := range TaskToDownloadOrUploadChanel { my := CopyMap(v) //延迟2秒进入下载 time.Sleep(2*time.Second) if my[consts.TASK_TYPE]==consts.TASK_TYPE_DOWNLOAD{ downLoadTask(my) continue } upLoadTask(my) } }() } //执行单个上传任务 func upLoadTask(task map[string] interface{}){ remoteIpfsApi := ipfs.NewShell(env.IpfsApi) isUp := remoteIpfsApi.IsUp() if !isUp { log.Println("盒子节点网络连接不通!") //TaskToDownloadOrUploadChanel <- task //return } //上传启动标识,有上传进度则设置为true var uploading bool=false //cmd ipfs get cmd := exec.Command(env.IpfsPath, "add", task[consts.TASK_ABSOLUTE_PATH].(string)) uploadProgress := make(chan string,10000) var stdout, stderr []byte var errStdout, errStderr error stdoutIn, _ := cmd.StdoutPipe() stderrIn, _ := cmd.StderrPipe() cmd.Start() go func() { stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, uploadProgress) }() go func() { stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, uploadProgress) }() //上传 updateTaskProgress(task, 0) //异步给前端反馈上传进度 go func(){ first := true //millSeconds := time.Now().UnixNano() / 1e6 //current :=time.Now().UnixNano() / 1e6 for content := range uploadProgress { // 通道关闭后会退出for range循环 //current =time.Now().UnixNano() / 1e6 if first { pro,err := contentToJSONByte(content) if err!=nil{ log.Print(err) continue } if pro.Process == 0{ continue } //设置上传启动标识为true updateTaskProgress(task, pro.Process) uploading=true //millSeconds = current first=false } //if current-millSeconds>500{ pro,err := contentToJSONByte(content) if err!=nil{ log.Print(err) continue } if pro.Process == 0{ continue } uploading=true updateTaskProgress(task, pro.Process) //millSeconds = current //} } }() log.Println("资源上传中...") // //设置30秒连接超时,30秒未启动上传则上传失败 go func() { index :=0 for true{ if uploading==true{ return } index++ time.Sleep(time.Duration(1)*time.Second) if uploading==false && index==30{ log.Println("资源连接超时(30s),上传进程被终止") err := cmd.Process.Kill() if err!=nil{ log.Println(err) } updateTaskStatusToFail(task) return } if index>30{ return } } }() //等待执行完成 err := cmd.Wait() log.Println("add 执行完成") if err != nil { log.Println("cmd.Run() failed with %s\n", err) updateTaskStatusToFail(task) return } if errStdout != nil || errStderr != nil { log.Println("failed to capture stdout or stderr\n") updateTaskStatusToFail(task) return } outStr := string(stdout) fileHash := strings.Split(outStr," ")[1] task["IpfsCid"] = fileHash log.Printf("out:%s", outStr) defer close(uploadProgress) //ipfs provide cmd = exec.Command(env.IpfsPath,"dht","provide",fileHash) err = cmd.Run() if err != nil { log.Println(err) return } //加入ipfs盒子节点备份队列 task[consts.TASK_IPFS_API]= env.IpfsApi err = insertFileNodeBackupTask(task) if err!=nil{ log.Println(err) } //更新远程服务中心文件对象Hash值和历史记录 var id string var projId string var folderId string var modifyUserId string var fileSzie string log.Println(task) if task["Id"]!=nil { id = task["Id"].(string) } projId = task["ProjId"].(string) folderId = task["FolderId"].(string) modifyUserId = task["ModifyUserId"].(string) fileSzie = strconv.FormatInt(interfaceToInt64(task["FileSize"]),10) returnData, err := replaceIntoArchiveById(id, projId, folderId, task["ArchName"].(string), task["Extension"].(string), task["IpfsCid"].(string), fileSzie, task["RelativePath"].(string), modifyUserId, interfaceToInt(task["Version"])) if err !=nil{ log.Printf("replaceIntoArchiveById err %v", err) updateTaskStatusToFail(task) if err.Error()=="版本冲突"{ sendConflictNotifyToWeb(2, task) } return } if returnData!=nil{ //远程对象重置 remoteTask := returnData task["ModifyName"] = remoteTask["ModifyName"].(string) //更新工作空间文件对象 replceIntoWorkSpaceFileObject(task[consts.TASK_ABSOLUTE_PATH].(string),remoteTask, false) } //完成上传 updateTaskProgress(task, 100.00) log.Printf("叮,资源文件[ %v ]上传完成",task[consts.TASK_ABSOLUTE_PATH].(string)) } //异步处理备份节点 func syncUploadToBackUpNode(){ key := fmt.Sprintf("/%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_FILE_NODE_BACK) records,err := db.QueryWithPrefix(key) if err != nil{ log.Println() } if records==nil || len(records)==0{ return } for k, v := range records { var task = make(map[string] interface{}) err := json.Unmarshal([]byte(v),&task) if err !=nil{ log.Println(err) } if task[consts.TASK_IPFS_API]==""{ log.Println("ipfs api 未配置") } //备份 remoteIpfsApi := ipfs.NewShell(env.IpfsApi) isUp := remoteIpfsApi.IsUp() if !isUp { log.Println("盒子节点网络连接不通!") continue } //判断备份节点对等节点是否已添加连接 localsh := ipfs.NewShell("localhost:5001") idOut,err :=localsh.ID() localId :=idOut.ID swarmConnInfos,err :=remoteIpfsApi.SwarmPeers(context.Background()) hasConnect := false for _,swarmconn := range swarmConnInfos.Peers { if swarmconn.Peer==localId{ hasConnect=true break } } if !hasConnect{ log.Println("中继处理") swarmConnectAddr :="/ipfs/"+"12D3KooWER8uoGrzeLuJHTXoTMnR4jjHfpcA6ZcpXBaQNJvG5jMP"+"/p2p-circuit/ipfs/"+localId errT := remoteIpfsApi.SwarmConnect(context.Background(),swarmConnectAddr) if errT!=nil{ log.Printf("中继失败,引导节点备份失败 %v", err) } } err = remoteIpfsApi.Pin(task["IpfsCid"].(string)) if err != nil{ log.Println(err) } log.Printf("文件对象 %v 备份成功", v) //移除 err = db.DeleteWithPrefix(k) if err != nil{ log.Println(err) } } } func CopyMap(source map[string] interface{} ) map[string] interface{}{ target := make(map[string] interface{}) for k, v := range source { target[k]=v } return target } //新增文件节点备份任务 func insertFileNodeBackupTask(task map[string] interface{}) error{ myTask := CopyMap(task) //key=userId:TASK_SYNC_STATUS_WAIT:taskId archiveByte,err := json.Marshal(myTask) if err !=nil{ log.Println(err) return err } // key -> /userid/TASK_SYNC/417367746536689664 key := fmt.Sprintf("/%v%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_FILE_NODE_BACK,myTask[consts.TASK_ID].(string)) err = db.ReplaceInto(key,string(archiveByte)) if err !=nil{ log.Println(err) return err } return nil } //执行单个下载任务 func downLoadTask(task map[string] interface{}){ //检查文件目录是否存在,不存在则创建 directory := fmt.Sprintf("%v%v%v%v%v%v%v%v%v",env.WorkSpace,string(os.PathSeparator),env.CurrentUserPhone,string(os.PathSeparator),task["ProjName"],string(os.PathSeparator),task["NodeName"],string(os.PathSeparator),task["RelativePath"]) _,err := os.Stat(directory) if err != nil { //创建文件目录 err = os.MkdirAll(directory, os.ModePerm) if err!=nil{ log.Println(err) updateTaskStatusToFail(task) return } } //判断文件是否冲突(本地文件存在,且已修改未提交) filePath := filepath.Clean(fmt.Sprintf("%v%v%v.%v",directory,string(os.PathSeparator),task["ArchName"].(string),task["Extension"].(string))) orginFilePath := filePath conflict,err := isConflict(orginFilePath) if err!=nil{ log.Println(err) updateTaskStatusToFail(task) return } log.Printf("文件冲突:%v", conflict) if conflict { //文件名_冲突文件_作者_时间戳 modifyUnix := strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll(task["ModifyTime"].(string),"-",""),":",""),"+0800",""),"T","") filePath = filepath.Clean(fmt.Sprintf("%v%v%v_冲突文件_%v_%v.%v",directory,string(os.PathSeparator),task["ArchName"].(string), task["ModifyName"].(string), modifyUnix, task["Extension"].(string))) } //临时文件命名.locking tempFilePath :=filePath+".locking" //下载启动标识,有下载进度则设置为true var downloading bool = false //cmd ipfs get progress := make(chan string,10000) var stdout, stderr []byte var errStdout, errStderr error cmd := exec.Command(env.IpfsPath,"get", task["IpfsCid"].(string),"-o", tempFilePath) stdoutIn, _ := cmd.StdoutPipe() stderrIn, _ := cmd.StderrPipe() cmd.Start() go func() { stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, progress) }() go func() { stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, progress) }() //开始下载 updateTaskProgress(task, 0) //异步读取进度 //异步定时读取进度反馈给前端,每500ms返回一次进度 go func(){ //millSeconds := time.Now().UnixNano() / 1e6 for content := range progress { // 通道关闭后会退出for range循环 //current :=time.Now().UnixNano() / 1e6 if !downloading{ log.Printf("%v.%v %v 资源连接成功,下载中...",task["ArchName"],task["Extension"], task["IpfsCid"]) } downloading = true //current-millSeconds>500 && if strings.Index(content,"100.00%")==-1{ pro,err := contentToJSONByte(content) if err!=nil{ log.Print(err) continue } if pro.Process == 0{ pro.Process = 1 } //设置下载启动标识和下载时间戳 //millSeconds = current //更新进度、状态 updateTaskProgress(task, pro.Process) break } } }() //设置30秒连接超时,30秒未启动下载则下载失败 go func() { index :=0 for true{ //启动下载则不做超时判断 if downloading==true{ return } index++ time.Sleep(time.Duration(1)*time.Second) if downloading==false && index==30{ err = cmd.Process.Kill() log.Println("资源连接超时(30s),下载进程被终止") updateTaskStatusToFail(task) return } if index>30{ return } } }() //等待下载执行完成 err = cmd.Wait() if err != nil { updateTaskStatusToFail(task) log.Printf("cmd.Run() failed with %s\n", err) return } if errStdout != nil || errStderr != nil { updateTaskStatusToFail(task) log.Printf("failed to capture stdout or stderr\n") return } outStr := string(stdout) log.Printf("下载成功,:%s", outStr) //临时文件重命名 err = os.Rename(tempFilePath, filePath) if err!=nil{ log.Println(err) } time.Sleep(1*time.Second) //完成下载 task[consts.TASK_ABSOLUTE_PATH] = orginFilePath updateTaskProgress(task, 100.00) //更新工作空间文件对象 replceIntoWorkSpaceFileObject(orginFilePath,task, conflict) //冲突发送,通知 if conflict { sendConflictNotifyToWeb(3, task) } defer close(progress) } // type 2:上传冲突,3:下载冲突 func sendConflictNotifyToWeb(msgType int64, param map[string] interface{}){ var lockingMsg = make(map[string] interface{}) lockingMsg["UserId"]=env.CurrentUserId lockingMsg["Title"]="文件冲突" lockingMsg["Body"]="文件冲突" lockingMsg["Status"]=0 lockingMsg["Type"]= msgType lockingMsg["Parameter"]=param MessageToWebChanel <- lockingMsg } //在线通知 func ListeningRemoteMessage(){ log.Println("开启在线通知线程") for true { unixTime := strconv.FormatInt(time.Now().Unix(),10) time.Sleep(3*time.Minute) data,err := GetLockingMsgListByFilter(env.CurrentUserId,unixTime) if err != nil{ log.Printf("消息通知失败! %v", err) continue } if data==nil{ log.Println("未查询到通知消息") continue } //遍历消息 for _, msg := range data { MessageToWebChanel <- msg.(map[string] interface{}) } } } //是否存在文件冲突 func isConflict(filePath string) (isConflict bool,err error){ key := fmt.Sprintf("/%v%v%v", env.CurrentUserPhone, consts.ETCD_DIRECTOR_WORKSPACE_FILE_OBJECT,filePath) taskMap,err := db.QueryWithPrefix(key) if err != nil{ log.Println(err) return false,err } //不存在则否 taskString := taskMap[key] if len(taskString)==0{ return false,err } var task map[string] interface{} err = json.Unmarshal([]byte(taskString),&task) if err != nil{ log.Println(err) return false,err } return task[consts.TASK_IS_MODIFY].(bool),nil } //更新任务进度 func updateTaskProgress(task map[string] interface{}, progress float64){ task[consts.TASK_SYNC_PROGRESS]= progress if progress==100.00{ task[consts.TASK_SYNC_STATUS]= consts.TASK_SYNC_STATUS_FINISH }else{ task[consts.TASK_SYNC_STATUS]= consts.TASK_SYNC_STATUS_ING } taskByte,err := json.Marshal(task) if err !=nil{ log.Printf("序列化失败 %v \n", err) } // key -> /userid/TASK_SYNC/417367746536689664 key := fmt.Sprintf("/%v%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_SYNC,task[consts.TASK_ID]) err = db.ReplaceInto(key,string(taskByte)) if err !=nil{ log.Printf("etcd put 失败 %v \n", err) } TaskToWebChanel <- task } //更新任务状态为失败 func updateTaskStatusToFail(task map[string] interface{}){ task[consts.TASK_SYNC_STATUS] = consts.TASK_SYNC_STATUS_FAIL taskByte,err := json.Marshal(task) if err !=nil{ log.Printf("序列化失败 %v \n", err) } // key -> /userid/TASK_SYNC/417367746536689664 key := fmt.Sprintf("/%v%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_SYNC,task[consts.TASK_ID]) err = db.ReplaceInto(key,string(taskByte)) if err !=nil{ log.Printf("etcd put 失败 %v \n", err) } TaskToWebChanel <- task } func interfaceToInt64(id interface{}) int64{ switch id.(type) { case float64: return int64(id.(float64)) default: return id.(int64) } } func interfaceToInt(id interface{}) int{ switch id.(type) { case float64: return int(id.(float64)) default: return id.(int) } } /** 文件上传下载进度 */ type processStruct struct { Size string `json:"size"` CurrentSize string `json:"currentSize"` Unit string `json:"unit"` CurrentUnit string `json:"currentUnit"` Process float64 `json:"process"` Hash string `json:"hash"` CommitHistoryHash string `json:"commitHistoryHash"` Version int `json:"version"` } //更新工作空间文件对象 func replceIntoWorkSpaceFileObject(filePath string,task map[string] interface{}, conflict bool){ //冲突文件不做标记 if strings.Index(filePath,"_冲突文件_")>-1{ return } key := fmt.Sprintf("/%v%v%v", env.CurrentUserPhone, consts.ETCD_DIRECTOR_WORKSPACE_FILE_OBJECT,filePath) if !conflict { task[consts.TASK_IS_MODIFY] = false }else{ task[consts.TASK_IS_MODIFY] = true } taskByte,err := json.Marshal(task) if err !=nil{ log.Println(err) return } err = db.ReplaceInto(key, string(taskByte)) if err !=nil{ log.Println(err) } } //读取解析cmd返回文件上传或下载进度信息 func contentToJSONByte(content string) (pro processStruct, err error){ sts :=strings.Split(content," ") //log.Printf("content:%v,length:%v", content, len(sts)) if len(sts)<8{ log.Println("字符长度小于8") log.Println(content) return pro,errors.New("字符长度小于8") } //for i, v := range sts { // log.Printf("%v -> %v", i, v) //} var processFloat float64 //var err error //if (len(sts)==9 || len(sts)==8){ //processFloat,err =strconv.ParseFloat(strings.Replace(sts[7],"%","",1), 64) //}else if (len(sts)==10 || len(sts)==11){ //processFloat,err =strconv.ParseFloat(strings.Replace(sts[9],"%","",1), 64) //}else{ //processFloat,err =strconv.ParseFloat(strings.Replace(sts[len(sts)-2],"%","",1), 64) //} if strings.Index(content,"m")>-1{ processFloat,err =strconv.ParseFloat(strings.Replace(sts[len(sts)-2],"%","",1), 64) }else{ processFloat,err =strconv.ParseFloat(strings.Replace(sts[len(sts)-1],"%","",1), 64) } if err !=nil{ log.Printf("%v, length:%v, err:%v",content, len(sts), err) //for i, v := range sts { // log.Printf("%v -> %v", i, v) //} return pro,err } if processFloat==0{ //log.Println("当前进度0") pro.Process = 0 return pro,err } pro.Size = sts[4] pro.CurrentSize = sts[1] pro.Unit = sts[2] pro.CurrentUnit = sts[5] pro.Process = processFloat return pro,err } func copyAndCapture(w io.Writer, r io.Reader, progress chan string) ([]byte, error) { var out []byte buf := make([]byte, 1024, 1024) for { n, err := r.Read(buf[:]) if n > 0 { d := buf[:n] out = append(out, d...) if strings.Index(string(d),"Saving")>-1{ continue } progress <- string(d) } if err != nil { //log.Println(err) // Read returns io.EOF at the end of file, which is not an error for us if err == io.EOF { err = nil } return out, err } } // never reached panic(true) return nil, nil }