Explorar el Código

update

pull/1/head
yuan_rh hace 4 años
padre
commit
c652a256ab
Se han modificado 3 ficheros con 89 adiciones y 124 borrados
  1. +65
    -66
      handle/handle.go
  2. +17
    -50
      main.go
  3. +7
    -8
      websocket/websocket.go

+ 65
- 66
handle/handle.go Ver fichero

@@ -5,8 +5,8 @@ import (
"fmt" "fmt"
"fts/config" "fts/config"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
shell "github.com/ipfs/go-ipfs-api"
_ "github.com/ipfs/go-ipfs-api" _ "github.com/ipfs/go-ipfs-api"
shell "github.com/ipfs/go-ipfs-api"
"io" "io"
"log" "log"
"os" "os"
@@ -57,7 +57,7 @@ func main() {
@param userId 用户ID @param userId 用户ID
@param projectName 项目名称 @param projectName 项目名称
*/ */
func InitLocalWorkSpace(userId,projectName string) (error){
func InitLocalWorkSpace(conn *websocket.Conn,userId,projectName string) (error){


//初始化当前登陆用户 //初始化当前登陆用户
gobalLoginUserId = userId gobalLoginUserId = userId
@@ -73,6 +73,14 @@ func InitLocalWorkSpace(userId,projectName string) (error){
os.MkdirAll(projectPath+"\\协作文件", os.ModePerm) os.MkdirAll(projectPath+"\\协作文件", os.ModePerm)
os.MkdirAll(projectPath+"\\公共文件", os.ModePerm)*/ os.MkdirAll(projectPath+"\\公共文件", os.ModePerm)*/
} }

log.Println("切换本地工作目录至 "+projectPath)

if err := conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprint(projectPath))); err != nil {
log.Println(err)
return err
}

return nil return nil
} }


@@ -92,6 +100,7 @@ func DownCommand(conn *websocket.Conn, hash, projectName, fileName, dir string)
//创建文件目录 //创建文件目录
err = os.MkdirAll(absoluteDir, os.ModePerm) err = os.MkdirAll(absoluteDir, os.ModePerm)
if err!=nil{ if err!=nil{
log.Println(err)
return err return err
} }
} }
@@ -122,9 +131,10 @@ func DownCommand(conn *websocket.Conn, hash, projectName, fileName, dir string)
} }


if err != nil { if err != nil {
log.Fatalf("json.Marshal error %s\n", err)
log.Printf("json.Marshal error %s\n", err)
} }
if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil { if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
log.Println(err)
break break
} }
seconds = current seconds = current
@@ -137,7 +147,7 @@ func DownCommand(conn *websocket.Conn, hash, projectName, fileName, dir string)
} }


if err != nil { if err != nil {
log.Fatalf("json.Marshal error %s\n", err)
log.Printf("json.Marshal error %s\n", err)
} }
if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil { if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
panic(err) panic(err)
@@ -147,21 +157,44 @@ func DownCommand(conn *websocket.Conn, hash, projectName, fileName, dir string)
} }
} }
}() }()
log.Println("下载资源连接中...")

//设置30秒连接超时
go func() {
index :=0
for true{
if cmd.ProcessState==nil{
index++
time.Sleep(time.Duration(1)*time.Second)
if index==30{
err = cmd.Process.Kill()
log.Println("进程连接超时30s已被Kill")
if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
return
}
return
}
}else {
return
}
}
}()


err = cmd.Wait() err = cmd.Wait()
if err != nil { if err != nil {
log.Fatalf("cmd.Run() failed with %s\n", err)
log.Panicf("cmd.Run() failed with %s\n", err)
} }
if errStdout != nil || errStderr != nil { if errStdout != nil || errStderr != nil {
log.Fatalf("failed to capture stdout or stderr\n")
log.Printf("failed to capture stdout or stderr\n")
} }
outStr, errStr := string(stdout), string(stderr) outStr, errStr := string(stdout), string(stderr)
fmt.Printf("\nout:\n%s\nerr:\n%s\n", outStr, errStr)
log.Printf("out:%s ,err:%s", outStr, errStr)


defer close(progress) defer close(progress)


fmt.Println("下载成功")

