yuan_rh пре 4 година
родитељ
комит
e4890b3c32
5 измењених фајлова са 517 додато и 20 уклоњено
  1. +7
    -2
      config/config.go
  2. +255
    -0
      etcdclient/etcdclient.go
  3. +153
    -16
      handle/handle.go
  4. +63
    -1
      main.go
  5. +39
    -1
      websocket/websocket.go

+ 7
- 2
config/config.go Прегледај датотеку

@@ -2,6 +2,7 @@ package config

import (
"github.com/clod-moon/goconf"
"github.com/fsnotify/fsnotify"
"log"
"os"
)
@@ -20,7 +21,10 @@ var GobalIpfsUrl ="localhost:5001"
//ipfs引导节点默认网关,安装目录下 bin/fts.ini 可配
var ServerIpfsUrl = "10.240.10.238:5001"

var UpdaterName ="探索者更新.exe"
var UpdaterName ="LOCKING更新.exe"
var EtcdUrl="127.0.0.1:2379"

var GobalWatch *fsnotify.Watcher

//var EtcdUrl="127.0.0.1:2379"

@@ -40,6 +44,7 @@ func InitConfig(){
log.Println("配置引导节点:"+ServerIpfsUrl)
}

LocalWorkSpaceDir=os.Getenv("USERPROFILE")+"\\easycloud"
ServerIpfsUrl = conf.GetValue("database", "username")
LocalWorkSpaceDir=os.Getenv("TSZDATADIR")

}

+ 255
- 0
etcdclient/etcdclient.go Прегледај датотеку

@@ -0,0 +1,255 @@
package etcdclient

import (
"context"
"errors"
"fmt"
"fts/config"
shell "github.com/ipfs/go-ipfs-api"
"go.etcd.io/etcd/clientv3"
"log"
"os"
"path/filepath"
"strings"
"time"
)




func main() {
//deleteAndAdd("C:\\Users\\yuan_rh\\easycloud\\330031270501289985\\2020双11")
//getFileHash()
err :=filepath.Walk("C:\\Users\\yuan_rh\\easycloud\\330031270501289985\\2020双11",walkfunc)
if err!=nil{
log.Println(err)
return
}

err = BatchAdd(gobalFileMap)
if err!=nil{
log.Println(err)
return
}
}

func getFileHash() error{

cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{config.EtcdUrl},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
log.Printf("connect to etcd failed, err:%v\n", err)
return err
}
log.Println("connect to etcd success")
defer cli.Close()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
resp, err := cli.Get(ctx, "key:330031270501289985:2020双11:file")
cancel()
if err != nil {
fmt.Printf("get from etcd failed, err:%v\n", err)
return err
}

for _, ev := range resp.Kvs {
log.Print(string(ev.Value))
}

return nil
}

/**
获取客户端连接
*/
func GetClient() (*clientv3.Client,error){

cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{config.EtcdUrl},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
log.Printf("connect to etcd failed, err:%v\n", err)
return nil,err
}

log.Println("connect to etcd success")
return cli,nil
}

/**
删除包含该前缀的所有Key
*/
func DeleteWithPrefix(prefix string) error{

if len(prefix)==0{
return errors.New("参数prefix不能为空")
}

client,err := GetClient()
if err!=nil{
return err
}
defer client.Close()

_, err = client.KV.Delete(context.Background(),prefix, clientv3.WithPrefix())
if err!=nil{
log.Println(err)
return err
}

return err
}

/**
批量增加
*/
func BatchAdd(dataMap map[string] string) error{

if dataMap==nil || len(dataMap)==0{
return errors.New("dataMap为空")
}

client,err := GetClient()
if err!=nil{
return err
}
defer client.Close()

for k, v := range dataMap {

_, err = client.KV.Put(context.Background(), k, v)

if err!=nil{
log.Println(err)
return err
}
}

return nil
}

/**
替换k,v
*/
func ReplaceInto(k, v string) error{

client,err := GetClient()
if err!=nil{
return err
}
defer client.Close()

_,err =client.KV.Put(context.Background(), k, v)
if err!=nil{
log.Println(err)
return err
}

return nil
}

/**
根据后缀查询
*/
func QueryWithPrefix(prefix string) (map[string] string,error){
client,err := GetClient()
if err!=nil{
log.Println(err)
return nil,err
}
defer client.Close()

resp,err :=client.KV.Get(context.Background(),prefix, clientv3.WithPrefix())
if err!=nil{
log.Println(err)
return nil,err
}
var gobalFileMap = make(map[string] string)
for _,v := range resp.Kvs{
gobalFileMap[string(v.Key)]=string(v.Value)
}
return gobalFileMap,nil;
}


