|
- package handle
-
- import (
- "bufio"
- "bytes"
- "context"
- "crypto/md5"
- "encoding/json"
- "errors"
- "fmt"
- "fts/config"
- "fts/etcdclient"
- "fts/nsqclient"
- "github.com/gorilla/websocket"
- _ "github.com/ipfs/go-ipfs-api"
- shell "github.com/ipfs/go-ipfs-api"
- "io"
- "io/ioutil"
- "log"
- "net/http"
- "os"
- "os/exec"
- "path"
- "path/filepath"
- "strconv"
- "strings"
- "time"
- )
-
- //登陆账号
- var gobalLoginUserName string
- //登陆账号Id
- var gobalLoginUserId string
-
- //key:filepath,value:hash
- var gobalFileMap = make(map[string] string)
- var gobalFileDownLoadingMap = make(map[string] int)
-
- //手动上传文件,非自动上传
- var goabalAddFileMap = make(map[string] int)
-
- //本地项目空间目录
- var gobalLocalProjectDir string
-
- var gobalSubscriptionFileChangeSwitch int =0 //订阅文件变更开关
-
- //全局消息通知管道
- var GobalMessageNotify = make(chan string,1000)
-
-
-
- var ipfsPath=os.Getenv("IPFS-PATH")
-
- /**
- 文件上传下载进度
- */
- type processStruct struct {
- Size string `json:"size"`
- CurrentSize string `json:"currentSize"`
- Unit string `json:"unit"`
- CurrentUnit string `json:"currentUnit"`
- Process float64 `json:"process"`
- Hash string `json:"hash"`
- CommitHistoryHash string `json:"commitHistoryHash"`
- Version int `json:"version"`
- }
-
-
- /**
- 初始化本地客户端配置,包括ipfs网关、引导节点
- @param ipfsApi ipfs网关 例如:http://192.168.1.1:5001
- @param ipfsBootstrap ipfs引导节点,多个用;分割 例如:/dns/www.lockingos.org/tcp/4001/p2p/12D3KooWER8uoGrzeLuJHTXoTMnR4jjHfpcA6ZcpXBaQNJvG5jMP
- */
- func InitClientConfig(ipfsApi,ipfsBootstrap string) error{
-
- //空格路径处理
- ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\ipfs.exe"
-
- config.ServerIpfsUrl = ipfsApi
- log.Println("配置客户端网关:"+config.ServerIpfsUrl)
- ipfsBootstraps := strings.Split(ipfsBootstrap,";")
- for _, simpleBootstrap := range ipfsBootstraps {
- log.Println("配置引导节点:"+simpleBootstrap)
- cmd := exec.Command(ipfsPath,"bootstrap", "add", simpleBootstrap)
- err :=cmd.Run()
- if err!=nil{
- return err
- }
- }
- return nil
- }
-
- /**
- 初始化本地工作目录
- @param userName 用户登陆账号
- @param userId 用户ID
- @param projectName 项目名称
- */
- func InitLocalWorkSpace(conn *websocket.Conn, userName, userId, projectName string) (error){
-
- //空格路径处理
- ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\ipfs.exe"
-
- //初始化当前登陆用户信息
- gobalLoginUserName = userName
- gobalLoginUserId = userId
-
- //初始化本地工作空间绝对路径
- gobalLocalProjectDir = fmt.Sprint(config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName)
-
- // 检查本地目录是否存在
- _,err := os.Stat(gobalLocalProjectDir)
- if err != nil {
- //创建文件目录
- os.MkdirAll(gobalLocalProjectDir, os.ModePerm)
- }
- log.Println("进入项目空间:"+gobalLocalProjectDir)
- if err := conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprint(gobalLocalProjectDir))); err != nil {
- log.Println(err)
- return err
- }
- return nil
- }
-
- //工作空间增加文件监听事件
- func watchWalkfunc(filePath string, info os.FileInfo, err error) error {
-
- if info == nil{
- return nil
- }
- if info.IsDir()==true{
- //config.GobalWatch.Remove(filePath)
- err = config.GobalWatch.Add(filePath)
- if err != nil {
- log.Println(err)
- return err
- }
- }
- return nil
- }
-
-
-
- /**
- 下载指令
- @param hash ipfs哈希值
- @param projectName 项目名称
- @para fileName 文件名称
- @param dir 云文件目录
- */
- func DownCommand(conn *websocket.Conn, hash, projectName, fileName, nodeDir string) error{
-
- //检查文件目录是否存在,不存在则创建
- fileDir := gobalLocalProjectDir+"\\"+nodeDir
- _,err := os.Stat(fileDir)
- if err != nil {
- //创建文件目录
- err = os.MkdirAll(fileDir, os.ModePerm)
- if err!=nil{
- log.Println(err)
- return err
- }
- }
-
- //下载启动标识,有下载进度则设置为true
- var downloading bool = false
-
- //检测文件打开状态
- tfile,err := os.OpenFile(fmt.Sprint(fileDir+"\\"+fileName),os.O_RDWR,1)
-
- if err != nil && (!os.IsNotExist(err)) {
-
- log.Println("文件被占用,请关闭打开的软件")
- if err := conn.WriteMessage(websocket.TextMessage, []byte("-2")); err != nil {
- return err
- }
- return err
- }
- defer tfile.Close()
-
- //正在下载标识,标识用于不监测更新改动
- gobalFileDownLoadingMap[fmt.Sprint(fileDir+"\\"+fileName)]=1
-
- //构建本地cmd执行 ipfs get
- progress := make(chan string,10000)
- var stdout, stderr []byte
- var errStdout, errStderr error
- cmd := exec.Command(ipfsPath,"get", hash,"-o",fmt.Sprint(fileDir+"\\"+fileName))
- stdoutIn, _ := cmd.StdoutPipe()
- stderrIn, _ := cmd.StderrPipe()
- cmd.Start()
- go func() {
- stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, progress)
- }()
- go func() {
- stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, progress)
- }()
-
- log.Println("资源连接中...")
- //异步定时读取进度反馈给前端,每500ms返回一次进度
- go func(){
- millSeconds := time.Now().UnixNano() / 1e6
- for content := range progress { // 通道关闭后会退出for range循环
- log.Println(">>>"+content)
- current :=time.Now().UnixNano() / 1e6
- if !downloading{
- log.Println("资源连接成功,下载中...")
- }
- downloading = true
- if current-millSeconds>500{
- projson,err := contentToJSONByte(content)
- if projson==nil && err==nil{
- continue
- }
- if err != nil {
- log.Printf("json.Marshal error %s\n", err)
- }
-
- //设置下载启动标识和下载时间戳
- millSeconds = current
-
- //反馈前端
- if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
- log.Println(err)
- break
- }
- }
-
- if strings.Index(content,"100.00%")!=-1{
-
- projson,err := contentToJSONByte(content)
- if projson==nil && err==nil{
- continue
- }
- if err != nil {
- log.Printf("json.Marshal error %s\n", err)
- }
- if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
- log.Println(err)
- }
- break
-
- }
- }
- }()
-
- //设置30秒连接超时,30秒未启动下载则下载失败
- go func() {
- index :=0
- for true{
- //启动下载则不做超时判断
- if downloading==true{
- return
- }
- index++
- time.Sleep(time.Duration(1)*time.Second)
- if downloading==false && index==30{
- err = cmd.Process.Kill()
- log.Println("资源连接超时(30s),下载进程被终止")
- if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
- return
- }
- return
- }
- }
- }()
-
- //等待下载执行完成
- err = cmd.Wait()
- if err != nil {
- log.Printf("cmd.Run() failed with %s\n", err)
- }
- if errStdout != nil || errStderr != nil {
- log.Printf("failed to capture stdout or stderr\n")
- }
- outStr := string(stdout)
- log.Printf("out:%s", outStr)
-
- //更新Etcd数据库的文件key对应hash值
- time.Sleep(200*time.Millisecond)
- key := gobalLoginUserName+"\\"+projectName+"\\"+nodeDir+"\\"+fileName
- err = etcdclient.ReplaceInto(key,hash+";0")
- if err != nil {
- log.Println(err)
- return err
- }
-
- //发送消息至文件变更订阅
- config.GobalWatchChannelMap[config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName] <- ";"
-
- //下载完成反馈
-
-
- log.Printf("叮,资源文件[ %v ]下载完成",fileName)
- defer time.Sleep(5*time.Second);gobalFileDownLoadingMap[fmt.Sprint(fileDir+"\\"+fileName)]=0
- defer close(progress)
- return nil
- }
-
- //读取解析cmd返回文件上传或下载进度信息
- func contentToJSONByte(content string) ([]byte,error){
- sts :=strings.Split(content," ")
- if len(sts)<8{
- log.Println("字符长度小于8")
- return nil,nil
- }
- var processFloat float64
- if (len(sts)==9 || len(sts)==8){
- processFloat,_ =strconv.ParseFloat(strings.Replace(sts[7],"%","",1), 64)
- }else{
- processFloat,_ =strconv.ParseFloat(strings.Replace(sts[8],"%","",1), 64)
- }
-
- if processFloat==0{
- //log.Println("当前进度0")
- return nil,nil
- }
-
- pro :=&processStruct{
- Size:sts[4],
- CurrentSize: sts[1],
- Unit: sts[2],
- CurrentUnit: sts[5],
- Process: processFloat,
- Hash: "",
-
- }
- projson,err :=json.Marshal(pro)
- return projson,err
- }
-
-
- /**
- 上传本地文件
- @param absolutePath 文件本地绝对路径
- @param fileName 文件名称
- @param projectName 项目名称
- @param dir 云文件目录
- @param currentHistoryHash 当前文件的历史版本管理文件hash
- @param note 备注
- @param creator 创建人
- @param milestone 是否事里程碑
- */
- func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir,currentHistoryHash,note,creator string,milestone bool) error{
-
- //本地文件目录
- fileDir := gobalLocalProjectDir+"\\"+dir
- //检查目录
- _,err := os.Stat(fileDir)
- if err != nil {
- //创建文件目录
- err = os.MkdirAll(fileDir, os.ModePerm)
- if err!=nil{
- return err
- }
- }
-
- //检测文件打开状态
- tfile,err := os.OpenFile(absolutePath,os.O_RDWR,1)
- if err != nil {
- log.Println("文件被占用,请关闭打开的软件")
- if err := conn.WriteMessage(websocket.TextMessage, []byte("-2")); err != nil {
- return err
- }
- return err
- }
- defer tfile.Close()
-
- serverSh := shell.NewShell(config.ServerIpfsUrl)
- //serverSh.SetTimeout(time.Duration(30)*time.Second)
- //log.Println("检测引导节点存活情况"+config.ServerIpfsUrl)
- //检测引导节点是否连接成功
- isUp := serverSh.IsUp()
- if !isUp {
- log.Println("备份节点网络连接不通!")
- if conn==nil{
- return errors.New("备份节点连失联")
- }else{
- if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
- return err
- }
- }
- return nil
- }
-
- //上传启动标识,有上传进度则设置为true
- var uploading bool=false
-
- //cmd执行ipfs add
- cmd := exec.Command(ipfsPath, "add",absolutePath)
- uploadProgress := make(chan string,10000)
- var stdout, stderr []byte
- var errStdout, errStderr error
- stdoutIn, _ := cmd.StdoutPipe()
- stderrIn, _ := cmd.StderrPipe()
- cmd.Start()
- go func() {
- stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, uploadProgress)
- }()
- go func() {
- stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, uploadProgress)
- }()
-
- //异步给前端反馈上传进度
- go func(){
- first := true
- millSeconds := time.Now().UnixNano() / 1e6
- current :=time.Now().UnixNano() / 1e6
- for content := range uploadProgress { // 通道关闭后会退出for range循环
- current =time.Now().UnixNano() / 1e6
- if first {
- projson,err := contentToJSONByte(content)
- if projson==nil && err==nil{
- continue
- }
-
- if err != nil {
- log.Println("json.Marshal error %s\n", err)
- }
- //设置上传启动标识为true
- uploading=true
- if conn!=nil{
- if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
- break
- }
- }
- millSeconds = current
- first=false
- }
-
- if current-millSeconds>500{
- projson,err := contentToJSONByte(content)
- if projson==nil && err==nil{
- continue
- }
-
- if err != nil {
- log.Println("json.Marshal error %s\n", err)
- }
- uploading=true
- if conn!=nil{
- if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
- break
- }
- }
- millSeconds = current
- }
- if (strings.Index(content,"90.00%")!=-1){
-
- projson,err := contentToJSONByte(content)
- if projson==nil && err==nil{
- continue
- }
-
-
- if err != nil {
- log.Println("json.Marshal error %s\n", err)
- }
- uploading=true
- if conn!=nil{
- if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
- break
- }
- }
- break
- }
- }
- }()
-
- log.Println("资源上传中...")
-
- //上传未启动超时时间30s
- go func() {
- index :=0
- for true{
- if uploading==true{
- return
- }
- index++
- time.Sleep(time.Duration(1)*time.Second)
- if uploading==false && index==30{
- err = cmd.Process.Kill()
- log.Println("资源连接超时(30s),上传进程被终止")
- if conn!=nil{
- if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
- return
- }
- }
- return
- }
- }
- }()
-
- //等待执行完成
- err = cmd.Wait()
- if err != nil {
- log.Println("cmd.Run() failed with %s\n", err)
- if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
- return err
- }
- return err
- }
- if errStdout != nil || errStderr != nil {
- log.Println("failed to capture stdout or stderr\n")
- }
- outStr := string(stdout)
- fileHash := strings.Split(outStr," ")[1]
- log.Printf("out:%s", outStr)
- defer close(uploadProgress)
-
- //ipfs provide
- cmd = exec.Command(ipfsPath,"dht","provide",fileHash)
- err = cmd.Run()
- if err != nil {
- log.Println(err)
- if conn!=nil{
- if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
- return err
- }
- }
- return err
- }
-
- //判断备份节点对等节点是否已添加连接
- sh := shell.NewShell(config.GobalIpfsUrl)
- idOut,err :=sh.ID()
- localId :=idOut.ID
- swarmConnInfos,err :=serverSh.SwarmPeers(context.Background())
- hasConnect := false
- for _,swarmconn := range swarmConnInfos.Peers {
- if swarmconn.Peer==localId{
- hasConnect=true
- break
- }
- }
-
- if !hasConnect{
- log.Println("中继处理")
- swarmConnectAddr :="/ipfs/"+"12D3KooWER8uoGrzeLuJHTXoTMnR4jjHfpcA6ZcpXBaQNJvG5jMP"+"/p2p-circuit/ipfs/"+localId
- errT := serverSh.SwarmConnect(context.Background(),swarmConnectAddr)
- if errT!=nil{
- log.Println("中继失败,引导节点备份失败")
- log.Println(err)
- if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
- return err
- }
- }
- }
-
- //后台备份
- go func() {
- err = serverSh.Pin(fileHash)
- if err!=nil{
- log.Printf("资源[ %v ]备份节点备份失败", fileName)
- log.Println(err)
- }
- log.Printf("资源[ %v ]备份节点备份成功", fileName)
- }()
-
- //文件不存在则进行本地文件夹拷贝
- if !fileExist(fmt.Sprint((fileDir+"\\"+fileName))) {
- //记录新增文件,新增文件不做post请求,前端自行post
- goabalAddFileMap[fileName] = 1
- err = sh.Get(fileHash,fmt.Sprint((fileDir+"\\"+fileName)))
- if err != nil {
- log.Println(err)
- return err
- }
- }
-
- //构建历史版本记录、写本地历史版本管理文件,上传至ipfs
- filenameall := path.Base(fileName)
- filesuffix := path.Ext(fileName)
- fileprefix := filenameall[0:len(filenameall) - len(filesuffix)]
- commitFilePath := fileDir+"\\"+fileprefix+".commit"
- commitVersion,commitHistoryHash,err := commitRecord(commitFilePath,currentHistoryHash,fileHash,note,creator,milestone)
- if err != nil {
- log.Printf("资源[ %v ]历史版本记录失败", fileName)
- log.Println(err)
- if conn!=nil{
- if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
- return err
- }
- }
- return err
- }
-
- //读取文件属性,构建100%进度对象
- objectStat,err :=sh.ObjectStat(fileHash)
- if err != nil {
- log.Println(err)
- return err
- }
- prog := new(processStruct)
- prog.Hash=fileHash
- prog.Process=100.00
- prog.Size=strconv.Itoa(objectStat.CumulativeSize)
- prog.CommitHistoryHash=commitHistoryHash
- prog.Version=commitVersion
- projson,err :=json.Marshal(prog)
-
- if conn!=nil{
- if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
- log.Println(err)
- return err
- }
- }
-
- //更新Etcd数据库hash
- key := gobalLoginUserName+"\\"+projectName+"\\"+dir+"\\"+fileName
- err = etcdclient.ReplaceInto(key,prog.Hash+";0")
- if err != nil {
- log.Println(err)
- return err
- }
-
- //自动协同逻辑
- //if conn==nil{
- // folderName := strings.Split(dir,"\\")[0]
- // var relativePath string
- // if len(strings.Split(dir, "\\"))==1{
- // relativePath = ""
- // }else{
- // relativePath = strings.Replace(relativePath, folderName+"\\","",1)
- // }
- // size,_ := strconv.ParseInt(prog.Size,10,64)
- // err = postUpdateFile(projectName,folderName,relativePath,fileHash, fileName, commitHistoryHash, currentHistoryHash, gobalLoginUserId, size, commitVersion)
- // if err!=nil{
- // return err
- // }
- //}
-
- //发送文件至文件变更订阅
- config.GobalWatchChannelMap[config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName] <- ";"
-
- log.Printf("叮,资源文件[ %v ]上传完成",fileName)
- return nil
- }
-
- //文件信息入参
- type FileParam struct {
- Digest string `json:"digest"` //md5(ProjectName|FolderName|RelativePath|FileName|FileVersion|UserName)
- ProjectName string `json:"projectName"`
- FolderName string `json:"folderName"`
- RelativePath string `json:"relativePath"`
- IpfsCid string `json:"ipfsCid"`
- FileName string `json:"fileName"`
- FileSize int64 `json:"fileSize,string"`
- FileVersion int `json:"fileVersion"`
- UserId int64 `json:"userId,string"`
- HistoryCurrentIpfsCid string `json:"historyCurrentIpfsCid"`
- HistoryPreIpfsCid string `json:"historyPreIpfsCid"`
- }
-
- //获取文件的历史版本管理文件最新hash值
- func postGetHistoryCurrentHash(projectName, folderName, fileName, relativePath string)(string,error){
- url:=config.ServerUrl+"/api/pms/sdk/queryFileHistoryCurrentCid"
- contentType := "application/json"
-
- fileParam :=FileParam{
- ProjectName: projectName,
- FolderName: folderName,
- RelativePath: relativePath,
- FileName: fileName,
- IpfsCid: "",
- FileSize: 0,
- FileVersion: 0,
- UserId: 0,
- HistoryPreIpfsCid: "",
- HistoryCurrentIpfsCid: "",
- }
- text:=fmt.Sprintf("%v|%v|%v|%v|%v|%v|%v|%v|%v|%v",projectName,folderName,relativePath,fileParam.IpfsCid,fileName,fileParam.FileSize,fileParam.FileVersion,fileParam.UserId,fileParam.HistoryCurrentIpfsCid,fileParam.HistoryPreIpfsCid)
- textByte := []byte(text)
- md5Byte := md5.Sum(textByte)
- digest := fmt.Sprintf("%x", md5Byte)
- fileParam.Digest=digest
- jsonData,err :=json.Marshal(fileParam)
- if err!=nil{
- log.Printf("json序列化化错误!")
- return "",err
- }
-
- //log.Print(string(jsonData[:]))
-
- resp, err := http.Post(url, contentType, bytes.NewReader(jsonData))
- if err != nil {
- fmt.Println("post failed, err:%v\n", err)
- return "",err
- }
-
- defer resp.Body.Close()
- b, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- fmt.Println("get resp failed,err:%v\n", err)
- return "",err
-
- }
- //log.Printf("post response:%v", string(b))
- result := make(map[string] interface{})
- err=json.Unmarshal(b,&result)
- if err!=nil{
- log.Printf("字符串%v反序列化出错", string(b[:]))
- }
-
- if result["Msg"].(string)==""{
- log.Printf("资源[ %v ]历史版本管理文件Hash:%v",fileName,result["Data"].(string))
- return result["Data"].(string),nil
- }
-
- return "",errors.New(result["Msg"].(string))
- }
-
- //更新文件记录
- func postUpdateFile(projectName, folderName, relativePath, ipfsCid, fileName, historyCurrentIpfsCid, historyPreIpfsCid,userId string, fileSize int64, fileVersion int)(err error){
-
- url:=config.ServerUrl+"/api/pms/sdk/updateFile"
- contentType := "application/json"
- intUserId,_:=strconv.ParseInt(userId,10,64)
-
- fileParam :=FileParam{
- ProjectName: projectName,
- FolderName: folderName,
- RelativePath: relativePath,
- IpfsCid: ipfsCid,
- FileName: fileName,
- FileSize: fileSize,
- FileVersion: fileVersion,
- UserId: intUserId,
- HistoryCurrentIpfsCid: historyCurrentIpfsCid,
- HistoryPreIpfsCid: historyPreIpfsCid,
- }
- text:=fmt.Sprintf("%v|%v|%v|%v|%v|%v|%v|%v|%v|%v",projectName,folderName,relativePath,ipfsCid,fileName,fileSize,fileVersion,userId,historyCurrentIpfsCid,historyPreIpfsCid)
- textByte := []byte(text)
- md5Byte := md5.Sum(textByte)
- digest := fmt.Sprintf("%x", md5Byte)
- fileParam.Digest=digest
- jsonData,err :=json.Marshal(fileParam)
- if err!=nil{
- log.Printf("json序列化化错误!")
- return err
- }
-
- resp, err := http.Post(url, contentType, bytes.NewReader(jsonData))
- if err != nil {
- fmt.Println("post failed, err:%v\n", err)
- return err
- }
-
- defer resp.Body.Close()
- b, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- fmt.Println("get resp failed,err:%v\n", err)
- return err
-
- }
- //log.Printf("post response:%v", string(b))
- result := make(map[string] interface{})
- err=json.Unmarshal(b,&result)
- if err!=nil{
- log.Printf("字符串%v反序列化出错", string(b[:]))
- return err
- }
- log.Printf("资源[ %v ]服务记录更新成功",fileName)
- return nil
- }
-
-
- /**
- 记录提交记录
- */
- func commitRecord(path,currentHistoryHash,hash,note,creator string, milestone bool) (int,string,error){
-
- commitHistory := new(commitHistory)
-
- //历史文件不存在则创建
- localSh :=shell.NewShell(config.GobalIpfsUrl)
- localSh.SetTimeout(10*time.Second)
- if len(currentHistoryHash)!=0 {
- os.Remove(path)
- err := localSh.Get(currentHistoryHash,path)
- if err != nil {
- log.Println("历史版本管理文件下载失败")
- return -1,"",err
- }
- }
-
- //初始化历史管理文件
- exist := fileExist(path)
- if !exist {
- commitFile,err := os.Create(path)
- if err != nil {
- log.Println("历史版本管理文件创建失败")
- return -1,"",err
- }
- commitFile.Close()
- }
-
- //设置文件隐藏属性
- attribCmd :=exec.Command("attrib","+h",path)
- err :=attribCmd.Run()
- if err != nil {
- log.Println("设置文件隐藏属性失败")
- return -1,"",err
- }
-
- //读取历史管理文件
- content ,err :=ioutil.ReadFile(path)
-
- if len(content)!=0{
- rows :=strings.Split(string(content),"\n")
- endRow :=rows[len(rows)-2]
- columns :=strings.Split(endRow,"\t")
- commitHistory.Version,_=strconv.Atoi(columns[6])
- commitHistory.Version++
- commitHistory.ParentHash=columns[1]
- }else{
- commitHistory.Version=1
- commitHistory.ParentHash="0000000000000000000000000000000000000000"
- }
- commitHistory.CurrentHash=hash
- commitHistory.Milestone = milestone
- commitHistory.Creator = creator
- commitHistory.Note = note
- commitHistory.CreateTime=time.Now().Unix()
-
- if commitHistory.ParentHash==commitHistory.CurrentHash{
- if commitHistory.Version>1{
- commitHistory.Version--
- }
- return commitHistory.Version,currentHistoryHash,nil
- }
-
- file,err :=os.OpenFile(path,os.O_APPEND,0666)
- if err != nil{
- log.Println(err)
- return -1,"",err
- }
- //写入历史管理文件
- w :=bufio.NewWriter(file)
- writeContent:=fmt.Sprintf("%v\t%v\t%v\t%v\t%v\t%v\t%v\t",commitHistory.ParentHash,commitHistory.CurrentHash,commitHistory.Creator,commitHistory.CreateTime,commitHistory.Note,commitHistory.Milestone,commitHistory.Version)
- fmt.Fprintln(w,writeContent)
- err = w.Flush()
- if err != nil {
- log.Println("历史版本管理文件写入失败")
- return -1,"",err
- }
- file.Close()
-
- addFile,err :=os.Open(path)
- if err != nil{
- log.Println(err)
- return -1,"",err
- }
- defer addFile.Close()
- //add 历史管理文件
- historyHash,err:=localSh.Add(addFile)
- if err != nil {
- log.Println("历史版本管理文件上传失败")
- return -1,"",err
- }
-
- serverSh :=shell.NewShell(config.ServerIpfsUrl)
- serverSh.SetTimeout(30*time.Second)
- err = serverSh.Pin(historyHash)
- if err != nil {
- log.Println("历史版本管理文件备份失败")
- return -1,"",err
- }
- return commitHistory.Version,historyHash,nil
- }
-
- func copyAndCapture(w io.Writer, r io.Reader, progress chan string) ([]byte, error) {
- var out []byte
- buf := make([]byte, 1024, 1024)
- for {
- n, err := r.Read(buf[:])
- if n > 0 {
- d := buf[:n]
- out = append(out, d...)
- progress <- string(d)
-
- }
- if err != nil {
- // Read returns io.EOF at the end of file, which is not an error for us
- if err == io.EOF {
- err = nil
- }
- return out, err
- }
- }
- // never reached
- panic(true)
- return nil, nil
- }
-
- /**
- 单个文件信息
- */
- type simpleFileInfo struct {
- Name string `json:"name" `
- Extension string `json:"extension"`
- RelativePath string `json:"relativePath"`
- AbsolutePath string `json:"absolutePath"`
- }
-
- var gobalFolderFileMap map[string] *simpleFileInfo
- var gobalRelativePath string
- /**
- 获取指定目录或文件的文件信息,如果是目录递归获取文件信息
- @param id 文件id
- */
- func GetFolderFileInfo(conn *websocket.Conn,absolutePath string) error{
-
- gobalFolderFileMap = make(map[string] *simpleFileInfo)
- fileInfo,err :=os.Stat(absolutePath)
- if err!=nil{
- log.Println(err)
- return err
- }
-
- log.Println(filepath.Dir(absolutePath))
- //单个文件处理
-
- if !fileInfo.IsDir() {
- simpleFileInfo := new(simpleFileInfo)
- simpleFileInfo.Name=fileInfo.Name()
- simpleFileInfo.Extension=path.Ext(absolutePath)
- simpleFileInfo.RelativePath=""
- simpleFileInfo.AbsolutePath=absolutePath
- gobalFolderFileMap[absolutePath]=simpleFileInfo
- bytes,err :=json.Marshal(gobalFolderFileMap)
- if err != nil {
- log.Println(err)
- return err
- }
- if err := conn.WriteMessage(websocket.TextMessage, bytes); err != nil {
- log.Println(err)
- return err
- }
- return nil
- }
-
- //文件目录处理
- gobalRelativePath = filepath.Dir(absolutePath)
- err =filepath.Walk(absolutePath, myWalkfunc)
-
- if err != nil {
- log.Println(err)
- return err
- }
-
- bytes,err :=json.Marshal(gobalFolderFileMap)
- if err != nil {
- log.Println(err)
- return err
- }
- if err := conn.WriteMessage(websocket.TextMessage, bytes); err != nil {
- log.Println(err)
- return err
- }
-
- return nil
- }
-
- func myWalkfunc(path string, info os.FileInfo, err error) error {
-
- if info.IsDir()==false{
- simpleFileInfo := new(simpleFileInfo)
- simpleFileInfo.Name=info.Name()
- simpleFileInfo.Extension=filepath.Ext(path)
- simpleFileInfo.RelativePath=filepath.Dir(strings.Replace(path,gobalRelativePath,"",1))
- simpleFileInfo.AbsolutePath=path
- gobalFolderFileMap[path]=simpleFileInfo
- return nil
- }
-
- return nil
- }
-
- /**
- 本地文件是否存在
- */
- func fileExist(path string) bool {
- _, err := os.Lstat(path)
- return !os.IsNotExist(err)
- }
-
- /**
- 获取本地文件列表
- */
- func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{
- projectPath := gobalLocalProjectDir
- //log.Println("切换文件列表:"+projectPath)
- keyPrefix := gobalLoginUserName+"\\"+projectName+"\\"
-
- //添加监控
- err := filepath.Walk(projectPath,watchWalkfunc)
- if err != nil {
- log.Println(err)
- return err
- }
- //初始化通道
- if config.GobalWatchChannelMap[projectPath] != nil {
- close(config.GobalWatchChannelMap[projectPath])
- }
- config.GobalWatchChannelMap[projectPath]=make(chan string,100)
- log.Println("添加文件监控:"+projectPath)
-
- //定期校验缓存的本地文件状态
- dataMapa,err := etcdclient.QueryWithPrefix(keyPrefix)
- if err != nil {
- log.Println(err)
- }
- if dataMapa!=nil && len(dataMapa)>0{
- for k,_ := range dataMapa {
- if !fileExist(config.LocalWorkSpaceDir+k){
- err = etcdclient.DeleteWithPrefix(k)
- if err != nil {
- log.Println(err)
- }
- }
- }
- }
-
-
- //优先etcd查询
- dataMap,err := etcdclient.QueryWithPrefix(keyPrefix)
- if err != nil {
- log.Println(err)
- return err
- }
- if dataMap==nil || len(dataMap)==0{
- // 不存在则初始化进etcd
-
- err =filepath.Walk(gobalLocalProjectDir,walkfunc)
- //路径错误
- if err != nil {
- log.Println(err)
- if err := conn.WriteMessage(websocket.TextMessage, []byte("{}")); err != nil {
- log.Println(err)
- return err
- }
- }
-
- mapByte,err:=json.Marshal(gobalFileMap)
- if err != nil {
- log.Println(err)
- return err
- }
-
- if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil {
- log.Println(err)
- return err
- }
-
- cacheMap := make(map[string] string)
- for k,v := range gobalFileMap {
- k := strings.Replace(k,config.LocalWorkSpaceDir,"",1)
- cacheMap[k]=v
-
- }
-
- //异步缓存
- //go func() {
- err = etcdclient.BatchAdd(cacheMap)
- if err != nil {
- log.Println(err)
- }
- //}()
-
- //log.Println("检查本地目录文件变更状态,执行睡眠时间1分钟")
-
- //清空gobalFileMap
- gobalFileMap = make(map[string] string)
- }
-
- err=sendFileListFromEtcd(keyPrefix,projectName,conn)
- if err != nil {
- log.Println(err)
- return err
- }
-
- for actionAndModifyFilePathStr :=range config.GobalWatchChannelMap[gobalLocalProjectDir] {
- //log.Println(actionAndModifyFilePathStr)
- actionAndModifyFilePath := strings.Split(actionAndModifyFilePathStr,";")
- queryKey := strings.Replace(actionAndModifyFilePath[1],config.LocalWorkSpaceDir,"",1)
-
- //当前登陆用户判断
- if gobalLoginUserName != strings.Split(queryKey,"\\")[0] && actionAndModifyFilePathStr!=";"{
- log.Printf("非法用户修改%v", actionAndModifyFilePathStr)
- continue
- }
-
- if actionAndModifyFilePath[0]=="remove"{
- queryMap,err :=etcdclient.QueryWithPrefix(queryKey)
- if len(queryMap)==0{
- continue
- }
- err = etcdclient.DeleteWithPrefix(queryKey)
- if err != nil {
- log.Println(err)
- }
- }else if actionAndModifyFilePath[0]=="write"{
- querymap,err := etcdclient.QueryWithPrefix(queryKey)
- if err != nil {
- log.Println(err)
- continue
- }
- if len(querymap)==0{
- continue
- }
-
- //更新判断
- if gobalFileDownLoadingMap[actionAndModifyFilePath[1]]==1{
- continue
- }
-
- oldValue := strings.Split(querymap[queryKey],";")
- newValue := oldValue[0]+";" +"1"
- err = etcdclient.ReplaceInto(queryKey,newValue)
- if err!=nil{
- log.Println(err)
- continue
- }
- log.Printf("文件变更 [ %v ] write", actionAndModifyFilePathStr)
-
- //保存即同步逻辑,如果非新增文件则自动post
- //获取文件的历史版本管理文件hash
- //filePath := actionAndModifyFilePath[1]
- //fileName :=filepath.Base(filePath)
- //folderName := strings.Split(queryKey,"\\")[2]
- //dir := strings.Replace(strings.Replace(queryKey,"\\"+fileName,"",1),strings.Split(queryKey,"\\")[0]+"\\"+strings.Split(queryKey,"\\")[1]+"\\","",1)
- //var relativePath string
- //if len(strings.Split(dir, "\\"))==1{
- // relativePath = ""
- //}else{
- // relativePath = strings.Replace(relativePath, folderName+"\\","",1)
- //}
- //historyCurrentHash,err :=postGetHistoryCurrentHash(projectName, folderName, fileName, relativePath)
- //if err!=nil{
- // GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName)
- // log.Printf("postGetHistoryCurrentHash 返回失败,%v",err.Error())
- // continue
- //}
- //
- ////自动更新文件
- //err =UploadCommand(nil,filePath,fileName,projectName,dir,historyCurrentHash,"",gobalLoginUserName,false)
- //if err!=nil{
- // GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName)
- // log.Printf("UploadCommand 返回失败,%v",err.Error())
- // //记录修改状态
- // newValue := oldValue[0]+";" +"1"
- // err = etcdclient.ReplaceInto(queryKey,newValue)
- // if err!=nil{
- // log.Println(err)
- // continue
- // }
- // continue
- //}
- //GobalMessageNotify <- fmt.Sprintf("文件更新成功\n%v",fileName)
- //continue
- }else if actionAndModifyFilePath[0]=="create"{
- querymap,err := etcdclient.QueryWithPrefix(queryKey)
- if err != nil {
- log.Println(err)
- continue
- }
- if len(querymap)==0{
- continue
- }
-
- //更新判断
- if gobalFileDownLoadingMap[actionAndModifyFilePath[1]]==1{
- continue
- }
-
- oldValue := strings.Split(querymap[queryKey],";")
- newValue := oldValue[0]+";" +"1"
- err = etcdclient.ReplaceInto(queryKey,newValue)
- if err!=nil{
- log.Println(err)
- continue
- }
- log.Printf("文件变更 [ %v ] create", actionAndModifyFilePathStr)
-
- //如果非新增文件则自动post
- //if goabalAddFileMap[]
- //获取文件的历史版本管理文件hash
- //filePath := actionAndModifyFilePath[1]
- //fileName :=filepath.Base(filePath)
- //folderName := strings.Split(queryKey,"\\")[2]
- //dir := strings.Replace(strings.Replace(queryKey,"\\"+fileName,"",1),strings.Split(queryKey,"\\")[0]+"\\"+strings.Split(queryKey,"\\")[1]+"\\","",1)
- //var relativePath string
- //if len(strings.Split(dir, "\\"))==1{
- // relativePath = ""
- //}else{
- // relativePath = strings.Replace(relativePath, folderName+"\\","",1)
- //}
- //historyCurrentHash,err :=postGetHistoryCurrentHash(projectName, folderName, fileName, relativePath)
- //if err!=nil{
- // GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName)
- // log.Printf("postGetHistoryCurrentHash 返回失败,%v",err.Error())
- // continue
- //}
- //
- ////自动更新文件
- //err =UploadCommand(nil,filePath,fileName,projectName,dir,historyCurrentHash,"",gobalLoginUserName,false)
- //if err!=nil{
- // GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName)
- // log.Printf("UploadCommand 返回失败,%v",err.Error())
- // //记录修改状态
- // newValue := oldValue[0]+";" +"1"
- // err = etcdclient.ReplaceInto(queryKey,newValue)
- // if err!=nil{
- // log.Println(err)
- // continue
- // }
- // continue
- //}
- //GobalMessageNotify <- fmt.Sprintf("文件更新成功\n%v",fileName)
- //continue
- }
- err = sendFileListFromEtcd(keyPrefix,projectName,conn)
- if err != nil {
- log.Println(err)
- return err
- }
- }
-
- return nil
- }
-
-
- func sendFileListFromEtcd(keyPrefix,projectName string,conn *websocket.Conn) error{
- dataMap,err := etcdclient.QueryWithPrefix(keyPrefix)
- if err != nil {
- log.Println(err)
- return err
- }
- if dataMap!=nil && len(dataMap)>0{
-
- cacheMap := make(map[string] string)
- for k,v := range dataMap {
- //历史数据加默认值
- if len(strings.Split(v, ";"))==1{
- v=v+";0"
- }
- cacheMap[strings.Replace(k,gobalLoginUserName+"\\"+projectName+"\\","",1)]=v
- }
-
- mapByte,err:=json.Marshal(cacheMap)
- if err != nil {
- log.Println(err)
- return err
- }
-
- if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil {
- log.Println(err)
- return err
- }
- return nil
- }else{
- log.Println("未查询到数据,keyPrefix:"+keyPrefix+",projectName:"+projectName)
- }
- return nil
- }
-
-
- /**
- 打开方式
- */
- func OpenFileWith(filePath string) error{
-
- //判断文件有效性
- _,err := os.Stat(filePath)
- if err!=nil{
- return err
- }
-
- //filePath = strings.Replace(filePath," ","~1",1)
-
- cmd := exec.Command("rundll32.exe","shell32.dll,OpenAs_RunDLL",filePath);
-
- err =cmd.Run()
- if err!=nil{
- log.Println(err)
- return err
- }
- return nil
- }
-
- /**
- 手动检查软件更新
- 0:不强制更新
- 1:强制更新
- */
- func CheckForUpdates(forceUpdate string) error{
-
- tszdir :=os.Getenv("TSZDIR")
-
- //空格路径处理
- ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\stop.vbs"
-
- //判断文件有效性
- _,err := os.Stat(tszdir+config.UpdaterName)
- if err!=nil{
- return err
- }
-
- cmd := exec.Command(tszdir+config.UpdaterName,"/justcheck");
- err =cmd.Run()
- if err!=nil{
- log.Println(err)
- return err
- }
-
- cmd = exec.Command(tszdir+config.UpdaterName,"/checknow");
- err =cmd.Run()
- if err!=nil{
- log.Println(err)
- return err
- }
-
- //检测到更新 kill所有客户端进程
- log.Println("close all process")
- cmd = exec.Command("cmd.exe","/c",ipfsPath);
- err =cmd.Run()
- if err!=nil{
- log.Println(err)
- return err
- }
- return nil
-
- }
-
- func walkfunc(filePath string, info os.FileInfo, err error) error {
-
- if info == nil{
- return nil
- }
-
- if info.IsDir()==false{
- //历史文件不扫描
- if path.Ext(filePath)==".commit" {
- return nil
- }
-
- sh := shell.NewShell(config.GobalIpfsUrl)
- file,err :=os.Open(filePath)
-
-
- if err != nil{
- log.Println(err)
- return err
- }
-
- defer file.Close()
-
- hash,err :=sh.Add(file)
- if err != nil {
- log.Println(err)
- return err
- }
-
- dir :=strings.Replace(fmt.Sprint(filePath),fmt.Sprint(gobalLocalProjectDir+"\\"),"",1)
-
- gobalFileMap[dir]=hash
-
- }
- return nil
- }
-
- /**
- 查询历史文件
- path 文件路径
- hash 历史版本文件hash
- */
- func QueryCommitHistory(filePath,hash string) (map[int] *commitHistory,error){
-
- result := make(map[int] *commitHistory)
- //校验文件路径
- _,err :=os.Stat(filePath)
- if err != nil {
- log.Println("文件 "+filePath+"not found")
- return nil,errors.New("参数错误!")
- }
-
- if len(hash) == 0 {
- return result,nil
- }
-
- //根据hash更新文件
- localSh := shell.NewShell(config.GobalIpfsUrl)
- //连接失败判断
- localSh.SetTimeout(5*time.Second)
- ext :=path.Ext(filePath)
- commitFilePath :=strings.Replace(filePath,ext,".commit",1)
- os.Remove(commitFilePath)
- err = localSh.Get(hash,commitFilePath)
- if err!=nil {
- log.Println("文件"+hash+"下载失败")
-
- return result,errors.New("历史文件获取失败,请稍后重试")
- }
-
- //设置文件隐藏属性
- attribCmd :=exec.Command("attrib","+h",commitFilePath)
- err =attribCmd.Run()
- if err != nil {
- log.Println("设置文件隐藏属性失败")
- return result,err
- }
-
- //解析历史版本文件
- contentByte,err := ioutil.ReadFile(commitFilePath)
- content := string(contentByte)
- if content==""{
- return result,nil
- }
- rows :=strings.Split(content,"\n")
- length := len(rows)
- var index int = 0
- for i:=length-2;i>=0;i--{
- columns := strings.Split(rows[i],"\t")
- commitHistoryInstance := new(commitHistory)
- commitHistoryInstance.ParentHash =columns[0]
- commitHistoryInstance.CurrentHash=columns[1]
- commitHistoryInstance.Version,_=strconv.Atoi(columns[6])
- commitHistoryInstance.Milestone,_=strconv.ParseBool(columns[5])
- commitHistoryInstance.Creator=columns[2]
- commitHistoryInstance.CreateTime,_=strconv.ParseInt(columns[3], 10, 64)
- commitHistoryInstance.Note = columns[4]
- result[index]=commitHistoryInstance
- index++
- }
- return result,nil
- }
-
- /**
- 设定某个历史版本为里程碑版本
- @param filePath 文件绝对路径
- @param commitHistoryHash 历史版本管理文件hash
- @param hash 文件hash
- @param milestone 是否是里程碑
-
- */
- func EditCommitHistoryMilestoneHandler(filePath,commitHistoryHash,hash string,milestone bool) (string,error){
-
- //result := make(map[int] *commitHistory)
- //校验文件路径
- _,err :=os.Stat(filePath)
- if err != nil {
- log.Println("文件 "+filePath+"not found")
- return "",errors.New("参数错误!")
- }
-
- if len(commitHistoryHash) == 0 {
- log.Println("参数hash must not empty")
- return "",errors.New("参数错误!")
- }
-
-
- //根据hash更新文件
- localSh := shell.NewShell(config.GobalIpfsUrl)
- //连接失败判断
- localSh.SetTimeout(5*time.Second)
- ext :=path.Ext(filePath)
- commitFilePath :=strings.Replace(filePath,ext,".commit",1)
- os.Remove(commitFilePath)
- err = localSh.Get(commitHistoryHash,commitFilePath)
- if err!=nil {
- log.Println("文件"+commitHistoryHash+"下载失败")
- return "",errors.New("历史文件获取失败,请稍后重试")
- }
-
- //设置文件隐藏属性
- attribCmd :=exec.Command("attrib","+h",commitFilePath)
- err =attribCmd.Run()
- if err != nil {
- log.Println("设置文件隐藏属性失败")
- return "",err
- }
-
- //解析历史版本文件
- contentByte,err := ioutil.ReadFile(commitFilePath)
- content := string(contentByte)
- if content==""{
- return "",nil
- }
- rows :=strings.Split(content,"\n")
- length := len(rows)
- resultString :=""
- for i:=0;i<length-1;i++{
- columns := strings.Split(rows[i],"\t")
- if columns[1]==hash{
- resultString=resultString+fmt.Sprintf("%v\t%v\t%v\t%v\t%v\t%v\t%v\t\n",columns[0],columns[1],columns[2],columns[3],columns[4],milestone,columns[6])
- continue
- }
- resultString=resultString+fmt.Sprintf("%v\t%v\t%v\t%v\t%v\t%v\t%v\t\n",columns[0],columns[1],columns[2],columns[3],columns[4],columns[5],columns[6])
-
- }
-
- os.Remove(commitFilePath)
- fw, err := os.OpenFile(commitFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)//os.O_TRUNC清空文件重新写入,否则原文件内容可能残留
- w := bufio.NewWriter(fw)
- w.WriteString(resultString)
- if err != nil {
- log.Println(err)
- return "",err
- }
- w.Flush()
- fw.Close()
-
- addFile,err :=os.Open(commitFilePath)
- if err != nil{
- log.Println(err)
- return "",err
- }
- defer addFile.Close()
- //add 历史管理文件
- historyHash,err:=localSh.Add(addFile)
- if err != nil {
- log.Println("历史版本管理文件上传失败")
- return "",err
- }
-
- serverSh :=shell.NewShell(config.ServerIpfsUrl)
- serverSh.SetTimeout(5*time.Second)
- err = serverSh.Pin(historyHash)
- if err != nil {
- log.Println("历史版本管理文件备份失败")
- return "",err
- }
-
- return historyHash,nil
- }
-
- /*
- 提交历史
- */
- type commitHistory struct {
- ParentHash string `json:"parentHash"`
- CurrentHash string `json:"currentHash"`
- Creator string `json:"creator"`
- CreateTime int64 `json:"createTime"`
- Note string `json:"note"`
- Version int `json:"version"`
- Milestone bool `json:"milestone"`
- }
-
- /**
- 消息通知
- @param userId 用户ID
- */
- func MessageNotify(conn *websocket.Conn, userId string) (error){
- msgKey :=fmt.Sprintf("lockingMsg\\%v",userId)
-
- //返回全量通知消息列表
- err :=queryEtcdToWebSocket(conn, msgKey)
- if err!=nil{
- log.Println(err)
- return err
- }
-
- //消费通知消息到本地
- nsqclient.Consumers(config.NsqTopic,(config.NsqChanelPrefix+userId),config.NsqAddr)
-
- for message := range nsqclient.MsgQueue {
- messOb :=nsqclient.LockingMsg{}
- err:=json.Unmarshal([]byte(message),&messOb)
- if err!=nil{
- log.Println(err)
- continue
- }
-
- for _, acceptUserId := range messOb.UserIds {
- //log.Println(strconv.FormatInt(acceptUserId,10)+">>>"+userId)
- if strconv.FormatInt(acceptUserId,10)==userId{
- messagekey := msgKey+"\\"+strconv.FormatInt(messOb.Id,10)
- err =etcdclient.ReplaceInto(messagekey,message)
- if err!=nil{
- nsqclient.MsgQueue <- message
- log.Println(err)
- }
-
- err = queryEtcdToWebSocket(conn, messagekey)
- if err!=nil{
- nsqclient.MsgQueue <- message
- log.Println(err)
- }
- }
- }
- }
- return nil
- }
-
- /**
- 消息通知
- @param userId 用户ID
- */
- func MessageMarkReadHandler(conn *websocket.Conn, userId,messageId string) (err error){
- msgKey :=fmt.Sprintf("lockingMsg\\%v\\%v",userId,messageId)
- err = etcdclient.DeleteWithPrefix(msgKey)
- return err
- }
-
- /**
- 查询etcd对应key值发送给前端
- */
- func queryEtcdToWebSocket(conn *websocket.Conn,etcdKeyPrefix string) (err error){
- msgs,err := etcdclient.QueryWithPrefix(etcdKeyPrefix)
- if err!=nil{
- log.Println(err)
- return err
- }
- mapByte,err :=json.Marshal(msgs)
- if err!=nil{
- log.Println(err)
- return err
-
- }
-
- if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil {
- log.Println(err)
- return err
-
- }
-
- return nil
- }
-
-
-
-
-
-
|