文件同步
選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
4年前
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637
  1. package handle
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "crypto/md5"
  7. "encoding/json"
  8. "errors"
  9. "fmt"
  10. "fts/config"
  11. "fts/etcdclient"
  12. "fts/nsqclient"
  13. "github.com/gorilla/websocket"
  14. _ "github.com/ipfs/go-ipfs-api"
  15. shell "github.com/ipfs/go-ipfs-api"
  16. "io"
  17. "io/ioutil"
  18. "log"
  19. "net/http"
  20. "os"
  21. "os/exec"
  22. "path"
  23. "path/filepath"
  24. "strconv"
  25. "strings"
  26. "time"
  27. )
  28. //登陆账号
  29. var gobalLoginUserName string
  30. //登陆账号Id
  31. var gobalLoginUserId string
  32. //key:filepath,value:hash
  33. var gobalFileMap = make(map[string] string)
  34. var gobalFileDownLoadingMap = make(map[string] int)
  35. //手动上传文件,非自动上传
  36. var goabalAddFileMap = make(map[string] int)
  37. //本地项目空间目录
  38. var gobalLocalProjectDir string
  39. var gobalSubscriptionFileChangeSwitch int =0 //订阅文件变更开关
  40. //全局消息通知管道
  41. var GobalMessageNotify = make(chan string,1000)
  42. var ipfsPath=os.Getenv("IPFS-PATH")
  43. /**
  44. 文件上传下载进度
  45. */
  46. type processStruct struct {
  47. Size string `json:"size"`
  48. CurrentSize string `json:"currentSize"`
  49. Unit string `json:"unit"`
  50. CurrentUnit string `json:"currentUnit"`
  51. Process float64 `json:"process"`
  52. Hash string `json:"hash"`
  53. CommitHistoryHash string `json:"commitHistoryHash"`
  54. Version int `json:"version"`
  55. }
  56. /**
  57. 初始化本地客户端配置,包括ipfs网关、引导节点
  58. @param ipfsApi ipfs网关 例如:http://192.168.1.1:5001
  59. @param ipfsBootstrap ipfs引导节点,多个用;分割 例如:/dns/www.lockingos.org/tcp/4001/p2p/12D3KooWER8uoGrzeLuJHTXoTMnR4jjHfpcA6ZcpXBaQNJvG5jMP
  60. */
  61. func InitClientConfig(ipfsApi,ipfsBootstrap string) error{
  62. //空格路径处理
  63. ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\ipfs.exe"
  64. config.ServerIpfsUrl = ipfsApi
  65. log.Println("配置客户端网关:"+config.ServerIpfsUrl)
  66. ipfsBootstraps := strings.Split(ipfsBootstrap,";")
  67. for _, simpleBootstrap := range ipfsBootstraps {
  68. log.Println("配置引导节点:"+simpleBootstrap)
  69. cmd := exec.Command(ipfsPath,"bootstrap", "add", simpleBootstrap)
  70. err :=cmd.Run()
  71. if err!=nil{
  72. return err
  73. }
  74. }
  75. return nil
  76. }
  77. /**
  78. 初始化本地工作目录
  79. @param userName 用户登陆账号
  80. @param userId 用户ID
  81. @param projectName 项目名称
  82. */
  83. func InitLocalWorkSpace(conn *websocket.Conn, userName, userId, projectName string) (error){
  84. //空格路径处理
  85. ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\ipfs.exe"
  86. //初始化当前登陆用户信息
  87. gobalLoginUserName = userName
  88. gobalLoginUserId = userId
  89. //初始化本地工作空间绝对路径
  90. gobalLocalProjectDir = fmt.Sprint(config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName)
  91. // 检查本地目录是否存在
  92. _,err := os.Stat(gobalLocalProjectDir)
  93. if err != nil {
  94. //创建文件目录
  95. os.MkdirAll(gobalLocalProjectDir, os.ModePerm)
  96. }
  97. log.Println("进入项目空间:"+gobalLocalProjectDir)
  98. if err := conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprint(gobalLocalProjectDir))); err != nil {
  99. log.Println(err)
  100. return err
  101. }
  102. return nil
  103. }
  104. //工作空间增加文件监听事件
  105. func watchWalkfunc(filePath string, info os.FileInfo, err error) error {
  106. if info == nil{
  107. return nil
  108. }
  109. if info.IsDir()==true{
  110. //config.GobalWatch.Remove(filePath)
  111. err = config.GobalWatch.Add(filePath)
  112. if err != nil {
  113. log.Println(err)
  114. return err
  115. }
  116. }
  117. return nil
  118. }
  119. /**
  120. 下载指令
  121. @param hash ipfs哈希值
  122. @param projectName 项目名称
  123. @para fileName 文件名称
  124. @param dir 云文件目录
  125. */
  126. func DownCommand(conn *websocket.Conn, hash, projectName, fileName, nodeDir string) error{
  127. //检查文件目录是否存在,不存在则创建
  128. fileDir := gobalLocalProjectDir+"\\"+nodeDir
  129. _,err := os.Stat(fileDir)
  130. if err != nil {
  131. //创建文件目录
  132. err = os.MkdirAll(fileDir, os.ModePerm)
  133. if err!=nil{
  134. log.Println(err)
  135. return err
  136. }
  137. }
  138. //下载启动标识,有下载进度则设置为true
  139. var downloading bool = false
  140. //检测文件打开状态
  141. tfile,err := os.OpenFile(fmt.Sprint(fileDir+"\\"+fileName),os.O_RDWR,1)
  142. if err != nil && (!os.IsNotExist(err)) {
  143. log.Println("文件被占用,请关闭打开的软件")
  144. if err := conn.WriteMessage(websocket.TextMessage, []byte("-2")); err != nil {
  145. return err
  146. }
  147. return err
  148. }
  149. defer tfile.Close()
  150. //正在下载标识,标识用于不监测更新改动
  151. gobalFileDownLoadingMap[fmt.Sprint(fileDir+"\\"+fileName)]=1
  152. //构建本地cmd执行 ipfs get
  153. progress := make(chan string,10000)
  154. var stdout, stderr []byte
  155. var errStdout, errStderr error
  156. cmd := exec.Command(ipfsPath,"get", hash,"-o",fmt.Sprint(fileDir+"\\"+fileName))
  157. stdoutIn, _ := cmd.StdoutPipe()
  158. stderrIn, _ := cmd.StderrPipe()
  159. cmd.Start()
  160. go func() {
  161. stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, progress)
  162. }()
  163. go func() {
  164. stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, progress)
  165. }()
  166. log.Println("资源连接中...")
  167. //异步定时读取进度反馈给前端,每500ms返回一次进度
  168. go func(){
  169. millSeconds := time.Now().UnixNano() / 1e6
  170. for content := range progress { // 通道关闭后会退出for range循环
  171. log.Println(">>>"+content)
  172. current :=time.Now().UnixNano() / 1e6
  173. if !downloading{
  174. log.Println("资源连接成功,下载中...")
  175. }
  176. downloading = true
  177. if current-millSeconds>500{
  178. projson,err := contentToJSONByte(content)
  179. if projson==nil && err==nil{
  180. continue
  181. }
  182. if err != nil {
  183. log.Printf("json.Marshal error %s\n", err)
  184. }
  185. //设置下载启动标识和下载时间戳
  186. millSeconds = current
  187. //反馈前端
  188. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  189. log.Println(err)
  190. break
  191. }
  192. }
  193. if strings.Index(content,"100.00%")!=-1{
  194. projson,err := contentToJSONByte(content)
  195. if projson==nil && err==nil{
  196. continue
  197. }
  198. if err != nil {
  199. log.Printf("json.Marshal error %s\n", err)
  200. }
  201. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  202. log.Println(err)
  203. }
  204. break
  205. }
  206. }
  207. }()
  208. //设置30秒连接超时,30秒未启动下载则下载失败
  209. go func() {
  210. index :=0
  211. for true{
  212. //启动下载则不做超时判断
  213. if downloading==true{
  214. return
  215. }
  216. index++
  217. time.Sleep(time.Duration(1)*time.Second)
  218. if downloading==false && index==30{
  219. err = cmd.Process.Kill()
  220. log.Println("资源连接超时(30s),下载进程被终止")
  221. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  222. return
  223. }
  224. return
  225. }
  226. }
  227. }()
  228. //等待下载执行完成
  229. err = cmd.Wait()
  230. if err != nil {
  231. log.Printf("cmd.Run() failed with %s\n", err)
  232. }
  233. if errStdout != nil || errStderr != nil {
  234. log.Printf("failed to capture stdout or stderr\n")
  235. }
  236. outStr := string(stdout)
  237. log.Printf("out:%s", outStr)
  238. //更新Etcd数据库的文件key对应hash值
  239. time.Sleep(200*time.Millisecond)
  240. key := gobalLoginUserName+"\\"+projectName+"\\"+nodeDir+"\\"+fileName
  241. err = etcdclient.ReplaceInto(key,hash+";0")
  242. if err != nil {
  243. log.Println(err)
  244. return err
  245. }
  246. //发送消息至文件变更订阅
  247. config.GobalWatchChannelMap[config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName] <- ";"
  248. //下载完成反馈
  249. log.Printf("叮,资源文件[ %v ]下载完成",fileName)
  250. defer time.Sleep(5*time.Second);gobalFileDownLoadingMap[fmt.Sprint(fileDir+"\\"+fileName)]=0
  251. defer close(progress)
  252. return nil
  253. }
  254. //读取解析cmd返回文件上传或下载进度信息
  255. func contentToJSONByte(content string) ([]byte,error){
  256. sts :=strings.Split(content," ")
  257. if len(sts)<8{
  258. log.Println("字符长度小于8")
  259. return nil,nil
  260. }
  261. var processFloat float64
  262. if (len(sts)==9 || len(sts)==8){
  263. processFloat,_ =strconv.ParseFloat(strings.Replace(sts[7],"%","",1), 64)
  264. }else{
  265. processFloat,_ =strconv.ParseFloat(strings.Replace(sts[8],"%","",1), 64)
  266. }
  267. if processFloat==0{
  268. //log.Println("当前进度0")
  269. return nil,nil
  270. }
  271. pro :=&processStruct{
  272. Size:sts[4],
  273. CurrentSize: sts[1],
  274. Unit: sts[2],
  275. CurrentUnit: sts[5],
  276. Process: processFloat,
  277. Hash: "",
  278. }
  279. projson,err :=json.Marshal(pro)
  280. return projson,err
  281. }
  282. /**
  283. 上传本地文件
  284. @param absolutePath 文件本地绝对路径
  285. @param fileName 文件名称
  286. @param projectName 项目名称
  287. @param dir 云文件目录
  288. @param currentHistoryHash 当前文件的历史版本管理文件hash
  289. @param note 备注
  290. @param creator 创建人
  291. @param milestone 是否事里程碑
  292. */
  293. func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir,currentHistoryHash,note,creator string,milestone bool) error{
  294. //本地文件目录
  295. fileDir := gobalLocalProjectDir+"\\"+dir
  296. //检查目录
  297. _,err := os.Stat(fileDir)
  298. if err != nil {
  299. //创建文件目录
  300. err = os.MkdirAll(fileDir, os.ModePerm)
  301. if err!=nil{
  302. return err
  303. }
  304. }
  305. //检测文件打开状态
  306. tfile,err := os.OpenFile(absolutePath,os.O_RDWR,1)
  307. if err != nil {
  308. log.Println("文件被占用,请关闭打开的软件")
  309. if err := conn.WriteMessage(websocket.TextMessage, []byte("-2")); err != nil {
  310. return err
  311. }
  312. return err
  313. }
  314. defer tfile.Close()
  315. serverSh := shell.NewShell(config.ServerIpfsUrl)
  316. //serverSh.SetTimeout(time.Duration(30)*time.Second)
  317. //log.Println("检测引导节点存活情况"+config.ServerIpfsUrl)
  318. //检测引导节点是否连接成功
  319. isUp := serverSh.IsUp()
  320. if !isUp {
  321. log.Println("备份节点网络连接不通!")
  322. if conn==nil{
  323. return errors.New("备份节点连失联")
  324. }else{
  325. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  326. return err
  327. }
  328. }
  329. return nil
  330. }
  331. //上传启动标识,有上传进度则设置为true
  332. var uploading bool=false
  333. //cmd执行ipfs add
  334. cmd := exec.Command(ipfsPath, "add",absolutePath)
  335. uploadProgress := make(chan string,10000)
  336. var stdout, stderr []byte
  337. var errStdout, errStderr error
  338. stdoutIn, _ := cmd.StdoutPipe()
  339. stderrIn, _ := cmd.StderrPipe()
  340. cmd.Start()
  341. go func() {
  342. stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, uploadProgress)
  343. }()
  344. go func() {
  345. stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, uploadProgress)
  346. }()
  347. //异步给前端反馈上传进度
  348. go func(){
  349. first := true
  350. millSeconds := time.Now().UnixNano() / 1e6
  351. current :=time.Now().UnixNano() / 1e6
  352. for content := range uploadProgress { // 通道关闭后会退出for range循环
  353. current =time.Now().UnixNano() / 1e6
  354. if first {
  355. projson,err := contentToJSONByte(content)
  356. if projson==nil && err==nil{
  357. continue
  358. }
  359. if err != nil {
  360. log.Println("json.Marshal error %s\n", err)
  361. }
  362. //设置上传启动标识为true
  363. uploading=true
  364. if conn!=nil{
  365. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  366. break
  367. }
  368. }
  369. millSeconds = current
  370. first=false
  371. }
  372. if current-millSeconds>500{
  373. projson,err := contentToJSONByte(content)
  374. if projson==nil && err==nil{
  375. continue
  376. }
  377. if err != nil {
  378. log.Println("json.Marshal error %s\n", err)
  379. }
  380. uploading=true
  381. if conn!=nil{
  382. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  383. break
  384. }
  385. }
  386. millSeconds = current
  387. }
  388. if (strings.Index(content,"90.00%")!=-1){
  389. projson,err := contentToJSONByte(content)
  390. if projson==nil && err==nil{
  391. continue
  392. }
  393. if err != nil {
  394. log.Println("json.Marshal error %s\n", err)
  395. }
  396. uploading=true
  397. if conn!=nil{
  398. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  399. break
  400. }
  401. }
  402. break
  403. }
  404. }
  405. }()
  406. log.Println("资源上传中...")
  407. //上传未启动超时时间30s
  408. go func() {
  409. index :=0
  410. for true{
  411. if uploading==true{
  412. return
  413. }
  414. index++
  415. time.Sleep(time.Duration(1)*time.Second)
  416. if uploading==false && index==30{
  417. err = cmd.Process.Kill()
  418. log.Println("资源连接超时(30s),上传进程被终止")
  419. if conn!=nil{
  420. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  421. return
  422. }
  423. }
  424. return
  425. }
  426. }
  427. }()
  428. //等待执行完成
  429. err = cmd.Wait()
  430. if err != nil {
  431. log.Println("cmd.Run() failed with %s\n", err)
  432. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  433. return err
  434. }
  435. return err
  436. }
  437. if errStdout != nil || errStderr != nil {
  438. log.Println("failed to capture stdout or stderr\n")
  439. }
  440. outStr := string(stdout)
  441. fileHash := strings.Split(outStr," ")[1]
  442. log.Printf("out:%s", outStr)
  443. defer close(uploadProgress)
  444. //ipfs provide
  445. cmd = exec.Command(ipfsPath,"dht","provide",fileHash)
  446. err = cmd.Run()
  447. if err != nil {
  448. log.Println(err)
  449. if conn!=nil{
  450. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  451. return err
  452. }
  453. }
  454. return err
  455. }
  456. //判断备份节点对等节点是否已添加连接
  457. sh := shell.NewShell(config.GobalIpfsUrl)
  458. idOut,err :=sh.ID()
  459. localId :=idOut.ID
  460. swarmConnInfos,err :=serverSh.SwarmPeers(context.Background())
  461. hasConnect := false
  462. for _,swarmconn := range swarmConnInfos.Peers {
  463. if swarmconn.Peer==localId{
  464. hasConnect=true
  465. break
  466. }
  467. }
  468. if !hasConnect{
  469. log.Println("中继处理")
  470. swarmConnectAddr :="/ipfs/"+"12D3KooWER8uoGrzeLuJHTXoTMnR4jjHfpcA6ZcpXBaQNJvG5jMP"+"/p2p-circuit/ipfs/"+localId
  471. errT := serverSh.SwarmConnect(context.Background(),swarmConnectAddr)
  472. if errT!=nil{
  473. log.Println("中继失败,引导节点备份失败")
  474. log.Println(err)
  475. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  476. return err
  477. }
  478. }
  479. }
  480. //后台备份
  481. go func() {
  482. err = serverSh.Pin(fileHash)
  483. if err!=nil{
  484. log.Printf("资源[ %v ]备份节点备份失败", fileName)
  485. log.Println(err)
  486. }
  487. log.Printf("资源[ %v ]备份节点备份成功", fileName)
  488. }()
  489. //文件不存在则进行本地文件夹拷贝
  490. if !fileExist(fmt.Sprint((fileDir+"\\"+fileName))) {
  491. //记录新增文件,新增文件不做post请求,前端自行post
  492. goabalAddFileMap[fileName] = 1
  493. err = sh.Get(fileHash,fmt.Sprint((fileDir+"\\"+fileName)))
  494. if err != nil {
  495. log.Println(err)
  496. return err
  497. }
  498. }
  499. //构建历史版本记录、写本地历史版本管理文件,上传至ipfs
  500. filenameall := path.Base(fileName)
  501. filesuffix := path.Ext(fileName)
  502. fileprefix := filenameall[0:len(filenameall) - len(filesuffix)]
  503. commitFilePath := fileDir+"\\"+fileprefix+".commit"
  504. commitVersion,commitHistoryHash,err := commitRecord(commitFilePath,currentHistoryHash,fileHash,note,creator,milestone)
  505. if err != nil {
  506. log.Printf("资源[ %v ]历史版本记录失败", fileName)
  507. log.Println(err)
  508. if conn!=nil{
  509. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  510. return err
  511. }
  512. }
  513. return err
  514. }
  515. //读取文件属性,构建100%进度对象
  516. objectStat,err :=sh.ObjectStat(fileHash)
  517. if err != nil {
  518. log.Println(err)
  519. return err
  520. }
  521. prog := new(processStruct)
  522. prog.Hash=fileHash
  523. prog.Process=100.00
  524. prog.Size=strconv.Itoa(objectStat.CumulativeSize)
  525. prog.CommitHistoryHash=commitHistoryHash
  526. prog.Version=commitVersion
  527. projson,err :=json.Marshal(prog)
  528. if conn!=nil{
  529. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  530. log.Println(err)
  531. return err
  532. }
  533. }
  534. //更新Etcd数据库hash
  535. key := gobalLoginUserName+"\\"+projectName+"\\"+dir+"\\"+fileName
  536. err = etcdclient.ReplaceInto(key,prog.Hash+";0")
  537. if err != nil {
  538. log.Println(err)
  539. return err
  540. }
  541. //自动协同逻辑
  542. //if conn==nil{
  543. // folderName := strings.Split(dir,"\\")[0]
  544. // var relativePath string
  545. // if len(strings.Split(dir, "\\"))==1{
  546. // relativePath = ""
  547. // }else{
  548. // relativePath = strings.Replace(relativePath, folderName+"\\","",1)
  549. // }
  550. // size,_ := strconv.ParseInt(prog.Size,10,64)
  551. // err = postUpdateFile(projectName,folderName,relativePath,fileHash, fileName, commitHistoryHash, currentHistoryHash, gobalLoginUserId, size, commitVersion)
  552. // if err!=nil{
  553. // return err
  554. // }
  555. //}
  556. //发送文件至文件变更订阅
  557. config.GobalWatchChannelMap[config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName] <- ";"
  558. log.Printf("叮,资源文件[ %v ]上传完成",fileName)
  559. return nil
  560. }
  561. //文件信息入参
  562. type FileParam struct {
  563. Digest string `json:"digest"` //md5(ProjectName|FolderName|RelativePath|FileName|FileVersion|UserName)
  564. ProjectName string `json:"projectName"`
  565. FolderName string `json:"folderName"`
  566. RelativePath string `json:"relativePath"`
  567. IpfsCid string `json:"ipfsCid"`
  568. FileName string `json:"fileName"`
  569. FileSize int64 `json:"fileSize,string"`
  570. FileVersion int `json:"fileVersion"`
  571. UserId int64 `json:"userId,string"`
  572. HistoryCurrentIpfsCid string `json:"historyCurrentIpfsCid"`
  573. HistoryPreIpfsCid string `json:"historyPreIpfsCid"`
  574. }
  575. //获取文件的历史版本管理文件最新hash值
  576. func postGetHistoryCurrentHash(projectName, folderName, fileName, relativePath string)(string,error){
  577. url:=config.ServerUrl+"/api/pms/sdk/queryFileHistoryCurrentCid"
  578. contentType := "application/json"
  579. fileParam :=FileParam{
  580. ProjectName: projectName,
  581. FolderName: folderName,
  582. RelativePath: relativePath,
  583. FileName: fileName,
  584. IpfsCid: "",
  585. FileSize: 0,
  586. FileVersion: 0,
  587. UserId: 0,
  588. HistoryPreIpfsCid: "",
  589. HistoryCurrentIpfsCid: "",
  590. }
  591. text:=fmt.Sprintf("%v|%v|%v|%v|%v|%v|%v|%v|%v|%v",projectName,folderName,relativePath,fileParam.IpfsCid,fileName,fileParam.FileSize,fileParam.FileVersion,fileParam.UserId,fileParam.HistoryCurrentIpfsCid,fileParam.HistoryPreIpfsCid)
  592. textByte := []byte(text)
  593. md5Byte := md5.Sum(textByte)
  594. digest := fmt.Sprintf("%x", md5Byte)
  595. fileParam.Digest=digest
  596. jsonData,err :=json.Marshal(fileParam)
  597. if err!=nil{
  598. log.Printf("json序列化化错误!")
  599. return "",err
  600. }
  601. //log.Print(string(jsonData[:]))
  602. resp, err := http.Post(url, contentType, bytes.NewReader(jsonData))
  603. if err != nil {
  604. fmt.Println("post failed, err:%v\n", err)
  605. return "",err
  606. }
  607. defer resp.Body.Close()
  608. b, err := ioutil.ReadAll(resp.Body)
  609. if err != nil {
  610. fmt.Println("get resp failed,err:%v\n", err)
  611. return "",err
  612. }
  613. //log.Printf("post response:%v", string(b))
  614. result := make(map[string] interface{})
  615. err=json.Unmarshal(b,&result)
  616. if err!=nil{
  617. log.Printf("字符串%v反序列化出错", string(b[:]))
  618. }
  619. if result["Msg"].(string)==""{
  620. log.Printf("资源[ %v ]历史版本管理文件Hash:%v",fileName,result["Data"].(string))
  621. return result["Data"].(string),nil
  622. }
  623. return "",errors.New(result["Msg"].(string))
  624. }
  625. //更新文件记录
  626. func postUpdateFile(projectName, folderName, relativePath, ipfsCid, fileName, historyCurrentIpfsCid, historyPreIpfsCid,userId string, fileSize int64, fileVersion int)(err error){
  627. url:=config.ServerUrl+"/api/pms/sdk/updateFile"
  628. contentType := "application/json"
  629. intUserId,_:=strconv.ParseInt(userId,10,64)
  630. fileParam :=FileParam{
  631. ProjectName: projectName,
  632. FolderName: folderName,
  633. RelativePath: relativePath,
  634. IpfsCid: ipfsCid,
  635. FileName: fileName,
  636. FileSize: fileSize,
  637. FileVersion: fileVersion,
  638. UserId: intUserId,
  639. HistoryCurrentIpfsCid: historyCurrentIpfsCid,
  640. HistoryPreIpfsCid: historyPreIpfsCid,
  641. }
  642. text:=fmt.Sprintf("%v|%v|%v|%v|%v|%v|%v|%v|%v|%v",projectName,folderName,relativePath,ipfsCid,fileName,fileSize,fileVersion,userId,historyCurrentIpfsCid,historyPreIpfsCid)
  643. textByte := []byte(text)
  644. md5Byte := md5.Sum(textByte)
  645. digest := fmt.Sprintf("%x", md5Byte)
  646. fileParam.Digest=digest
  647. jsonData,err :=json.Marshal(fileParam)
  648. if err!=nil{
  649. log.Printf("json序列化化错误!")
  650. return err
  651. }
  652. resp, err := http.Post(url, contentType, bytes.NewReader(jsonData))
  653. if err != nil {
  654. fmt.Println("post failed, err:%v\n", err)
  655. return err
  656. }
  657. defer resp.Body.Close()
  658. b, err := ioutil.ReadAll(resp.Body)
  659. if err != nil {
  660. fmt.Println("get resp failed,err:%v\n", err)
  661. return err
  662. }
  663. //log.Printf("post response:%v", string(b))
  664. result := make(map[string] interface{})
  665. err=json.Unmarshal(b,&result)
  666. if err!=nil{
  667. log.Printf("字符串%v反序列化出错", string(b[:]))
  668. return err
  669. }
  670. log.Printf("资源[ %v ]服务记录更新成功",fileName)
  671. return nil
  672. }
  673. /**
  674. 记录提交记录
  675. */
  676. func commitRecord(path,currentHistoryHash,hash,note,creator string, milestone bool) (int,string,error){
  677. commitHistory := new(commitHistory)
  678. //历史文件不存在则创建
  679. localSh :=shell.NewShell(config.GobalIpfsUrl)
  680. localSh.SetTimeout(10*time.Second)
  681. if len(currentHistoryHash)!=0 {
  682. os.Remove(path)
  683. err := localSh.Get(currentHistoryHash,path)
  684. if err != nil {
  685. log.Println("历史版本管理文件下载失败")
  686. return -1,"",err
  687. }
  688. }
  689. //初始化历史管理文件
  690. exist := fileExist(path)
  691. if !exist {
  692. commitFile,err := os.Create(path)
  693. if err != nil {
  694. log.Println("历史版本管理文件创建失败")
  695. return -1,"",err
  696. }
  697. commitFile.Close()
  698. }
  699. //设置文件隐藏属性
  700. attribCmd :=exec.Command("attrib","+h",path)
  701. err :=attribCmd.Run()
  702. if err != nil {
  703. log.Println("设置文件隐藏属性失败")
  704. return -1,"",err
  705. }
  706. //读取历史管理文件
  707. content ,err :=ioutil.ReadFile(path)
  708. if len(content)!=0{
  709. rows :=strings.Split(string(content),"\n")
  710. endRow :=rows[len(rows)-2]
  711. columns :=strings.Split(endRow,"\t")
  712. commitHistory.Version,_=strconv.Atoi(columns[6])
  713. commitHistory.Version++
  714. commitHistory.ParentHash=columns[1]
  715. }else{
  716. commitHistory.Version=1
  717. commitHistory.ParentHash="0000000000000000000000000000000000000000"
  718. }
  719. commitHistory.CurrentHash=hash
  720. commitHistory.Milestone = milestone
  721. commitHistory.Creator = creator
  722. commitHistory.Note = note
  723. commitHistory.CreateTime=time.Now().Unix()
  724. if commitHistory.ParentHash==commitHistory.CurrentHash{
  725. if commitHistory.Version>1{
  726. commitHistory.Version--
  727. }
  728. return commitHistory.Version,currentHistoryHash,nil
  729. }
  730. file,err :=os.OpenFile(path,os.O_APPEND,0666)
  731. if err != nil{
  732. log.Println(err)
  733. return -1,"",err
  734. }
  735. //写入历史管理文件
  736. w :=bufio.NewWriter(file)
  737. writeContent:=fmt.Sprintf("%v\t%v\t%v\t%v\t%v\t%v\t%v\t",commitHistory.ParentHash,commitHistory.CurrentHash,commitHistory.Creator,commitHistory.CreateTime,commitHistory.Note,commitHistory.Milestone,commitHistory.Version)
  738. fmt.Fprintln(w,writeContent)
  739. err = w.Flush()
  740. if err != nil {
  741. log.Println("历史版本管理文件写入失败")
  742. return -1,"",err
  743. }
  744. file.Close()
  745. addFile,err :=os.Open(path)
  746. if err != nil{
  747. log.Println(err)
  748. return -1,"",err
  749. }
  750. defer addFile.Close()
  751. //add 历史管理文件
  752. historyHash,err:=localSh.Add(addFile)
  753. if err != nil {
  754. log.Println("历史版本管理文件上传失败")
  755. return -1,"",err
  756. }
  757. serverSh :=shell.NewShell(config.ServerIpfsUrl)
  758. serverSh.SetTimeout(30*time.Second)
  759. err = serverSh.Pin(historyHash)
  760. if err != nil {
  761. log.Println("历史版本管理文件备份失败")
  762. return -1,"",err
  763. }
  764. return commitHistory.Version,historyHash,nil
  765. }
  766. func copyAndCapture(w io.Writer, r io.Reader, progress chan string) ([]byte, error) {
  767. var out []byte
  768. buf := make([]byte, 1024, 1024)
  769. for {
  770. n, err := r.Read(buf[:])
  771. if n > 0 {
  772. d := buf[:n]
  773. out = append(out, d...)
  774. progress <- string(d)
  775. }
  776. if err != nil {
  777. // Read returns io.EOF at the end of file, which is not an error for us
  778. if err == io.EOF {
  779. err = nil
  780. }
  781. return out, err
  782. }
  783. }
  784. // never reached
  785. panic(true)
  786. return nil, nil
  787. }
  788. /**
  789. 单个文件信息
  790. */
  791. type simpleFileInfo struct {
  792. Name string `json:"name" `
  793. Extension string `json:"extension"`
  794. RelativePath string `json:"relativePath"`
  795. AbsolutePath string `json:"absolutePath"`
  796. }
  797. var gobalFolderFileMap map[string] *simpleFileInfo
  798. var gobalRelativePath string
  799. /**
  800. 获取指定目录或文件的文件信息,如果是目录递归获取文件信息
  801. @param id 文件id
  802. */
  803. func GetFolderFileInfo(conn *websocket.Conn,absolutePath string) error{
  804. gobalFolderFileMap = make(map[string] *simpleFileInfo)
  805. fileInfo,err :=os.Stat(absolutePath)
  806. if err!=nil{
  807. log.Println(err)
  808. return err
  809. }
  810. log.Println(filepath.Dir(absolutePath))
  811. //单个文件处理
  812. if !fileInfo.IsDir() {
  813. simpleFileInfo := new(simpleFileInfo)
  814. simpleFileInfo.Name=fileInfo.Name()
  815. simpleFileInfo.Extension=path.Ext(absolutePath)
  816. simpleFileInfo.RelativePath=""
  817. simpleFileInfo.AbsolutePath=absolutePath
  818. gobalFolderFileMap[absolutePath]=simpleFileInfo
  819. bytes,err :=json.Marshal(gobalFolderFileMap)
  820. if err != nil {
  821. log.Println(err)
  822. return err
  823. }
  824. if err := conn.WriteMessage(websocket.TextMessage, bytes); err != nil {
  825. log.Println(err)
  826. return err
  827. }
  828. return nil
  829. }
  830. //文件目录处理
  831. gobalRelativePath = filepath.Dir(absolutePath)
  832. err =filepath.Walk(absolutePath, myWalkfunc)
  833. if err != nil {
  834. log.Println(err)
  835. return err
  836. }
  837. bytes,err :=json.Marshal(gobalFolderFileMap)
  838. if err != nil {
  839. log.Println(err)
  840. return err
  841. }
  842. if err := conn.WriteMessage(websocket.TextMessage, bytes); err != nil {
  843. log.Println(err)
  844. return err
  845. }
  846. return nil
  847. }
  848. func myWalkfunc(path string, info os.FileInfo, err error) error {
  849. if info.IsDir()==false{
  850. simpleFileInfo := new(simpleFileInfo)
  851. simpleFileInfo.Name=info.Name()
  852. simpleFileInfo.Extension=filepath.Ext(path)
  853. simpleFileInfo.RelativePath=filepath.Dir(strings.Replace(path,gobalRelativePath,"",1))
  854. simpleFileInfo.AbsolutePath=path
  855. gobalFolderFileMap[path]=simpleFileInfo
  856. return nil
  857. }
  858. return nil
  859. }
  860. /**
  861. 本地文件是否存在
  862. */
  863. func fileExist(path string) bool {
  864. _, err := os.Lstat(path)
  865. return !os.IsNotExist(err)
  866. }
  867. /**
  868. 获取本地文件列表
  869. */
  870. func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{
  871. projectPath := gobalLocalProjectDir
  872. //log.Println("切换文件列表:"+projectPath)
  873. keyPrefix := gobalLoginUserName+"\\"+projectName+"\\"
  874. //添加监控
  875. err := filepath.Walk(projectPath,watchWalkfunc)
  876. if err != nil {
  877. log.Println(err)
  878. return err
  879. }
  880. //初始化通道
  881. if config.GobalWatchChannelMap[projectPath] != nil {
  882. close(config.GobalWatchChannelMap[projectPath])
  883. }
  884. config.GobalWatchChannelMap[projectPath]=make(chan string,100)
  885. log.Println("添加文件监控:"+projectPath)
  886. //定期校验缓存的本地文件状态
  887. dataMapa,err := etcdclient.QueryWithPrefix(keyPrefix)
  888. if err != nil {
  889. log.Println(err)
  890. }
  891. if dataMapa!=nil && len(dataMapa)>0{
  892. for k,_ := range dataMapa {
  893. if !fileExist(config.LocalWorkSpaceDir+k){
  894. err = etcdclient.DeleteWithPrefix(k)
  895. if err != nil {
  896. log.Println(err)
  897. }
  898. }
  899. }
  900. }
  901. //优先etcd查询
  902. dataMap,err := etcdclient.QueryWithPrefix(keyPrefix)
  903. if err != nil {
  904. log.Println(err)
  905. return err
  906. }
  907. if dataMap==nil || len(dataMap)==0{
  908. // 不存在则初始化进etcd
  909. err =filepath.Walk(gobalLocalProjectDir,walkfunc)
  910. //路径错误
  911. if err != nil {
  912. log.Println(err)
  913. if err := conn.WriteMessage(websocket.TextMessage, []byte("{}")); err != nil {
  914. log.Println(err)
  915. return err
  916. }
  917. }
  918. mapByte,err:=json.Marshal(gobalFileMap)
  919. if err != nil {
  920. log.Println(err)
  921. return err
  922. }
  923. if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil {
  924. log.Println(err)
  925. return err
  926. }
  927. cacheMap := make(map[string] string)
  928. for k,v := range gobalFileMap {
  929. k := strings.Replace(k,config.LocalWorkSpaceDir,"",1)
  930. cacheMap[k]=v
  931. }
  932. //异步缓存
  933. //go func() {
  934. err = etcdclient.BatchAdd(cacheMap)
  935. if err != nil {
  936. log.Println(err)
  937. }
  938. //}()
  939. //log.Println("检查本地目录文件变更状态,执行睡眠时间1分钟")
  940. //清空gobalFileMap
  941. gobalFileMap = make(map[string] string)
  942. }
  943. err=sendFileListFromEtcd(keyPrefix,projectName,conn)
  944. if err != nil {
  945. log.Println(err)
  946. return err
  947. }
  948. for actionAndModifyFilePathStr :=range config.GobalWatchChannelMap[gobalLocalProjectDir] {
  949. //log.Println(actionAndModifyFilePathStr)
  950. actionAndModifyFilePath := strings.Split(actionAndModifyFilePathStr,";")
  951. queryKey := strings.Replace(actionAndModifyFilePath[1],config.LocalWorkSpaceDir,"",1)
  952. //当前登陆用户判断
  953. if gobalLoginUserName != strings.Split(queryKey,"\\")[0] && actionAndModifyFilePathStr!=";"{
  954. log.Printf("非法用户修改%v", actionAndModifyFilePathStr)
  955. continue
  956. }
  957. if actionAndModifyFilePath[0]=="remove"{
  958. queryMap,err :=etcdclient.QueryWithPrefix(queryKey)
  959. if len(queryMap)==0{
  960. continue
  961. }
  962. err = etcdclient.DeleteWithPrefix(queryKey)
  963. if err != nil {
  964. log.Println(err)
  965. }
  966. }else if actionAndModifyFilePath[0]=="write"{
  967. querymap,err := etcdclient.QueryWithPrefix(queryKey)
  968. if err != nil {
  969. log.Println(err)
  970. continue
  971. }
  972. if len(querymap)==0{
  973. continue
  974. }
  975. //更新判断
  976. if gobalFileDownLoadingMap[actionAndModifyFilePath[1]]==1{
  977. continue
  978. }
  979. oldValue := strings.Split(querymap[queryKey],";")
  980. newValue := oldValue[0]+";" +"1"
  981. err = etcdclient.ReplaceInto(queryKey,newValue)
  982. if err!=nil{
  983. log.Println(err)
  984. continue
  985. }
  986. log.Printf("文件变更 [ %v ] write", actionAndModifyFilePathStr)
  987. //保存即同步逻辑,如果非新增文件则自动post
  988. //获取文件的历史版本管理文件hash
  989. //filePath := actionAndModifyFilePath[1]
  990. //fileName :=filepath.Base(filePath)
  991. //folderName := strings.Split(queryKey,"\\")[2]
  992. //dir := strings.Replace(strings.Replace(queryKey,"\\"+fileName,"",1),strings.Split(queryKey,"\\")[0]+"\\"+strings.Split(queryKey,"\\")[1]+"\\","",1)
  993. //var relativePath string
  994. //if len(strings.Split(dir, "\\"))==1{
  995. // relativePath = ""
  996. //}else{
  997. // relativePath = strings.Replace(relativePath, folderName+"\\","",1)
  998. //}
  999. //historyCurrentHash,err :=postGetHistoryCurrentHash(projectName, folderName, fileName, relativePath)
  1000. //if err!=nil{
  1001. // GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName)
  1002. // log.Printf("postGetHistoryCurrentHash 返回失败,%v",err.Error())
  1003. // continue
  1004. //}
  1005. //
  1006. ////自动更新文件
  1007. //err =UploadCommand(nil,filePath,fileName,projectName,dir,historyCurrentHash,"",gobalLoginUserName,false)
  1008. //if err!=nil{
  1009. // GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName)
  1010. // log.Printf("UploadCommand 返回失败,%v",err.Error())
  1011. // //记录修改状态
  1012. // newValue := oldValue[0]+";" +"1"
  1013. // err = etcdclient.ReplaceInto(queryKey,newValue)
  1014. // if err!=nil{
  1015. // log.Println(err)
  1016. // continue
  1017. // }
  1018. // continue
  1019. //}
  1020. //GobalMessageNotify <- fmt.Sprintf("文件更新成功\n%v",fileName)
  1021. //continue
  1022. }else if actionAndModifyFilePath[0]=="create"{
  1023. querymap,err := etcdclient.QueryWithPrefix(queryKey)
  1024. if err != nil {
  1025. log.Println(err)
  1026. continue
  1027. }
  1028. if len(querymap)==0{
  1029. continue
  1030. }
  1031. //更新判断
  1032. if gobalFileDownLoadingMap[actionAndModifyFilePath[1]]==1{
  1033. continue
  1034. }
  1035. oldValue := strings.Split(querymap[queryKey],";")
  1036. newValue := oldValue[0]+";" +"1"
  1037. err = etcdclient.ReplaceInto(queryKey,newValue)
  1038. if err!=nil{
  1039. log.Println(err)
  1040. continue
  1041. }
  1042. log.Printf("文件变更 [ %v ] create", actionAndModifyFilePathStr)
  1043. //如果非新增文件则自动post
  1044. //if goabalAddFileMap[]
  1045. //获取文件的历史版本管理文件hash
  1046. //filePath := actionAndModifyFilePath[1]
  1047. //fileName :=filepath.Base(filePath)
  1048. //folderName := strings.Split(queryKey,"\\")[2]
  1049. //dir := strings.Replace(strings.Replace(queryKey,"\\"+fileName,"",1),strings.Split(queryKey,"\\")[0]+"\\"+strings.Split(queryKey,"\\")[1]+"\\","",1)
  1050. //var relativePath string
  1051. //if len(strings.Split(dir, "\\"))==1{
  1052. // relativePath = ""
  1053. //}else{
  1054. // relativePath = strings.Replace(relativePath, folderName+"\\","",1)
  1055. //}
  1056. //historyCurrentHash,err :=postGetHistoryCurrentHash(projectName, folderName, fileName, relativePath)
  1057. //if err!=nil{
  1058. // GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName)
  1059. // log.Printf("postGetHistoryCurrentHash 返回失败,%v",err.Error())
  1060. // continue
  1061. //}
  1062. //
  1063. ////自动更新文件
  1064. //err =UploadCommand(nil,filePath,fileName,projectName,dir,historyCurrentHash,"",gobalLoginUserName,false)
  1065. //if err!=nil{
  1066. // GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName)
  1067. // log.Printf("UploadCommand 返回失败,%v",err.Error())
  1068. // //记录修改状态
  1069. // newValue := oldValue[0]+";" +"1"
  1070. // err = etcdclient.ReplaceInto(queryKey,newValue)
  1071. // if err!=nil{
  1072. // log.Println(err)
  1073. // continue
  1074. // }
  1075. // continue
  1076. //}
  1077. //GobalMessageNotify <- fmt.Sprintf("文件更新成功\n%v",fileName)
  1078. //continue
  1079. }
  1080. err = sendFileListFromEtcd(keyPrefix,projectName,conn)
  1081. if err != nil {
  1082. log.Println(err)
  1083. return err
  1084. }
  1085. }
  1086. return nil
  1087. }
  1088. func sendFileListFromEtcd(keyPrefix,projectName string,conn *websocket.Conn) error{
  1089. dataMap,err := etcdclient.QueryWithPrefix(keyPrefix)
  1090. if err != nil {
  1091. log.Println(err)
  1092. return err
  1093. }
  1094. if dataMap!=nil && len(dataMap)>0{
  1095. cacheMap := make(map[string] string)
  1096. for k,v := range dataMap {
  1097. //历史数据加默认值
  1098. if len(strings.Split(v, ";"))==1{
  1099. v=v+";0"
  1100. }
  1101. cacheMap[strings.Replace(k,gobalLoginUserName+"\\"+projectName+"\\","",1)]=v
  1102. }
  1103. mapByte,err:=json.Marshal(cacheMap)
  1104. if err != nil {
  1105. log.Println(err)
  1106. return err
  1107. }
  1108. if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil {
  1109. log.Println(err)
  1110. return err
  1111. }
  1112. return nil
  1113. }else{
  1114. log.Println("未查询到数据,keyPrefix:"+keyPrefix+",projectName:"+projectName)
  1115. }
  1116. return nil
  1117. }
  1118. /**
  1119. 打开方式
  1120. */
  1121. func OpenFileWith(filePath string) error{
  1122. //判断文件有效性
  1123. _,err := os.Stat(filePath)
  1124. if err!=nil{
  1125. return err
  1126. }
  1127. //filePath = strings.Replace(filePath," ","~1",1)
  1128. cmd := exec.Command("rundll32.exe","shell32.dll,OpenAs_RunDLL",filePath);
  1129. err =cmd.Run()
  1130. if err!=nil{
  1131. log.Println(err)
  1132. return err
  1133. }
  1134. return nil
  1135. }
  1136. /**
  1137. 手动检查软件更新
  1138. 0:不强制更新
  1139. 1:强制更新
  1140. */
  1141. func CheckForUpdates(forceUpdate string) error{
  1142. tszdir :=os.Getenv("TSZDIR")
  1143. //空格路径处理
  1144. ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\stop.vbs"
  1145. //判断文件有效性
  1146. _,err := os.Stat(tszdir+config.UpdaterName)
  1147. if err!=nil{
  1148. return err
  1149. }
  1150. cmd := exec.Command(tszdir+config.UpdaterName,"/justcheck");
  1151. err =cmd.Run()
  1152. if err!=nil{
  1153. log.Println(err)
  1154. return err
  1155. }
  1156. cmd = exec.Command(tszdir+config.UpdaterName,"/checknow");
  1157. err =cmd.Run()
  1158. if err!=nil{
  1159. log.Println(err)
  1160. return err
  1161. }
  1162. //检测到更新 kill所有客户端进程
  1163. log.Println("close all process")
  1164. cmd = exec.Command("cmd.exe","/c",ipfsPath);
  1165. err =cmd.Run()
  1166. if err!=nil{
  1167. log.Println(err)
  1168. return err
  1169. }
  1170. return nil
  1171. }
  1172. func walkfunc(filePath string, info os.FileInfo, err error) error {
  1173. if info == nil{
  1174. return nil
  1175. }
  1176. if info.IsDir()==false{
  1177. //历史文件不扫描
  1178. if path.Ext(filePath)==".commit" {
  1179. return nil
  1180. }
  1181. sh := shell.NewShell(config.GobalIpfsUrl)
  1182. file,err :=os.Open(filePath)
  1183. if err != nil{
  1184. log.Println(err)
  1185. return err
  1186. }
  1187. defer file.Close()
  1188. hash,err :=sh.Add(file)
  1189. if err != nil {
  1190. log.Println(err)
  1191. return err
  1192. }
  1193. dir :=strings.Replace(fmt.Sprint(filePath),fmt.Sprint(gobalLocalProjectDir+"\\"),"",1)
  1194. gobalFileMap[dir]=hash
  1195. }
  1196. return nil
  1197. }
  1198. /**
  1199. 查询历史文件
  1200. path 文件路径
  1201. hash 历史版本文件hash
  1202. */
  1203. func QueryCommitHistory(filePath,hash string) (map[int] *commitHistory,error){
  1204. result := make(map[int] *commitHistory)
  1205. //校验文件路径
  1206. _,err :=os.Stat(filePath)
  1207. if err != nil {
  1208. log.Println("文件 "+filePath+"not found")
  1209. return nil,errors.New("参数错误!")
  1210. }
  1211. if len(hash) == 0 {
  1212. return result,nil
  1213. }
  1214. //根据hash更新文件
  1215. localSh := shell.NewShell(config.GobalIpfsUrl)
  1216. //连接失败判断
  1217. localSh.SetTimeout(5*time.Second)
  1218. ext :=path.Ext(filePath)
  1219. commitFilePath :=strings.Replace(filePath,ext,".commit",1)
  1220. os.Remove(commitFilePath)
  1221. err = localSh.Get(hash,commitFilePath)
  1222. if err!=nil {
  1223. log.Println("文件"+hash+"下载失败")
  1224. return result,errors.New("历史文件获取失败,请稍后重试")
  1225. }
  1226. //设置文件隐藏属性
  1227. attribCmd :=exec.Command("attrib","+h",commitFilePath)
  1228. err =attribCmd.Run()
  1229. if err != nil {
  1230. log.Println("设置文件隐藏属性失败")
  1231. return result,err
  1232. }
  1233. //解析历史版本文件
  1234. contentByte,err := ioutil.ReadFile(commitFilePath)
  1235. content := string(contentByte)
  1236. if content==""{
  1237. return result,nil
  1238. }
  1239. rows :=strings.Split(content,"\n")
  1240. length := len(rows)
  1241. var index int = 0
  1242. for i:=length-2;i>=0;i--{
  1243. columns := strings.Split(rows[i],"\t")
  1244. commitHistoryInstance := new(commitHistory)
  1245. commitHistoryInstance.ParentHash =columns[0]
  1246. commitHistoryInstance.CurrentHash=columns[1]
  1247. commitHistoryInstance.Version,_=strconv.Atoi(columns[6])
  1248. commitHistoryInstance.Milestone,_=strconv.ParseBool(columns[5])
  1249. commitHistoryInstance.Creator=columns[2]
  1250. commitHistoryInstance.CreateTime,_=strconv.ParseInt(columns[3], 10, 64)
  1251. commitHistoryInstance.Note = columns[4]
  1252. result[index]=commitHistoryInstance
  1253. index++
  1254. }
  1255. return result,nil
  1256. }
  1257. /**
  1258. 设定某个历史版本为里程碑版本
  1259. @param filePath 文件绝对路径
  1260. @param commitHistoryHash 历史版本管理文件hash
  1261. @param hash 文件hash
  1262. @param milestone 是否是里程碑
  1263. */
  1264. func EditCommitHistoryMilestoneHandler(filePath,commitHistoryHash,hash string,milestone bool) (string,error){
  1265. //result := make(map[int] *commitHistory)
  1266. //校验文件路径
  1267. _,err :=os.Stat(filePath)
  1268. if err != nil {
  1269. log.Println("文件 "+filePath+"not found")
  1270. return "",errors.New("参数错误!")
  1271. }
  1272. if len(commitHistoryHash) == 0 {
  1273. log.Println("参数hash must not empty")
  1274. return "",errors.New("参数错误!")
  1275. }
  1276. //根据hash更新文件
  1277. localSh := shell.NewShell(config.GobalIpfsUrl)
  1278. //连接失败判断
  1279. localSh.SetTimeout(5*time.Second)
  1280. ext :=path.Ext(filePath)
  1281. commitFilePath :=strings.Replace(filePath,ext,".commit",1)
  1282. os.Remove(commitFilePath)
  1283. err = localSh.Get(commitHistoryHash,commitFilePath)
  1284. if err!=nil {
  1285. log.Println("文件"+commitHistoryHash+"下载失败")
  1286. return "",errors.New("历史文件获取失败,请稍后重试")
  1287. }
  1288. //设置文件隐藏属性
  1289. attribCmd :=exec.Command("attrib","+h",commitFilePath)
  1290. err =attribCmd.Run()
  1291. if err != nil {
  1292. log.Println("设置文件隐藏属性失败")
  1293. return "",err
  1294. }
  1295. //解析历史版本文件
  1296. contentByte,err := ioutil.ReadFile(commitFilePath)
  1297. content := string(contentByte)
  1298. if content==""{
  1299. return "",nil
  1300. }
  1301. rows :=strings.Split(content,"\n")
  1302. length := len(rows)
  1303. resultString :=""
  1304. for i:=0;i<length-1;i++{
  1305. columns := strings.Split(rows[i],"\t")
  1306. if columns[1]==hash{
  1307. resultString=resultString+fmt.Sprintf("%v\t%v\t%v\t%v\t%v\t%v\t%v\t\n",columns[0],columns[1],columns[2],columns[3],columns[4],milestone,columns[6])
  1308. continue
  1309. }
  1310. resultString=resultString+fmt.Sprintf("%v\t%v\t%v\t%v\t%v\t%v\t%v\t\n",columns[0],columns[1],columns[2],columns[3],columns[4],columns[5],columns[6])
  1311. }
  1312. os.Remove(commitFilePath)
  1313. fw, err := os.OpenFile(commitFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)//os.O_TRUNC清空文件重新写入,否则原文件内容可能残留
  1314. w := bufio.NewWriter(fw)
  1315. w.WriteString(resultString)
  1316. if err != nil {
  1317. log.Println(err)
  1318. return "",err
  1319. }
  1320. w.Flush()
  1321. fw.Close()
  1322. addFile,err :=os.Open(commitFilePath)
  1323. if err != nil{
  1324. log.Println(err)
  1325. return "",err
  1326. }
  1327. defer addFile.Close()
  1328. //add 历史管理文件
  1329. historyHash,err:=localSh.Add(addFile)
  1330. if err != nil {
  1331. log.Println("历史版本管理文件上传失败")
  1332. return "",err
  1333. }
  1334. serverSh :=shell.NewShell(config.ServerIpfsUrl)
  1335. serverSh.SetTimeout(5*time.Second)
  1336. err = serverSh.Pin(historyHash)
  1337. if err != nil {
  1338. log.Println("历史版本管理文件备份失败")
  1339. return "",err
  1340. }
  1341. return historyHash,nil
  1342. }
  1343. /*
  1344. 提交历史
  1345. */
  1346. type commitHistory struct {
  1347. ParentHash string `json:"parentHash"`
  1348. CurrentHash string `json:"currentHash"`
  1349. Creator string `json:"creator"`
  1350. CreateTime int64 `json:"createTime"`
  1351. Note string `json:"note"`
  1352. Version int `json:"version"`
  1353. Milestone bool `json:"milestone"`
  1354. }
  1355. /**
  1356. 消息通知
  1357. @param userId 用户ID
  1358. */
  1359. func MessageNotify(conn *websocket.Conn, userId string) (error){
  1360. msgKey :=fmt.Sprintf("lockingMsg\\%v",userId)
  1361. //返回全量通知消息列表
  1362. err :=queryEtcdToWebSocket(conn, msgKey)
  1363. if err!=nil{
  1364. log.Println(err)
  1365. return err
  1366. }
  1367. //消费通知消息到本地
  1368. nsqclient.Consumers(config.NsqTopic,(config.NsqChanelPrefix+userId),config.NsqAddr)
  1369. for message := range nsqclient.MsgQueue {
  1370. messOb :=nsqclient.LockingMsg{}
  1371. err:=json.Unmarshal([]byte(message),&messOb)
  1372. if err!=nil{
  1373. log.Println(err)
  1374. continue
  1375. }
  1376. for _, acceptUserId := range messOb.UserIds {
  1377. //log.Println(strconv.FormatInt(acceptUserId,10)+">>>"+userId)
  1378. if strconv.FormatInt(acceptUserId,10)==userId{
  1379. messagekey := msgKey+"\\"+strconv.FormatInt(messOb.Id,10)
  1380. err =etcdclient.ReplaceInto(messagekey,message)
  1381. if err!=nil{
  1382. nsqclient.MsgQueue <- message
  1383. log.Println(err)
  1384. }
  1385. err = queryEtcdToWebSocket(conn, messagekey)
  1386. if err!=nil{
  1387. nsqclient.MsgQueue <- message
  1388. log.Println(err)
  1389. }
  1390. }
  1391. }
  1392. }
  1393. return nil
  1394. }
  1395. /**
  1396. 消息通知
  1397. @param userId 用户ID
  1398. */
  1399. func MessageMarkReadHandler(conn *websocket.Conn, userId,messageId string) (err error){
  1400. msgKey :=fmt.Sprintf("lockingMsg\\%v\\%v",userId,messageId)
  1401. err = etcdclient.DeleteWithPrefix(msgKey)
  1402. return err
  1403. }
  1404. /**
  1405. 查询etcd对应key值发送给前端
  1406. */
  1407. func queryEtcdToWebSocket(conn *websocket.Conn,etcdKeyPrefix string) (err error){
  1408. msgs,err := etcdclient.QueryWithPrefix(etcdKeyPrefix)
  1409. if err!=nil{
  1410. log.Println(err)
  1411. return err
  1412. }
  1413. mapByte,err :=json.Marshal(msgs)
  1414. if err!=nil{
  1415. log.Println(err)
  1416. return err
  1417. }
  1418. if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil {
  1419. log.Println(err)
  1420. return err
  1421. }
  1422. return nil
  1423. }