diff --git a/config/config.go b/config/config.go index 9c532d5..04e4da5 100644 --- a/config/config.go +++ b/config/config.go @@ -2,6 +2,7 @@ package config import ( "github.com/clod-moon/goconf" + "github.com/fsnotify/fsnotify" "log" "os" ) @@ -20,7 +21,10 @@ var GobalIpfsUrl ="localhost:5001" //ipfs引导节点默认网关,安装目录下 bin/fts.ini 可配 var ServerIpfsUrl = "10.240.10.238:5001" -var UpdaterName ="探索者更新.exe" +var UpdaterName ="LOCKING更新.exe" +var EtcdUrl="127.0.0.1:2379" + +var GobalWatch *fsnotify.Watcher //var EtcdUrl="127.0.0.1:2379" @@ -40,6 +44,7 @@ func InitConfig(){ log.Println("配置引导节点:"+ServerIpfsUrl) } - LocalWorkSpaceDir=os.Getenv("USERPROFILE")+"\\easycloud" + ServerIpfsUrl = conf.GetValue("database", "username") + LocalWorkSpaceDir=os.Getenv("TSZDATADIR") } \ No newline at end of file diff --git a/etcdclient/etcdclient.go b/etcdclient/etcdclient.go new file mode 100644 index 0000000..49b204b --- /dev/null +++ b/etcdclient/etcdclient.go @@ -0,0 +1,255 @@ +package etcdclient + +import ( + "context" + "errors" + "fmt" + "fts/config" + shell "github.com/ipfs/go-ipfs-api" + "go.etcd.io/etcd/clientv3" + "log" + "os" + "path/filepath" + "strings" + "time" +) + + + + +func main() { + //deleteAndAdd("C:\\Users\\yuan_rh\\easycloud\\330031270501289985\\2020双11") + //getFileHash() + err :=filepath.Walk("C:\\Users\\yuan_rh\\easycloud\\330031270501289985\\2020双11",walkfunc) + if err!=nil{ + log.Println(err) + return + } + + err = BatchAdd(gobalFileMap) + if err!=nil{ + log.Println(err) + return + } +} + +func getFileHash() error{ + + cli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{config.EtcdUrl}, + DialTimeout: 5 * time.Second, + }) + if err != nil { + // handle error! + log.Printf("connect to etcd failed, err:%v\n", err) + return err + } + log.Println("connect to etcd success") + defer cli.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*60) + resp, err := cli.Get(ctx, "key:330031270501289985:2020双11:file") + cancel() + if err != nil { + fmt.Printf("get from etcd failed, err:%v\n", err) + return err + } + + for _, ev := range resp.Kvs { + log.Print(string(ev.Value)) + } + + return nil +} + +/** +获取客户端连接 + */ +func GetClient() (*clientv3.Client,error){ + + cli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{config.EtcdUrl}, + DialTimeout: 5 * time.Second, + }) + if err != nil { + // handle error! + log.Printf("connect to etcd failed, err:%v\n", err) + return nil,err + } + + log.Println("connect to etcd success") + return cli,nil +} + +/** +删除包含该前缀的所有Key +*/ +func DeleteWithPrefix(prefix string) error{ + + if len(prefix)==0{ + return errors.New("参数prefix不能为空") + } + + client,err := GetClient() + if err!=nil{ + return err + } + defer client.Close() + + _, err = client.KV.Delete(context.Background(),prefix, clientv3.WithPrefix()) + if err!=nil{ + log.Println(err) + return err + } + + return err +} + +/** +批量增加 + */ +func BatchAdd(dataMap map[string] string) error{ + + if dataMap==nil || len(dataMap)==0{ + return errors.New("dataMap为空") + } + + client,err := GetClient() + if err!=nil{ + return err + } + defer client.Close() + + for k, v := range dataMap { + + _, err = client.KV.Put(context.Background(), k, v) + + if err!=nil{ + log.Println(err) + return err + } + } + + return nil +} + +/** + 替换k,v + */ +func ReplaceInto(k, v string) error{ + + client,err := GetClient() + if err!=nil{ + return err + } + defer client.Close() + + _,err =client.KV.Put(context.Background(), k, v) + if err!=nil{ + log.Println(err) + return err + } + + return nil +} + +/** +根据后缀查询 + */ +func QueryWithPrefix(prefix string) (map[string] string,error){ + client,err := GetClient() + if err!=nil{ + log.Println(err) + return nil,err + } + defer client.Close() + + resp,err :=client.KV.Get(context.Background(),prefix, clientv3.WithPrefix()) + if err!=nil{ + log.Println(err) + return nil,err + } + var gobalFileMap = make(map[string] string) + for _,v := range resp.Kvs{ + gobalFileMap[string(v.Key)]=string(v.Value) + } + return gobalFileMap,nil; +} + + +func deleteAndAdd(path string) error{ + + cli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{config.EtcdUrl}, + DialTimeout: 5 * time.Second, + }) + if err != nil { + // handle error! + log.Printf("connect to etcd failed, err:%v\n", err) + return err + } + log.Println("connect to etcd success") + defer cli.Close() + + err =filepath.Walk(path,walkfunc) + if err!=nil{ + log.Println(err) + return err + } + + //jsonByte,err :=json.Marshal(gobalFileMap) + //jsonString :=string(jsonByte) + + //ctx, cancel := context.WithTimeout(context.Background(), time.Second*120) + + _,err =cli.Delete(context.Background(),"进入采购",clientv3.WithPrefix()) + log.Println(err) + /*for k, v := range gobalFileMap { + //log.Print(string(v)) + _, err = cli.Put(ctx, k, v) + + if err!=nil{ + log.Println(err) + } + }*/ + //_, err = cli.Put(ctx, "key:330031270501289985:2020双11:file", jsonString) + + //cancel() + if err != nil { + log.Printf("put to etcd failed, err:%v\n", err) + return err + } + return nil +} + +var gobalFileMap = make(map[string] string) +var getLocalFileListDir string = "C:\\Users\\yuan_rh\\easycloud\\330031270501289985\\2020双11\\" + + +func walkfunc(path string, info os.FileInfo, err error) error { + + if info.IsDir()==false{ + + sh := shell.NewShell(config.GobalIpfsUrl) + file,err :=os.Open(path) + + if err != nil{ + log.Println(err) + return err + } + + defer file.Close() + + hash,err :=sh.Add(file) + if err != nil { + log.Println(err) + return err + } + + dir :=strings.Replace(fmt.Sprint(path),fmt.Sprint(getLocalFileListDir),"",1) + + gobalFileMap[dir]=hash + + } + return nil +} diff --git a/handle/handle.go b/handle/handle.go index 9668ad0..665f1ce 100644 --- a/handle/handle.go +++ b/handle/handle.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "fts/config" + "fts/etcdclient" "github.com/gorilla/websocket" _ "github.com/ipfs/go-ipfs-api" shell "github.com/ipfs/go-ipfs-api" @@ -18,7 +19,7 @@ import ( "time" ) -var gobalLoginUserId string +var gobalLoginUserName string //key:filepath,value:hash var gobalFileMap = make(map[string] string) @@ -64,24 +65,20 @@ func main() { @param userId 用户ID @param projectName 项目名称 */ -func InitLocalWorkSpace(conn *websocket.Conn,userId,projectName string) (error){ +func InitLocalWorkSpace(conn *websocket.Conn,userName,projectName string) (error){ //空格路径处理 ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\ipfs.exe" //初始化当前登陆用户 - gobalLoginUserId = userId + gobalLoginUserName = userName // 检查本地目录是否存在 - var projectPath = config.LocalWorkSpaceDir +"\\"+userId+"\\"+projectName + var projectPath = config.LocalWorkSpaceDir+userName+"\\"+projectName _,err := os.Stat(projectPath) if err != nil { //创建文件目录 os.MkdirAll(projectPath, os.ModePerm) - /*os.MkdirAll(projectPath+"\\我的文件", os.ModePerm) - os.MkdirAll(projectPath+"\\工作文件", os.ModePerm) - os.MkdirAll(projectPath+"\\协作文件", os.ModePerm) - os.MkdirAll(projectPath+"\\公共文件", os.ModePerm)*/ } log.Println("切换本地工作目录至 "+projectPath) @@ -126,7 +123,7 @@ func InitClientConfig(ipfsApi,ipfsBootstrap string) error{ */ func DownCommand(conn *websocket.Conn, hash, projectName, fileName, dir string) error{ - absoluteDir := config.LocalWorkSpaceDir+"\\"+gobalLoginUserId+"\\"+projectName+"\\"+dir + absoluteDir := config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName+"\\"+dir //检查目录 _,err := os.Stat(absoluteDir) if err != nil { @@ -252,6 +249,14 @@ func DownCommand(conn *websocket.Conn, hash, projectName, fileName, dir string) outStr := string(stdout) log.Printf("out:%s", outStr) + //TODO test 更新数据库hash + key := gobalLoginUserName+"\\"+projectName+"\\"+dir+"\\"+fileName + err = etcdclient.ReplaceInto(key,hash) + if err != nil { + log.Println(err) + return err + } + if err==nil{ log.Println("下载成功") } @@ -306,7 +311,7 @@ func contentToJSONByte(content string) ([]byte,error){ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir string) error{ //本地拷贝文件 - absoluteDir := config.LocalWorkSpaceDir+"\\"+gobalLoginUserId+"\\"+projectName+"\\"+dir + absoluteDir := config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName+"\\"+dir //检查目录 _,err := os.Stat(absoluteDir) if err != nil { @@ -475,6 +480,14 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir st return err } + //TODO test 更新数据库hash + key := gobalLoginUserName+"\\"+projectName+"\\"+dir+"\\"+fileName + err = etcdclient.ReplaceInto(key,prog.Hash) + if err != nil { + log.Println(err) + return err + } + log.Println("上传成功") return nil @@ -588,21 +601,81 @@ func myWalkfunc(path string, info os.FileInfo, err error) error { return nil } +/** +本地文件是否存在 + */ +func fileExist(path string) bool { + _, err := os.Lstat(path) + return !os.IsNotExist(err) +} + /** 获取本地文件列表 */ func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{ - getLocalFileListDir = fmt.Sprint(config.LocalWorkSpaceDir+"\\"+gobalLoginUserId+"\\"+projectName+"\\") + getLocalFileListDir = fmt.Sprint(config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName+"\\") + keyPrefix := gobalLoginUserName+"\\"+projectName+"\\" + + //定期校验缓存的本地文件状态 + go func() { + for true { + time.Sleep(time.Duration(1)*time.Minute) + dataMap,err := etcdclient.QueryWithPrefix(keyPrefix) + if err != nil { + log.Println(err) + continue + } + if dataMap!=nil && len(dataMap)>0{ + for k,_ := range dataMap { + if !fileExist(config.LocalWorkSpaceDir+k){ + err = etcdclient.DeleteWithPrefix(k) + if err != nil { + log.Println(err) + } + } + } + } + } + }() + + for true { - for { - err :=filepath.Walk(getLocalFileListDir,walkfunc) + //优先etcd查询 + dataMap,err := etcdclient.QueryWithPrefix(keyPrefix) if err != nil { log.Println(err) + return err + } + if dataMap!=nil && len(dataMap)>0{ + + cacheMap := make(map[string] string) + for k,v := range dataMap { + cacheMap[strings.Replace(k,gobalLoginUserName+"\\"+projectName+"\\","",1)]=v + } + + mapByte,err:=json.Marshal(cacheMap) + if err != nil { + log.Println(err) + return err + } + + if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil { + log.Println(err) + return err + } time.Sleep(time.Duration(1)*time.Minute) continue } + // 不存在则初始化进etcd + + err =filepath.Walk(getLocalFileListDir,walkfunc) + if err != nil { + log.Println(err) + //time.Sleep(time.Duration(1)*time.Minute) + } + mapByte,err:=json.Marshal(gobalFileMap) if err != nil { log.Println(err) @@ -614,17 +687,60 @@ func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{ return err } - log.Println("检查本地目录文件变更状态,执行睡眠时间1分钟") + cacheMap := make(map[string] string) + for k,v := range gobalFileMap { + k := strings.Replace(k,config.LocalWorkSpaceDir,"",1) + cacheMap[k]=v + + } + + //异步缓存 + //go func() { + err = etcdclient.BatchAdd(cacheMap) + if err != nil { + log.Println(err) + } + //}() + + //log.Println("检查本地目录文件变更状态,执行睡眠时间1分钟") //清空gobalFileMap gobalFileMap = make(map[string] string) - time.Sleep(time.Duration(1)*time.Minute) } return nil } + + +/** +TODO 文件变更通知 + */ +func FileChangeNotify(){ + +} + +/** +TODO 监视文件变动 + */ +func WatchFile(filePaths string) error{ + if len(filePaths)==0{ + //TODO + return nil + } + files := strings.Split(filePaths,";") + for _,file := range files{ + err :=config.GobalWatch.Add(file) + if err != nil { + log.Println(err) + } + log.Println("文件[ "+file+" ]添加监听事件成功") + } + + return nil +} + /** 打开方式 */ @@ -657,13 +773,34 @@ func CheckForUpdates(forceUpdate string) error{ tszdir :=os.Getenv("TSZDIR") + //空格路径处理 + ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\stop.vbs" + + //判断文件有效性 _,err := os.Stat(tszdir+config.UpdaterName) if err!=nil{ return err } - cmd := exec.Command(tszdir+config.UpdaterName,"/checknow"); + + cmd := exec.Command(tszdir+config.UpdaterName,"/justcheck"); + err =cmd.Run() + if err!=nil{ + log.Println(err) + return err + } + + cmd = exec.Command(tszdir+config.UpdaterName,"/checknow"); + err =cmd.Run() + if err!=nil{ + log.Println(err) + return err + } + + //检测到更新 kill所有客户端进程 + log.Println("close all process") + cmd = exec.Command("cmd.exe","/c",ipfsPath); err =cmd.Run() if err!=nil{ log.Println(err) diff --git a/main.go b/main.go index 701316b..c9381f3 100644 --- a/main.go +++ b/main.go @@ -16,6 +16,8 @@ import ( _ "strings" ) + + func main() { config.InitConfig() @@ -39,6 +41,61 @@ func main() { log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) //handle.InitLocalWorkSpace(nil,"324523676458291200","11.4") + //文件监控 + /*config.GobalWatch, err = fsnotify.NewWatcher() + if err != nil { + log.Println(err) + } + + go func() { + for { + select { + case ev := <-config.GobalWatch.Events: + { + log.Println(ev.Op.String()+":"+ev.Name) + if ev.Op&fsnotify.Create == fsnotify.Create { + fmt.Println("创建文件 : ", ev.Name); + //这里获取新创建文件的信息,如果是目录,则加入监控中 + fi, err := os.Stat(ev.Name); + if err == nil && fi.IsDir() { + config.GobalWatch.Add(ev.Name); + fmt.Println("添加监控 : ", ev.Name); + } + } + if ev.Op&fsnotify.Write == fsnotify.Write { + fmt.Println("写入文件 : ", ev.Name); + } + if ev.Op&fsnotify.Remove == fsnotify.Remove { + fmt.Println("删除文件 : ", ev.Name); + //如果删除文件是目录,则移除监控 + fi, err := os.Stat(ev.Name); + if err == nil && fi.IsDir() { + config.GobalWatch.Remove(ev.Name); + fmt.Println("删除监控 : ", ev.Name); + } + } + if ev.Op&fsnotify.Rename == fsnotify.Rename { + fmt.Println("重命名文件 : ", ev.Name); + //如果重命名文件是目录,则移除监控 + //注意这里无法使用os.Stat来判断是否是目录了 + //因为重命名后,go已经无法找到原文件来获取信息了 + //所以这里就简单粗爆的直接remove好了 + config.GobalWatch.Remove(ev.Name); + } + if ev.Op&fsnotify.Chmod == fsnotify.Chmod { + fmt.Println("修改权限 : ", ev.Name); + } + } + case err := <-config.GobalWatch.Errors: + { + fmt.Println("error : ", err); + return; + } + } + } + }();*/ + + //defer config.GobalWatch.Close() //http://localhost:7777/ws http.HandleFunc("/upload", websocket.UploadHandler) @@ -49,11 +106,15 @@ func main() { http.HandleFunc("/openFileWith", websocket.OpenFileWithHandler) http.HandleFunc("/checkForUpdates", websocket.CheckForUpdatesHandler) http.HandleFunc("/initClientConfig", websocket.InitClientConfigHandler) + http.HandleFunc("/watchFile", websocket.WatchFileHandler) //服务端启动 log.Println("服务启动成功,监听端口7777,等待连接。") http.ListenAndServe("0.0.0.0:7777", nil) + + + } //func main() { @@ -75,6 +136,7 @@ func (w *Watch) watchDir(dir string) { filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { //这里判断是否为目录,只需监控目录即可 //目录下的文件也在监控范围内,不需要我们一个一个加 + if info.IsDir() { path, err := filepath.Abs(path); if err != nil { @@ -84,7 +146,7 @@ func (w *Watch) watchDir(dir string) { if err != nil { return err; } - fmt.Println("监控 : ", path); + fmt.Println("监控目录 : ", path); } return nil; }); diff --git a/websocket/websocket.go b/websocket/websocket.go index a85f163..443620f 100644 --- a/websocket/websocket.go +++ b/websocket/websocket.go @@ -297,6 +297,44 @@ ERR: conn.Close() } +func WatchFileHandler(w http.ResponseWriter, r *http.Request) { + //w.Write([]byte("hello")) + //收到http请求(upgrade),完成websocket协议转换 + //在应答的header中放上upgrade:websoket + var ( + conn *websocket.Conn + err error + //msgType int + data []byte + ) + if conn, err = upgrader.Upgrade(w, r, nil); err !=nil { + //报错了,直接返回底层的websocket链接就会终断掉 + return + } + //得到了websocket.Conn长连接的对象,实现数据的收发 + for { + //Text(json), Binary + //if _, data, err = conn.ReadMessage(); err != nil { + if _, data, err = conn.ReadMessage(); err != nil { + //报错关闭websocket + goto ERR + } + //发送数据,判断返回值是否报错 + log.Println("param WatchFile:"+string(data)) + + err := handle.WatchFile(string(data)) + if err!=nil{ + log.Println(err) + goto ERR + } + + goto ERR + } + //error的标签 +ERR: + conn.Close() +} + func SubscriptionFileChangeHandler(w http.ResponseWriter, r *http.Request){ //w.Write([]byte("hello")) //收到http请求(upgrade),完成websocket协议转换 @@ -328,7 +366,7 @@ func SubscriptionFileChangeHandler(w http.ResponseWriter, r *http.Request){ log.Println(err) goto ERR } - + goto ERR } //error的标签 ERR: