From 2fc72a688ef6ba9875e85ae73911b0f3e845b426 Mon Sep 17 00:00:00 2001 From: yuan_rh <545873205@qq.com> Date: Wed, 21 Apr 2021 16:11:43 +0800 Subject: [PATCH] feat:v1.0.5 --- config/config.go | 6 + handle/handle.go | 285 ++++++++++++++++++++++--------- main.go | 4 + nsqclient/nsq_single_consumer.go | 47 +++++ nsqclient/nsq_single_product.go | 30 ++++ websocket/websocket.go | 65 ++++++- 6 files changed, 348 insertions(+), 89 deletions(-) create mode 100644 nsqclient/nsq_single_consumer.go create mode 100644 nsqclient/nsq_single_product.go diff --git a/config/config.go b/config/config.go index 00b5609..e6765a0 100644 --- a/config/config.go +++ b/config/config.go @@ -26,6 +26,10 @@ var EtcdUrl="127.0.0.1:2379" var ServerUrl = "http://www.lockingos.org:9000" +var NsqAddr = "www.lockingos.org:4150" +var NsqTopic = "locking-topic-dev" +var NsqChanelPrefix = "channel-userId-" + var GobalWatch *fsnotify.Watcher //全局的chanel map @@ -33,6 +37,8 @@ var GobalWatchChannelMap = make(map[string] chan string) //var EtcdUrl="127.0.0.1:2379" + + func InitConfig(){ tszdir :=os.Getenv("TSZDIR") diff --git a/handle/handle.go b/handle/handle.go index 4c1e80c..28cb9a0 100644 --- a/handle/handle.go +++ b/handle/handle.go @@ -10,6 +10,7 @@ import ( "fmt" "fts/config" "fts/etcdclient" + "fts/nsqclient" "github.com/gorilla/websocket" _ "github.com/ipfs/go-ipfs-api" shell "github.com/ipfs/go-ipfs-api" @@ -61,6 +62,7 @@ type processStruct struct { Process float64 `json:"process"` Hash string `json:"hash"` CommitHistoryHash string `json:"commitHistoryHash"` + Version int `json:"version"` } @@ -163,6 +165,19 @@ func DownCommand(conn *websocket.Conn, hash, projectName, fileName, nodeDir stri //下载启动标识,有下载进度则设置为true var downloading bool = false + //检测文件打开状态 + tfile,err := os.OpenFile(fmt.Sprint(fileDir+"\\"+fileName),os.O_RDWR,1) + + if err != nil && (!os.IsNotExist(err)) { + + log.Println("文件被占用,请关闭打开的软件") + if err := conn.WriteMessage(websocket.TextMessage, []byte("-2")); err != nil { + return err + } + return err + } + defer tfile.Close() + //正在下载标识,标识用于不监测更新改动 gobalFileDownLoadingMap[fmt.Sprint(fileDir+"\\"+fileName)]=1 @@ -186,6 +201,7 @@ func DownCommand(conn *websocket.Conn, hash, projectName, fileName, nodeDir stri go func(){ millSeconds := time.Now().UnixNano() / 1e6 for content := range progress { // 通道关闭后会退出for range循环 + log.Println(">>>"+content) current :=time.Now().UnixNano() / 1e6 if !downloading{ log.Println("资源连接成功,下载中...") @@ -220,7 +236,7 @@ func DownCommand(conn *websocket.Conn, hash, projectName, fileName, nodeDir stri log.Printf("json.Marshal error %s\n", err) } if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil { - panic(err) + log.Println(err) } break @@ -263,7 +279,7 @@ func DownCommand(conn *websocket.Conn, hash, projectName, fileName, nodeDir stri //更新Etcd数据库的文件key对应hash值 time.Sleep(200*time.Millisecond) key := gobalLoginUserName+"\\"+projectName+"\\"+nodeDir+"\\"+fileName - err = etcdclient.ReplaceInto(key,hash) + err = etcdclient.ReplaceInto(key,hash+";0") if err != nil { log.Println(err) return err @@ -272,6 +288,9 @@ func DownCommand(conn *websocket.Conn, hash, projectName, fileName, nodeDir stri //发送消息至文件变更订阅 config.GobalWatchChannelMap[config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName] <- ";" + //下载完成反馈 + + log.Printf("叮,资源文件[ %v ]下载完成",fileName) defer time.Sleep(5*time.Second);gobalFileDownLoadingMap[fmt.Sprint(fileDir+"\\"+fileName)]=0 defer close(progress) @@ -336,6 +355,17 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir,cu } } + //检测文件打开状态 + tfile,err := os.OpenFile(absolutePath,os.O_RDWR,1) + if err != nil { + log.Println("文件被占用,请关闭打开的软件") + if err := conn.WriteMessage(websocket.TextMessage, []byte("-2")); err != nil { + return err + } + return err + } + defer tfile.Close() + serverSh := shell.NewShell(config.ServerIpfsUrl) //serverSh.SetTimeout(time.Duration(30)*time.Second) //log.Println("检测引导节点存活情况"+config.ServerIpfsUrl) @@ -566,6 +596,7 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir,cu prog.Process=100.00 prog.Size=strconv.Itoa(objectStat.CumulativeSize) prog.CommitHistoryHash=commitHistoryHash + prog.Version=commitVersion projson,err :=json.Marshal(prog) if conn!=nil{ @@ -583,20 +614,21 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir,cu return err } - if conn==nil{ - folderName := strings.Split(dir,"\\")[0] - var relativePath string - if len(strings.Split(dir, "\\"))==1{ - relativePath = "" - }else{ - relativePath = strings.Replace(relativePath, folderName+"\\","",1) - } - size,_ := strconv.ParseInt(prog.Size,10,64) - err = postUpdateFile(projectName,folderName,relativePath,fileHash, fileName, commitHistoryHash, currentHistoryHash, gobalLoginUserId, size, commitVersion) - if err!=nil{ - return err - } - } + //自动协同逻辑 + //if conn==nil{ + // folderName := strings.Split(dir,"\\")[0] + // var relativePath string + // if len(strings.Split(dir, "\\"))==1{ + // relativePath = "" + // }else{ + // relativePath = strings.Replace(relativePath, folderName+"\\","",1) + // } + // size,_ := strconv.ParseInt(prog.Size,10,64) + // err = postUpdateFile(projectName,folderName,relativePath,fileHash, fileName, commitHistoryHash, currentHistoryHash, gobalLoginUserId, size, commitVersion) + // if err!=nil{ + // return err + // } + //} //发送文件至文件变更订阅 config.GobalWatchChannelMap[config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName] <- ";" @@ -792,6 +824,9 @@ func commitRecord(path,currentHistoryHash,hash,note,creator string, milestone bo commitHistory.CreateTime=time.Now().Unix() if commitHistory.ParentHash==commitHistory.CurrentHash{ + if commitHistory.Version>1{ + commitHistory.Version-- + } return commitHistory.Version,currentHistoryHash,nil } @@ -1089,42 +1124,41 @@ func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{ } log.Printf("文件变更 [ %v ] write", actionAndModifyFilePathStr) - //如果非新增文件则自动post - filePath := actionAndModifyFilePath[1] - + //保存即同步逻辑,如果非新增文件则自动post //获取文件的历史版本管理文件hash - fileName :=filepath.Base(filePath) - folderName := strings.Split(queryKey,"\\")[2] - dir := strings.Replace(strings.Replace(queryKey,"\\"+fileName,"",1),strings.Split(queryKey,"\\")[0]+"\\"+strings.Split(queryKey,"\\")[1]+"\\","",1) - var relativePath string - if len(strings.Split(dir, "\\"))==1{ - relativePath = "" - }else{ - relativePath = strings.Replace(relativePath, folderName+"\\","",1) - } - historyCurrentHash,err :=postGetHistoryCurrentHash(projectName, folderName, fileName, relativePath) - if err!=nil{ - GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName) - log.Printf("postGetHistoryCurrentHash 返回失败,%v",err.Error()) - continue - } - - //自动更新文件 - err =UploadCommand(nil,filePath,fileName,projectName,dir,historyCurrentHash,"",gobalLoginUserName,false) - if err!=nil{ - GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName) - log.Printf("UploadCommand 返回失败,%v",err.Error()) - //记录修改状态 - newValue := oldValue[0]+";" +"1" - err = etcdclient.ReplaceInto(queryKey,newValue) - if err!=nil{ - log.Println(err) - continue - } - continue - } - GobalMessageNotify <- fmt.Sprintf("文件更新成功\n%v",fileName) - continue + //filePath := actionAndModifyFilePath[1] + //fileName :=filepath.Base(filePath) + //folderName := strings.Split(queryKey,"\\")[2] + //dir := strings.Replace(strings.Replace(queryKey,"\\"+fileName,"",1),strings.Split(queryKey,"\\")[0]+"\\"+strings.Split(queryKey,"\\")[1]+"\\","",1) + //var relativePath string + //if len(strings.Split(dir, "\\"))==1{ + // relativePath = "" + //}else{ + // relativePath = strings.Replace(relativePath, folderName+"\\","",1) + //} + //historyCurrentHash,err :=postGetHistoryCurrentHash(projectName, folderName, fileName, relativePath) + //if err!=nil{ + // GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName) + // log.Printf("postGetHistoryCurrentHash 返回失败,%v",err.Error()) + // continue + //} + // + ////自动更新文件 + //err =UploadCommand(nil,filePath,fileName,projectName,dir,historyCurrentHash,"",gobalLoginUserName,false) + //if err!=nil{ + // GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName) + // log.Printf("UploadCommand 返回失败,%v",err.Error()) + // //记录修改状态 + // newValue := oldValue[0]+";" +"1" + // err = etcdclient.ReplaceInto(queryKey,newValue) + // if err!=nil{ + // log.Println(err) + // continue + // } + // continue + //} + //GobalMessageNotify <- fmt.Sprintf("文件更新成功\n%v",fileName) + //continue }else if actionAndModifyFilePath[0]=="create"{ querymap,err := etcdclient.QueryWithPrefix(queryKey) if err != nil { @@ -1134,6 +1168,7 @@ func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{ if len(querymap)==0{ continue } + //更新判断 if gobalFileDownLoadingMap[actionAndModifyFilePath[1]]==1{ continue @@ -1150,41 +1185,40 @@ func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{ //如果非新增文件则自动post //if goabalAddFileMap[] - filePath := actionAndModifyFilePath[1] - //获取文件的历史版本管理文件hash - fileName :=filepath.Base(filePath) - folderName := strings.Split(queryKey,"\\")[2] - dir := strings.Replace(strings.Replace(queryKey,"\\"+fileName,"",1),strings.Split(queryKey,"\\")[0]+"\\"+strings.Split(queryKey,"\\")[1]+"\\","",1) - var relativePath string - if len(strings.Split(dir, "\\"))==1{ - relativePath = "" - }else{ - relativePath = strings.Replace(relativePath, folderName+"\\","",1) - } - historyCurrentHash,err :=postGetHistoryCurrentHash(projectName, folderName, fileName, relativePath) - if err!=nil{ - GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName) - log.Printf("postGetHistoryCurrentHash 返回失败,%v",err.Error()) - continue - } - - //自动更新文件 - err =UploadCommand(nil,filePath,fileName,projectName,dir,historyCurrentHash,"",gobalLoginUserName,false) - if err!=nil{ - GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName) - log.Printf("UploadCommand 返回失败,%v",err.Error()) - //记录修改状态 - newValue := oldValue[0]+";" +"1" - err = etcdclient.ReplaceInto(queryKey,newValue) - if err!=nil{ - log.Println(err) - continue - } - continue - } - GobalMessageNotify <- fmt.Sprintf("文件更新成功\n%v",fileName) - continue + //filePath := actionAndModifyFilePath[1] + //fileName :=filepath.Base(filePath) + //folderName := strings.Split(queryKey,"\\")[2] + //dir := strings.Replace(strings.Replace(queryKey,"\\"+fileName,"",1),strings.Split(queryKey,"\\")[0]+"\\"+strings.Split(queryKey,"\\")[1]+"\\","",1) + //var relativePath string + //if len(strings.Split(dir, "\\"))==1{ + // relativePath = "" + //}else{ + // relativePath = strings.Replace(relativePath, folderName+"\\","",1) + //} + //historyCurrentHash,err :=postGetHistoryCurrentHash(projectName, folderName, fileName, relativePath) + //if err!=nil{ + // GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName) + // log.Printf("postGetHistoryCurrentHash 返回失败,%v",err.Error()) + // continue + //} + // + ////自动更新文件 + //err =UploadCommand(nil,filePath,fileName,projectName,dir,historyCurrentHash,"",gobalLoginUserName,false) + //if err!=nil{ + // GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName) + // log.Printf("UploadCommand 返回失败,%v",err.Error()) + // //记录修改状态 + // newValue := oldValue[0]+";" +"1" + // err = etcdclient.ReplaceInto(queryKey,newValue) + // if err!=nil{ + // log.Println(err) + // continue + // } + // continue + //} + //GobalMessageNotify <- fmt.Sprintf("文件更新成功\n%v",fileName) + //continue } err = sendFileListFromEtcd(keyPrefix,projectName,conn) if err != nil { @@ -1514,6 +1548,87 @@ type commitHistory struct { Milestone bool `json:"milestone"` } +/** +消息通知 +@param userId 用户ID +*/ +func MessageNotify(conn *websocket.Conn, userId string) (error){ + msgKey :=fmt.Sprintf("lockingMsg\\%v",userId) + + //返回全量通知消息列表 + err :=queryEtcdToWebSocket(conn, msgKey) + if err!=nil{ + log.Println(err) + return err + } + + //消费通知消息到本地 + nsqclient.Consumers(config.NsqTopic,(config.NsqChanelPrefix+userId),config.NsqAddr) + + for message := range nsqclient.MsgQueue { + messOb :=nsqclient.LockingMsg{} + err:=json.Unmarshal([]byte(message),&messOb) + if err!=nil{ + log.Println(err) + continue + } + + for _, acceptUserId := range messOb.UserIds { + //log.Println(strconv.FormatInt(acceptUserId,10)+">>>"+userId) + if strconv.FormatInt(acceptUserId,10)==userId{ + messagekey := msgKey+"\\"+strconv.FormatInt(messOb.Id,10) + err =etcdclient.ReplaceInto(messagekey,message) + if err!=nil{ + nsqclient.MsgQueue <- message + log.Println(err) + } + + err = queryEtcdToWebSocket(conn, messagekey) + if err!=nil{ + nsqclient.MsgQueue <- message + log.Println(err) + } + } + } + } + return nil +} + +/** +消息通知 +@param userId 用户ID +*/ +func MessageMarkReadHandler(conn *websocket.Conn, userId,messageId string) (err error){ + msgKey :=fmt.Sprintf("lockingMsg\\%v\\%v",userId,messageId) + err = etcdclient.DeleteWithPrefix(msgKey) + return err +} + +/** +查询etcd对应key值发送给前端 + */ +func queryEtcdToWebSocket(conn *websocket.Conn,etcdKeyPrefix string) (err error){ + msgs,err := etcdclient.QueryWithPrefix(etcdKeyPrefix) + if err!=nil{ + log.Println(err) + return err + } + mapByte,err :=json.Marshal(msgs) + if err!=nil{ + log.Println(err) + return err + + } + + if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil { + log.Println(err) + return err + + } + + return nil +} + diff --git a/main.go b/main.go index 638f572..5ad3f7c 100644 --- a/main.go +++ b/main.go @@ -181,6 +181,10 @@ func main() { //http.HandleFunc("/watchFile", websocket.WatchFileHandler) http.HandleFunc("/keeplive", websocket.KeepliveHandler) http.HandleFunc("/messageNotify", websocket.MessageNotifyHandler) + http.HandleFunc("/messageMarkRead", websocket.MessageMarkReadHandler) + + //TODO 消息已读 + http.HandleFunc("/queryCommitHistory", websocket.QueryCommitHistoryHandler) http.HandleFunc("/editCommitHistoryMilestone", websocket.EditCommitHistoryMilestoneHandler) diff --git a/nsqclient/nsq_single_consumer.go b/nsqclient/nsq_single_consumer.go new file mode 100644 index 0000000..b8990a7 --- /dev/null +++ b/nsqclient/nsq_single_consumer.go @@ -0,0 +1,47 @@ +package nsqclient + +import ( + "github.com/nsqio/go-nsq" + "log" +) + +/** +Locking通知消息 +*/ +type LockingMsg struct{ + Id int64 `json:"id,string"` + UserIds []int64 `json:"userIds"` + PushChannel int `json:"pushChannel"` + Title string `json:"title"` + Body string `json:"body"` + Type string `json:"type"` + PushTimeUnix int64 `json:"pushTimeUnix,string"` + Parameter map[string] string `json:"parameter"` +} + +var MsgQueue = make(chan string,100000) + +type NewHandler struct{} + +func (m *NewHandler) HandleMessage(msg *nsq.Message) (err error) { + //addr := msg.NSQDAddress + message := string(msg.Body) + log.Println(message) + MsgQueue <- message + return +} +func Consumers(topic, channel, addr string) { + conf := nsq.NewConfig() + new_consumer, err := nsq.NewConsumer(topic, channel, conf) + if err != nil { + + } + + // 接收消息 + new_handler := &NewHandler{} + new_consumer.AddHandler(new_handler) + err = new_consumer.ConnectToNSQD(addr) + if err != nil { + + } +} diff --git a/nsqclient/nsq_single_product.go b/nsqclient/nsq_single_product.go new file mode 100644 index 0000000..ad675e0 --- /dev/null +++ b/nsqclient/nsq_single_product.go @@ -0,0 +1,30 @@ +package nsqclient + +import ( + "fmt" + "github.com/nsqio/go-nsq" + "time" +) + + + + +func main() { + + nsqAddr := "www.lockingos.org:4150" + conf :=nsq.NewConfig() + p ,err := nsq.NewProducer(nsqAddr,conf) + if err != nil { + fmt.Println(err) + return + } + for { + message := "message :"+ time.Now().Format("2006-01-02 15:04:05") + fmt.Println(message) + // 发送消息 + p.Publish("topic-demo1",[]byte(message)) + + time.Sleep(2*time.Second) + } + +} diff --git a/websocket/websocket.go b/websocket/websocket.go index f482183..3451905 100644 --- a/websocket/websocket.go +++ b/websocket/websocket.go @@ -432,19 +432,76 @@ func MessageNotifyHandler(w http.ResponseWriter, r *http.Request){ conn *websocket.Conn err error //msgType int - //data []byte + data []byte + ) + if conn, err = upgrader.Upgrade(w, r, nil); err !=nil { + //报错了,直接返回底层的websocket链接就会终断掉 + return + } + //得到了websocket.Conn长连接的对象,实现数据的收发 + for { + //Text(json), Binary + //if _, data, err = conn.ReadMessage(); err != nil { + if _, data, err = conn.ReadMessage(); err != nil { + //报错关闭websocket + goto ERR + } + //发送数据,判断返回值是否报错 + //log.Println("param initLocalWorkSpaceHandler:"+string(data)) + + err := handle.MessageNotify(conn,string(data)) + + if err!=nil{ + log.Println(err) + goto ERR + } + + goto ERR + } + //error的标签 +ERR: + conn.Close() +} + + +//消息已读 +func MessageMarkReadHandler(w http.ResponseWriter, r *http.Request){ + //w.Write([]byte("hello")) + //收到http请求(upgrade),完成websocket协议转换 + //在应答的header中放上upgrade:websoket + var ( + conn *websocket.Conn + err error + //msgType int + data []byte ) if conn, err = upgrader.Upgrade(w, r, nil); err !=nil { //报错了,直接返回底层的websocket链接就会终断掉 return } + //得到了websocket.Conn长连接的对象,实现数据的收发 + for { + //Text(json), Binary + //if _, data, err = conn.ReadMessage(); err != nil { + if _, data, err = conn.ReadMessage(); err != nil { + //报错关闭websocket + goto ERR + } + //发送数据,判断返回值是否报错 + //log.Println("param initLocalWorkSpaceHandler:"+string(data)) + params := strings.Split(string(data),"|") + err := handle.MessageMarkReadHandler(conn,params[0],params[1]) - //读取通道消息 - for msg := range handle.GobalMessageNotify { - if err := conn.WriteMessage(websocket.TextMessage, []byte(msg)); err != nil { + if err!=nil{ + if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil { + log.Println(err) + goto ERR + } log.Println(err) goto ERR } + + goto ERR } //error的标签 ERR: