Przeglądaj źródła

feat:v1.0.5

dev-v1.0.5
yuan_rh 4 lat temu
rodzic
commit
2fc72a688e
6 zmienionych plików z 348 dodań i 89 usunięć
  1. +6
    -0
      config/config.go
  2. +200
    -85
      handle/handle.go
  3. +4
    -0
      main.go
  4. +47
    -0
      nsqclient/nsq_single_consumer.go
  5. +30
    -0
      nsqclient/nsq_single_product.go
  6. +61
    -4
      websocket/websocket.go

+ 6
- 0
config/config.go Wyświetl plik

@@ -26,6 +26,10 @@ var EtcdUrl="127.0.0.1:2379"

var ServerUrl = "http://www.lockingos.org:9000"

var NsqAddr = "www.lockingos.org:4150"
var NsqTopic = "locking-topic-dev"
var NsqChanelPrefix = "channel-userId-"

var GobalWatch *fsnotify.Watcher

//全局的chanel map
@@ -33,6 +37,8 @@ var GobalWatchChannelMap = make(map[string] chan string)
//var EtcdUrl="127.0.0.1:2379"




func InitConfig(){

tszdir :=os.Getenv("TSZDIR")


+ 200
- 85
handle/handle.go Wyświetl plik

@@ -10,6 +10,7 @@ import (
"fmt"
"fts/config"
"fts/etcdclient"
"fts/nsqclient"
"github.com/gorilla/websocket"
_ "github.com/ipfs/go-ipfs-api"
shell "github.com/ipfs/go-ipfs-api"
@@ -61,6 +62,7 @@ type processStruct struct {
Process float64 `json:"process"`
Hash string `json:"hash"`
CommitHistoryHash string `json:"commitHistoryHash"`
Version int `json:"version"`
}


@@ -163,6 +165,19 @@ func DownCommand(conn *websocket.Conn, hash, projectName, fileName, nodeDir stri
//下载启动标识,有下载进度则设置为true
var downloading bool = false

//检测文件打开状态
tfile,err := os.OpenFile(fmt.Sprint(fileDir+"\\"+fileName),os.O_RDWR,1)

if err != nil && (!os.IsNotExist(err)) {

log.Println("文件被占用,请关闭打开的软件")
if err := conn.WriteMessage(websocket.TextMessage, []byte("-2")); err != nil {
return err
}
return err
}
defer tfile.Close()

//正在下载标识,标识用于不监测更新改动
gobalFileDownLoadingMap[fmt.Sprint(fileDir+"\\"+fileName)]=1

@@ -186,6 +201,7 @@ func DownCommand(conn *websocket.Conn, hash, projectName, fileName, nodeDir stri
go func(){
millSeconds := time.Now().UnixNano() / 1e6
for content := range progress { // 通道关闭后会退出for range循环
log.Println(">>>"+content)
current :=time.Now().UnixNano() / 1e6
if !downloading{
log.Println("资源连接成功,下载中...")
@@ -220,7 +236,7 @@ func DownCommand(conn *websocket.Conn, hash, projectName, fileName, nodeDir stri
log.Printf("json.Marshal error %s\n", err)
}
if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
panic(err)
log.Println(err)
}
break

@@ -263,7 +279,7 @@ func DownCommand(conn *websocket.Conn, hash, projectName, fileName, nodeDir stri
//更新Etcd数据库的文件key对应hash值
time.Sleep(200*time.Millisecond)
key := gobalLoginUserName+"\\"+projectName+"\\"+nodeDir+"\\"+fileName
err = etcdclient.ReplaceInto(key,hash)
err = etcdclient.ReplaceInto(key,hash+";0")
if err != nil {
log.Println(err)
return err
@@ -272,6 +288,9 @@ func DownCommand(conn *websocket.Conn, hash, projectName, fileName, nodeDir stri
//发送消息至文件变更订阅
config.GobalWatchChannelMap[config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName] <- ";"

//下载完成反馈


log.Printf("叮,资源文件[ %v ]下载完成",fileName)
defer time.Sleep(5*time.Second);gobalFileDownLoadingMap[fmt.Sprint(fileDir+"\\"+fileName)]=0
defer close(progress)
@@ -336,6 +355,17 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir,cu
}
}

//检测文件打开状态
tfile,err := os.OpenFile(absolutePath,os.O_RDWR,1)
if err != nil {
log.Println("文件被占用,请关闭打开的软件")
if err := conn.WriteMessage(websocket.TextMessage, []byte("-2")); err != nil {
return err
}
return err
}
defer tfile.Close()

serverSh := shell.NewShell(config.ServerIpfsUrl)
//serverSh.SetTimeout(time.Duration(30)*time.Second)
//log.Println("检测引导节点存活情况"+config.ServerIpfsUrl)
@@ -566,6 +596,7 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir,cu
prog.Process=100.00
prog.Size=strconv.Itoa(objectStat.CumulativeSize)
prog.CommitHistoryHash=commitHistoryHash
prog.Version=commitVersion
projson,err :=json.Marshal(prog)

if conn!=nil{
@@ -583,20 +614,21 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir,cu
return err
}

if conn==nil{
folderName := strings.Split(dir,"\\")[0]
var relativePath string
if len(strings.Split(dir, "\\"))==1{
relativePath = ""
}else{
relativePath = strings.Replace(relativePath, folderName+"\\","",1)
}
size,_ := strconv.ParseInt(prog.Size,10,64)
err = postUpdateFile(projectName,folderName,relativePath,fileHash, fileName, commitHistoryHash, currentHistoryHash, gobalLoginUserId, size, commitVersion)
if err!=nil{
return err
}
}
//自动协同逻辑
//if conn==nil{
// folderName := strings.Split(dir,"\\")[0]
// var relativePath string
// if len(strings.Split(dir, "\\"))==1{
// relativePath = ""
// }else{
// relativePath = strings.Replace(relativePath, folderName+"\\","",1)
// }
// size,_ := strconv.ParseInt(prog.Size,10,64)
// err = postUpdateFile(projectName,folderName,relativePath,fileHash, fileName, commitHistoryHash, currentHistoryHash, gobalLoginUserId, size, commitVersion)
// if err!=nil{
// return err
// }
//}

//发送文件至文件变更订阅
config.GobalWatchChannelMap[config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName] <- ";"
@@ -792,6 +824,9 @@ func commitRecord(path,currentHistoryHash,hash,note,creator string, milestone bo
commitHistory.CreateTime=time.Now().Unix()

if commitHistory.ParentHash==commitHistory.CurrentHash{
if commitHistory.Version>1{
commitHistory.Version--
}
return commitHistory.Version,currentHistoryHash,nil
}

@@ -1089,42 +1124,41 @@ func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{
}
log.Printf("文件变更 [ %v ] write", actionAndModifyFilePathStr)

//如果非新增文件则自动post
filePath := actionAndModifyFilePath[1]

//保存即同步逻辑,如果非新增文件则自动post
//获取文件的历史版本管理文件hash
fileName :=filepath.Base(filePath)
folderName := strings.Split(queryKey,"\\")[2]
dir := strings.Replace(strings.Replace(queryKey,"\\"+fileName,"",1),strings.Split(queryKey,"\\")[0]+"\\"+strings.Split(queryKey,"\\")[1]+"\\","",1)
var relativePath string
if len(strings.Split(dir, "\\"))==1{
relativePath = ""
}else{
relativePath = strings.Replace(relativePath, folderName+"\\","",1)
}
historyCurrentHash,err :=postGetHistoryCurrentHash(projectName, folderName, fileName, relativePath)
if err!=nil{
GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName)
log.Printf("postGetHistoryCurrentHash 返回失败,%v",err.Error())
continue
}

//自动更新文件
err =UploadCommand(nil,filePath,fileName,projectName,dir,historyCurrentHash,"",gobalLoginUserName,false)
if err!=nil{
GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName)
log.Printf("UploadCommand 返回失败,%v",err.Error())
//记录修改状态
newValue := oldValue[0]+";" +"1"
err = etcdclient.ReplaceInto(queryKey,newValue)
if err!=nil{
log.Println(err)
continue
}
continue
}
GobalMessageNotify <- fmt.Sprintf("文件更新成功\n%v",fileName)
continue
//filePath := actionAndModifyFilePath[1]
//fileName :=filepath.Base(filePath)
//folderName := strings.Split(queryKey,"\\")[2]
//dir := strings.Replace(strings.Replace(queryKey,"\\"+fileName,"",1),strings.Split(queryKey,"\\")[0]+"\\"+strings.Split(queryKey,"\\")[1]+"\\","",1)
//var relativePath string
//if len(strings.Split(dir, "\\"))==1{
// relativePath = ""
//}else{
// relativePath = strings.Replace(relativePath, folderName+"\\","",1)
//}
//historyCurrentHash,err :=postGetHistoryCurrentHash(projectName, folderName, fileName, relativePath)
//if err!=nil{
// GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName)
// log.Printf("postGetHistoryCurrentHash 返回失败,%v",err.Error())
// continue
//}
//
////自动更新文件
//err =UploadCommand(nil,filePath,fileName,projectName,dir,historyCurrentHash,"",gobalLoginUserName,false)
//if err!=nil{
// GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName)
// log.Printf("UploadCommand 返回失败,%v",err.Error())
// //记录修改状态
// newValue := oldValue[0]+";" +"1"
// err = etcdclient.ReplaceInto(queryKey,newValue)
// if err!=nil{
// log.Println(err)
// continue
// }
// continue
//}
//GobalMessageNotify <- fmt.Sprintf("文件更新成功\n%v",fileName)
//continue
}else if actionAndModifyFilePath[0]=="create"{
querymap,err := etcdclient.QueryWithPrefix(queryKey)
if err != nil {
@@ -1134,6 +1168,7 @@ func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{
if len(querymap)==0{
continue
}

//更新判断
if gobalFileDownLoadingMap[actionAndModifyFilePath[1]]==1{
continue
@@ -1150,41 +1185,40 @@ func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{

//如果非新增文件则自动post
//if goabalAddFileMap[]
filePath := actionAndModifyFilePath[1]

//获取文件的历史版本管理文件hash
fileName :=filepath.Base(filePath)
folderName := strings.Split(queryKey,"\\")[2]
dir := strings.Replace(strings.Replace(queryKey,"\\"+fileName,"",1),strings.Split(queryKey,"\\")[0]+"\\"+strings.Split(queryKey,"\\")[1]+"\\","",1)
var relativePath string
if len(strings.Split(dir, "\\"))==1{
relativePath = ""
}else{
relativePath = strings.Replace(relativePath, folderName+"\\","",1)
}
historyCurrentHash,err :=postGetHistoryCurrentHash(projectName, folderName, fileName, relativePath)
if err!=nil{
GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName)
log.Printf("postGetHistoryCurrentHash 返回失败,%v",err.Error())
continue
}

//自动更新文件
err =UploadCommand(nil,filePath,fileName,projectName,dir,historyCurrentHash,"",gobalLoginUserName,false)
if err!=nil{
GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName)
log.Printf("UploadCommand 返回失败,%v",err.Error())
//记录修改状态
newValue := oldValue[0]+";" +"1"
err = etcdclient.ReplaceInto(queryKey,newValue)
if err!=nil{
log.Println(err)
continue
}
continue
}
GobalMessageNotify <- fmt.Sprintf("文件更新成功\n%v",fileName)
continue
//filePath := actionAndModifyFilePath[1]
//fileName :=filepath.Base(filePath)
//folderName := strings.Split(queryKey,"\\")[2]
//dir := strings.Replace(strings.Replace(queryKey,"\\"+fileName,"",1),strings.Split(queryKey,"\\")[0]+"\\"+strings.Split(queryKey,"\\")[1]+"\\","",1)
//var relativePath string
//if len(strings.Split(dir, "\\"))==1{
// relativePath = ""
//}else{
// relativePath = strings.Replace(relativePath, folderName+"\\","",1)
//}
//historyCurrentHash,err :=postGetHistoryCurrentHash(projectName, folderName, fileName, relativePath)
//if err!=nil{
// GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName)
// log.Printf("postGetHistoryCurrentHash 返回失败,%v",err.Error())
// continue
//}
//
////自动更新文件
//err =UploadCommand(nil,filePath,fileName,projectName,dir,historyCurrentHash,"",gobalLoginUserName,false)
//if err!=nil{
// GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName)
// log.Printf("UploadCommand 返回失败,%v",err.Error())
// //记录修改状态
// newValue := oldValue[0]+";" +"1"
// err = etcdclient.ReplaceInto(queryKey,newValue)
// if err!=nil{
// log.Println(err)
// continue
// }
// continue
//}
//GobalMessageNotify <- fmt.Sprintf("文件更新成功\n%v",fileName)
//continue
}
err = sendFileListFromEtcd(keyPrefix,projectName,conn)
if err != nil {
@@ -1514,6 +1548,87 @@ type commitHistory struct {
Milestone bool `json:"milestone"`
}

/**
消息通知
@param userId 用户ID
*/
func MessageNotify(conn *websocket.Conn, userId string) (error){
msgKey :=fmt.Sprintf("lockingMsg\\%v",userId)

//返回全量通知消息列表
err :=queryEtcdToWebSocket(conn, msgKey)
if err!=nil{
log.Println(err)
return err
}

//消费通知消息到本地
nsqclient.Consumers(config.NsqTopic,(config.NsqChanelPrefix+userId),config.NsqAddr)

for message := range nsqclient.MsgQueue {
messOb :=nsqclient.LockingMsg{}
err:=json.Unmarshal([]byte(message),&messOb)
if err!=nil{
log.Println(err)
continue
}

for _, acceptUserId := range messOb.UserIds {
//log.Println(strconv.FormatInt(acceptUserId,10)+">>>"+userId)
if strconv.FormatInt(acceptUserId,10)==userId{
messagekey := msgKey+"\\"+strconv.FormatInt(messOb.Id,10)
err =etcdclient.ReplaceInto(messagekey,message)
if err!=nil{
nsqclient.MsgQueue <- message
log.Println(err)
}

err = queryEtcdToWebSocket(conn, messagekey)
if err!=nil{
nsqclient.MsgQueue <- message
log.Println(err)
}
}
}
}
return nil
}

/**
消息通知
@param userId 用户ID
*/
func MessageMarkReadHandler(conn *websocket.Conn, userId,messageId string) (err error){
msgKey :=fmt.Sprintf("lockingMsg\\%v\\%v",userId,messageId)
err = etcdclient.DeleteWithPrefix(msgKey)
return err
}

/**
查询etcd对应key值发送给前端
*/
func queryEtcdToWebSocket(conn *websocket.Conn,etcdKeyPrefix string) (err error){
msgs,err := etcdclient.QueryWithPrefix(etcdKeyPrefix)
if err!=nil{
log.Println(err)
return err
}
mapByte,err :=json.Marshal(msgs)
if err!=nil{
log.Println(err)
return err

}

if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil {
log.Println(err)
return err

}

return nil
}






+ 4
- 0
main.go Wyświetl plik

@@ -181,6 +181,10 @@ func main() {
//http.HandleFunc("/watchFile", websocket.WatchFileHandler)
http.HandleFunc("/keeplive", websocket.KeepliveHandler)
http.HandleFunc("/messageNotify", websocket.MessageNotifyHandler)
http.HandleFunc("/messageMarkRead", websocket.MessageMarkReadHandler)

//TODO 消息已读

http.HandleFunc("/queryCommitHistory", websocket.QueryCommitHistoryHandler)
http.HandleFunc("/editCommitHistoryMilestone", websocket.EditCommitHistoryMilestoneHandler)



+ 47
- 0
nsqclient/nsq_single_consumer.go Wyświetl plik

@@ -0,0 +1,47 @@
package nsqclient

import (
"github.com/nsqio/go-nsq"
"log"
)

/**
Locking通知消息
*/
type LockingMsg struct{
Id int64 `json:"id,string"`
UserIds []int64 `json:"userIds"`
PushChannel int `json:"pushChannel"`
Title string `json:"title"`
Body string `json:"body"`
Type string `json:"type"`
PushTimeUnix int64 `json:"pushTimeUnix,string"`
Parameter map[string] string `json:"parameter"`
}

var MsgQueue = make(chan string,100000)

type NewHandler struct{}

func (m *NewHandler) HandleMessage(msg *nsq.Message) (err error) {
//addr := msg.NSQDAddress
message := string(msg.Body)
log.Println(message)
MsgQueue <- message
return
}
func Consumers(topic, channel, addr string) {
conf := nsq.NewConfig()
new_consumer, err := nsq.NewConsumer(topic, channel, conf)
if err != nil {

}

// 接收消息
new_handler := &NewHandler{}
new_consumer.AddHandler(new_handler)
err = new_consumer.ConnectToNSQD(addr)
if err != nil {

}
}

+ 30
- 0
nsqclient/nsq_single_product.go Wyświetl plik

@@ -0,0 +1,30 @@
package nsqclient

import (
"fmt"
"github.com/nsqio/go-nsq"
"time"
)




func main() {

nsqAddr := "www.lockingos.org:4150"
conf :=nsq.NewConfig()
p ,err := nsq.NewProducer(nsqAddr,conf)
if err != nil {
fmt.Println(err)
return
}
for {
message := "message :"+ time.Now().Format("2006-01-02 15:04:05")
fmt.Println(message)
// 发送消息
p.Publish("topic-demo1",[]byte(message))

time.Sleep(2*time.Second)
}
}

+ 61
- 4
websocket/websocket.go Wyświetl plik

@@ -432,19 +432,76 @@ func MessageNotifyHandler(w http.ResponseWriter, r *http.Request){
conn *websocket.Conn
err error
//msgType int
//data []byte
data []byte
)
if conn, err = upgrader.Upgrade(w, r, nil); err !=nil {
//报错了,直接返回底层的websocket链接就会终断掉
return
}
//得到了websocket.Conn长连接的对象,实现数据的收发
for {
//Text(json), Binary
//if _, data, err = conn.ReadMessage(); err != nil {
if _, data, err = conn.ReadMessage(); err != nil {
//报错关闭websocket
goto ERR
}
//发送数据,判断返回值是否报错
//log.Println("param initLocalWorkSpaceHandler:"+string(data))

err := handle.MessageNotify(conn,string(data))

if err!=nil{
log.Println(err)
goto ERR
}

goto ERR
}
//error的标签
ERR:
conn.Close()
}


//消息已读
func MessageMarkReadHandler(w http.ResponseWriter, r *http.Request){
//w.Write([]byte("hello"))
//收到http请求(upgrade),完成websocket协议转换
//在应答的header中放上upgrade:websoket
var (
conn *websocket.Conn
err error
//msgType int
data []byte
)
if conn, err = upgrader.Upgrade(w, r, nil); err !=nil {
//报错了,直接返回底层的websocket链接就会终断掉
return
}
//得到了websocket.Conn长连接的对象,实现数据的收发
for {
//Text(json), Binary
//if _, data, err = conn.ReadMessage(); err != nil {
if _, data, err = conn.ReadMessage(); err != nil {
//报错关闭websocket
goto ERR
}
//发送数据,判断返回值是否报错
//log.Println("param initLocalWorkSpaceHandler:"+string(data))
params := strings.Split(string(data),"|")
err := handle.MessageMarkReadHandler(conn,params[0],params[1])

//读取通道消息
for msg := range handle.GobalMessageNotify {
if err := conn.WriteMessage(websocket.TextMessage, []byte(msg)); err != nil {
if err!=nil{
if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
log.Println(err)
goto ERR
}
log.Println(err)
goto ERR
}

goto ERR
}
//error的标签
ERR:


Ładowanie…
Anuluj
Zapisz