if err==nil{
log.Println("下载成功")
}
return nil return nil


} }
@@ -198,7 +231,6 @@ func contentToJSONByte(content string) ([]byte,error){
} }





/** /**
上传本地文件 上传本地文件
@param absolutePath 文件本地绝对路径 @param absolutePath 文件本地绝对路径
@@ -235,7 +267,7 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir st
return err return err
} }
} }
fmt.Println(ipfsPath, "add",absolutePath)
cmd := exec.Command(ipfsPath, "add",absolutePath) cmd := exec.Command(ipfsPath, "add",absolutePath)
uploadProgress := make(chan string,10000) uploadProgress := make(chan string,10000)
var stdout, stderr []byte var stdout, stderr []byte
@@ -288,6 +320,7 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir st
}() }()


err = cmd.Wait() err = cmd.Wait()

if err != nil { if err != nil {
log.Fatalf("cmd.Run() failed with %s\n", err) log.Fatalf("cmd.Run() failed with %s\n", err)
} }
@@ -295,7 +328,7 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir st
log.Fatalf("failed to capture stdout or stderr\n") log.Fatalf("failed to capture stdout or stderr\n")
} }
outStr, errStr := string(stdout), string(stderr) outStr, errStr := string(stdout), string(stderr)
fmt.Printf("\nout:\n%s\nerr:\n%s\n", outStr, errStr)
log.Printf("out:%s,err:%s", outStr, errStr)
/*sh.Get(hash,absoluteDir+fileName) /*sh.Get(hash,absoluteDir+fileName)
if err != nil { if err != nil {
return "",err return "",err
@@ -309,9 +342,9 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir st
sh := shell.NewShell(config.GobalIpfsUrl) sh := shell.NewShell(config.GobalIpfsUrl)
objectStat,err :=sh.ObjectStat(prog.Hash) objectStat,err :=sh.ObjectStat(prog.Hash)
if err != nil { if err != nil {
log.Println(err)
return err return err
} }
fmt.Println(objectStat.CumulativeSize)
prog.Size=strconv.Itoa(objectStat.CumulativeSize) prog.Size=strconv.Itoa(objectStat.CumulativeSize)


projson,err :=json.Marshal(prog) projson,err :=json.Marshal(prog)
@@ -324,19 +357,21 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir st
serverSh := shell.NewShell(config.ServerIpfsUrl) serverSh := shell.NewShell(config.ServerIpfsUrl)
err = serverSh.Pin(prog.Hash) err = serverSh.Pin(prog.Hash)
if err != nil { if err != nil {
fmt.Println(err)
log.Println("引导节点备份失败")
log.Println(err)
} }
log.Println("引导节点文件备份成功")
}() }()


//本地文件夹拷贝 //本地文件夹拷贝
err = sh.Get(prog.Hash,fmt.Sprint((absoluteDir+"\\"+fileName))) err = sh.Get(prog.Hash,fmt.Sprint((absoluteDir+"\\"+fileName)))


if err != nil { if err != nil {
fmt.Println(err)
log.Println(err)
return err return err
} }


fmt.Println("上传成功")
log.Println("上传成功")


return nil return nil
} }
@@ -373,6 +408,8 @@ func NotifyFileChange(id string, changeType int){


} }




/** /**
获取本地文件列表 获取本地文件列表
*/ */
@@ -383,44 +420,28 @@ func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{
for { for {
err :=filepath.Walk(getLocalFileListDir,walkfunc) err :=filepath.Walk(getLocalFileListDir,walkfunc)
if err != nil { if err != nil {
fmt.Println(err)
log.Println(err)
continue continue
} }


mapByte,err:=json.Marshal(gobalFileMap) mapByte,err:=json.Marshal(gobalFileMap)
if err != nil { if err != nil {
fmt.Println(err)
log.Println(err)
return err return err
} }


if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil { if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil {
fmt.Println(err)
log.Println(err)
} }
//for k,v := range gobalFileMap {
// if v==1{
// //socket
// if err := conn.WriteMessage(websocket.TextMessage, []byte(k)); err != nil {
//
// fmt.Println(err)
// continue
// }
// //添加 检查删除
// //gobalFileMap[k]=0
// }
//}
fmt.Println("执行睡眠3分钟")

time.Sleep(time.Duration(3)*time.Minute)

log.Println("检查本地目录文件变更状态,执行睡眠时间1分钟")

//清空gobalFileMap
gobalFileMap = make(map[string] string)

time.Sleep(time.Duration(1)*time.Minute)
} }


//err :=filepath.Walk(getLocalFileListDir,walkfunc)
//if err != nil {
// return gobalFileMap,err
//}
//fmt.Println(len(gobalFileMap))
//for s := range gobalFileMap {
// fmt.Println(s)
//}
return nil return nil
} }