func deleteAndAdd(path string) error{

cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{config.EtcdUrl},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
log.Printf("connect to etcd failed, err:%v\n", err)
return err
}
log.Println("connect to etcd success")
defer cli.Close()

err =filepath.Walk(path,walkfunc)
if err!=nil{
log.Println(err)
return err
}

//jsonByte,err :=json.Marshal(gobalFileMap)
//jsonString :=string(jsonByte)

//ctx, cancel := context.WithTimeout(context.Background(), time.Second*120)

_,err =cli.Delete(context.Background(),"进入采购",clientv3.WithPrefix())
log.Println(err)
/*for k, v := range gobalFileMap {
//log.Print(string(v))
_, err = cli.Put(ctx, k, v)

if err!=nil{
log.Println(err)
}
}*/
//_, err = cli.Put(ctx, "key:330031270501289985:2020双11:file", jsonString)

//cancel()
if err != nil {
log.Printf("put to etcd failed, err:%v\n", err)
return err
}
return nil
}

var gobalFileMap = make(map[string] string)
var getLocalFileListDir string = "C:\\Users\\yuan_rh\\easycloud\\330031270501289985\\2020双11\\"


func walkfunc(path string, info os.FileInfo, err error) error {

if info.IsDir()==false{

sh := shell.NewShell(config.GobalIpfsUrl)
file,err :=os.Open(path)

if err != nil{
log.Println(err)
return err
}

defer file.Close()

hash,err :=sh.Add(file)
if err != nil {
log.Println(err)
return err
}

dir :=strings.Replace(fmt.Sprint(path),fmt.Sprint(getLocalFileListDir),"",1)

gobalFileMap[dir]=hash

}
return nil
}

+ 153
- 16
handle/handle.go Прегледај датотеку

@@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"fts/config"
"fts/etcdclient"
"github.com/gorilla/websocket"
_ "github.com/ipfs/go-ipfs-api"
shell "github.com/ipfs/go-ipfs-api"
@@ -18,7 +19,7 @@ import (
"time"
)

var gobalLoginUserId string
var gobalLoginUserName string

