diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..84efc43 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +swagger/ +swagger.zip +*.exe* +*.tmp +vendor/ +*.mod +*.sum +.idea \ No newline at end of file diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..9e26b58 --- /dev/null +++ b/config/config.go @@ -0,0 +1,24 @@ +package config + +import "os" + +func main() { + +} + +//本地工作目录 +var LocalWorkSpaceDir string + +//本地ipfs网关 +var GobalIpfsUrl ="localhost:5001" + +//ipfs引导节点网关 +var ServerIpfsUrl = "10.240.10.238:5001" + + +func InitConfig(){ + //TODO 读取本地工作目录路径,默认工作空间 + //LocalWorkSpaceDir = "D:\\easycloud" + LocalWorkSpaceDir=os.Getenv("USERPROFILE")+"\\easycloud" + +} \ No newline at end of file diff --git a/handle/handle.go b/handle/handle.go new file mode 100644 index 0000000..d2c3f87 --- /dev/null +++ b/handle/handle.go @@ -0,0 +1,481 @@ +package handle + +import ( + "encoding/json" + "fmt" + "fts/config" + "github.com/gorilla/websocket" + shell "github.com/ipfs/go-ipfs-api" + _ "github.com/ipfs/go-ipfs-api" + "io" + "log" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + "time" +) + +var gobalLoginUserId string + +//key:filepath,value:hash +var gobalFileMap = make(map[string] string) +var gobalFileUpdateTimeMap = make(map[string] string) + +//var gobalFileChangeMap = make(map[string] string) +var getLocalFileListDir string +var gobalSubscriptionFileChangeSwitch int =0 //订阅文件变更开关 + +var ipfsPath=os.Getenv("IPFS-PATH")+"ipfs.exe" +/** +文件上传下载进度 + */ +type processStruct struct { + Size string `json:"size"` + CurrentSize string `json:"currentSize"` + Unit string `json:"unit"` + Process float64 `json:"process"` + Hash string `json:"hash"` +} + +func main() { + //config.InitConfig() + //InitLocalWorkSpace("320872793405132801","test1") + // + //DownCommand("QmTp2hEo8eXRp6wg7jXv1BLCMh5a4F3B7buAUZNZUu772j","testOne","hello.txt","a/b/") + // + //UploadCommand("C:\\Users\\yuan_rh\\Downloads\\QmRzN7uW6HCVAkGMXNWv3rC9dqPJijvEgvtW6DKsQQE8Js","QmRzN7uW6HCVAkGMXNWv3rC9dqPJijvEgvtW6DKsQQE8Js","testOne","a/b/") + // + //GetLocalFileList("testOne") +} + + + +/** +初始化本地工作目录 +@param userId 用户ID +@param projectName 项目名称 + */ +func InitLocalWorkSpace(userId,projectName string) (error){ + + //初始化当前登陆用户 + gobalLoginUserId = userId + + // 检查本地目录是否存在 + var projectPath = config.LocalWorkSpaceDir +"\\"+userId+"\\"+projectName + _,err := os.Stat(projectPath) + if err != nil { + //创建文件目录 + os.MkdirAll(projectPath, os.ModePerm) + /*os.MkdirAll(projectPath+"\\我的文件", os.ModePerm) + os.MkdirAll(projectPath+"\\工作文件", os.ModePerm) + os.MkdirAll(projectPath+"\\协作文件", os.ModePerm) + os.MkdirAll(projectPath+"\\公共文件", os.ModePerm)*/ + } + return nil +} + +/** +下载指令 +@param hash ipfs哈希值 +@param projectName 项目名称 +@para fileName 文件名称 +@param dir 云文件目录 + */ +func DownCommand(conn *websocket.Conn, hash, projectName, fileName, dir string) error{ + + absoluteDir := config.LocalWorkSpaceDir+"\\"+gobalLoginUserId+"\\"+projectName+"\\"+dir + //检查目录 + _,err := os.Stat(absoluteDir) + if err != nil { + //创建文件目录 + err = os.MkdirAll(absoluteDir, os.ModePerm) + if err!=nil{ + return err + } + } + + + cmd := exec.Command(ipfsPath,"get", hash,"-o",fmt.Sprint(absoluteDir+"\\"+fileName)) + progress := make(chan string,10000) + var stdout, stderr []byte + var errStdout, errStderr error + stdoutIn, _ := cmd.StdoutPipe() + stderrIn, _ := cmd.StderrPipe() + cmd.Start() + go func() { + stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, progress) + }() + go func() { + stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, progress) + }() + + go func(){ + seconds := time.Now().Unix() + for content := range progress { // 通道关闭后会退出for range循环 + current :=time.Now().Unix() + if current-seconds>1{ + projson,err := contentToJSONByte(content) + if projson==nil && err==nil{ + continue + } + + if err != nil { + log.Fatalf("json.Marshal error %s\n", err) + } + if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil { + break + } + seconds = current + } + if strings.Index(content,"100.00%")!=-1{ + + projson,err := contentToJSONByte(content) + if projson==nil && err==nil{ + continue + } + + if err != nil { + log.Fatalf("json.Marshal error %s\n", err) + } + if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil { + panic(err) + } + break + + } + } + }() + + err = cmd.Wait() + if err != nil { + log.Fatalf("cmd.Run() failed with %s\n", err) + } + if errStdout != nil || errStderr != nil { + log.Fatalf("failed to capture stdout or stderr\n") + } + outStr, errStr := string(stdout), string(stderr) + fmt.Printf("\nout:\n%s\nerr:\n%s\n", outStr, errStr) + + defer close(progress) + + fmt.Println("下载成功") + + return nil + +} + +func contentToJSONByte(content string) ([]byte,error){ + sts :=strings.Split(content," ") + if len(sts)<8{ + return nil,nil + } + var processFloat float64 + if (len(sts)==9 || len(sts)==8){ + processFloat,_ =strconv.ParseFloat(strings.Replace(sts[7],"%","",1), 64) + }else{ + processFloat,_ =strconv.ParseFloat(strings.Replace(sts[8],"%","",1), 64) + } + + if processFloat==0{ + return nil,nil + + } + + + pro :=&processStruct{ + Size:sts[4], + CurrentSize: sts[1], + Unit: sts[2], + Process: processFloat, + Hash: "", + + } + projson,err :=json.Marshal(pro) + + return projson,err +} + + + +/** +上传本地文件 +@param absolutePath 文件本地绝对路径 +@param fileName 文件名称 +@param projectName 项目名称 +@param dir 云文件目录 + */ +func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir string) error{ + //拷贝文件 + + //上传文件 + + // + //file,err :=os.Open(absolutePath) + //if err != nil{ + // return "",err + //} + // + //defer file.Close() + // + //hash,err :=sh.Add(file) + //if err != nil { + // return "",err + //} + + //本地拷贝文件 + absoluteDir := config.LocalWorkSpaceDir+"\\"+gobalLoginUserId+"\\"+projectName+"\\"+dir + //检查目录 + _,err := os.Stat(absoluteDir) + if err != nil { + //创建文件目录 + err = os.MkdirAll(absoluteDir, os.ModePerm) + if err!=nil{ + return err + } + } + fmt.Println(ipfsPath, "add",absolutePath) + cmd := exec.Command(ipfsPath, "add",absolutePath) + uploadProgress := make(chan string,10000) + var stdout, stderr []byte + var errStdout, errStderr error + stdoutIn, _ := cmd.StdoutPipe() + stderrIn, _ := cmd.StderrPipe() + cmd.Start() + go func() { + stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, uploadProgress) + }() + go func() { + stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, uploadProgress) + }() + + go func(){ + seconds := time.Now().Unix() + for content := range uploadProgress { // 通道关闭后会退出for range循环 + current :=time.Now().Unix() + if current-seconds>1{ + projson,err := contentToJSONByte(content) + if projson==nil && err==nil{ + continue + } + + if err != nil { + log.Fatalf("json.Marshal error %s\n", err) + } + if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil { + break + } + seconds = current + } + if strings.Index(content,"100.00%")!=-1{ + + projson,err := contentToJSONByte(content) + if projson==nil && err==nil{ + continue + } + + + if err != nil { + log.Fatalf("json.Marshal error %s\n", err) + } + if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil { + panic(err) + } + break + } + } + }() + + err = cmd.Wait() + if err != nil { + log.Fatalf("cmd.Run() failed with %s\n", err) + } + if errStdout != nil || errStderr != nil { + log.Fatalf("failed to capture stdout or stderr\n") + } + outStr, errStr := string(stdout), string(stderr) + fmt.Printf("\nout:\n%s\nerr:\n%s\n", outStr, errStr) + /*sh.Get(hash,absoluteDir+fileName) + if err != nil { + return "",err + }*/ + defer close(uploadProgress) + + prog := new(processStruct) + prog.Hash=strings.Split(outStr," ")[1] + prog.Process=100.00 + + sh := shell.NewShell(config.GobalIpfsUrl) + objectStat,err :=sh.ObjectStat(prog.Hash) + if err != nil { + return err + } + fmt.Println(objectStat.CumulativeSize) + prog.Size=strconv.Itoa(objectStat.CumulativeSize) + + projson,err :=json.Marshal(prog) + + if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil { + panic(err) + } + + go func() { + serverSh := shell.NewShell(config.ServerIpfsUrl) + err = serverSh.Pin(prog.Hash) + if err != nil { + fmt.Println(err) + } + }() + + //本地文件夹拷贝 + err = sh.Get(prog.Hash,fmt.Sprint((absoluteDir+"\\"+fileName))) + + if err != nil { + fmt.Println(err) + return err + } + + fmt.Println("上传成功") + + return nil +} + +func copyAndCapture(w io.Writer, r io.Reader, progress chan string) ([]byte, error) { + var out []byte + buf := make([]byte, 1024, 1024) + for { + n, err := r.Read(buf[:]) + if n > 0 { + d := buf[:n] + out = append(out, d...) + progress <- string(d) + + } + if err != nil { + // Read returns io.EOF at the end of file, which is not an error for us + if err == io.EOF { + err = nil + } + return out, err + } + } + // never reached + panic(true) + return nil, nil +} + +/** +运行变更生效 +@param id 文件id +*/ +func NotifyFileChange(id string, changeType int){ + +} + +/** +获取本地文件列表 + */ +func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{ + + getLocalFileListDir = fmt.Sprint(config.LocalWorkSpaceDir+"\\"+gobalLoginUserId+"\\"+projectName+"\\") + + for { + err :=filepath.Walk(getLocalFileListDir,walkfunc) + if err != nil { + fmt.Println(err) + continue + } + + mapByte,err:=json.Marshal(gobalFileMap) + if err != nil { + fmt.Println(err) + return err + } + + if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil { + fmt.Println(err) + } + //for k,v := range gobalFileMap { + // if v==1{ + // //socket + // if err := conn.WriteMessage(websocket.TextMessage, []byte(k)); err != nil { + // + // fmt.Println(err) + // continue + // } + // //添加 检查删除 + // //gobalFileMap[k]=0 + // } + //} + fmt.Println("执行睡眠3分钟") + + time.Sleep(time.Duration(3)*time.Minute) + } + + //err :=filepath.Walk(getLocalFileListDir,walkfunc) + //if err != nil { + // return gobalFileMap,err + //} + //fmt.Println(len(gobalFileMap)) + //for s := range gobalFileMap { + // fmt.Println(s) + //} + return nil +} + +func walkfunc(path string, info os.FileInfo, err error) error { + + if info.IsDir()==false{ + + sh := shell.NewShell(config.GobalIpfsUrl) + file,err :=os.Open(path) + if err != nil{ + fmt.Println(err) + return err + } + + defer file.Close() + + hash,err :=sh.Add(file) + if err != nil { + fmt.Println(err) + return err + } + + dir :=strings.Replace(fmt.Sprint(path),fmt.Sprint(getLocalFileListDir),"",1) + + gobalFileMap[dir]=hash + + //文件已修改状态 + //dir :=strings.Replace(path,getLocalFileListDir,"",1) + //if gobalFileMap[dir]==1{ + // return nil + //} + //gobalFileMap[dir]=0 + // + ////判断是否修改 + //f,err:=os.Stat(path) + //if err!=nil{ + // return err + //} + //secondStr := strconv.FormatInt(f.ModTime().Unix(),10) + //if gobalFileUpdateTimeMap[path] == ""{ + // gobalFileUpdateTimeMap[path]=secondStr + //}else{ + // if gobalFileUpdateTimeMap[path]!=secondStr{ + // gobalFileUpdateTimeMap[path]=secondStr + // gobalFileMap[dir]=1 + // } + //} + + } + return nil +} + + + + + + + diff --git a/main.go b/main.go new file mode 100644 index 0000000..976442b --- /dev/null +++ b/main.go @@ -0,0 +1,97 @@ +package main + +import ( + "fmt" + _ "fmt" + "fts/config" + "fts/websocket" + _ "github.com/ipfs/go-ipfs-api" + "io/ioutil" + "log" + "net/http" + "os" + _ "os" + _ "strings" +) + +//func copyAndCapture(w io.Writer, r io.Reader) ([]byte, error) { +// var out []byte +// buf := make([]byte, 1024, 1024) +// for { +// n, err := r.Read(buf[:]) +// if n > 0 { +// d := buf[:n] +// out = append(out, d...) +// os.Stdout.Write(d) +// } +// if err != nil { +// // Read returns io.EOF at the end of file, which is not an error for us +// if err == io.EOF { +// err = nil +// } +// return out, err +// } +// } +// // never reached +// panic(true) +// return nil, nil +//} + + +func main() { + + //cmd := exec.Command("cmd","/C", "ipfs add C:\\Users\\yuan_rh\\Downloads\\QmRzN7uW6HCVAkGMXNWv3rC9dqPJijvEgvtW6DKsQQE8Js") + //var stdout, stderr []byte + //var errStdout, errStderr error + //stdoutIn, _ := cmd.StdoutPipe() + //stderrIn, _ := cmd.StderrPipe() + //cmd.Start() + //go func() { + // stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn) + //}() + //go func() { + // stderr, errStderr = copyAndCapture(os.Stderr, stderrIn) + //}() + // + //err := cmd.Wait() + //if err != nil { + // log.Fatalf("cmd.Run() failed with %s\n", err) + //} + //if errStdout != nil || errStderr != nil { + // log.Fatalf("failed to capture stdout or stderr\n") + //} + //outStr, errStr := string(stdout), string(stderr) + //fmt.Printf("\nout:\n%s\nerr:\n%s\n", outStr, errStr) + config.InitConfig() + //handle.InitLocalWorkSpace("324523676458291200","11.4") + + //http://localhost:7777/ws + http.HandleFunc("/upload", websocket.UploadHandler) + http.HandleFunc("/subscriptionFileChange", websocket.SubscriptionFileChangeHandler) + http.HandleFunc("/download", websocket.DownloadHandler) + http.HandleFunc("/init", websocket.InitLocalWorkSpaceHandler) + + //服务端启动 + fmt.Println("启动服务,监听端口7777") + http.ListenAndServe("0.0.0.0:7777", nil) + + + +} + +// 递归扫描目录 +func ScanDirs(dirName string) []string { + files, err := ioutil.ReadDir(dirName) + if err != nil { + log.Println(err) + } + var fileList []string + for _, file := range files { + fmt.Println(file.Name()+" "+file.ModTime().String()) + fileList = append(fileList, dirName + string(os.PathSeparator) + file.Name()) + if file.IsDir() { + fileList = append(fileList, ScanDirs(dirName + string(os.PathSeparator) + file.Name())...) + } + } + return fileList +} \ No newline at end of file diff --git a/websocket/websocket.go b/websocket/websocket.go new file mode 100644 index 0000000..0ea4415 --- /dev/null +++ b/websocket/websocket.go @@ -0,0 +1,178 @@ +package websocket + +import ( + "fmt" + "fts/handle" + "github.com/gorilla/websocket" + "net/http" + "strings" +) + +var( + upgrader = websocket.Upgrader{ + //允许跨域访问 + CheckOrigin: func(r *http.Request) bool { + return true + }, + } +) + +func UploadHandler(w http.ResponseWriter, r *http.Request) { + //w.Write([]byte("hello")) + //收到http请求(upgrade),完成websocket协议转换 + //在应答的header中放上upgrade:websoket + var ( + conn *websocket.Conn + err error + //msgType int + data []byte + ) + if conn, err = upgrader.Upgrade(w, r, nil); err !=nil { + //报错了,直接返回底层的websocket链接就会终断掉 + return + } + //得到了websocket.Conn长连接的对象,实现数据的收发 + for { + //Text(json), Binary + //if _, data, err = conn.ReadMessage(); err != nil { + if _, data, err = conn.ReadMessage(); err != nil { + //报错关闭websocket + goto ERR + } + //发送数据,判断返回值是否报错 + fmt.Println("UploadHandler param:"+string(data)) + + params :=strings.Split(string(data),"|") + err := handle.UploadCommand(conn,params[0],params[1],params[2],params[3]) + if err!=nil{ + fmt.Println(err) + goto ERR + } + goto ERR + } + //error的标签 +ERR: + conn.Close() +} + +func DownloadHandler(w http.ResponseWriter, r *http.Request) { + //w.Write([]byte("hello")) + //收到http请求(upgrade),完成websocket协议转换 + //在应答的header中放上upgrade:websoket + var ( + conn *websocket.Conn + err error + //msgType int + data []byte + ) + if conn, err = upgrader.Upgrade(w, r, nil); err !=nil { + //报错了,直接返回底层的websocket链接就会终断掉 + return + } + //得到了websocket.Conn长连接的对象,实现数据的收发 + for { + //Text(json), Binary + //if _, data, err = conn.ReadMessage(); err != nil { + if _, data, err = conn.ReadMessage(); err != nil { + //报错关闭websocket + goto ERR + } + //发送数据,判断返回值是否报错 + fmt.Println("DownloadHandler param:"+string(data)) + + params :=strings.Split(string(data),"|") + err := handle.DownCommand(conn,params[0],params[1],params[2],params[3]) + if err!=nil{ + fmt.Println(err) + goto ERR + } + + goto ERR + } + //error的标签 +ERR: + conn.Close() +} + +func InitLocalWorkSpaceHandler(w http.ResponseWriter, r *http.Request) { + //w.Write([]byte("hello")) + //收到http请求(upgrade),完成websocket协议转换 + //在应答的header中放上upgrade:websoket + var ( + conn *websocket.Conn + err error + //msgType int + data []byte + ) + if conn, err = upgrader.Upgrade(w, r, nil); err !=nil { + //报错了,直接返回底层的websocket链接就会终断掉 + return + } + //得到了websocket.Conn长连接的对象,实现数据的收发 + for { + //Text(json), Binary + //if _, data, err = conn.ReadMessage(); err != nil { + if _, data, err = conn.ReadMessage(); err != nil { + //报错关闭websocket + goto ERR + } + //发送数据,判断返回值是否报错 + fmt.Println("InitLocalWorkSpaceHandler param:"+string(data)) + //var fileMap map[string]int + //fileMap,err = handle.GetLocalFileList("testOne") + //jsonbyte,_ := json.Marshal(fileMap) + params :=strings.Split(string(data),"|") + err := handle.InitLocalWorkSpace(params[0],params[1]) + if err!=nil{ + fmt.Println(err) + goto ERR + } + + goto ERR + } + //error的标签 +ERR: + conn.Close() +} + +func SubscriptionFileChangeHandler(w http.ResponseWriter, r *http.Request){ + //w.Write([]byte("hello")) + //收到http请求(upgrade),完成websocket协议转换 + //在应答的header中放上upgrade:websoket + var ( + conn *websocket.Conn + err error + //msgType int + data []byte + ) + if conn, err = upgrader.Upgrade(w, r, nil); err !=nil { + //报错了,直接返回底层的websocket链接就会终断掉 + return + } + //得到了websocket.Conn长连接的对象,实现数据的收发 + for { + //Text(json), Binary + //if _, data, err = conn.ReadMessage(); err != nil { + if _, data, err = conn.ReadMessage(); err != nil { + //报错关闭websocket + goto ERR + } + //发送数据,判断返回值是否报错 + fmt.Println("SubscriptionFileChangeHandler param:"+string(data)) + err = handle.SubscriptionFileChange(conn,string(data)) + + if err != nil { + //报错了 + fmt.Println(err) + goto ERR + } + + } + //error的标签 +ERR: + conn.Close()} + +func main() { + + +} \ No newline at end of file