@@ -431,7 +452,7 @@ func walkfunc(path string, info os.FileInfo, err error) error {
sh := shell.NewShell(config.GobalIpfsUrl) sh := shell.NewShell(config.GobalIpfsUrl)
file,err :=os.Open(path) file,err :=os.Open(path)
if err != nil{ if err != nil{
fmt.Println(err)
log.Println(err)
return err return err
} }


@@ -439,7 +460,7 @@ func walkfunc(path string, info os.FileInfo, err error) error {


hash,err :=sh.Add(file) hash,err :=sh.Add(file)
if err != nil { if err != nil {
fmt.Println(err)
log.Println(err)
return err return err
} }


@@ -447,28 +468,6 @@ func walkfunc(path string, info os.FileInfo, err error) error {


gobalFileMap[dir]=hash gobalFileMap[dir]=hash


//文件已修改状态
//dir :=strings.Replace(path,getLocalFileListDir,"",1)
//if gobalFileMap[dir]==1{
// return nil
//}
//gobalFileMap[dir]=0
//
////判断是否修改
//f,err:=os.Stat(path)
//if err!=nil{
// return err
//}
//secondStr := strconv.FormatInt(f.ModTime().Unix(),10)
//if gobalFileUpdateTimeMap[path] == ""{
// gobalFileUpdateTimeMap[path]=secondStr
//}else{
// if gobalFileUpdateTimeMap[path]!=secondStr{
// gobalFileUpdateTimeMap[path]=secondStr
// gobalFileMap[dir]=1
// }
//}

} }
return nil return nil
} }


+ 17
- 50
main.go Ver fichero

@@ -6,6 +6,7 @@ import (
"fts/config" "fts/config"
"fts/websocket" "fts/websocket"
_ "github.com/ipfs/go-ipfs-api" _ "github.com/ipfs/go-ipfs-api"
"io"
"io/ioutil" "io/ioutil"
"log" "log"
"net/http" "net/http"
@@ -14,56 +15,24 @@ import (
_ "strings" _ "strings"
) )


//func copyAndCapture(w io.Writer, r io.Reader) ([]byte, error) {
// var out []byte
// buf := make([]byte, 1024, 1024)
// for {
// n, err := r.Read(buf[:])
// if n > 0 {
// d := buf[:n]
// out = append(out, d...)
// os.Stdout.Write(d)
// }
// if err != nil {
// // Read returns io.EOF at the end of file, which is not an error for us
// if err == io.EOF {
// err = nil
// }
// return out, err
// }
// }
// // never reached
// panic(true)
// return nil, nil
//}


