文件同步
Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

482 рядки
10 KiB

  1. package handle
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "fts/config"
  6. "github.com/gorilla/websocket"
  7. shell "github.com/ipfs/go-ipfs-api"
  8. _ "github.com/ipfs/go-ipfs-api"
  9. "io"
  10. "log"
  11. "os"
  12. "os/exec"
  13. "path/filepath"
  14. "strconv"
  15. "strings"
  16. "time"
  17. )
  18. var gobalLoginUserId string
  19. //key:filepath,value:hash
  20. var gobalFileMap = make(map[string] string)
  21. var gobalFileUpdateTimeMap = make(map[string] string)
  22. //var gobalFileChangeMap = make(map[string] string)
  23. var getLocalFileListDir string
  24. var gobalSubscriptionFileChangeSwitch int =0 //订阅文件变更开关
  25. var ipfsPath=os.Getenv("IPFS-PATH")+"ipfs.exe"
  26. /**
  27. 文件上传下载进度
  28. */
  29. type processStruct struct {
  30. Size string `json:"size"`
  31. CurrentSize string `json:"currentSize"`
  32. Unit string `json:"unit"`
  33. Process float64 `json:"process"`
  34. Hash string `json:"hash"`
  35. }
  36. func main() {
  37. //config.InitConfig()
  38. //InitLocalWorkSpace("320872793405132801","test1")
  39. //
  40. //DownCommand("QmTp2hEo8eXRp6wg7jXv1BLCMh5a4F3B7buAUZNZUu772j","testOne","hello.txt","a/b/")
  41. //
  42. //UploadCommand("C:\\Users\\yuan_rh\\Downloads\\QmRzN7uW6HCVAkGMXNWv3rC9dqPJijvEgvtW6DKsQQE8Js","QmRzN7uW6HCVAkGMXNWv3rC9dqPJijvEgvtW6DKsQQE8Js","testOne","a/b/")
  43. //
  44. //GetLocalFileList("testOne")
  45. }
  46. /**
  47. 初始化本地工作目录
  48. @param userId 用户ID
  49. @param projectName 项目名称
  50. */
  51. func InitLocalWorkSpace(userId,projectName string) (error){
  52. //初始化当前登陆用户
  53. gobalLoginUserId = userId
  54. // 检查本地目录是否存在
  55. var projectPath = config.LocalWorkSpaceDir +"\\"+userId+"\\"+projectName
  56. _,err := os.Stat(projectPath)
  57. if err != nil {
  58. //创建文件目录
  59. os.MkdirAll(projectPath, os.ModePerm)
  60. /*os.MkdirAll(projectPath+"\\我的文件", os.ModePerm)
  61. os.MkdirAll(projectPath+"\\工作文件", os.ModePerm)
  62. os.MkdirAll(projectPath+"\\协作文件", os.ModePerm)
  63. os.MkdirAll(projectPath+"\\公共文件", os.ModePerm)*/
  64. }
  65. return nil
  66. }
  67. /**
  68. 下载指令
  69. @param hash ipfs哈希值
  70. @param projectName 项目名称
  71. @para fileName 文件名称
  72. @param dir 云文件目录
  73. */
  74. func DownCommand(conn *websocket.Conn, hash, projectName, fileName, dir string) error{
  75. absoluteDir := config.LocalWorkSpaceDir+"\\"+gobalLoginUserId+"\\"+projectName+"\\"+dir
  76. //检查目录
  77. _,err := os.Stat(absoluteDir)
  78. if err != nil {
  79. //创建文件目录
  80. err = os.MkdirAll(absoluteDir, os.ModePerm)
  81. if err!=nil{
  82. return err
  83. }
  84. }
  85. cmd := exec.Command(ipfsPath,"get", hash,"-o",fmt.Sprint(absoluteDir+"\\"+fileName))
  86. progress := make(chan string,10000)
  87. var stdout, stderr []byte
  88. var errStdout, errStderr error
  89. stdoutIn, _ := cmd.StdoutPipe()
  90. stderrIn, _ := cmd.StderrPipe()
  91. cmd.Start()
  92. go func() {
  93. stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, progress)
  94. }()
  95. go func() {
  96. stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, progress)
  97. }()
  98. go func(){
  99. seconds := time.Now().Unix()
  100. for content := range progress { // 通道关闭后会退出for range循环
  101. current :=time.Now().Unix()
  102. if current-seconds>1{
  103. projson,err := contentToJSONByte(content)
  104. if projson==nil && err==nil{
  105. continue
  106. }
  107. if err != nil {
  108. log.Fatalf("json.Marshal error %s\n", err)
  109. }
  110. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  111. break
  112. }
  113. seconds = current
  114. }
  115. if strings.Index(content,"100.00%")!=-1{
  116. projson,err := contentToJSONByte(content)
  117. if projson==nil && err==nil{
  118. continue
  119. }
  120. if err != nil {
  121. log.Fatalf("json.Marshal error %s\n", err)
  122. }
  123. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  124. panic(err)
  125. }
  126. break
  127. }
  128. }
  129. }()
  130. err = cmd.Wait()
  131. if err != nil {
  132. log.Fatalf("cmd.Run() failed with %s\n", err)
  133. }
  134. if errStdout != nil || errStderr != nil {
  135. log.Fatalf("failed to capture stdout or stderr\n")
  136. }
  137. outStr, errStr := string(stdout), string(stderr)
  138. fmt.Printf("\nout:\n%s\nerr:\n%s\n", outStr, errStr)
  139. defer close(progress)
  140. fmt.Println("下载成功")
  141. return nil
  142. }
  143. func contentToJSONByte(content string) ([]byte,error){
  144. sts :=strings.Split(content," ")
  145. if len(sts)<8{
  146. return nil,nil
  147. }
  148. var processFloat float64
  149. if (len(sts)==9 || len(sts)==8){
  150. processFloat,_ =strconv.ParseFloat(strings.Replace(sts[7],"%","",1), 64)
  151. }else{
  152. processFloat,_ =strconv.ParseFloat(strings.Replace(sts[8],"%","",1), 64)
  153. }
  154. if processFloat==0{
  155. return nil,nil
  156. }
  157. pro :=&processStruct{
  158. Size:sts[4],
  159. CurrentSize: sts[1],
  160. Unit: sts[2],
  161. Process: processFloat,
  162. Hash: "",
  163. }
  164. projson,err :=json.Marshal(pro)
  165. return projson,err
  166. }
  167. /**
  168. 上传本地文件
  169. @param absolutePath 文件本地绝对路径
  170. @param fileName 文件名称
  171. @param projectName 项目名称
  172. @param dir 云文件目录
  173. */
  174. func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir string) error{
  175. //拷贝文件
  176. //上传文件
  177. //
  178. //file,err :=os.Open(absolutePath)
  179. //if err != nil{
  180. // return "",err
  181. //}
  182. //
  183. //defer file.Close()
  184. //
  185. //hash,err :=sh.Add(file)
  186. //if err != nil {
  187. // return "",err
  188. //}
  189. //本地拷贝文件
  190. absoluteDir := config.LocalWorkSpaceDir+"\\"+gobalLoginUserId+"\\"+projectName+"\\"+dir
  191. //检查目录
  192. _,err := os.Stat(absoluteDir)
  193. if err != nil {
  194. //创建文件目录
  195. err = os.MkdirAll(absoluteDir, os.ModePerm)
  196. if err!=nil{
  197. return err
  198. }
  199. }
  200. fmt.Println(ipfsPath, "add",absolutePath)
  201. cmd := exec.Command(ipfsPath, "add",absolutePath)
  202. uploadProgress := make(chan string,10000)
  203. var stdout, stderr []byte
  204. var errStdout, errStderr error
  205. stdoutIn, _ := cmd.StdoutPipe()
  206. stderrIn, _ := cmd.StderrPipe()
  207. cmd.Start()
  208. go func() {
  209. stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, uploadProgress)
  210. }()
  211. go func() {
  212. stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, uploadProgress)
  213. }()
  214. go func(){
  215. seconds := time.Now().Unix()
  216. for content := range uploadProgress { // 通道关闭后会退出for range循环
  217. current :=time.Now().Unix()
  218. if current-seconds>1{
  219. projson,err := contentToJSONByte(content)
  220. if projson==nil && err==nil{
  221. continue
  222. }
  223. if err != nil {
  224. log.Fatalf("json.Marshal error %s\n", err)
  225. }
  226. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  227. break
  228. }
  229. seconds = current
  230. }
  231. if strings.Index(content,"100.00%")!=-1{
  232. projson,err := contentToJSONByte(content)
  233. if projson==nil && err==nil{
  234. continue
  235. }
  236. if err != nil {
  237. log.Fatalf("json.Marshal error %s\n", err)
  238. }
  239. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  240. panic(err)
  241. }
  242. break
  243. }
  244. }
  245. }()
  246. err = cmd.Wait()
  247. if err != nil {
  248. log.Fatalf("cmd.Run() failed with %s\n", err)
  249. }
  250. if errStdout != nil || errStderr != nil {
  251. log.Fatalf("failed to capture stdout or stderr\n")
  252. }
  253. outStr, errStr := string(stdout), string(stderr)
  254. fmt.Printf("\nout:\n%s\nerr:\n%s\n", outStr, errStr)
  255. /*sh.Get(hash,absoluteDir+fileName)
  256. if err != nil {
  257. return "",err
  258. }*/
  259. defer close(uploadProgress)
  260. prog := new(processStruct)
  261. prog.Hash=strings.Split(outStr," ")[1]
  262. prog.Process=100.00
  263. sh := shell.NewShell(config.GobalIpfsUrl)
  264. objectStat,err :=sh.ObjectStat(prog.Hash)
  265. if err != nil {
  266. return err
  267. }
  268. fmt.Println(objectStat.CumulativeSize)
  269. prog.Size=strconv.Itoa(objectStat.CumulativeSize)
  270. projson,err :=json.Marshal(prog)
  271. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  272. panic(err)
  273. }
  274. go func() {
  275. serverSh := shell.NewShell(config.ServerIpfsUrl)
  276. err = serverSh.Pin(prog.Hash)
  277. if err != nil {
  278. fmt.Println(err)
  279. }
  280. }()
  281. //本地文件夹拷贝
  282. err = sh.Get(prog.Hash,fmt.Sprint((absoluteDir+"\\"+fileName)))
  283. if err != nil {
  284. fmt.Println(err)
  285. return err
  286. }
  287. fmt.Println("上传成功")
  288. return nil
  289. }
  290. func copyAndCapture(w io.Writer, r io.Reader, progress chan string) ([]byte, error) {
  291. var out []byte
  292. buf := make([]byte, 1024, 1024)
  293. for {
  294. n, err := r.Read(buf[:])
  295. if n > 0 {
  296. d := buf[:n]
  297. out = append(out, d...)
  298. progress <- string(d)
  299. }
  300. if err != nil {
  301. // Read returns io.EOF at the end of file, which is not an error for us
  302. if err == io.EOF {
  303. err = nil
  304. }
  305. return out, err
  306. }
  307. }
  308. // never reached
  309. panic(true)
  310. return nil, nil
  311. }
  312. /**
  313. 运行变更生效
  314. @param id 文件id
  315. */
  316. func NotifyFileChange(id string, changeType int){
  317. }
  318. /**
  319. 获取本地文件列表
  320. */
  321. func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{
  322. getLocalFileListDir = fmt.Sprint(config.LocalWorkSpaceDir+"\\"+gobalLoginUserId+"\\"+projectName+"\\")
  323. for {
  324. err :=filepath.Walk(getLocalFileListDir,walkfunc)
  325. if err != nil {
  326. fmt.Println(err)
  327. continue
  328. }
  329. mapByte,err:=json.Marshal(gobalFileMap)
  330. if err != nil {
  331. fmt.Println(err)
  332. return err
  333. }
  334. if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil {
  335. fmt.Println(err)
  336. }
  337. //for k,v := range gobalFileMap {
  338. // if v==1{
  339. // //socket
  340. // if err := conn.WriteMessage(websocket.TextMessage, []byte(k)); err != nil {
  341. //
  342. // fmt.Println(err)
  343. // continue
  344. // }
  345. // //添加 检查删除
  346. // //gobalFileMap[k]=0
  347. // }
  348. //}
  349. fmt.Println("执行睡眠3分钟")
  350. time.Sleep(time.Duration(3)*time.Minute)
  351. }
  352. //err :=filepath.Walk(getLocalFileListDir,walkfunc)
  353. //if err != nil {
  354. // return gobalFileMap,err
  355. //}
  356. //fmt.Println(len(gobalFileMap))
  357. //for s := range gobalFileMap {
  358. // fmt.Println(s)
  359. //}
  360. return nil
  361. }
  362. func walkfunc(path string, info os.FileInfo, err error) error {
  363. if info.IsDir()==false{
  364. sh := shell.NewShell(config.GobalIpfsUrl)
  365. file,err :=os.Open(path)
  366. if err != nil{
  367. fmt.Println(err)
  368. return err
  369. }
  370. defer file.Close()
  371. hash,err :=sh.Add(file)
  372. if err != nil {
  373. fmt.Println(err)
  374. return err
  375. }
  376. dir :=strings.Replace(fmt.Sprint(path),fmt.Sprint(getLocalFileListDir),"",1)
  377. gobalFileMap[dir]=hash
  378. //文件已修改状态
  379. //dir :=strings.Replace(path,getLocalFileListDir,"",1)
  380. //if gobalFileMap[dir]==1{
  381. // return nil
  382. //}
  383. //gobalFileMap[dir]=0
  384. //
  385. ////判断是否修改
  386. //f,err:=os.Stat(path)
  387. //if err!=nil{
  388. // return err
  389. //}
  390. //secondStr := strconv.FormatInt(f.ModTime().Unix(),10)
  391. //if gobalFileUpdateTimeMap[path] == ""{
  392. // gobalFileUpdateTimeMap[path]=secondStr
  393. //}else{
  394. // if gobalFileUpdateTimeMap[path]!=secondStr{
  395. // gobalFileUpdateTimeMap[path]=secondStr
  396. // gobalFileMap[dir]=1
  397. // }
  398. //}
  399. }
  400. return nil
  401. }