//key:filepath,value:hash
var gobalFileMap = make(map[string] string)
@@ -64,24 +65,20 @@ func main() {
@param userId 用户ID
@param projectName 项目名称
*/
func InitLocalWorkSpace(conn *websocket.Conn,userId,projectName string) (error){
func InitLocalWorkSpace(conn *websocket.Conn,userName,projectName string) (error){

//空格路径处理
ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\ipfs.exe"

//初始化当前登陆用户
gobalLoginUserId = userId
gobalLoginUserName = userName

// 检查本地目录是否存在
var projectPath = config.LocalWorkSpaceDir +"\\"+userId+"\\"+projectName
var projectPath = config.LocalWorkSpaceDir+userName+"\\"+projectName
_,err := os.Stat(projectPath)
if err != nil {
//创建文件目录
os.MkdirAll(projectPath, os.ModePerm)
/*os.MkdirAll(projectPath+"\\我的文件", os.ModePerm)
os.MkdirAll(projectPath+"\\工作文件", os.ModePerm)
os.MkdirAll(projectPath+"\\协作文件", os.ModePerm)
os.MkdirAll(projectPath+"\\公共文件", os.ModePerm)*/
}

log.Println("切换本地工作目录至 "+projectPath)
@@ -126,7 +123,7 @@ func InitClientConfig(ipfsApi,ipfsBootstrap string) error{
*/
func DownCommand(conn *websocket.Conn, hash, projectName, fileName, dir string) error{

absoluteDir := config.LocalWorkSpaceDir+"\\"+gobalLoginUserId+"\\"+projectName+"\\"+dir
absoluteDir := config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName+"\\"+dir
//检查目录
_,err := os.Stat(absoluteDir)
if err != nil {
@@ -252,6 +249,14 @@ func DownCommand(conn *websocket.Conn, hash, projectName, fileName, dir string)
outStr := string(stdout)
log.Printf("out:%s", outStr)

//TODO test 更新数据库hash
key := gobalLoginUserName+"\\"+projectName+"\\"+dir+"\\"+fileName
err = etcdclient.ReplaceInto(key,hash)
if err != nil {
log.Println(err)
return err
}

if err==nil{
log.Println("下载成功")
}
@@ -306,7 +311,7 @@ func contentToJSONByte(content string) ([]byte,error){
func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir string) error{

//本地拷贝文件
absoluteDir := config.LocalWorkSpaceDir+"\\"+gobalLoginUserId+"\\"+projectName+"\\"+dir
absoluteDir := config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName+"\\"+dir
//检查目录
_,err := os.Stat(absoluteDir)
if err != nil {
@@ -475,6 +480,14 @@ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir st
return err
}

//TODO test 更新数据库hash
key := gobalLoginUserName+"\\"+projectName+"\\"+dir+"\\"+fileName
err = etcdclient.ReplaceInto(key,prog.Hash)
if err != nil {
log.Println(err)
return err
}

log.Println("上传成功")

return nil
@@ -588,21 +601,81 @@ func myWalkfunc(path string, info os.FileInfo, err error) error {
return nil
}

/**
本地文件是否存在
*/
func fileExist(path string) bool {
_, err := os.Lstat(path)
return !os.IsNotExist(err)
}

/**
获取本地文件列表
*/
func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{

getLocalFileListDir = fmt.Sprint(config.LocalWorkSpaceDir+"\\"+gobalLoginUserId+"\\"+projectName+"\\")
getLocalFileListDir = fmt.Sprint(config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName+"\\")
keyPrefix := gobalLoginUserName+"\\"+projectName+"\\"

//定期校验缓存的本地文件状态
go func() {
for true {
time.Sleep(time.Duration(1)*time.Minute)
dataMap,err := etcdclient.QueryWithPrefix(keyPrefix)
if err != nil {
log.Println(err)
continue
}
if dataMap!=nil && len(dataMap)>0{
for k,_ := range dataMap {
if !fileExist(config.LocalWorkSpaceDir+k){
err = etcdclient.DeleteWithPrefix(k)
if err != nil {
log.Println(err)
}
}
}
}
}
}()

for true {

for {
err :=filepath.Walk(getLocalFileListDir,walkfunc)
//优先etcd查询
dataMap,err := etcdclient.QueryWithPrefix(keyPrefix)
if err != nil {
log.Println(err)
return err
}
if dataMap!=nil && len(dataMap)>0{

cacheMap := make(map[string] string)
for k,v := range dataMap {
cacheMap[strings.Replace(k,gobalLoginUserName+"\\"+projectName+"\\","",1)]=v
}

mapByte,err:=json.Marshal(cacheMap)
if err != nil {
log.Println(err)
return err
}

if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil {
log.Println(err)
return err
}
time.Sleep(time.Duration(1)*time.Minute)
continue
}

// 不存在则初始化进etcd

err =filepath.Walk(getLocalFileListDir,walkfunc)
if err != nil {
log.Println(err)
//time.Sleep(time.Duration(1)*time.Minute)
}

mapByte,err:=json.Marshal(gobalFileMap)
if err != nil {
log.Println(err)
@@ -614,17 +687,60 @@ func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{
return err
}

log.Println("检查本地目录文件变更状态,执行睡眠时间1分钟")
cacheMap := make(map[string] string)
for k,v := range gobalFileMap {
k := strings.Replace(k,config.LocalWorkSpaceDir,"",1)
cacheMap[k]=v

}

//异步缓存
//go func() {
err = etcdclient.BatchAdd(cacheMap)
if err != nil {
log.Println(err)
}
//}()

//log.Println("检查本地目录文件变更状态,执行睡眠时间1分钟")

//清空gobalFileMap
gobalFileMap = make(map[string] string)

time.Sleep(time.Duration(1)*time.Minute)
}

return nil
}



/**
TODO 文件变更通知
*/
func FileChangeNotify(){

}

/**
TODO 监视文件变动
*/
func WatchFile(filePaths string) error{
if len(filePaths)==0{
//TODO
return nil
}
files := strings.Split(filePaths,";")
for _,file := range files{
err :=config.GobalWatch.Add(file)
if err != nil {
log.Println(err)
}
log.Println("文件[ "+file+" ]添加监听事件成功")
}

return nil
}

/**
打开方式
*/
@@ -657,13 +773,34 @@ func CheckForUpdates(forceUpdate string) error{

tszdir :=os.Getenv("TSZDIR")

//空格路径处理
ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\stop.vbs"


//判断文件有效性
_,err := os.Stat(tszdir+config.UpdaterName)
if err!=nil{
return err
}

cmd := exec.Command(tszdir+config.UpdaterName,"/checknow");

cmd := exec.Command(tszdir+config.UpdaterName,"/justcheck");
err =cmd.Run()
if err!=nil{
log.Println(err)
return err
}

cmd = exec.Command(tszdir+config.UpdaterName,"/checknow");
err =cmd.Run()
if err!=nil{
log.Println(err)
return err
}

//检测到更新 kill所有客户端进程
log.Println("close all process")
cmd = exec.Command("cmd.exe","/c",ipfsPath);
err =cmd.Run()
if err!=nil{
log.Println(err)


+ 63
- 1
main.go Прегледај датотеку

@@ -16,6 +16,8 @@ import (
_ "strings"
)



func main() {

config.InitConfig()
@@ -39,6 +41,61 @@ func main() {
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)

//handle.InitLocalWorkSpace(nil,"324523676458291200","11.4")
//文件监控
/*config.GobalWatch, err = fsnotify.NewWatcher()
if err != nil {
log.Println(err)
}

go func() {
for {
select {
case ev := <-config.GobalWatch.Events:
{
log.Println(ev.Op.String()+":"+ev.Name)
if ev.Op&fsnotify.Create == fsnotify.Create {
fmt.Println("创建文件 : ", ev.Name);
//这里获取新创建文件的信息,如果是目录,则加入监控中
fi, err := os.Stat(ev.Name);
if err == nil && fi.IsDir() {
config.GobalWatch.Add(ev.Name);
fmt.Println("添加监控 : ", ev.Name);
}
}
if ev.Op&fsnotify.Write == fsnotify.Write {
fmt.Println("写入文件 : ", ev.Name);
}
if ev.Op&fsnotify.Remove == fsnotify.Remove {
fmt.Println("删除文件 : ", ev.Name);
//如果删除文件是目录,则移除监控
fi, err := os.Stat(ev.Name);
if err == nil && fi.IsDir() {
config.GobalWatch.Remove(ev.Name);
fmt.Println("删除监控 : ", ev.Name);
}
}
if ev.Op&fsnotify.Rename == fsnotify.Rename {
fmt.Println("重命名文件 : ", ev.Name);
//如果重命名文件是目录,则移除监控
//注意这里无法使用os.Stat来判断是否是目录了
//因为重命名后,go已经无法找到原文件来获取信息了
//所以这里就简单粗爆的直接remove好了
config.GobalWatch.Remove(ev.Name);
}
if ev.Op&fsnotify.Chmod == fsnotify.Chmod {
fmt.Println("修改权限 : ", ev.Name);
}
}
case err := <-config.GobalWatch.Errors:
{
fmt.Println("error : ", err);
return;
}
}
}
}();*/

//defer config.GobalWatch.Close()

//http://localhost:7777/ws
http.HandleFunc("/upload", websocket.UploadHandler)
@@ -49,11 +106,15 @@ func main() {
http.HandleFunc("/openFileWith", websocket.OpenFileWithHandler)
http.HandleFunc("/checkForUpdates", websocket.CheckForUpdatesHandler)
http.HandleFunc("/initClientConfig", websocket.InitClientConfigHandler)
http.HandleFunc("/watchFile", websocket.WatchFileHandler)

//服务端启动
log.Println("服务启动成功,监听端口7777,等待连接。")
http.ListenAndServe("0.0.0.0:7777", nil)




}

//func main() {
@@ -75,6 +136,7 @@ func (w *Watch) watchDir(dir string) {
filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
//这里判断是否为目录,只需监控目录即可
//目录下的文件也在监控范围内,不需要我们一个一个加

if info.IsDir() {
path, err := filepath.Abs(path);
if err != nil {
@@ -84,7 +146,7 @@ func (w *Watch) watchDir(dir string) {
if err != nil {
return err;
}
fmt.Println("监控 : ", path);
fmt.Println("监控目录 : ", path);
}
return nil;
});


+ 39
- 1
websocket/websocket.go Прегледај датотеку

@@ -297,6 +297,44 @@ ERR:
conn.Close()
}

func WatchFileHandler(w http.ResponseWriter, r *http.Request) {
//w.Write([]byte("hello"))
//收到http请求(upgrade),完成websocket协议转换
//在应答的header中放上upgrade:websoket
var (
conn *websocket.Conn
err error
//msgType int
data []byte
)
if conn, err = upgrader.Upgrade(w, r, nil); err !=nil {
//报错了,直接返回底层的websocket链接就会终断掉
return
}
//得到了websocket.Conn长连接的对象,实现数据的收发
for {
//Text(json), Binary
//if _, data, err = conn.ReadMessage(); err != nil {
if _, data, err = conn.ReadMessage(); err != nil {
//报错关闭websocket
goto ERR
}
//发送数据,判断返回值是否报错
log.Println("param WatchFile:"+string(data))

err := handle.WatchFile(string(data))
if err!=nil{
log.Println(err)
goto ERR
}

goto ERR
}
//error的标签
ERR:
conn.Close()
}

func SubscriptionFileChangeHandler(w http.ResponseWriter, r *http.Request){
//w.Write([]byte("hello"))
//收到http请求(upgrade),完成websocket协议转换
@@ -328,7 +366,7 @@ func SubscriptionFileChangeHandler(w http.ResponseWriter, r *http.Request){
log.Println(err)
goto ERR
}
goto ERR
}
//error的标签
ERR:


Loading…
Откажи
Сачувај