Переглянути джерело

feat:junit 下载功能

master
yuan_rh 3 роки тому
джерело
коміт
e1fb0b0421
4 змінених файлів з 297 додано та 16 видалено
  1. +18
    -0
      consts/gobal_const.go
  2. +4
    -3
      env/env.go
  3. +8
    -6
      handler/http_handler.go
  4. +267
    -7
      handler/websocket_handler.go

+ 18
- 0
consts/gobal_const.go Переглянути файл

@@ -8,11 +8,17 @@ package consts

const (
LOG_PREFIX = "[locking-kit-server] "

LOG_PATH = "../logs"

IPFS_PATH = "IPFS-PATH"

WORKSPACE = "TSZDATADIR"

SERVER_IP_PORT = "http://127.0.0.1:8092"

CONTENT_TYPE = "application/json"

ETCD_URL = "127.0.0.1:2379"

//等待同步
@@ -21,6 +27,9 @@ const (
//同步中
TASK_SYNC_STATUS_ING = "TASK_SYNC_STATUS_ING"

//同步暂停
TASK_SYNC_STATUS_PAUSE = "TASK_SYNC_STATUS_PAUSE"

//同步成功
TASK_SYNC_STATUS_FINISH = "TASK_SYNC_STATUS_FINISH"

@@ -28,4 +37,13 @@ const (
TASK_SYNC_STATUS_FAIL = "TASK_SYNC_STATUS_FAIL"

ETCD_DIRECTOR_TASK_SYNC = "/TASK_SYNC/"

//同时最大下载数
MAX_DOWNLOADING_NUM = 3

TASK_ID = "TaskId"

TASK_SYNC_STATUS = "TaskSyncStatus"

TASK_SYNC_PROGRESS= "TaskSyncProgress"
)

+ 4
- 3
env/env.go Переглянути файл

@@ -3,6 +3,7 @@ package env
import (
"locking-kit-server/consts"
"os"
"strings"
)

/**
@@ -13,13 +14,13 @@ import (
*/

//IFPS.exe 路径
var IpfsPath = os.Getenv(consts.IPFS_PATH)
var IpfsPath = strings.Replace(os.Getenv(consts.IPFS_PATH),"\"","",1)+"\\ipfs.exe"

//工作目录 TODO
//var WorkSpace = os.Getenv(consts.WORKSPACE)
var WorkSpace = "D:\\easycloud\\"

//当前用户
var CurrentUserId string
//当前用户 TODO
var CurrentUserPhone = "16666666666"



+ 8
- 6
handler/http_handler.go Переглянути файл

@@ -24,8 +24,6 @@ import (
* @date 2021/6/28 11:11
**/

var currentLoginUserId string

//@title 同步文件夹至工作空间
//@param ids 项目id,逗号分隔
func SyncFolderToWorkSpace(w http.ResponseWriter, r *http.Request){
@@ -70,19 +68,23 @@ func batchInsertTask(task []interface{}) error{
if err !=nil{
return err
}
key := fmt.Sprintf("%v%v:%v:%v",currentLoginUserId, consts.ETCD_DIRECTOR_TASK_SYNC, consts.TASK_SYNC_STATUS_WAIT,taskId)
// key -> /userid/TASK_SYNC/417367746536689664
key := fmt.Sprintf("/%v%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_SYNC,taskId)
err = db.ReplaceInto(key,string(archiveByte))
if err !=nil{
return err
}

//添加到下载队列
TaskToDownloadChanel <- archiveObj.(map[string] interface{})
}
return nil
}

func addAttribute(archiveObj map[string] interface{}, taskId int64, taskSyncStatus string, progress float64){
archiveObj["TaskId"] = taskId
archiveObj["TaskSyncStatus"] = taskSyncStatus
archiveObj["TaskSyncProgress"] = progress
archiveObj[consts.TASK_ID] = taskId
archiveObj[consts.TASK_SYNC_STATUS] = taskSyncStatus
archiveObj[consts.TASK_SYNC_PROGRESS] = progress
}




+ 267
- 7
handler/websocket_handler.go Переглянути файл

@@ -1,11 +1,20 @@
package handler

import (
"encoding/json"
"fmt"
"github.com/gorilla/websocket"
"io"
"locking-kit-server/consts"
"locking-kit-server/db"
"locking-kit-server/env"
"log"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"time"
)

/**
@@ -14,13 +23,15 @@ import (
* @date 2021/6/28 11:11
**/

var TaskSyncChanel = make(chan string, 10000)
var TaskToWebChanel = make(chan map[string] interface{}, 100000)

var TaskToDownloadChanel = make(chan map[string] interface{}, 100000)

//监听任务同步情况
func SubscriptionTaskSyncHandler(conn *websocket.Conn) (err error){

//获取全部任务,包括 同步中、已同步、同步异常
prefix := fmt.Sprintf("%v%v:%v",currentLoginUserId, consts.ETCD_DIRECTOR_TASK_SYNC, consts.TASK_SYNC_STATUS_WAIT)
prefix := fmt.Sprintf("/%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_SYNC)
task,err := db.QueryWithPrefix(prefix)
if err != nil{
log.Printf("监听同步任务失败,%v", err)
@@ -28,15 +39,35 @@ func SubscriptionTaskSyncHandler(conn *websocket.Conn) (err error){
}

for _, v := range task {
if err := conn.WriteMessage(websocket.TextMessage, []byte(v)); err != nil {
log.Printf("监听同步任务失败,%v", err)
return err

data,err := taskStringToMap(v)
if err !=nil{
log.Println(err)
continue
}
TaskToWebChanel <- data

//解析开机下载任务
//优先下载中
if data[consts.TASK_SYNC_STATUS] == consts.TASK_SYNC_STATUS_ING {
TaskToDownloadChanel <- data
continue
}

if data[consts.TASK_SYNC_STATUS] == consts.TASK_SYNC_STATUS_WAIT {
TaskToDownloadChanel <- data
}
}

//开启异步下载任务
syncDownload()

//任务同步信道
for v := range TaskSyncChanel {
if err := conn.WriteMessage(websocket.TextMessage, []byte(v)); err != nil {
for v := range TaskToWebChanel {

data,_ := json.Marshal(v)

if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
log.Printf("监听同步任务失败,%v", err)
return err
}
@@ -45,4 +76,233 @@ func SubscriptionTaskSyncHandler(conn *websocket.Conn) (err error){
return nil
}

//任务对象字符串转map对象
func taskStringToMap(task string)(data map[string] interface{}, err error){
err = json.Unmarshal([]byte(task),&data)
if err != nil{
return data,err
}
return data,err
}

//处理下载队列
func syncDownload(){
go func() {
log.Print("开启异步处理下载队列...")
for v := range TaskToDownloadChanel {
downLoadTask(v)
}
}()
}

//执行单个下载任务
func downLoadTask(task map[string] interface{}){

//检查文件目录是否存在,不存在则创建
directory := fmt.Sprintf("%v%v%v%v%v%v%v%v%v",env.WorkSpace,string(os.PathSeparator),env.CurrentUserPhone,string(os.PathSeparator),task["ProjName"],string(os.PathSeparator),task["NodeName"],string(os.PathSeparator),task["RelativePath"])
_,err := os.Stat(directory)
if err != nil {
//创建文件目录
err = os.MkdirAll(directory, os.ModePerm)
if err!=nil{
log.Println(err)
updateTaskStatusToFail(task)
return
}
}

//下载启动标识,有下载进度则设置为true
var downloading bool = false

//cmd ipfs get
progress := make(chan string,10000)
var stdout, stderr []byte
var errStdout, errStderr error
cmd := exec.Command(env.IpfsPath,"get", task["IpfsCid"].(string),"-o",filepath.Clean(fmt.Sprintf("%v%v%v.%v",directory,string(os.PathSeparator),task["ArchName"].(string),task["Extension"].(string))))
stdoutIn, _ := cmd.StdoutPipe()
stderrIn, _ := cmd.StderrPipe()
cmd.Start()
go func() {
stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, progress)
}()
go func() {
stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, progress)
}()

//异步读取进度
//异步定时读取进度反馈给前端,每500ms返回一次进度
go func(){
millSeconds := time.Now().UnixNano() / 1e6
for content := range progress { // 通道关闭后会退出for range循环
current :=time.Now().UnixNano() / 1e6
if !downloading{
log.Printf("%v.%v %v 资源连接成功,下载中...",task["ArchName"],task["Extension"], task["IpfsCid"])
}
downloading = true
if current-millSeconds>500 && strings.Index(content,"100.00%")==-1{
pro := contentToJSONByte(content)
if pro.Process == 0{
continue
}

//设置下载启动标识和下载时间戳
millSeconds = current

//更新进度、状态
updateTaskProgress(task, pro.Process)
break
}

}
}()

//设置30秒连接超时,30秒未启动下载则下载失败
go func() {
index :=0
for true{
//启动下载则不做超时判断
if downloading==true{
return
}
index++
time.Sleep(time.Duration(1)*time.Second)
if downloading==false && index==30{
err = cmd.Process.Kill()
log.Println("资源连接超时(30s),下载进程被终止")
updateTaskStatusToFail(task)
return
}
}
}()

//等待下载执行完成
err = cmd.Wait()
if err != nil {
log.Printf("cmd.Run() failed with %s\n", err)
}
if errStdout != nil || errStderr != nil {
log.Printf("failed to capture stdout or stderr\n")
}
outStr := string(stdout)
log.Printf("下载成功,:%s", outStr)

//完成下载
updateTaskProgress(task, 100.00)

defer close(progress)
}

//更新任务进度
func updateTaskProgress(task map[string] interface{}, progress float64){

task[consts.TASK_SYNC_PROGRESS]= progress
if progress==100.00{
task[consts.TASK_SYNC_STATUS]= consts.TASK_SYNC_STATUS_FINISH
}else{
task[consts.TASK_SYNC_STATUS]= consts.TASK_SYNC_STATUS_ING
}

taskByte,err := json.Marshal(task)
if err !=nil{
log.Printf("序列化失败 %v \n", err)
}
// key -> /userid/TASK_SYNC/417367746536689664
key := fmt.Sprintf("/%v%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_SYNC,task[consts.TASK_ID])
err = db.ReplaceInto(key,string(taskByte))
if err !=nil{
log.Printf("etcd put 失败 %v \n", err)
}

TaskToWebChanel <- task
}

//更新任务状态为失败
func updateTaskStatusToFail(task map[string] interface{}){

task[consts.TASK_SYNC_STATUS] = consts.TASK_SYNC_STATUS_FAIL

taskByte,err := json.Marshal(task)
if err !=nil{
log.Printf("序列化失败 %v \n", err)
}
// key -> /userid/TASK_SYNC/417367746536689664
key := fmt.Sprintf("/%v%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_SYNC,task[consts.TASK_ID])
err = db.ReplaceInto(key,string(taskByte))
if err !=nil{
log.Printf("etcd put 失败 %v \n", err)
}
TaskToWebChanel <- task
}

/**
文件上传下载进度
*/
type processStruct struct {
Size string `json:"size"`
CurrentSize string `json:"currentSize"`
Unit string `json:"unit"`
CurrentUnit string `json:"currentUnit"`
Process float64 `json:"process"`
Hash string `json:"hash"`
CommitHistoryHash string `json:"commitHistoryHash"`
Version int `json:"version"`
}

//读取解析cmd返回文件上传或下载进度信息
func contentToJSONByte(content string) (pro processStruct){

sts :=strings.Split(content," ")

if len(sts)<8{
//log.Println("字符长度小于8")
return pro
}
var processFloat float64
if (len(sts)==9 || len(sts)==8){
processFloat,_ =strconv.ParseFloat(strings.Replace(sts[7],"%","",1), 64)
}else{
processFloat,_ =strconv.ParseFloat(strings.Replace(sts[8],"%","",1), 64)
}

if processFloat==0{
//log.Println("当前进度0")
return pro
}

pro.Size = sts[4]
pro.CurrentSize = sts[1]
pro.Unit = sts[2]
pro.CurrentUnit = sts[5]
pro.Process = processFloat

return pro
}

func copyAndCapture(w io.Writer, r io.Reader, progress chan string) ([]byte, error) {
var out []byte
buf := make([]byte, 1024, 1024)
for {
n, err := r.Read(buf[:])
if n > 0 {
d := buf[:n]
out = append(out, d...)
if strings.Index(string(d),"Saving")>-1{
continue
}
progress <- string(d)

}
if err != nil {
// Read returns io.EOF at the end of file, which is not an error for us
if err == io.EOF {
err = nil
}
return out, err
}
}
// never reached
panic(true)
return nil, nil
}



Завантаження…
Відмінити
Зберегти