diff --git a/handle/handle.go b/handle/handle.go index 665f1ce..56cba26 100644 --- a/handle/handle.go +++ b/handle/handle.go @@ -334,7 +334,7 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir st defer tfile.Close() serverSh := shell.NewShell(config.ServerIpfsUrl) - serverSh.SetTimeout(time.Duration(30)*time.Second) + //serverSh.SetTimeout(time.Duration(30)*time.Second) log.Println("检测引导节点存活情况"+config.ServerIpfsUrl) //检测引导节点是否连接成功 isUp := serverSh.IsUp() @@ -427,6 +427,10 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir st if err != nil { log.Println("cmd.Run() failed with %s\n", err) + if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil { + return err + } + return err } if errStdout != nil || errStderr != nil { log.Println("failed to capture stdout or stderr\n") @@ -444,7 +448,7 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir st prog.Process=100.00 sh := shell.NewShell(config.GobalIpfsUrl) - sh.SetTimeout(time.Duration(30)*time.Second) + //sh.SetTimeout(time.Duration(30)*time.Second) objectStat,err :=sh.ObjectStat(prog.Hash) if err != nil { log.Println(err) @@ -453,10 +457,11 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir st prog.Size=strconv.Itoa(objectStat.CumulativeSize) projson,err :=json.Marshal(prog) - serverSh.SetTimeout(time.Duration(30)*time.Second) - err = serverSh.Pin(prog.Hash) + + // + cmd = exec.Command(ipfsPath,"dht","provide",prog.Hash) + err = cmd.Run() if err != nil { - log.Println("引导节点备份失败") log.Println(err) if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil { return err @@ -464,17 +469,21 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir st return err } - if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil { + //serverSh.SetTimeout(time.Duration(600)*time.Second) + + err = serverSh.Pin(prog.Hash) + if err != nil { + log.Println("引导节点备份失败") log.Println(err) + if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil { + return err + } + return err } - log.Println("引导节点文件备份成功") - //本地文件夹拷贝 err = sh.Get(prog.Hash,fmt.Sprint((absoluteDir+"\\"+fileName))) - - if err != nil { log.Println(err) return err @@ -488,6 +497,9 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir st return err } + if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil { + log.Println(err) + } log.Println("上传成功") return nil @@ -620,7 +632,7 @@ func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{ //定期校验缓存的本地文件状态 go func() { for true { - time.Sleep(time.Duration(1)*time.Minute) + time.Sleep(time.Duration(5)*time.Minute) dataMap,err := etcdclient.QueryWithPrefix(keyPrefix) if err != nil { log.Println(err) @@ -671,9 +683,15 @@ func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{ // 不存在则初始化进etcd err =filepath.Walk(getLocalFileListDir,walkfunc) + //路径错误 if err != nil { log.Println(err) - //time.Sleep(time.Duration(1)*time.Minute) + if err := conn.WriteMessage(websocket.TextMessage, []byte("{}")); err != nil { + log.Println(err) + return err + } + time.Sleep(time.Duration(1)*time.Minute) + continue } mapByte,err:=json.Marshal(gobalFileMap) @@ -812,6 +830,10 @@ func CheckForUpdates(forceUpdate string) error{ func walkfunc(path string, info os.FileInfo, err error) error { + if info == nil{ + return nil + } + if info.IsDir()==false{ sh := shell.NewShell(config.GobalIpfsUrl) diff --git a/main.go b/main.go index c9381f3..0615224 100644 --- a/main.go +++ b/main.go @@ -13,7 +13,9 @@ import ( "os" _ "os" "path/filepath" + "strconv" _ "strings" + "time" ) @@ -28,8 +30,8 @@ func main() { //创建文件目录 os.MkdirAll(config.LocalWorkSpaceDir, os.ModePerm) } - - logpath :=config.LocalWorkSpaceDir+"\\"+"fts.log" + time.Now().Month().String() + logpath :=config.LocalWorkSpaceDir+"\\"+"fts_"+strconv.Itoa(time.Now().Year())+time.Now().Month().String()+".log" logFile, err := os.OpenFile(logpath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) if err != nil { log.Printf("open log file failed, err:", err) @@ -97,6 +99,30 @@ func main() { //defer config.GobalWatch.Close() + + go func() { + //客户端心跳检测 + log.Println("keeplive") + time.Sleep(20*time.Second) + var keepliveTimeOut int64 =5 //20s + var lastAccept int64 = time.Now().Unix() //秒时间戳 + for true{ + //log.Print(!websocket.IsKeeplive) + //log.Println((time.Now().Unix()-lastAccept)>keepliveTimeOut) + if !websocket.IsKeeplive { + if (time.Now().Unix()-lastAccept)>keepliveTimeOut{ + log.Println("长时间未检测到心跳 Exit") + os.Exit(1) + } + continue + } + + lastAccept = time.Now().Unix() + websocket.IsKeeplive = false + //time.Sleep(100*time.Millisecond) + } + }() + //http://localhost:7777/ws http.HandleFunc("/upload", websocket.UploadHandler) http.HandleFunc("/subscriptionFileChange", websocket.SubscriptionFileChangeHandler) @@ -107,6 +133,7 @@ func main() { http.HandleFunc("/checkForUpdates", websocket.CheckForUpdatesHandler) http.HandleFunc("/initClientConfig", websocket.InitClientConfigHandler) http.HandleFunc("/watchFile", websocket.WatchFileHandler) + http.HandleFunc("/keeplive", websocket.KeepliveHandler) //服务端启动 log.Println("服务启动成功,监听端口7777,等待连接。") @@ -114,7 +141,6 @@ func main() { - } //func main() { diff --git a/websocket/websocket.go b/websocket/websocket.go index 443620f..8c2d2bc 100644 --- a/websocket/websocket.go +++ b/websocket/websocket.go @@ -372,6 +372,42 @@ func SubscriptionFileChangeHandler(w http.ResponseWriter, r *http.Request){ ERR: conn.Close()} + + +var IsKeeplive bool = false //客户端是否存活 +func KeepliveHandler(w http.ResponseWriter, r *http.Request){ + //w.Write([]byte("hello")) + //收到http请求(upgrade),完成websocket协议转换 + //在应答的header中放上upgrade:websoket + IsKeeplive = true + var ( + conn *websocket.Conn + err error + //msgType int + //data []byte + ) + if conn, err = upgrader.Upgrade(w, r, nil); err !=nil { + //报错了,直接返回底层的websocket链接就会终断掉 + log.Println(err) + return + } + + //得到了websocket.Conn长连接的对象,实现数据的收发 + for { + //Text(json), Binary + //if _, data, err = conn.ReadMessage(); err != nil { + if _, _, err = conn.ReadMessage(); err != nil { + //报错关闭websocket + log.Println(err) + goto ERR + } + IsKeeplive = true + //goto ERR + } + //error的标签 +ERR: + conn.Close()} + func main() {