Compare commits

...

5 Commits

Author SHA1 Message Date
  yuan_rh 2fc72a688e feat:v1.0.5 4 years ago
  yuan_rh 38f9ee907e feat:更为正式环境 4 years ago
  yuan_rh d31af8958e fixbug 4 years ago
  yuan_rh 6ea48b2551 feat;增加消息通知接口 4 years ago
  yuan_rh 57f72096ab feat:完成保存即协同 4 years ago
6 changed files with 745 additions and 227 deletions
Split View
  1. +8
    -0
      config/config.go
  2. +544
    -200
      handle/handle.go
  3. +19
    -16
      main.go
  4. +47
    -0
      nsqclient/nsq_single_consumer.go
  5. +30
    -0
      nsqclient/nsq_single_product.go
  6. +97
    -11
      websocket/websocket.go

+ 8
- 0
config/config.go View File

@@ -24,6 +24,12 @@ var ServerIpfsUrl = "10.240.10.238:5001"
var UpdaterName ="LOCKING更新.exe"
var EtcdUrl="127.0.0.1:2379"

var ServerUrl = "http://www.lockingos.org:9000"

var NsqAddr = "www.lockingos.org:4150"
var NsqTopic = "locking-topic-dev"
var NsqChanelPrefix = "channel-userId-"

var GobalWatch *fsnotify.Watcher

//全局的chanel map
@@ -31,6 +37,8 @@ var GobalWatchChannelMap = make(map[string] chan string)
//var EtcdUrl="127.0.0.1:2379"




