Autor | SHA1 | Wiadomość | Data |
---|---|---|---|
|
2fc72a688e | feat:v1.0.5 | 4 lat temu |
|
38f9ee907e | feat:更为正式环境 | 4 lat temu |
|
d31af8958e | fixbug | 4 lat temu |
|
6ea48b2551 | feat;增加消息通知接口 | 4 lat temu |
|
57f72096ab |
feat:完成保存即协同
1.TODO init userId参数替换 |
4 lat temu |
@@ -24,6 +24,12 @@ var ServerIpfsUrl = "10.240.10.238:5001" | |||||
var UpdaterName ="LOCKING更新.exe" | var UpdaterName ="LOCKING更新.exe" | ||||
var EtcdUrl="127.0.0.1:2379" | var EtcdUrl="127.0.0.1:2379" | ||||
var ServerUrl = "http://www.lockingos.org:9000" | |||||
var NsqAddr = "www.lockingos.org:4150" | |||||
var NsqTopic = "locking-topic-dev" | |||||
var NsqChanelPrefix = "channel-userId-" | |||||
var GobalWatch *fsnotify.Watcher | var GobalWatch *fsnotify.Watcher | ||||
//全局的chanel map | //全局的chanel map | ||||
@@ -31,6 +37,8 @@ var GobalWatchChannelMap = make(map[string] chan string) | |||||
//var EtcdUrl="127.0.0.1:2379" | //var EtcdUrl="127.0.0.1:2379" | ||||
func InitConfig(){ | func InitConfig(){ | ||||
tszdir :=os.Getenv("TSZDIR") | tszdir :=os.Getenv("TSZDIR") | ||||
@@ -1,7 +1,6 @@ | |||||
package main | package main | ||||
import ( | import ( | ||||
"fmt" | |||||
_ "fmt" | _ "fmt" | ||||
"fts/config" | "fts/config" | ||||
"fts/websocket" | "fts/websocket" | ||||
@@ -55,7 +54,7 @@ func main() { | |||||
select { | select { | ||||
case ev := <-config.GobalWatch.Events: | case ev := <-config.GobalWatch.Events: | ||||
{ | { | ||||
log.Println(ev.Op.String()+":"+ev.Name) | |||||
//log.Println(ev.Op.String()+":"+ev.Name) | |||||
if filepath.Ext(ev.Name)==".commit"{ | if filepath.Ext(ev.Name)==".commit"{ | ||||
continue | continue | ||||
} | } | ||||
@@ -64,29 +63,29 @@ func main() { | |||||
} | } | ||||
if ev.Op&fsnotify.Create == fsnotify.Create { | if ev.Op&fsnotify.Create == fsnotify.Create { | ||||
fmt.Println("创建文件 : ", ev.Name); | |||||
//fmt.Println("创建文件 : ", ev.Name); | |||||
//这里获取新创建文件的信息,如果是目录,则加入监控中 | //这里获取新创建文件的信息,如果是目录,则加入监控中 | ||||
fi, err := os.Stat(ev.Name); | fi, err := os.Stat(ev.Name); | ||||
if err == nil && fi.IsDir() { | if err == nil && fi.IsDir() { | ||||
config.GobalWatch.Add(ev.Name); | config.GobalWatch.Add(ev.Name); | ||||
fmt.Println("添加监控 : ", ev.Name); | |||||
//fmt.Println("添加监控 : ", ev.Name); | |||||
}else{ | }else{ | ||||
filePath :=ev.Name | filePath :=ev.Name | ||||
abs := strings.Replace(filePath,config.LocalWorkSpaceDir,"",1) | abs := strings.Replace(filePath,config.LocalWorkSpaceDir,"",1) | ||||
param :=strings.Split(abs,"\\") | param :=strings.Split(abs,"\\") | ||||
key :=config.LocalWorkSpaceDir+param[0]+"\\"+param[1] | key :=config.LocalWorkSpaceDir+param[0]+"\\"+param[1] | ||||
log.Println("chan-->"+key) | |||||
//log.Println("chan-->"+key) | |||||
ch :=config.GobalWatchChannelMap[key] | ch :=config.GobalWatchChannelMap[key] | ||||
if ch==nil{ | if ch==nil{ | ||||
continue | continue | ||||
} | } | ||||
log.Println("chan-->"+("create"+";"+ev.Name)) | |||||
//log.Println("chan-->"+("create"+";"+ev.Name)) | |||||
ch <- ("create"+";"+ev.Name) | ch <- ("create"+";"+ev.Name) | ||||
} | } | ||||
} | } | ||||
if ev.Op&fsnotify.Write == fsnotify.Write { | if ev.Op&fsnotify.Write == fsnotify.Write { | ||||
fmt.Println("写入文件 : ", ev.Name); | |||||
//fmt.Println("写入文件 : ", ev.Name); | |||||
//判断文件,发送事件 | //判断文件,发送事件 | ||||
fi, err := os.Stat(ev.Name); | fi, err := os.Stat(ev.Name); | ||||
if err == nil && !fi.IsDir() { | if err == nil && !fi.IsDir() { | ||||
@@ -94,23 +93,23 @@ func main() { | |||||
abs := strings.Replace(filePath,config.LocalWorkSpaceDir,"",1) | abs := strings.Replace(filePath,config.LocalWorkSpaceDir,"",1) | ||||
param :=strings.Split(abs,"\\") | param :=strings.Split(abs,"\\") | ||||
key :=config.LocalWorkSpaceDir+param[0]+"\\"+param[1] | key :=config.LocalWorkSpaceDir+param[0]+"\\"+param[1] | ||||
log.Println("chan-->"+key) | |||||
//log.Println("chan-->"+key) | |||||
ch :=config.GobalWatchChannelMap[key] | ch :=config.GobalWatchChannelMap[key] | ||||
if ch==nil{ | if ch==nil{ | ||||
continue | continue | ||||
} | } | ||||
log.Println("chan-->"+("write"+";"+ev.Name)) | |||||
log.Println(ch) | |||||
//log.Println("chan-->"+("write"+";"+ev.Name)) | |||||
//log.Println(ch) | |||||
ch <- ("write"+";"+ev.Name) | ch <- ("write"+";"+ev.Name) | ||||
} | } | ||||
} | } | ||||
if ev.Op&fsnotify.Remove == fsnotify.Remove { | if ev.Op&fsnotify.Remove == fsnotify.Remove { | ||||
fmt.Println("删除文件 : ", ev.Name); | |||||
//fmt.Println("删除文件 : ", ev.Name); | |||||
//如果删除文件是目录,则移除监控 | //如果删除文件是目录,则移除监控 | ||||
fi, err := os.Stat(ev.Name); | fi, err := os.Stat(ev.Name); | ||||
if err == nil && fi.IsDir() { | if err == nil && fi.IsDir() { | ||||
config.GobalWatch.Remove(ev.Name); | config.GobalWatch.Remove(ev.Name); | ||||
fmt.Println("删除监控 : ", ev.Name); | |||||
//fmt.Println("删除监控 : ", ev.Name); | |||||
}else{ | }else{ | ||||
filePath :=ev.Name | filePath :=ev.Name | ||||
abs := strings.Replace(filePath,config.LocalWorkSpaceDir,"",1) | abs := strings.Replace(filePath,config.LocalWorkSpaceDir,"",1) | ||||
@@ -120,12 +119,12 @@ func main() { | |||||
if ch==nil{ | if ch==nil{ | ||||
continue | continue | ||||
} | } | ||||
log.Println("chan-->"+("remove"+";"+ev.Name)) | |||||
//log.Println("chan-->"+("remove"+";"+ev.Name)) | |||||
ch <- ("remove"+";"+ev.Name) | ch <- ("remove"+";"+ev.Name) | ||||
} | } | ||||
} | } | ||||
if ev.Op&fsnotify.Rename == fsnotify.Rename { | if ev.Op&fsnotify.Rename == fsnotify.Rename { | ||||
fmt.Println("重命名文件 : ", ev.Name); | |||||
//fmt.Println("重命名文件 : ", ev.Name); | |||||
//如果重命名文件是目录,则移除监控 | //如果重命名文件是目录,则移除监控 | ||||
//注意这里无法使用os.Stat来判断是否是目录了 | //注意这里无法使用os.Stat来判断是否是目录了 | ||||
//因为重命名后,go已经无法找到原文件来获取信息了 | //因为重命名后,go已经无法找到原文件来获取信息了 | ||||
@@ -133,12 +132,12 @@ func main() { | |||||
config.GobalWatch.Remove(ev.Name); | config.GobalWatch.Remove(ev.Name); | ||||
} | } | ||||
if ev.Op&fsnotify.Chmod == fsnotify.Chmod { | if ev.Op&fsnotify.Chmod == fsnotify.Chmod { | ||||
fmt.Println("修改权限 : ", ev.Name); | |||||
//fmt.Println("修改权限 : ", ev.Name); | |||||
} | } | ||||
} | } | ||||
case err := <-config.GobalWatch.Errors: | case err := <-config.GobalWatch.Errors: | ||||
{ | { | ||||
fmt.Println("error : ", err); | |||||
log.Printf("error : %v", err); | |||||
return; | return; | ||||
} | } | ||||
} | } | ||||
@@ -181,6 +180,10 @@ func main() { | |||||
http.HandleFunc("/initClientConfig", websocket.InitClientConfigHandler) | http.HandleFunc("/initClientConfig", websocket.InitClientConfigHandler) | ||||
//http.HandleFunc("/watchFile", websocket.WatchFileHandler) | //http.HandleFunc("/watchFile", websocket.WatchFileHandler) | ||||
http.HandleFunc("/keeplive", websocket.KeepliveHandler) | http.HandleFunc("/keeplive", websocket.KeepliveHandler) | ||||
http.HandleFunc("/messageNotify", websocket.MessageNotifyHandler) | |||||
http.HandleFunc("/messageMarkRead", websocket.MessageMarkReadHandler) | |||||
//TODO 消息已读 | |||||
http.HandleFunc("/queryCommitHistory", websocket.QueryCommitHistoryHandler) | http.HandleFunc("/queryCommitHistory", websocket.QueryCommitHistoryHandler) | ||||
http.HandleFunc("/editCommitHistoryMilestone", websocket.EditCommitHistoryMilestoneHandler) | http.HandleFunc("/editCommitHistoryMilestone", websocket.EditCommitHistoryMilestoneHandler) | ||||
@@ -0,0 +1,47 @@ | |||||
package nsqclient | |||||
import ( | |||||
"github.com/nsqio/go-nsq" | |||||
"log" | |||||
) | |||||
/** | |||||
Locking通知消息 | |||||
*/ | |||||
type LockingMsg struct{ | |||||
Id int64 `json:"id,string"` | |||||
UserIds []int64 `json:"userIds"` | |||||
PushChannel int `json:"pushChannel"` | |||||
Title string `json:"title"` | |||||
Body string `json:"body"` | |||||
Type string `json:"type"` | |||||
PushTimeUnix int64 `json:"pushTimeUnix,string"` | |||||
Parameter map[string] string `json:"parameter"` | |||||
} | |||||
var MsgQueue = make(chan string,100000) | |||||
type NewHandler struct{} | |||||
func (m *NewHandler) HandleMessage(msg *nsq.Message) (err error) { | |||||
//addr := msg.NSQDAddress | |||||
message := string(msg.Body) | |||||
log.Println(message) | |||||
MsgQueue <- message | |||||
return | |||||
} | |||||
func Consumers(topic, channel, addr string) { | |||||
conf := nsq.NewConfig() | |||||
new_consumer, err := nsq.NewConsumer(topic, channel, conf) | |||||
if err != nil { | |||||
} | |||||
// 接收消息 | |||||
new_handler := &NewHandler{} | |||||
new_consumer.AddHandler(new_handler) | |||||
err = new_consumer.ConnectToNSQD(addr) | |||||
if err != nil { | |||||
} | |||||
} |
@@ -0,0 +1,30 @@ | |||||
package nsqclient | |||||
import ( | |||||
"fmt" | |||||
"github.com/nsqio/go-nsq" | |||||
"time" | |||||
) | |||||
func main() { | |||||
nsqAddr := "www.lockingos.org:4150" | |||||
conf :=nsq.NewConfig() | |||||
p ,err := nsq.NewProducer(nsqAddr,conf) | |||||
if err != nil { | |||||
fmt.Println(err) | |||||
return | |||||
} | |||||
for { | |||||
message := "message :"+ time.Now().Format("2006-01-02 15:04:05") | |||||
fmt.Println(message) | |||||
// 发送消息 | |||||
p.Publish("topic-demo1",[]byte(message)) | |||||
time.Sleep(2*time.Second) | |||||
} | |||||
} |
@@ -42,7 +42,7 @@ func UploadHandler(w http.ResponseWriter, r *http.Request) { | |||||
goto ERR | goto ERR | ||||
} | } | ||||
//发送数据,判断返回值是否报错 | //发送数据,判断返回值是否报错 | ||||
log.Println("param uploadHandler:"+string(data)) | |||||
//log.Println("param uploadHandler:"+string(data)) | |||||
params :=strings.Split(string(data),"|") | params :=strings.Split(string(data),"|") | ||||
mil,_:=strconv.ParseBool(params[7]) | mil,_:=strconv.ParseBool(params[7]) | ||||
@@ -81,7 +81,7 @@ func DownloadHandler(w http.ResponseWriter, r *http.Request) { | |||||
goto ERR | goto ERR | ||||
} | } | ||||
//发送数据,判断返回值是否报错 | //发送数据,判断返回值是否报错 | ||||
log.Println("param downloadHandler:"+string(data)) | |||||
//log.Println("param downloadHandler:"+string(data)) | |||||
params :=strings.Split(string(data),"|") | params :=strings.Split(string(data),"|") | ||||
err := handle.DownCommand(conn,params[0],params[1],params[2],params[3]) | err := handle.DownCommand(conn,params[0],params[1],params[2],params[3]) | ||||
@@ -120,10 +120,11 @@ func InitLocalWorkSpaceHandler(w http.ResponseWriter, r *http.Request) { | |||||
goto ERR | goto ERR | ||||
} | } | ||||
//发送数据,判断返回值是否报错 | //发送数据,判断返回值是否报错 | ||||
log.Println("param initLocalWorkSpaceHandler:"+string(data)) | |||||
//log.Println("param initLocalWorkSpaceHandler:"+string(data)) | |||||
params :=strings.Split(string(data),"|") | params :=strings.Split(string(data),"|") | ||||
err := handle.InitLocalWorkSpace(conn,params[0],params[1]) | |||||
//userId = "367294106252087297" | |||||
err := handle.InitLocalWorkSpace(conn,params[0],params[1],params[2]) | |||||
if err!=nil{ | if err!=nil{ | ||||
log.Println(err) | log.Println(err) | ||||
goto ERR | goto ERR | ||||
@@ -159,7 +160,7 @@ func OpenFileWithHandler(w http.ResponseWriter, r *http.Request) { | |||||
goto ERR | goto ERR | ||||
} | } | ||||
//发送数据,判断返回值是否报错 | //发送数据,判断返回值是否报错 | ||||
log.Println("param OpenFileWithHandler:"+string(data)) | |||||
//log.Println("param OpenFileWithHandler:"+string(data)) | |||||
err := handle.OpenFileWith(string(data)) | err := handle.OpenFileWith(string(data)) | ||||
if err!=nil{ | if err!=nil{ | ||||
@@ -201,7 +202,7 @@ func CheckForUpdatesHandler(w http.ResponseWriter, r *http.Request) { | |||||
goto ERR | goto ERR | ||||
} | } | ||||
//发送数据,判断返回值是否报错 | //发送数据,判断返回值是否报错 | ||||
log.Println("param CheckForUpdatesHandler:"+string(data)) | |||||
//log.Println("param CheckForUpdatesHandler:"+string(data)) | |||||
err := handle.CheckForUpdates(string(data)) | err := handle.CheckForUpdates(string(data)) | ||||
if err!=nil{ | if err!=nil{ | ||||
@@ -243,7 +244,7 @@ func InitClientConfigHandler(w http.ResponseWriter, r *http.Request) { | |||||
goto ERR | goto ERR | ||||
} | } | ||||
//发送数据,判断返回值是否报错 | //发送数据,判断返回值是否报错 | ||||
log.Println("param InitClientConfigHandler:"+string(data)) | |||||
//log.Println("param InitClientConfigHandler:"+string(data)) | |||||
params :=strings.Split(string(data),"|") | params :=strings.Split(string(data),"|") | ||||
err := handle.InitClientConfig(params[0],params[1]) | err := handle.InitClientConfig(params[0],params[1]) | ||||
if err!=nil{ | if err!=nil{ | ||||
@@ -285,7 +286,7 @@ func GetFolderFileInfoHandler(w http.ResponseWriter, r *http.Request) { | |||||
goto ERR | goto ERR | ||||
} | } | ||||
//发送数据,判断返回值是否报错 | //发送数据,判断返回值是否报错 | ||||
log.Println("param GetFolderFileInfo:"+string(data)) | |||||
//log.Println("param GetFolderFileInfo:"+string(data)) | |||||
err := handle.GetFolderFileInfo(conn,string(data)) | err := handle.GetFolderFileInfo(conn,string(data)) | ||||
if err!=nil{ | if err!=nil{ | ||||
@@ -323,7 +324,7 @@ func EditCommitHistoryMilestoneHandler(w http.ResponseWriter, r *http.Request) { | |||||
goto ERR | goto ERR | ||||
} | } | ||||
//发送数据,判断返回值是否报错 | //发送数据,判断返回值是否报错 | ||||
log.Println("param EditCommitHistoryMilestoneHandler:"+string(data)) | |||||
//log.Println("param EditCommitHistoryMilestoneHandler:"+string(data)) | |||||
params := strings.Split(string(data),"|") | params := strings.Split(string(data),"|") | ||||
mil,_:=strconv.ParseBool(params[3]) | mil,_:=strconv.ParseBool(params[3]) | ||||
hash,err := handle.EditCommitHistoryMilestoneHandler(params[0],params[1],params[2],mil) | hash,err := handle.EditCommitHistoryMilestoneHandler(params[0],params[1],params[2],mil) | ||||
@@ -370,7 +371,7 @@ func SubscriptionFileChangeHandler(w http.ResponseWriter, r *http.Request){ | |||||
goto ERR | goto ERR | ||||
} | } | ||||
//发送数据,判断返回值是否报错 | //发送数据,判断返回值是否报错 | ||||
log.Println("param subscriptionFileChangeHandler:"+string(data)) | |||||
//log.Println("param subscriptionFileChangeHandler:"+string(data)) | |||||
err = handle.SubscriptionFileChange(conn,string(data)) | err = handle.SubscriptionFileChange(conn,string(data)) | ||||
if err != nil { | if err != nil { | ||||
@@ -422,6 +423,91 @@ ERR: | |||||
} | } | ||||
//消息通知 | |||||
func MessageNotifyHandler(w http.ResponseWriter, r *http.Request){ | |||||
//w.Write([]byte("hello")) | |||||
//收到http请求(upgrade),完成websocket协议转换 | |||||
//在应答的header中放上upgrade:websoket | |||||
var ( | |||||
conn *websocket.Conn | |||||
err error | |||||
//msgType int | |||||
data []byte | |||||
) | |||||
if conn, err = upgrader.Upgrade(w, r, nil); err !=nil { | |||||
//报错了,直接返回底层的websocket链接就会终断掉 | |||||
return | |||||
} | |||||
//得到了websocket.Conn长连接的对象,实现数据的收发 | |||||
for { | |||||
//Text(json), Binary | |||||
//if _, data, err = conn.ReadMessage(); err != nil { | |||||
if _, data, err = conn.ReadMessage(); err != nil { | |||||
//报错关闭websocket | |||||
goto ERR | |||||
} | |||||
//发送数据,判断返回值是否报错 | |||||
//log.Println("param initLocalWorkSpaceHandler:"+string(data)) | |||||
err := handle.MessageNotify(conn,string(data)) | |||||
if err!=nil{ | |||||
log.Println(err) | |||||
goto ERR | |||||
} | |||||
goto ERR | |||||
} | |||||
//error的标签 | |||||
ERR: | |||||
conn.Close() | |||||
} | |||||
//消息已读 | |||||
func MessageMarkReadHandler(w http.ResponseWriter, r *http.Request){ | |||||
//w.Write([]byte("hello")) | |||||
//收到http请求(upgrade),完成websocket协议转换 | |||||
//在应答的header中放上upgrade:websoket | |||||
var ( | |||||
conn *websocket.Conn | |||||
err error | |||||
//msgType int | |||||
data []byte | |||||
) | |||||
if conn, err = upgrader.Upgrade(w, r, nil); err !=nil { | |||||
//报错了,直接返回底层的websocket链接就会终断掉 | |||||
return | |||||
} | |||||
//得到了websocket.Conn长连接的对象,实现数据的收发 | |||||
for { | |||||
//Text(json), Binary | |||||
//if _, data, err = conn.ReadMessage(); err != nil { | |||||
if _, data, err = conn.ReadMessage(); err != nil { | |||||
//报错关闭websocket | |||||
goto ERR | |||||
} | |||||
//发送数据,判断返回值是否报错 | |||||
//log.Println("param initLocalWorkSpaceHandler:"+string(data)) | |||||
params := strings.Split(string(data),"|") | |||||
err := handle.MessageMarkReadHandler(conn,params[0],params[1]) | |||||
if err!=nil{ | |||||
if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil { | |||||
log.Println(err) | |||||
goto ERR | |||||
} | |||||
log.Println(err) | |||||
goto ERR | |||||
} | |||||
goto ERR | |||||
} | |||||
//error的标签 | |||||
ERR: | |||||
conn.Close() | |||||
} | |||||
//查询历史版本记录 | //查询历史版本记录 | ||||
func QueryCommitHistoryHandler(w http.ResponseWriter, r *http.Request) { | func QueryCommitHistoryHandler(w http.ResponseWriter, r *http.Request) { | ||||
//w.Write([]byte("hello")) | //w.Write([]byte("hello")) | ||||
@@ -448,7 +534,7 @@ func QueryCommitHistoryHandler(w http.ResponseWriter, r *http.Request) { | |||||
//发送数据,判断返回值是否报错 | //发送数据,判断返回值是否报错 | ||||
//eg: 文件路径|历史版本文件Hash | //eg: 文件路径|历史版本文件Hash | ||||
dataString := string(data) | dataString := string(data) | ||||
log.Println("param QueryCommitHistoryHandler:"+dataString) | |||||
//log.Println("param QueryCommitHistoryHandler:"+dataString) | |||||
params := strings.Split(dataString,"|") | params := strings.Split(dataString,"|") | ||||
dataList,err := handle.QueryCommitHistory(params[0],params[1]) | dataList,err := handle.QueryCommitHistory(params[0],params[1]) | ||||