yuan_rh пре 4 година
родитељ
комит
f153f4927d
3 измењених фајлова са 99 додато и 15 уклоњено
  1. +34
    -12
      handle/handle.go
  2. +29
    -3
      main.go
  3. +36
    -0
      websocket/websocket.go

+ 34
- 12
handle/handle.go Прегледај датотеку

@@ -334,7 +334,7 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir st
defer tfile.Close()

serverSh := shell.NewShell(config.ServerIpfsUrl)
serverSh.SetTimeout(time.Duration(30)*time.Second)
//serverSh.SetTimeout(time.Duration(30)*time.Second)
log.Println("检测引导节点存活情况"+config.ServerIpfsUrl)
//检测引导节点是否连接成功
isUp := serverSh.IsUp()
@@ -427,6 +427,10 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir st

if err != nil {
log.Println("cmd.Run() failed with %s\n", err)
if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
return err
}
return err
}
if errStdout != nil || errStderr != nil {
log.Println("failed to capture stdout or stderr\n")
@@ -444,7 +448,7 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir st
prog.Process=100.00

sh := shell.NewShell(config.GobalIpfsUrl)
sh.SetTimeout(time.Duration(30)*time.Second)
//sh.SetTimeout(time.Duration(30)*time.Second)
objectStat,err :=sh.ObjectStat(prog.Hash)
if err != nil {
log.Println(err)
@@ -453,10 +457,11 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir st
prog.Size=strconv.Itoa(objectStat.CumulativeSize)

projson,err :=json.Marshal(prog)
serverSh.SetTimeout(time.Duration(30)*time.Second)
err = serverSh.Pin(prog.Hash)

//
cmd = exec.Command(ipfsPath,"dht","provide",prog.Hash)
err = cmd.Run()
if err != nil {
log.Println("引导节点备份失败")
log.Println(err)
if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
return err
@@ -464,17 +469,21 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir st
return err
}

if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
//serverSh.SetTimeout(time.Duration(600)*time.Second)

err = serverSh.Pin(prog.Hash)
if err != nil {
log.Println("引导节点备份失败")
log.Println(err)
if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
return err
}
return err
}

log.Println("引导节点文件备份成功")


//本地文件夹拷贝
err = sh.Get(prog.Hash,fmt.Sprint((absoluteDir+"\\"+fileName)))


if err != nil {
log.Println(err)
return err
@@ -488,6 +497,9 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir st
return err
}

if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
log.Println(err)
}
log.Println("上传成功")

return nil
@@ -620,7 +632,7 @@ func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{
//定期校验缓存的本地文件状态
go func() {
for true {
time.Sleep(time.Duration(1)*time.Minute)
time.Sleep(time.Duration(5)*time.Minute)
dataMap,err := etcdclient.QueryWithPrefix(keyPrefix)
if err != nil {
log.Println(err)
@@ -671,9 +683,15 @@ func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{
// 不存在则初始化进etcd

err =filepath.Walk(getLocalFileListDir,walkfunc)
//路径错误
if err != nil {
log.Println(err)
//time.Sleep(time.Duration(1)*time.Minute)
if err := conn.WriteMessage(websocket.TextMessage, []byte("{}")); err != nil {
log.Println(err)
return err
}
time.Sleep(time.Duration(1)*time.Minute)
continue
}

mapByte,err:=json.Marshal(gobalFileMap)
@@ -812,6 +830,10 @@ func CheckForUpdates(forceUpdate string) error{

func walkfunc(path string, info os.FileInfo, err error) error {

if info == nil{
return nil
}

if info.IsDir()==false{

sh := shell.NewShell(config.GobalIpfsUrl)


+ 29
- 3
main.go Прегледај датотеку

@@ -13,7 +13,9 @@ import (
"os"
_ "os"
"path/filepath"
"strconv"
_ "strings"
"time"
)


@@ -28,8 +30,8 @@ func main() {
//创建文件目录
os.MkdirAll(config.LocalWorkSpaceDir, os.ModePerm)
}
logpath :=config.LocalWorkSpaceDir+"\\"+"fts.log"
time.Now().Month().String()
logpath :=config.LocalWorkSpaceDir+"\\"+"fts_"+strconv.Itoa(time.Now().Year())+time.Now().Month().String()+".log"
logFile, err := os.OpenFile(logpath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
log.Printf("open log file failed, err:", err)
@@ -97,6 +99,30 @@ func main() {

//defer config.GobalWatch.Close()


go func() {
//客户端心跳检测
log.Println("keeplive")
time.Sleep(20*time.Second)
var keepliveTimeOut int64 =5 //20s
var lastAccept int64 = time.Now().Unix() //秒时间戳
for true{
//log.Print(!websocket.IsKeeplive)
//log.Println((time.Now().Unix()-lastAccept)>keepliveTimeOut)
if !websocket.IsKeeplive {
if (time.Now().Unix()-lastAccept)>keepliveTimeOut{
log.Println("长时间未检测到心跳 Exit")
os.Exit(1)
}
continue
}

lastAccept = time.Now().Unix()
websocket.IsKeeplive = false
//time.Sleep(100*time.Millisecond)
}
}()

//http://localhost:7777/ws
http.HandleFunc("/upload", websocket.UploadHandler)
http.HandleFunc("/subscriptionFileChange", websocket.SubscriptionFileChangeHandler)
@@ -107,6 +133,7 @@ func main() {
http.HandleFunc("/checkForUpdates", websocket.CheckForUpdatesHandler)
http.HandleFunc("/initClientConfig", websocket.InitClientConfigHandler)
http.HandleFunc("/watchFile", websocket.WatchFileHandler)
http.HandleFunc("/keeplive", websocket.KeepliveHandler)

//服务端启动
log.Println("服务启动成功,监听端口7777,等待连接。")
@@ -114,7 +141,6 @@ func main() {




}

//func main() {


+ 36
- 0
websocket/websocket.go Прегледај датотеку

@@ -372,6 +372,42 @@ func SubscriptionFileChangeHandler(w http.ResponseWriter, r *http.Request){
ERR:
conn.Close()}



var IsKeeplive bool = false //客户端是否存活
func KeepliveHandler(w http.ResponseWriter, r *http.Request){
//w.Write([]byte("hello"))
//收到http请求(upgrade),完成websocket协议转换
//在应答的header中放上upgrade:websoket
IsKeeplive = true
var (
conn *websocket.Conn
err error
//msgType int
//data []byte
)
if conn, err = upgrader.Upgrade(w, r, nil); err !=nil {
//报错了,直接返回底层的websocket链接就会终断掉
log.Println(err)
return
}

//得到了websocket.Conn长连接的对象,实现数据的收发
for {
//Text(json), Binary
//if _, data, err = conn.ReadMessage(); err != nil {
if _, _, err = conn.ReadMessage(); err != nil {
//报错关闭websocket
log.Println(err)
goto ERR
}
IsKeeplive = true
//goto ERR
}
//error的标签
ERR:
conn.Close()}

func main() {



Loading…
Откажи
Сачувај