func InitConfig(){

tszdir :=os.Getenv("TSZDIR")


+ 544
- 200
handle/handle.go
File diff suppressed because it is too large
View File


+ 19
- 16
main.go View File

@@ -1,7 +1,6 @@
package main

import (
"fmt"
_ "fmt"
"fts/config"
"fts/websocket"
@@ -55,7 +54,7 @@ func main() {
select {
case ev := <-config.GobalWatch.Events:
{
log.Println(ev.Op.String()+":"+ev.Name)
//log.Println(ev.Op.String()+":"+ev.Name)
if filepath.Ext(ev.Name)==".commit"{
continue
}
@@ -64,29 +63,29 @@ func main() {
}

if ev.Op&fsnotify.Create == fsnotify.Create {
fmt.Println("创建文件 : ", ev.Name);
//fmt.Println("创建文件 : ", ev.Name);
//这里获取新创建文件的信息,如果是目录,则加入监控中
fi, err := os.Stat(ev.Name);
if err == nil && fi.IsDir() {
config.GobalWatch.Add(ev.Name);
fmt.Println("添加监控 : ", ev.Name);
//fmt.Println("添加监控 : ", ev.Name);
}else{
filePath :=ev.Name
abs := strings.Replace(filePath,config.LocalWorkSpaceDir,"",1)
param :=strings.Split(abs,"\\")
key :=config.LocalWorkSpaceDir+param[0]+"\\"+param[1]
log.Println("chan-->"+key)
//log.Println("chan-->"+key)
ch :=config.GobalWatchChannelMap[key]
if ch==nil{
continue
}
log.Println("chan-->"+("create"+";"+ev.Name))
//log.Println("chan-->"+("create"+";"+ev.Name))
ch <- ("create"+";"+ev.Name)

}
}
if ev.Op&fsnotify.Write == fsnotify.Write {
fmt.Println("写入文件 : ", ev.Name);
//fmt.Println("写入文件 : ", ev.Name);
//判断文件,发送事件
fi, err := os.Stat(ev.Name);
if err == nil && !fi.IsDir() {
@@ -94,23 +93,23 @@ func main() {
abs := strings.Replace(filePath,config.LocalWorkSpaceDir,"",1)
param :=strings.Split(abs,"\\")
key :=config.LocalWorkSpaceDir+param[0]+"\\"+param[1]
log.Println("chan-->"+key)
//log.Println("chan-->"+key)
ch :=config.GobalWatchChannelMap[key]
if ch==nil{
continue
}
log.Println("chan-->"+("write"+";"+ev.Name))
log.Println(ch)
//log.Println("chan-->"+("write"+";"+ev.Name))
//log.Println(ch)
ch <- ("write"+";"+ev.Name)
}
}
if ev.Op&fsnotify.Remove == fsnotify.Remove {
fmt.Println("删除文件 : ", ev.Name);
//fmt.Println("删除文件 : ", ev.Name);
//如果删除文件是目录,则移除监控
fi, err := os.Stat(ev.Name);
if err == nil && fi.IsDir() {
config.GobalWatch.Remove(ev.Name);
fmt.Println("删除监控 : ", ev.Name);
//fmt.Println("删除监控 : ", ev.Name);
}else{
filePath :=ev.Name
abs := strings.Replace(filePath,config.LocalWorkSpaceDir,"",1)
@@ -120,12 +119,12 @@ func main() {
if ch==nil{
continue
}
log.Println("chan-->"+("remove"+";"+ev.Name))
//log.Println("chan-->"+("remove"+";"+ev.Name))
ch <- ("remove"+";"+ev.Name)
}
}
if ev.Op&fsnotify.Rename == fsnotify.Rename {
fmt.Println("重命名文件 : ", ev.Name);
//fmt.Println("重命名文件 : ", ev.Name);
//如果重命名文件是目录,则移除监控
//注意这里无法使用os.Stat来判断是否是目录了
//因为重命名后,go已经无法找到原文件来获取信息了
@@ -133,12 +132,12 @@ func main() {
config.GobalWatch.Remove(ev.Name);
}
if ev.Op&fsnotify.Chmod == fsnotify.Chmod {
fmt.Println("修改权限 : ", ev.Name);
//fmt.Println("修改权限 : ", ev.Name);
}
}
case err := <-config.GobalWatch.Errors:
{
fmt.Println("error : ", err);
log.Printf("error : %v", err);
return;
}
}
@@ -181,6 +180,10 @@ func main() {
http.HandleFunc("/initClientConfig", websocket.InitClientConfigHandler)
//http.HandleFunc("/watchFile", websocket.WatchFileHandler)
http.HandleFunc("/keeplive", websocket.KeepliveHandler)
http.HandleFunc("/messageNotify", websocket.MessageNotifyHandler)
http.HandleFunc("/messageMarkRead", websocket.MessageMarkReadHandler)

//TODO 消息已读

http.HandleFunc("/queryCommitHistory", websocket.QueryCommitHistoryHandler)
http.HandleFunc("/editCommitHistoryMilestone", websocket.EditCommitHistoryMilestoneHandler)


+ 47
- 0
nsqclient/nsq_single_consumer.go View File

@@ -0,0 +1,47 @@
package nsqclient

import (
"github.com/nsqio/go-nsq"
"log"
)

/**
Locking通知消息
*/
type LockingMsg struct{
Id int64 `json:"id,string"`
UserIds []int64 `json:"userIds"`
PushChannel int `json:"pushChannel"`
Title string `json:"title"`
Body string `json:"body"`
Type string `json:"type"`
PushTimeUnix int64 `json:"pushTimeUnix,string"`
Parameter map[string] string `json:"parameter"`
}

var MsgQueue = make(chan string,100000)

type NewHandler struct{}

func (m *NewHandler) HandleMessage(msg *nsq.Message) (err error) {
//addr := msg.NSQDAddress
message := string(msg.Body)
log.Println(message)
MsgQueue <- message
return
}
func Consumers(topic, channel, addr string) {
conf := nsq.NewConfig()
new_consumer, err := nsq.NewConsumer(topic, channel, conf)
if err != nil {

}

// 接收消息
new_handler := &NewHandler{}
new_consumer.AddHandler(new_handler)
err = new_consumer.ConnectToNSQD(addr)
if err != nil {

}
}

+ 30
- 0
nsqclient/nsq_single_product.go View File

@@ -0,0 +1,30 @@
package nsqclient

import (
"fmt"
"github.com/nsqio/go-nsq"
"time"
)




func main() {

nsqAddr := "www.lockingos.org:4150"
conf :=nsq.NewConfig()
p ,err := nsq.NewProducer(nsqAddr,conf)
if err != nil {
fmt.Println(err)
return
}
for {
message := "message :"+ time.Now().Format("2006-01-02 15:04:05")
fmt.Println(message)
// 发送消息
p.Publish("topic-demo1",[]byte(message))

time.Sleep(2*time.Second)
}
}

+ 97
- 11
websocket/websocket.go View File

@@ -42,7 +42,7 @@ func UploadHandler(w http.ResponseWriter, r *http.Request) {
goto ERR
}
//发送数据,判断返回值是否报错
log.Println("param uploadHandler:"+string(data))
//log.Println("param uploadHandler:"+string(data))

params :=strings.Split(string(data),"|")
mil,_:=strconv.ParseBool(params[7])
@@ -81,7 +81,7 @@ func DownloadHandler(w http.ResponseWriter, r *http.Request) {
goto ERR
}
//发送数据,判断返回值是否报错
log.Println("param downloadHandler:"+string(data))
//log.Println("param downloadHandler:"+string(data))

params :=strings.Split(string(data),"|")
err := handle.DownCommand(conn,params[0],params[1],params[2],params[3])
@@ -120,10 +120,11 @@ func InitLocalWorkSpaceHandler(w http.ResponseWriter, r *http.Request) {
goto ERR
}
//发送数据,判断返回值是否报错
log.Println("param initLocalWorkSpaceHandler:"+string(data))
//log.Println("param initLocalWorkSpaceHandler:"+string(data))

params :=strings.Split(string(data),"|")
err := handle.InitLocalWorkSpace(conn,params[0],params[1])
//userId = "367294106252087297"
err := handle.InitLocalWorkSpace(conn,params[0],params[1],params[2])
if err!=nil{
log.Println(err)
goto ERR
@@ -159,7 +160,7 @@ func OpenFileWithHandler(w http.ResponseWriter, r *http.Request) {
goto ERR
}
//发送数据,判断返回值是否报错
log.Println("param OpenFileWithHandler:"+string(data))
//log.Println("param OpenFileWithHandler:"+string(data))

err := handle.OpenFileWith(string(data))
if err!=nil{
@@ -201,7 +202,7 @@ func CheckForUpdatesHandler(w http.ResponseWriter, r *http.Request) {
goto ERR
}
//发送数据,判断返回值是否报错
log.Println("param CheckForUpdatesHandler:"+string(data))
//log.Println("param CheckForUpdatesHandler:"+string(data))

err := handle.CheckForUpdates(string(data))
if err!=nil{
@@ -243,7 +244,7 @@ func InitClientConfigHandler(w http.ResponseWriter, r *http.Request) {
goto ERR
}
//发送数据,判断返回值是否报错
log.Println("param InitClientConfigHandler:"+string(data))
//log.Println("param InitClientConfigHandler:"+string(data))
params :=strings.Split(string(data),"|")
err := handle.InitClientConfig(params[0],params[1])
if err!=nil{
@@ -285,7 +286,7 @@ func GetFolderFileInfoHandler(w http.ResponseWriter, r *http.Request) {
goto ERR
}
//发送数据,判断返回值是否报错
log.Println("param GetFolderFileInfo:"+string(data))
//log.Println("param GetFolderFileInfo:"+string(data))

err := handle.GetFolderFileInfo(conn,string(data))
if err!=nil{
@@ -323,7 +324,7 @@ func EditCommitHistoryMilestoneHandler(w http.ResponseWriter, r *http.Request) {
goto ERR
}
//发送数据,判断返回值是否报错
log.Println("param EditCommitHistoryMilestoneHandler:"+string(data))
//log.Println("param EditCommitHistoryMilestoneHandler:"+string(data))
params := strings.Split(string(data),"|")
mil,_:=strconv.ParseBool(params[3])
hash,err := handle.EditCommitHistoryMilestoneHandler(params[0],params[1],params[2],mil)
@@ -370,7 +371,7 @@ func SubscriptionFileChangeHandler(w http.ResponseWriter, r *http.Request){
goto ERR
}
//发送数据,判断返回值是否报错
log.Println("param subscriptionFileChangeHandler:"+string(data))
//log.Println("param subscriptionFileChangeHandler:"+string(data))
err = handle.SubscriptionFileChange(conn,string(data))

if err != nil {
@@ -422,6 +423,91 @@ ERR:

}

//消息通知
func MessageNotifyHandler(w http.ResponseWriter, r *http.Request){
//w.Write([]byte("hello"))
//收到http请求(upgrade),完成websocket协议转换
//在应答的header中放上upgrade:websoket
var (
conn *websocket.Conn
err error
//msgType int
data []byte
)
if conn, err = upgrader.Upgrade(w, r, nil); err !=nil {
//报错了,直接返回底层的websocket链接就会终断掉
return
}
//得到了websocket.Conn长连接的对象,实现数据的收发
for {
//Text(json), Binary
//if _, data, err = conn.ReadMessage(); err != nil {
if _, data, err = conn.ReadMessage(); err != nil {
//报错关闭websocket
goto ERR
}
//发送数据,判断返回值是否报错
//log.Println("param initLocalWorkSpaceHandler:"+string(data))

err := handle.MessageNotify(conn,string(data))

if err!=nil{
log.Println(err)
goto ERR
}

goto ERR
}
//error的标签
ERR:
conn.Close()
}


//消息已读
func MessageMarkReadHandler(w http.ResponseWriter, r *http.Request){
//w.Write([]byte("hello"))
//收到http请求(upgrade),完成websocket协议转换
//在应答的header中放上upgrade:websoket
var (
conn *websocket.Conn
err error
//msgType int
data []byte
)
if conn, err = upgrader.Upgrade(w, r, nil); err !=nil {
//报错了,直接返回底层的websocket链接就会终断掉
return
}
//得到了websocket.Conn长连接的对象,实现数据的收发
for {
//Text(json), Binary
//if _, data, err = conn.ReadMessage(); err != nil {
if _, data, err = conn.ReadMessage(); err != nil {
//报错关闭websocket
goto ERR
}
//发送数据,判断返回值是否报错
//log.Println("param initLocalWorkSpaceHandler:"+string(data))
params := strings.Split(string(data),"|")
err := handle.MessageMarkReadHandler(conn,params[0],params[1])

if err!=nil{
if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
log.Println(err)
goto ERR
}
log.Println(err)
goto ERR
}

goto ERR
}
//error的标签
ERR:
conn.Close()
}

//查询历史版本记录
func QueryCommitHistoryHandler(w http.ResponseWriter, r *http.Request) {
//w.Write([]byte("hello"))
@@ -448,7 +534,7 @@ func QueryCommitHistoryHandler(w http.ResponseWriter, r *http.Request) {
//发送数据,判断返回值是否报错
//eg: 文件路径|历史版本文件Hash
dataString := string(data)
log.Println("param QueryCommitHistoryHandler:"+dataString)
//log.Println("param QueryCommitHistoryHandler:"+dataString)
params := strings.Split(dataString,"|")

dataList,err := handle.QueryCommitHistory(params[0],params[1])


Loading…
Cancel
Save