Author | SHA1 | Message | Date |
---|---|---|---|
|
2fc72a688e | feat:v1.0.5 | 4 years ago |
|
38f9ee907e | feat:更为正式环境 | 4 years ago |
|
d31af8958e | fixbug | 4 years ago |
|
6ea48b2551 | feat;增加消息通知接口 | 4 years ago |
|
57f72096ab |
feat:完成保存即协同
1.TODO init userId参数替换 |
4 years ago |
@@ -24,6 +24,12 @@ var ServerIpfsUrl = "10.240.10.238:5001" | |||
var UpdaterName ="LOCKING更新.exe" | |||
var EtcdUrl="127.0.0.1:2379" | |||
var ServerUrl = "http://www.lockingos.org:9000" | |||
var NsqAddr = "www.lockingos.org:4150" | |||
var NsqTopic = "locking-topic-dev" | |||
var NsqChanelPrefix = "channel-userId-" | |||
var GobalWatch *fsnotify.Watcher | |||
//全局的chanel map | |||
@@ -31,6 +37,8 @@ var GobalWatchChannelMap = make(map[string] chan string) | |||
//var EtcdUrl="127.0.0.1:2379" | |||
func InitConfig(){ | |||
tszdir :=os.Getenv("TSZDIR") | |||
@@ -1,7 +1,6 @@ | |||
package main | |||
import ( | |||
"fmt" | |||
_ "fmt" | |||
"fts/config" | |||
"fts/websocket" | |||
@@ -55,7 +54,7 @@ func main() { | |||
select { | |||
case ev := <-config.GobalWatch.Events: | |||
{ | |||
log.Println(ev.Op.String()+":"+ev.Name) | |||
//log.Println(ev.Op.String()+":"+ev.Name) | |||
if filepath.Ext(ev.Name)==".commit"{ | |||
continue | |||
} | |||
@@ -64,29 +63,29 @@ func main() { | |||
} | |||
if ev.Op&fsnotify.Create == fsnotify.Create { | |||
fmt.Println("创建文件 : ", ev.Name); | |||
//fmt.Println("创建文件 : ", ev.Name); | |||
//这里获取新创建文件的信息,如果是目录,则加入监控中 | |||
fi, err := os.Stat(ev.Name); | |||
if err == nil && fi.IsDir() { | |||
config.GobalWatch.Add(ev.Name); | |||
fmt.Println("添加监控 : ", ev.Name); | |||
//fmt.Println("添加监控 : ", ev.Name); | |||
}else{ | |||
filePath :=ev.Name | |||
abs := strings.Replace(filePath,config.LocalWorkSpaceDir,"",1) | |||
param :=strings.Split(abs,"\\") | |||
key :=config.LocalWorkSpaceDir+param[0]+"\\"+param[1] | |||
log.Println("chan-->"+key) | |||
//log.Println("chan-->"+key) | |||
ch :=config.GobalWatchChannelMap[key] | |||
if ch==nil{ | |||
continue | |||
} | |||
log.Println("chan-->"+("create"+";"+ev.Name)) | |||
//log.Println("chan-->"+("create"+";"+ev.Name)) | |||
ch <- ("create"+";"+ev.Name) | |||
} | |||
} | |||
if ev.Op&fsnotify.Write == fsnotify.Write { | |||
fmt.Println("写入文件 : ", ev.Name); | |||
//fmt.Println("写入文件 : ", ev.Name); | |||
//判断文件,发送事件 | |||
fi, err := os.Stat(ev.Name); | |||
if err == nil && !fi.IsDir() { | |||
@@ -94,23 +93,23 @@ func main() { | |||
abs := strings.Replace(filePath,config.LocalWorkSpaceDir,"",1) | |||
param :=strings.Split(abs,"\\") | |||
key :=config.LocalWorkSpaceDir+param[0]+"\\"+param[1] | |||
log.Println("chan-->"+key) | |||
//log.Println("chan-->"+key) | |||
ch :=config.GobalWatchChannelMap[key] | |||
if ch==nil{ | |||
continue | |||
} | |||
log.Println("chan-->"+("write"+";"+ev.Name)) | |||
log.Println(ch) | |||
//log.Println("chan-->"+("write"+";"+ev.Name)) | |||
//log.Println(ch) | |||
ch <- ("write"+";"+ev.Name) | |||
} | |||
} | |||
if ev.Op&fsnotify.Remove == fsnotify.Remove { | |||
fmt.Println("删除文件 : ", ev.Name); | |||
//fmt.Println("删除文件 : ", ev.Name); | |||
//如果删除文件是目录,则移除监控 | |||
fi, err := os.Stat(ev.Name); | |||
if err == nil && fi.IsDir() { | |||
config.GobalWatch.Remove(ev.Name); | |||
fmt.Println("删除监控 : ", ev.Name); | |||
//fmt.Println("删除监控 : ", ev.Name); | |||
}else{ | |||
filePath :=ev.Name | |||
abs := strings.Replace(filePath,config.LocalWorkSpaceDir,"",1) | |||
@@ -120,12 +119,12 @@ func main() { | |||
if ch==nil{ | |||
continue | |||
} | |||
log.Println("chan-->"+("remove"+";"+ev.Name)) | |||
//log.Println("chan-->"+("remove"+";"+ev.Name)) | |||
ch <- ("remove"+";"+ev.Name) | |||
} | |||
} | |||
if ev.Op&fsnotify.Rename == fsnotify.Rename { | |||
fmt.Println("重命名文件 : ", ev.Name); | |||
//fmt.Println("重命名文件 : ", ev.Name); | |||
//如果重命名文件是目录,则移除监控 | |||
//注意这里无法使用os.Stat来判断是否是目录了 | |||
//因为重命名后,go已经无法找到原文件来获取信息了 | |||
@@ -133,12 +132,12 @@ func main() { | |||
config.GobalWatch.Remove(ev.Name); | |||
} | |||
if ev.Op&fsnotify.Chmod == fsnotify.Chmod { | |||
fmt.Println("修改权限 : ", ev.Name); | |||
//fmt.Println("修改权限 : ", ev.Name); | |||
} | |||
} | |||
case err := <-config.GobalWatch.Errors: | |||
{ | |||
fmt.Println("error : ", err); | |||
log.Printf("error : %v", err); | |||
return; | |||
} | |||
} | |||
@@ -181,6 +180,10 @@ func main() { | |||
http.HandleFunc("/initClientConfig", websocket.InitClientConfigHandler) | |||
//http.HandleFunc("/watchFile", websocket.WatchFileHandler) | |||
http.HandleFunc("/keeplive", websocket.KeepliveHandler) | |||
http.HandleFunc("/messageNotify", websocket.MessageNotifyHandler) | |||
http.HandleFunc("/messageMarkRead", websocket.MessageMarkReadHandler) | |||
//TODO 消息已读 | |||
http.HandleFunc("/queryCommitHistory", websocket.QueryCommitHistoryHandler) | |||
http.HandleFunc("/editCommitHistoryMilestone", websocket.EditCommitHistoryMilestoneHandler) | |||
@@ -0,0 +1,47 @@ | |||
package nsqclient | |||
import ( | |||
"github.com/nsqio/go-nsq" | |||
"log" | |||
) | |||
/** | |||
Locking通知消息 | |||
*/ | |||
type LockingMsg struct{ | |||
Id int64 `json:"id,string"` | |||
UserIds []int64 `json:"userIds"` | |||
PushChannel int `json:"pushChannel"` | |||
Title string `json:"title"` | |||
Body string `json:"body"` | |||
Type string `json:"type"` | |||
PushTimeUnix int64 `json:"pushTimeUnix,string"` | |||
Parameter map[string] string `json:"parameter"` | |||
} | |||
var MsgQueue = make(chan string,100000) | |||
type NewHandler struct{} | |||
func (m *NewHandler) HandleMessage(msg *nsq.Message) (err error) { | |||
//addr := msg.NSQDAddress | |||
message := string(msg.Body) | |||
log.Println(message) | |||
MsgQueue <- message | |||
return | |||
} | |||
func Consumers(topic, channel, addr string) { | |||
conf := nsq.NewConfig() | |||
new_consumer, err := nsq.NewConsumer(topic, channel, conf) | |||
if err != nil { | |||
} | |||
// 接收消息 | |||
new_handler := &NewHandler{} | |||
new_consumer.AddHandler(new_handler) | |||
err = new_consumer.ConnectToNSQD(addr) | |||
if err != nil { | |||
} | |||
} |
@@ -0,0 +1,30 @@ | |||
package nsqclient | |||
import ( | |||
"fmt" | |||
"github.com/nsqio/go-nsq" | |||
"time" | |||
) | |||
func main() { | |||
nsqAddr := "www.lockingos.org:4150" | |||
conf :=nsq.NewConfig() | |||
p ,err := nsq.NewProducer(nsqAddr,conf) | |||
if err != nil { | |||
fmt.Println(err) | |||
return | |||
} | |||
for { | |||
message := "message :"+ time.Now().Format("2006-01-02 15:04:05") | |||
fmt.Println(message) | |||
// 发送消息 | |||
p.Publish("topic-demo1",[]byte(message)) | |||
time.Sleep(2*time.Second) | |||
} | |||
} |
@@ -42,7 +42,7 @@ func UploadHandler(w http.ResponseWriter, r *http.Request) { | |||
goto ERR | |||
} | |||
//发送数据,判断返回值是否报错 | |||
log.Println("param uploadHandler:"+string(data)) | |||
//log.Println("param uploadHandler:"+string(data)) | |||
params :=strings.Split(string(data),"|") | |||
mil,_:=strconv.ParseBool(params[7]) | |||
@@ -81,7 +81,7 @@ func DownloadHandler(w http.ResponseWriter, r *http.Request) { | |||
goto ERR | |||
} | |||
//发送数据,判断返回值是否报错 | |||
log.Println("param downloadHandler:"+string(data)) | |||
//log.Println("param downloadHandler:"+string(data)) | |||
params :=strings.Split(string(data),"|") | |||
err := handle.DownCommand(conn,params[0],params[1],params[2],params[3]) | |||
@@ -120,10 +120,11 @@ func InitLocalWorkSpaceHandler(w http.ResponseWriter, r *http.Request) { | |||
goto ERR | |||
} | |||
//发送数据,判断返回值是否报错 | |||
log.Println("param initLocalWorkSpaceHandler:"+string(data)) | |||
//log.Println("param initLocalWorkSpaceHandler:"+string(data)) | |||
params :=strings.Split(string(data),"|") | |||
err := handle.InitLocalWorkSpace(conn,params[0],params[1]) | |||
//userId = "367294106252087297" | |||
err := handle.InitLocalWorkSpace(conn,params[0],params[1],params[2]) | |||
if err!=nil{ | |||
log.Println(err) | |||
goto ERR | |||
@@ -159,7 +160,7 @@ func OpenFileWithHandler(w http.ResponseWriter, r *http.Request) { | |||
goto ERR | |||
} | |||
//发送数据,判断返回值是否报错 | |||
log.Println("param OpenFileWithHandler:"+string(data)) | |||
//log.Println("param OpenFileWithHandler:"+string(data)) | |||
err := handle.OpenFileWith(string(data)) | |||
if err!=nil{ | |||
@@ -201,7 +202,7 @@ func CheckForUpdatesHandler(w http.ResponseWriter, r *http.Request) { | |||
goto ERR | |||
} | |||
//发送数据,判断返回值是否报错 | |||
log.Println("param CheckForUpdatesHandler:"+string(data)) | |||
//log.Println("param CheckForUpdatesHandler:"+string(data)) | |||
err := handle.CheckForUpdates(string(data)) | |||
if err!=nil{ | |||
@@ -243,7 +244,7 @@ func InitClientConfigHandler(w http.ResponseWriter, r *http.Request) { | |||
goto ERR | |||
} | |||
//发送数据,判断返回值是否报错 | |||
log.Println("param InitClientConfigHandler:"+string(data)) | |||
//log.Println("param InitClientConfigHandler:"+string(data)) | |||
params :=strings.Split(string(data),"|") | |||
err := handle.InitClientConfig(params[0],params[1]) | |||
if err!=nil{ | |||
@@ -285,7 +286,7 @@ func GetFolderFileInfoHandler(w http.ResponseWriter, r *http.Request) { | |||
goto ERR | |||
} | |||
//发送数据,判断返回值是否报错 | |||
log.Println("param GetFolderFileInfo:"+string(data)) | |||
//log.Println("param GetFolderFileInfo:"+string(data)) | |||
err := handle.GetFolderFileInfo(conn,string(data)) | |||
if err!=nil{ | |||
@@ -323,7 +324,7 @@ func EditCommitHistoryMilestoneHandler(w http.ResponseWriter, r *http.Request) { | |||
goto ERR | |||
} | |||
//发送数据,判断返回值是否报错 | |||
log.Println("param EditCommitHistoryMilestoneHandler:"+string(data)) | |||
//log.Println("param EditCommitHistoryMilestoneHandler:"+string(data)) | |||
params := strings.Split(string(data),"|") | |||
mil,_:=strconv.ParseBool(params[3]) | |||
hash,err := handle.EditCommitHistoryMilestoneHandler(params[0],params[1],params[2],mil) | |||
@@ -370,7 +371,7 @@ func SubscriptionFileChangeHandler(w http.ResponseWriter, r *http.Request){ | |||
goto ERR | |||
} | |||
//发送数据,判断返回值是否报错 | |||
log.Println("param subscriptionFileChangeHandler:"+string(data)) | |||
//log.Println("param subscriptionFileChangeHandler:"+string(data)) | |||
err = handle.SubscriptionFileChange(conn,string(data)) | |||
if err != nil { | |||
@@ -422,6 +423,91 @@ ERR: | |||
} | |||
//消息通知 | |||
func MessageNotifyHandler(w http.ResponseWriter, r *http.Request){ | |||
//w.Write([]byte("hello")) | |||
//收到http请求(upgrade),完成websocket协议转换 | |||
//在应答的header中放上upgrade:websoket | |||
var ( | |||
conn *websocket.Conn | |||
err error | |||
//msgType int | |||
data []byte | |||
) | |||
if conn, err = upgrader.Upgrade(w, r, nil); err !=nil { | |||
//报错了,直接返回底层的websocket链接就会终断掉 | |||
return | |||
} | |||
//得到了websocket.Conn长连接的对象,实现数据的收发 | |||
for { | |||
//Text(json), Binary | |||
//if _, data, err = conn.ReadMessage(); err != nil { | |||
if _, data, err = conn.ReadMessage(); err != nil { | |||
//报错关闭websocket | |||
goto ERR | |||
} | |||
//发送数据,判断返回值是否报错 | |||
//log.Println("param initLocalWorkSpaceHandler:"+string(data)) | |||
err := handle.MessageNotify(conn,string(data)) | |||
if err!=nil{ | |||
log.Println(err) | |||
goto ERR | |||
} | |||
goto ERR | |||
} | |||
//error的标签 | |||
ERR: | |||
conn.Close() | |||
} | |||
//消息已读 | |||
func MessageMarkReadHandler(w http.ResponseWriter, r *http.Request){ | |||
//w.Write([]byte("hello")) | |||
//收到http请求(upgrade),完成websocket协议转换 | |||
//在应答的header中放上upgrade:websoket | |||
var ( | |||
conn *websocket.Conn | |||
err error | |||
//msgType int | |||
data []byte | |||
) | |||
if conn, err = upgrader.Upgrade(w, r, nil); err !=nil { | |||
//报错了,直接返回底层的websocket链接就会终断掉 | |||
return | |||
} | |||
//得到了websocket.Conn长连接的对象,实现数据的收发 | |||
for { | |||
//Text(json), Binary | |||
//if _, data, err = conn.ReadMessage(); err != nil { | |||
if _, data, err = conn.ReadMessage(); err != nil { | |||
//报错关闭websocket | |||
goto ERR | |||
} | |||
//发送数据,判断返回值是否报错 | |||
//log.Println("param initLocalWorkSpaceHandler:"+string(data)) | |||
params := strings.Split(string(data),"|") | |||
err := handle.MessageMarkReadHandler(conn,params[0],params[1]) | |||
if err!=nil{ | |||
if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil { | |||
log.Println(err) | |||
goto ERR | |||
} | |||
log.Println(err) | |||
goto ERR | |||
} | |||
goto ERR | |||
} | |||
//error的标签 | |||
ERR: | |||
conn.Close() | |||
} | |||
//查询历史版本记录 | |||
func QueryCommitHistoryHandler(w http.ResponseWriter, r *http.Request) { | |||
//w.Write([]byte("hello")) | |||
@@ -448,7 +534,7 @@ func QueryCommitHistoryHandler(w http.ResponseWriter, r *http.Request) { | |||
//发送数据,判断返回值是否报错 | |||
//eg: 文件路径|历史版本文件Hash | |||
dataString := string(data) | |||
log.Println("param QueryCommitHistoryHandler:"+dataString) | |||
//log.Println("param QueryCommitHistoryHandler:"+dataString) | |||
params := strings.Split(dataString,"|") | |||
dataList,err := handle.QueryCommitHistory(params[0],params[1]) | |||