func main() { func main() {


//cmd := exec.Command("cmd","/C", "ipfs add C:\\Users\\yuan_rh\\Downloads\\QmRzN7uW6HCVAkGMXNWv3rC9dqPJijvEgvtW6DKsQQE8Js")
//var stdout, stderr []byte
//var errStdout, errStderr error
//stdoutIn, _ := cmd.StdoutPipe()
//stderrIn, _ := cmd.StderrPipe()
//cmd.Start()
//go func() {
// stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn)
//}()
//go func() {
// stderr, errStderr = copyAndCapture(os.Stderr, stderrIn)
//}()
//
//err := cmd.Wait()
//if err != nil {
// log.Fatalf("cmd.Run() failed with %s\n", err)
//}
//if errStdout != nil || errStderr != nil {
// log.Fatalf("failed to capture stdout or stderr\n")
//}
//outStr, errStr := string(stdout), string(stderr)
//fmt.Printf("\nout:\n%s\nerr:\n%s\n", outStr, errStr)
config.InitConfig() config.InitConfig()
//handle.InitLocalWorkSpace("324523676458291200","11.4")

//日志设置

logpath :=config.LocalWorkSpaceDir+"\\"+"fts.log"
logFile, err := os.OpenFile(logpath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
log.Printf("open log file failed, err:", err)
return
}
multiWriter := io.MultiWriter(os.Stdout,logFile)
log.SetOutput(multiWriter)
log.SetPrefix("[fts] ")
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)

//handle.InitLocalWorkSpace(nil,"324523676458291200","11.4")


//http://localhost:7777/ws //http://localhost:7777/ws
http.HandleFunc("/upload", websocket.UploadHandler) http.HandleFunc("/upload", websocket.UploadHandler)
@@ -72,11 +41,9 @@ func main() {
http.HandleFunc("/init", websocket.InitLocalWorkSpaceHandler) http.HandleFunc("/init", websocket.InitLocalWorkSpaceHandler)


//服务端启动 //服务端启动
fmt.Println("启动服务,监听端口7777")
log.Println("服务启动成功,监听端口7777,等待连接。")
http.ListenAndServe("0.0.0.0:7777", nil) http.ListenAndServe("0.0.0.0:7777", nil)




} }


// 递归扫描目录 // 递归扫描目录


+ 7
- 8
websocket/websocket.go Ver fichero

@@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"fts/handle" "fts/handle"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"log"
"net/http" "net/http"
"strings" "strings"
) )
@@ -40,7 +41,7 @@ func UploadHandler(w http.ResponseWriter, r *http.Request) {
goto ERR goto ERR
} }
//发送数据,判断返回值是否报错 //发送数据,判断返回值是否报错
fmt.Println("UploadHandler param:"+string(data))
log.Println("param uploadHandler:"+string(data))


params :=strings.Split(string(data),"|") params :=strings.Split(string(data),"|")
err := handle.UploadCommand(conn,params[0],params[1],params[2],params[3]) err := handle.UploadCommand(conn,params[0],params[1],params[2],params[3])
@@ -78,7 +79,7 @@ func DownloadHandler(w http.ResponseWriter, r *http.Request) {
goto ERR goto ERR
} }
//发送数据,判断返回值是否报错 //发送数据,判断返回值是否报错
fmt.Println("DownloadHandler param:"+string(data))
log.Println("param downloadHandler:"+string(data))


params :=strings.Split(string(data),"|") params :=strings.Split(string(data),"|")
err := handle.DownCommand(conn,params[0],params[1],params[2],params[3]) err := handle.DownCommand(conn,params[0],params[1],params[2],params[3])
@@ -117,12 +118,10 @@ func InitLocalWorkSpaceHandler(w http.ResponseWriter, r *http.Request) {
goto ERR goto ERR
} }
//发送数据,判断返回值是否报错 //发送数据,判断返回值是否报错
fmt.Println("InitLocalWorkSpaceHandler param:"+string(data))
//var fileMap map[string]int
//fileMap,err = handle.GetLocalFileList("testOne")
//jsonbyte,_ := json.Marshal(fileMap)
log.Println("param initLocalWorkSpaceHandler:"+string(data))

params :=strings.Split(string(data),"|") params :=strings.Split(string(data),"|")
err := handle.InitLocalWorkSpace(params[0],params[1])
err := handle.InitLocalWorkSpace(conn,params[0],params[1])
if err!=nil{ if err!=nil{
fmt.Println(err) fmt.Println(err)
goto ERR goto ERR
@@ -158,7 +157,7 @@ func SubscriptionFileChangeHandler(w http.ResponseWriter, r *http.Request){
goto ERR goto ERR
} }
//发送数据,判断返回值是否报错 //发送数据,判断返回值是否报错
fmt.Println("SubscriptionFileChangeHandler param:"+string(data))
log.Println("param subscriptionFileChangeHandler:"+string(data))
err = handle.SubscriptionFileChange(conn,string(data)) err = handle.SubscriptionFileChange(conn,string(data))


if err != nil { if err != nil {


Cargando…
Cancelar
Guardar