diff --git a/go.mod b/go.mod index 24eeae3b26..ebd9b1ecd4 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,8 @@ module github.com/czs007/suvlim go 1.14 require ( + code.cloudfoundry.org/bytefmt v0.0.0-20200131002437-cf55d5288a48 // indirect + github.com/onsi/ginkgo v1.14.0 // indirect github.com/BurntSushi/toml v0.3.1 github.com/golang/protobuf v1.4.2 github.com/minio/minio-go/v7 v7.0.5 diff --git a/go.sum b/go.sum index 9688c1fd55..e950c711f0 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,7 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLM github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa v0.0.1 h1:r3ncXbtIiad9owWu22r8ryYogBEV9NbJykk8k6K+u0w= +github.com/cncf/udpa v0.0.1 h1:r3ncXbtIiad9owWu22r8ryYogBEV9NbJykk8k6K+u0w= github.com/coreos/bbolt v1.3.2 h1:wZwiHHUieZCquLkDL0B8UhzreNWsPHooDAG3q34zk0s= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/etcd v3.3.12+incompatible h1:pAWNwdf7QiT1zfaWyqCtNZQWCLByQyA3JrSQyuYAqnQ= @@ -21,7 +22,10 @@ github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbp github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= @@ -30,8 +34,12 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385 h1:clC1lXBpe2kTj2VHdaIu9ajZQe4kcEY9j0NsnDDBZ3o= github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= github.com/envoyproxy/data-plane-api v0.0.0-20200823234036-b215ae4c0e16 h1:GLGI0UaYAaQ3udUVbWNUWyLwp+agjr+++DW0N43HEcU= +github.com/envoyproxy/data-plane-api v0.0.0-20200823234036-b215ae4c0e16 h1:GLGI0UaYAaQ3udUVbWNUWyLwp+agjr+++DW0N43HEcU= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9 h1:EGUd+AQfZoi1OwZAoqekLbl4kq6tafFtKQSiN8nL21Y= @@ -75,28 +83,22 @@ github.com/grpc-ecosystem/grpc-gateway v1.8.1 h1:VNUuLKyFcJ5IektwBKcZU4J5GJKEt+O github.com/grpc-ecosystem/grpc-gateway v1.8.1/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= -github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid v1.3.1 h1:5JNjFYYQrZeKRJ0734q51WCEEn2huer72Dc7K+R/b6s= github.com/klauspost/cpuid v1.3.1/go.mod h1:bYW4mA6ZgKPob1/Dlai2LviZJO7KGI3uoWLd42rAQw4= -github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/minio/md5-simd v1.1.0 h1:QPfiOqlZH+Cj9teu0t9b1nTBfPbyTl16Of5MeuShdK4= github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw= -github.com/minio/minio-go v1.0.0 h1:ooSujki+Z1PRGZsYffJw5jnF5eMBvzMVV86TLAlM0UM= -github.com/minio/minio-go v6.0.14+incompatible h1:fnV+GD28LeqdN6vT2XdGKW8Qe/IfjJDswNVuni6km9o= github.com/minio/minio-go/v7 v7.0.5 h1:I2NIJ2ojwJqD/YByemC1M59e1b4FW9kS7NlOar7HPV4= github.com/minio/minio-go/v7 v7.0.5/go.mod h1:TA0CQCjJZHM5SJj9IjqR0NmpmQJ6bCbXifAJ3mUU6Hw= github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU= @@ -109,51 +111,36 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= -github.com/montanaflynn/stats v0.5.0 h1:2EkzeTSqBB4V4bJwWrt5gIIrZmpJBcoIRGS2kWLgzmk= github.com/montanaflynn/stats v0.5.0/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU= +github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg= +github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= +github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= -github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8 h1:USx2/E1bX46VG32FIw034Au6seQ2fY9NEILmNh/UlQg= github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ= -github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3 h1:04yuCf5NMvLU8rB2m4Qs3rynH7EYpMno3lHkewIOdMo= github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3/go.mod h1:DazNTg0PTldtpsQiT9I5tVJwV1onHMKBBgXzmJUlMns= -github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20190305055742-ab7debc182d9 h1:EsTt42btov+tFchxOFKnxBNmXOWyPKiddOwvr/WO90g= github.com/pingcap/kvproto v0.0.0-20190305055742-ab7debc182d9/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= -github.com/pingcap/pd v2.1.5+incompatible h1:vOLV2tSQdRjjmxaTXtJULoC94dYQOd+6fzn2yChODHc= github.com/pingcap/pd v2.1.5+incompatible/go.mod h1:nD3+EoYes4+aNNODO99ES59V83MZSI+dFbhyr667a0E= -github.com/pivotal-golang/bytefmt v0.0.0-20200131002437-cf55d5288a48 h1:2JCf+JCLBs7IUZzYdIrSDN+GWYacKOdToIAt5zcga54= github.com/pivotal-golang/bytefmt v0.0.0-20200131002437-cf55d5288a48/go.mod h1:43j3yLP9UiXa0z95/W3hN7yTjoxsQoOll5rrGBgBcnE= -github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740= github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= -github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= -github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jOqq0gIVUe6Yk0/QMZ640k6NvkxcBf+8= github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= -github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a h1:9a8MnZMP0X2nLJdBg+pBmGgkJlSaKC2KaQmTCk1XDtE= github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= -github.com/protocolbuffers/protobuf v3.13.0+incompatible h1:omZA3Tuq+U2kJ2uMuqMR9c1VO5qLEgZ19m9878fXNtg= github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uYEyJGbgTkfkS4+E/PavXkNJcbFIpEtjt2B0KDQ5+9M= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= -github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME= github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= -github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= -github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a h1:pa8hGb/2YqsZKovtsgrwcDH1RZhVbTKCjLp47XpqCDs= github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= -github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -163,25 +150,15 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/tikv/client-go v0.0.0-20200723074018-095b94dc2430 h1:qrLc3hp4FJfZ+AzQHkf2BmlYYyj+RhSaYC8JeliV1KE= github.com/tikv/client-go v0.0.0-20200723074018-095b94dc2430/go.mod h1:I3o4nCR8z2GtgGnqN/YCK5wgr/9bGtkJvCQgtKkTmo8= -github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= -github.com/ugorji/go v1.1.2 h1:JON3E2/GPW2iDNGoSAusl1KDf5TRQ8k8q7Tp097pZGs= github.com/ugorji/go v1.1.2/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ= -github.com/ugorji/go/codec v0.0.0-20190204201341-e444a5086c43 h1:BasDe+IErOQKrMVXab7UayvSlIpiyGwRvuX3EKYY7UA= github.com/ugorji/go/codec v0.0.0-20190204201341-e444a5086c43/go.mod h1:iT03XoTwV7xq/+UGwKO3UbC1nNNlopQiY61beSdrtOA= -github.com/unrolled/render v1.0.0 h1:XYtvhA3UkpB7PqkvhUFYmpKD55OudoIeygcfus4vcd4= github.com/unrolled/render v1.0.0/go.mod h1:tu82oB5W2ykJRVioYsB+IQKcft7ryBr7w12qMBUPyXg= -github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= -go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= -go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= -go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= -go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -196,6 +173,7 @@ golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -208,51 +186,48 @@ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae h1:Ih9Yo4hSPImZOpfGuA4bR/ORKTAbhZo2AbWNRCnevdo= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/time v0.0.0-20181108054448-85acf8d2951c h1:fqgJT0MGcGpPgpWU7VRdRjuArfcOvC4AoJmILihzhDg= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f h1:FU37niK8AQ59mHcskRyQL7H0ErSeNh650vdcj8HqdSI= google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= -google.golang.org/grpc v1.19.0 h1:cfg4PD8YEdSFnm7qLV4++93WcmhH2nIUhMjhdCvl3j8= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= -google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/ini.v1 v1.57.0 h1:9unxIsFcTt4I55uWluz+UmL95q4kdJ0buvQ1ZIqVQww= gopkg.in/ini.v1 v1.57.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= -gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pulsar/schema/message.go b/pulsar/schema/message.go index cdf1c8e964..c7ec414b8f 100644 --- a/pulsar/schema/message.go +++ b/pulsar/schema/message.go @@ -1,5 +1,7 @@ package schema +import "bytes" + type ErrorCode int32 const ( @@ -31,10 +33,11 @@ const ( type Status struct { Error_code ErrorCode - Reason string + Reason string } type DataType int32 + const ( NONE DataType = 0 BOOL DataType = 1 @@ -50,10 +53,10 @@ const ( ) type AttrRecord struct { - Int32Value []int32 - Int64Value []int64 - FloatValue []float32 - DoubleValue []float64 + Int32Value int32 + Int64Value int64 + FloatValue float32 + DoubleValue float64 } type VectorRowRecord struct { @@ -68,7 +71,7 @@ type VectorRecord struct { type FieldValue struct { FieldName string Type DataType - AttrRecord *AttrRecord //what's the diff with VectorRecord + AttrRecord *AttrRecord //what's the diff with VectorRecord VectorRecord *VectorRecord } @@ -78,64 +81,74 @@ type VectorParam struct { } type OpType int + const ( - Insert OpType = 0 - Delete OpType = 1 - Search OpType = 2 - TimeSync OpType = 3 - Key2Seg OpType = 4 - Statistics OpType = 5 + Insert OpType = 0 + Delete OpType = 1 + Search OpType = 2 + TimeSync OpType = 3 + Key2Seg OpType = 4 + Statistics OpType = 5 ) type Message interface { GetType() OpType + Serialization() []byte + Deserialization(serializationData []byte) } type InsertMsg struct { CollectionName string - Fields []*FieldValue - EntityId int64 - PartitionTag string - Timestamp int64 - ClientId int64 - MsgType OpType + Fields []*FieldValue + EntityId int64 + PartitionTag string + Timestamp uint64 + ClientId int64 + MsgType OpType } type DeleteMsg struct { CollectionName string - EntityId int64 - Timestamp int64 - ClientId int64 - MsgType OpType + EntityId int64 + Timestamp int64 + ClientId int64 + MsgType OpType } type SearchMsg struct { CollectionName string - PartitionTag string - VectorParam *VectorParam - Timestamp int64 - ClientId int64 - MsgType OpType + PartitionTag string + VectorParam *VectorParam + Timestamp int64 + ClientId int64 + MsgType OpType } type TimeSyncMsg struct { - ClientId int64 + ClientId int64 Timestamp int64 - MsgType OpType + MsgType OpType } type Key2SegMsg struct { EntityId int64 Segments []string - MsgType OpType + MsgType OpType } - - func (ims *InsertMsg) GetType() OpType { return ims.MsgType } +func (ims *InsertMsg) Serialization() []byte { + var serialization_data bytes.Buffer + return serialization_data.Bytes() +} + +func (ims *InsertMsg) Deserialization(serializationData []byte) { + +} + func (dms *DeleteMsg) GetType() OpType { return dms.MsgType } @@ -151,6 +164,3 @@ func (tms *TimeSyncMsg) GetType() OpType { func (kms *Key2SegMsg) GetType() OpType { return kms.MsgType } - - - diff --git a/reader/index.go b/reader/index.go new file mode 100644 index 0000000000..6e20ba2544 --- /dev/null +++ b/reader/index.go @@ -0,0 +1,13 @@ +package reader + +import "../pulsar/schema" + +type IndexConfig struct {} + +func buildIndex(config IndexConfig) schema.Status { + return schema.Status{Error_code: schema.ErrorCode_SUCCESS} +} + +func dropIndex(fieldName string) schema.Status { + return schema.Status{Error_code: schema.ErrorCode_SUCCESS} +} diff --git a/reader/reader.go b/reader/reader.go index e69de29bb2..f56c7f1ab0 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -0,0 +1,73 @@ +package reader + +import "../pulsar/schema" + +func milvusInsertMock(collectionName string, partitionTag string, entityIds *[]int64, timestamps *[]int64, dataChunk [][]*schema.FieldValue) ResultEntityIds { + return ResultEntityIds{} +} + +func milvusDeleteMock(collectionName string, entityIds *[]int64, timestamps *[]int64) ResultEntityIds { + return ResultEntityIds{} +} + +func milvusSearchMock(collectionName string, queryString string, timestamps *[]int64, vectorRecord *[]schema.VectorRecord) ResultEntityIds { + return ResultEntityIds{} +} + +type dataChunkSchema struct { + FieldName string + DataType schema.DataType + Dim int +} + +func insert(insertMessages []*schema.InsertMsg) schema.Status { + var collectionName = insertMessages[0].CollectionName + var partitionTag = insertMessages[0].PartitionTag + var clientId = insertMessages[0].ClientId + + // TODO: prevent Memory copy + var entityIds []int64 + var timestamps []int64 + var vectorRecords [][]*schema.FieldValue + for _, msg := range insertMessages { + entityIds = append(entityIds, msg.EntityId) + timestamps = append(timestamps, msg.Timestamp) + vectorRecords = append(vectorRecords, msg.Fields) + } + + var result = milvusInsertMock(collectionName, partitionTag, &entityIds, ×tamps, vectorRecords) + return publishResult(&result, clientId) +} + +func delete(deleteMessages []*schema.DeleteMsg) schema.Status { + var collectionName = deleteMessages[0].CollectionName + var clientId = deleteMessages[0].ClientId + + // TODO: prevent Memory copy + var entityIds []int64 + var timestamps []int64 + for _, msg := range deleteMessages { + entityIds = append(entityIds, msg.EntityId) + timestamps = append(timestamps, msg.Timestamp) + } + + var result = milvusDeleteMock(collectionName, &entityIds, ×tamps) + return publishResult(&result, clientId) +} + +func search(searchMessages []*schema.SearchMsg) schema.Status { + var collectionName = searchMessages[0].CollectionName + var clientId int64 = searchMessages[0].ClientId + var queryString = searchMessages[0].VectorParam.Json + + // TODO: prevent Memory copy + var records []schema.VectorRecord + var timestamps []int64 + for _, msg := range searchMessages { + records = append(records, *msg.VectorParam.RowRecord) + timestamps = append(timestamps, msg.Timestamp) + } + + var result = milvusSearchMock(collectionName, queryString, ×tamps, &records) + return publishResult(&result, clientId) +} diff --git a/reader/result.go b/reader/result.go new file mode 100644 index 0000000000..b09b4ba5ed --- /dev/null +++ b/reader/result.go @@ -0,0 +1,28 @@ +package reader + +import ( + "../pulsar/schema" + "fmt" +) + +type ResultEntityIds []int64 + +func getResultTopicByClientId(clientId int64) string { + // TODO: Result topic? + return "result-topic/partition-" + string(clientId) +} + +func publishResult(ids *ResultEntityIds, clientId int64) schema.Status { + // TODO: Pulsar publish + var resultTopic = getResultTopicByClientId(clientId) + fmt.Println(resultTopic) + return schema.Status{Error_code: schema.ErrorCode_SUCCESS} +} + +func publicStatistic(statisticTopic string) schema.Status { + // TODO: get statistic info + // getStatisticInfo() + // var info = getStatisticInfo() + // TODO: Pulsar publish + return schema.Status{Error_code: schema.ErrorCode_SUCCESS} +} diff --git a/storage/internal/minio/codec/codec.go b/storage/internal/minio/codec/codec.go index 0ed692f2a0..60f777e365 100644 --- a/storage/internal/minio/codec/codec.go +++ b/storage/internal/minio/codec/codec.go @@ -17,7 +17,7 @@ func MvccDecode(key string) (string, uint64, string, error) { suffixIndex := 0 TSIndex := 0 undersCount := 0 - for i := len(key) - 1; i < 0; i++ { + for i := len(key) - 1; i > 0; i-- { if key[i] == '_' { undersCount++ if undersCount == 1 { @@ -35,6 +35,7 @@ func MvccDecode(key string) (string, uint64, string, error) { var TS uint64 _, err := fmt.Sscanf(key[TSIndex:suffixIndex-1], "%x", &TS) + TS = ^TS if err != nil { return "", 0, "", err } @@ -43,12 +44,17 @@ func MvccDecode(key string) (string, uint64, string, error) { } func LogEncode(key []byte, ts uint64, channel int) []byte { - return []byte("log_" + string(ts) + "_" + string(key) + "_" + string(channel)) + suffix := string(key) + "_" + fmt.Sprintf("%d", channel) + logKey, err := MvccEncode([]byte("log"), ts, suffix) + if err != nil{ + return nil + } + return logKey } func LogDecode(logKey string) (string, uint64, int, error) { if len(logKey) < 16 { - return nil, 0, 0, errors.New("insufficient bytes to decode value") + return "", 0, 0, errors.New("insufficient bytes to decode value") } channelIndex := 0 @@ -56,7 +62,7 @@ func LogDecode(logKey string) (string, uint64, int, error) { TSIndex := 0 undersCount := 0 - for i := len(logKey) - 1; i < 0; i++ { + for i := len(logKey) - 1; i > 0; i-- { if logKey[i] == '_' { undersCount++ if undersCount == 1 { @@ -71,7 +77,7 @@ func LogDecode(logKey string) (string, uint64, int, error) { } } } - if channelIndex == 0 || TSIndex == 0 || keyIndex == 0 { + if channelIndex == 0 || TSIndex == 0 || keyIndex == 0 || logKey[:TSIndex-1] != "log" { return "", 0, 0, errors.New("key is wrong formatted") } @@ -81,10 +87,15 @@ func LogDecode(logKey string) (string, uint64, int, error) { if err != nil { return "", 0, 0, err } + TS = ^TS - _, err = fmt.Sscanf(logKey[channelIndex:len(logKey)-1], "%d", &channel) + _, err = fmt.Sscanf(logKey[channelIndex:], "%d", &channel) if err != nil { return "", 0, 0, err } return logKey[keyIndex : channelIndex-1], TS, channel, nil } + +func SegmentEncode(segment string, suffix string) []byte { + return []byte(segment + "_" + suffix) +} diff --git a/storage/internal/minio/minio_store.go b/storage/internal/minio/minio_store.go index 5b5ea93120..6b6565a797 100644 --- a/storage/internal/minio/minio_store.go +++ b/storage/internal/minio/minio_store.go @@ -2,11 +2,11 @@ package minio_driver import ( "context" + "github.com/czs007/suvlim/storage/internal/minio/codec" + . "github.com/czs007/suvlim/storage/internal/minio/codec" + . "github.com/czs007/suvlim/storage/pkg/types" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" - . "storage/internal/minio/codec" - "storage/internal/tikv/codec" - . "storage/pkg/types" ) type minioDriver struct { @@ -53,7 +53,7 @@ func (s *minioDriver) put(ctx context.Context, key Key, value Value, timestamp T return err } - err = s.driver.PUT(ctx, []byte(minioKey), value) + err = s.driver.Put(ctx, []byte(minioKey), value) return err } @@ -68,9 +68,9 @@ func (s *minioDriver) scanLE(ctx context.Context, key Key, timestamp Timestamp, return nil, nil, nil, err } - timestamps := make([]Timestamp, len(keys)) + var timestamps []Timestamp for _, key := range keys { - _, timestamp, _ := codec.MvccDecode(key) + _, timestamp, _, _ := codec.MvccDecode(string(key)) timestamps = append(timestamps, timestamp) } @@ -83,14 +83,14 @@ func (s *minioDriver) scanGE(ctx context.Context, key Key, timestamp Timestamp, return nil, nil, nil, err } - keys, values, err := s.driver.Scan(ctx, key, []byte(keyStart), -1, keyOnly) + keys, values, err := s.driver.Scan(ctx, key, keyStart, -1, keyOnly) if err != nil { return nil, nil, nil, err } - timestamps := make([]Timestamp, len(keys)) + var timestamps []Timestamp for _, key := range keys { - _, timestamp, _ := codec.MvccDecode(key) + _, timestamp, _, _ := codec.MvccDecode(string(key)) timestamps = append(timestamps, timestamp) } @@ -103,7 +103,7 @@ func (s *minioDriver) deleteLE(ctx context.Context, key Key, timestamp Timestamp if err != nil { return err } - err = s.driver.DeleteRange(ctx, key, []byte(keyEnd)) + err = s.driver.DeleteRange(ctx, key, keyEnd) return err } func (s *minioDriver) deleteGE(ctx context.Context, key Key, timestamp Timestamp) error { @@ -133,11 +133,24 @@ func (s *minioDriver) GetRow(ctx context.Context, key Key, timestamp Timestamp) if err != nil { return nil, err } - _, values, err := s.driver.Scan(ctx, key, minioKey, 1, false) + + keys, values, err := s.driver.Scan(ctx, append(key, byte('_')), minioKey, 1, false) + if values == nil || keys == nil{ + return nil, err + } + + _, _, suffix, err := MvccDecode(string(keys[0])) + if err != nil{ + return nil, err + } + if suffix == "delete"{ + return nil, nil + } + return values[0], err } func (s *minioDriver) GetRows(ctx context.Context, keys []Key, timestamp Timestamp) ([]Value, error){ - values := make([]Value, len(keys)) + var values []Value for _, key := range keys{ value, err := s.GetRow(ctx, key, timestamp) if err!= nil{ @@ -153,10 +166,10 @@ func (s *minioDriver) PutRow(ctx context.Context, key Key, value Value, segment if err != nil{ return err } - err = s.driver.PUT(ctx, minioKey, value) + err = s.driver.Put(ctx, minioKey, value) return err } -func (s *minioDriver) PutRows(ctx context.Context, keys []Key, values []Value, segments []string, timestamp Timestamp) error{ +func (s *minioDriver) PutRows(ctx context.Context, keys []Key, values []Value, segment string, timestamp Timestamp) error{ maxThread := 100 batchSize := 1 keysLength := len(keys) @@ -172,9 +185,9 @@ func (s *minioDriver) PutRows(ctx context.Context, keys []Key, values []Value, s } errCh := make(chan error) - f := func(ctx2 context.Context, keys2 []Key, values2 []Value, segments2 []string, timestamp2 Timestamp) { + f := func(ctx2 context.Context, keys2 []Key, values2 []Value, segments2 string, timestamp2 Timestamp) { for i := 0; i < len(keys2); i++{ - err := s.PutRow(ctx2, keys2[i], values2[i], segments2[i], timestamp2) + err := s.PutRow(ctx2, keys2[i], values2[i], segments2, timestamp2) errCh <- err } } @@ -185,7 +198,7 @@ func (s *minioDriver) PutRows(ctx context.Context, keys []Key, values []Value, s if len(keys) < end { end = len(keys) } - f(ctx, keys[start:end], values[start:end], segments[start:end], timestamp) + f(ctx, keys[start:end], values[start:end], segment, timestamp) }() } @@ -203,7 +216,7 @@ func (s *minioDriver) DeleteRow(ctx context.Context, key Key, timestamp Timestam return err } value := []byte("0") - err = s.driver.PUT(ctx, minioKey, value) + err = s.driver.Put(ctx, minioKey, value) return err } @@ -250,40 +263,60 @@ func (s *minioDriver) DeleteRows(ctx context.Context, keys []Key, timestamp Time func (s *minioDriver) PutLog(ctx context.Context, key Key, value Value, timestamp Timestamp, channel int) error{ logKey := LogEncode(key, timestamp, channel) - err := s.driver.PUT(ctx, logKey, value) + err := s.driver.Put(ctx, logKey, value) return err } -func (s *minioDriver) FetchLog(ctx context.Context, start Timestamp, end Timestamp, channels []int) () error{ +func (s *minioDriver) GetLog(ctx context.Context, start Timestamp, end Timestamp, channels []int) ([]Value, error) { + keys, values, err := s.driver.GetByPrefix(ctx, []byte("log_"), false) + if err != nil { + return nil, err + } - return nil + var resultValues []Value + for i, key := range keys{ + _, ts, channel, err := LogDecode(string(key)) + if err != nil { + return nil, err + } + if ts >= start && ts <= end { + for j := 0; j < len(channels); j++ { + if channel == channels[j] { + resultValues = append(resultValues, values[i]) + } + } + } + } + + return resultValues, nil } func (s *minioDriver) GetSegmentIndex(ctx context.Context, segment string) (SegmentIndex, error){ - return nil, nil + return s.driver.Get(ctx, SegmentEncode(segment, "index")) } func (s *minioDriver) PutSegmentIndex(ctx context.Context, segment string, index SegmentIndex) error{ - return nil + return s.driver.Put(ctx, SegmentEncode(segment, "index"), index) } func (s *minioDriver) DeleteSegmentIndex(ctx context.Context, segment string) error{ - return nil + return s.driver.Delete(ctx, SegmentEncode(segment, "index")) } func (s *minioDriver) GetSegmentDL(ctx context.Context, segment string) (SegmentDL, error){ - return nil, nil + + return s.driver.Get(ctx, SegmentEncode(segment, "DL")) } -func (s *minioDriver) SetSegmentDL(ctx context.Context, segment string, log SegmentDL) error{ +func (s *minioDriver) PutSegmentDL(ctx context.Context, segment string, log SegmentDL) error{ - return nil + return s.driver.Put(ctx, SegmentEncode(segment, "DL"), log) } func (s *minioDriver) DeleteSegmentDL(ctx context.Context, segment string) error{ - return nil + return s.driver.Delete(ctx, SegmentEncode(segment, "DL")) } diff --git a/storage/internal/minio/minio_storeEngine.go b/storage/internal/minio/minio_storeEngine.go index f5aaca73d9..3bce5cbf39 100644 --- a/storage/internal/minio/minio_storeEngine.go +++ b/storage/internal/minio/minio_storeEngine.go @@ -5,7 +5,7 @@ import ( "context" "github.com/minio/minio-go/v7" "io" - . "storage/pkg/types" + . "github.com/czs007/suvlim/storage/pkg/types" ) var bucketName = "zcbucket" @@ -14,39 +14,7 @@ type minioStore struct { client *minio.Client } -//func NewMinioStore(ctx context.Context) (*minioStore, error) { -// // to-do read conf -// var endPoint = "127.0.0.1:9000" -// var accessKeyID = "testminio" -// var secretAccessKey = "testminio" -// var useSSL = false -// -// minioClient, err := minio.New(endPoint, &minio.Options{ -// Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""), -// Secure: useSSL, -// }) -// -// if err != nil { -// return nil, err -// } -// -// bucketExists, err := minioClient.BucketExists(ctx, bucketName) -// if err != nil { -// return nil, err -// } -// -// if !bucketExists { -// err = minioClient.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{}) -// if err != nil { -// return nil, err -// } -// } -// return &minioStore{ -// client: minioClient, -// }, nil -//} - -func (s *minioStore) PUT(ctx context.Context, key Key, value Value) error { +func (s *minioStore) Put(ctx context.Context, key Key, value Value) error { reader := bytes.NewReader(value) _, err := s.client.PutObject(ctx, bucketName, string(key), reader, int64(len(value)), minio.PutObjectOptions{}) @@ -57,7 +25,7 @@ func (s *minioStore) PUT(ctx context.Context, key Key, value Value) error { return err } -func (s *minioStore) GET(ctx context.Context, key Key) (Value, error) { +func (s *minioStore) Get(ctx context.Context, key Key) (Value, error) { object, err := s.client.GetObject(ctx, bucketName, string(key), minio.GetObjectOptions{}) if err != nil { return nil, err @@ -69,7 +37,7 @@ func (s *minioStore) GET(ctx context.Context, key Key) (Value, error) { if err != nil && err != io.EOF { return nil, err } - return buf[:n], err + return buf[:n], nil } func (s *minioStore) GetByPrefix(ctx context.Context, prefix Key, keyOnly bool) ([]Key, []Value, error) { @@ -81,7 +49,7 @@ func (s *minioStore) GetByPrefix(ctx context.Context, prefix Key, keyOnly bool) for object := range objects { objectsKeys = append(objectsKeys, []byte(object.Key)) if !keyOnly{ - value, err := s.GET(ctx, []byte(object.Key)) + value, err := s.Get(ctx, []byte(object.Key)) if err != nil{ return nil, nil, err } @@ -98,19 +66,19 @@ func (s *minioStore) Scan(ctx context.Context, keyStart Key, keyEnd Key, limit i var values []Value limitCount := uint(limit) for object := range s.client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{Prefix: string(keyStart)}) { - if object.Key <= string(keyEnd) { + if object.Key >= string(keyEnd) { keys = append(keys, []byte(object.Key)) if !keyOnly { - value, err := s.GET(ctx, []byte(object.Key)) + value, err := s.Get(ctx, []byte(object.Key)) if err != nil { return nil, nil, err } values = append(values, value) } - } - limitCount--; - if limitCount <= 0{ - break + limitCount--; + if limitCount <= 0{ + break + } } } diff --git a/storage/internal/minio/minio_test.go b/storage/internal/minio/minio_test.go index 8e3612932b..98507a40ae 100644 --- a/storage/internal/minio/minio_test.go +++ b/storage/internal/minio/minio_test.go @@ -2,71 +2,107 @@ package minio_driver_test import ( "context" + minio_driver "github.com/czs007/suvlim/storage/internal/minio" "github.com/stretchr/testify/assert" - minio_driver "storage/internal/minio" "testing" ) -//var endpoint = "play.min.io" -//var accessKeyID = "Q3AM3UQ867SPQQA43P2F" -//var secretAccessKey = "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG" -//var useSSL = true -//var endPoint = "127.0.0.1:9000" -//var accessKeyID = "testminio" -//var secretAccessKey = "testminio" -//var useSSL = false var ctx = context.Background() -//var client, err = minio_driver.NewMinIOStore(endPoint, accessKeyID, secretAccessKey, useSSL) -var client, err = minio_driver.NewMinIOStore(ctx) +var client, err = minio_driver.NewMinioDriver(ctx) -func TestSet(t *testing.T) { - err = client.Set(ctx, []byte("bar"), []byte("abcdefghijklmnoopqrstuvwxyz"), 1234567) - assert.Nil(t, err) - err = client.Set(ctx, []byte("bar"), []byte("djhfkjsbdfbsdughorsgsdjhgoisdgh"), 1235567) - assert.Nil(t, err) - err = client.Set(ctx, []byte("bar"), []byte("123854676ershdgfsgdfk,sdhfg;sdi8"), 1236567) - assert.Nil(t, err) - err = client.Set(ctx, []byte("bar_1"), []byte("testkeybarorbar_1"), 1236567) - assert.Nil(t, err) -} -func TestGet(t *testing.T) { - object, _ := client.Get(ctx, []byte("bar"), 1234999) +func TestMinioDriver_PutRowAndGetRow(t *testing.T) { + err = client.PutRow(ctx, []byte("bar"), []byte("abcdefghijklmnoopqrstuvwxyz"), "SegmentA", 1234567) + assert.Nil(t, err) + err = client.PutRow(ctx, []byte("bar"), []byte("djhfkjsbdfbsdughorsgsdjhgoisdgh"), "SegmentA", 1235567) + assert.Nil(t, err) + err = client.PutRow(ctx, []byte("bar"), []byte("123854676ershdgfsgdfk,sdhfg;sdi8"), "SegmentB", 1236567) + assert.Nil(t, err) + err = client.PutRow(ctx, []byte("bar_1"), []byte("testkeybarorbar_1"), "SegmentC", 1236567) + assert.Nil(t, err) + object, _ := client.GetRow(ctx, []byte("bar"), 1234999) assert.Equal(t, "abcdefghijklmnoopqrstuvwxyz", string(object)) - object, _ = client.Get(ctx, []byte("bar"), 1235999) + object, _ = client.GetRow(ctx, []byte("bar"), 1235999) assert.Equal(t, "djhfkjsbdfbsdughorsgsdjhgoisdgh", string(object)) - object, _ = client.Get(ctx, []byte("bar"), 1236567) + object, _ = client.GetRow(ctx, []byte("bar"), 1236567) assert.Equal(t, "123854676ershdgfsgdfk,sdhfg;sdi8", string(object)) - object, _ = client.Get(ctx, []byte("bar_1"), 1236800) + object, _ = client.GetRow(ctx, []byte("bar_1"), 1236800) assert.Equal(t, "testkeybarorbar_1", string(object)) } -func TestDelete(t *testing.T){ - err = client.Delete(ctx, []byte("bar"), 1237000) +func TestMinioDriver_DeleteRow(t *testing.T){ + err = client.DeleteRow(ctx, []byte("bar"), 1237000) assert.Nil(t, err) - object, _ := client.Get(ctx, []byte("bar"), 1237000) + object, _ := client.GetRow(ctx, []byte("bar"), 1237000) assert.Nil(t, object) - err = client.Delete(ctx, []byte("bar_1"), 1237000) + err = client.DeleteRow(ctx, []byte("bar_1"), 1237000) assert.Nil(t, err) + object2, _ := client.GetRow(ctx, []byte("bar_1"), 1237000) + assert.Nil(t, object2) } -func TestBatchSet(t *testing.T){ +func TestMinioDriver_PutRowsAndGetRows(t *testing.T){ keys := [][]byte{[]byte("foo"), []byte("bar")} values := [][]byte{[]byte("The key is foo!"), []byte("The key is bar!")} - err = client.BatchSet(ctx, keys, values, 555555) + segment := "segmentA" + err = client.PutRows(ctx, keys, values, segment, 555555) assert.Nil(t, err) -} -func TestBatchGet(t *testing.T){ - keys := [][]byte{[]byte("foo"), []byte("bar")} - objects, err := client.BatchGet(ctx, keys, 666666) + objects, err := client.GetRows(ctx, keys, 666666) assert.Nil(t, err) assert.Equal(t, "The key is foo!", string(objects[0])) assert.Equal(t, "The key is bar!", string(objects[1])) } -func TestBatchDelete(t *testing.T){ +func TestMinioDriver_DeleteRows(t *testing.T){ keys := [][]byte{[]byte("foo"), []byte("bar")} - err := client.BatchDelete(ctx, keys, 666666) + err := client.DeleteRows(ctx, keys, 777777) + assert.Nil(t, err) + + objects, err := client.GetRows(ctx, keys, 777777) + assert.Nil(t, err) + assert.Nil(t, objects[0]) + assert.Nil(t, objects[1]) +} + +func TestMinioDriver_PutLogAndGetLog(t *testing.T) { + err = client.PutLog(ctx, []byte("insert"), []byte("This is insert log!"), 1234567, 11) + assert.Nil(t, err) + err = client.PutLog(ctx, []byte("delete"), []byte("This is delete log!"), 1236567, 10) + assert.Nil(t, err) + err = client.PutLog(ctx, []byte("update"), []byte("This is update log!"), 1237567, 9) + assert.Nil(t, err) + err = client.PutLog(ctx, []byte("select"), []byte("This is select log!"), 1238567, 8) + assert.Nil(t, err) + + channels := []int{5, 8, 9, 10, 11, 12, 13} + logValues, err := client.GetLog(ctx, 1234567, 1245678, channels) + assert.Nil(t, err) + assert.Equal(t, "This is select log!", string(logValues[0])) + assert.Equal(t, "This is update log!", string(logValues[1])) + assert.Equal(t, "This is delete log!", string(logValues[2])) + assert.Equal(t, "This is insert log!", string(logValues[3])) +} + +func TestMinioDriver_Segment(t *testing.T) { + err := client.PutSegmentIndex(ctx, "segmentA", []byte("This is segmentA's index!")) + assert.Nil(t, err) + + segmentIndex, err := client.GetSegmentIndex(ctx, "segmentA") + assert.Equal(t, "This is segmentA's index!", string(segmentIndex)) + + err = client.DeleteSegmentIndex(ctx, "segmentA") assert.Nil(t, err) } + +func TestMinioDriver_SegmentDL(t *testing.T){ + err := client.PutSegmentDL(ctx, "segmentB", []byte("This is segmentB's delete log!")) + assert.Nil(t, err) + + segmentDL, err := client.GetSegmentDL(ctx, "segmentB") + assert.Nil(t, err) + assert.Equal(t, "This is segmentB's delete log!", string(segmentDL)) + + err = client.DeleteSegmentDL(ctx, "segmentB") + assert.Nil(t, err) +} \ No newline at end of file diff --git a/storage/internal/tikv/tikv_store.go b/storage/internal/tikv/tikv_store.go index 70d7fae3b0..1a0de3e232 100644 --- a/storage/internal/tikv/tikv_store.go +++ b/storage/internal/tikv/tikv_store.go @@ -225,16 +225,14 @@ func (s *TikvStore) PutRow(ctx context.Context, key Key, value Value, segment st return s.put(ctx, key, value, timestamp, segment) } -func (s *TikvStore) PutRows(ctx context.Context, keys []Key, values []Value, segments []string, timestamp Timestamp) error { +func (s *TikvStore) PutRows(ctx context.Context, keys []Key, values []Value, segment string, timestamp Timestamp) error { if len(keys) != len(values) { return errors.New("the len of keys is not equal to the len of values") } - if len(keys) != len(segments) { - return errors.New("the len of keys is not equal to the len of segments") - } + encodedKeys := make([]Key, len(keys)) for i, key := range keys { - encodedKeys[i] = EncodeKey(key, timestamp, segments[i]) + encodedKeys[i] = EncodeKey(key, timestamp, segment) } return s.engine.BatchPut(ctx, encodedKeys, values) } diff --git a/storage/internal/tikv/tikv_test.go b/storage/internal/tikv/tikv_test.go index daa7462a67..4a236cb8a4 100644 --- a/storage/internal/tikv/tikv_test.go +++ b/storage/internal/tikv/tikv_test.go @@ -188,99 +188,56 @@ func TestTikvStore_Log(t *testing.T) { assert.Nil(t, err) } -//func TestTikvStore_PrefixKey(t *testing.T) { -// ctx := context.Background() -// key := Key("key") -// key1 := Key("key_1") -// -// // Clean kv data -// err := store.Delete(ctx, key, math.MaxUint64) -// assert.Nil(t, err) -// -// // Ensure test data is not exist -// v, err := store.Get(ctx, key, math.MaxUint64) -// assert.Nil(t, v) -// assert.Nil(t, err) -// v, err = store.Get(ctx, key1, math.MaxUint64) -// assert.Nil(t, v) -// assert.Nil(t, err) -// -// // Set some value for test key -// err = store.Set(ctx, key, Value("key_1"), 1) -// assert.Nil(t, err) -// err = store.Set(ctx, key1, Value("key1_1"), 1) -// assert.Nil(t, err) -// -// // Get Value -// v, err = store.Get(ctx, key, 1) -// assert.Nil(t, err) -// assert.Equal(t, Value("key_1"), v) -// -// v, err = store.Get(ctx, key1, 1) -// assert.Nil(t, err) -// assert.Equal(t, Value("key1_1"), v) -// -// // Delete key, value for "key" should nil -// err = store.Delete(ctx, key, 1) -// v, err = store.Get(ctx, key, 1) -// assert.Nil(t, v) -// assert.Nil(t, err) -// -// // Delete all test data -// err = store.Delete(ctx, key1, 1) -// assert.Nil(t, err) -// -//} -// -//func TestTikvStore_Batch(t *testing.T) { -// ctx := context.Background() -// -// // Prepare test data -// size := 0 -// var testKeys []Key -// var testValues []Value -// for i := 0; size/conf.Raw.MaxBatchPutSize < 1; i++ { -// key := fmt.Sprint("key", i) -// size += len(key) -// testKeys = append(testKeys, []byte(key)) -// value := fmt.Sprint("value", i) -// size += len(value) -// testValues = append(testValues, []byte(value)) -// v, err := store.Get(ctx, Key(key), math.MaxUint64) -// assert.Nil(t, v) -// assert.Nil(t, err) -// } -// -// // Set kv data -// err := store.BatchSet(ctx, testKeys, testValues, 1) -// assert.Nil(t, err) -// -// // Get value -// checkValues, err := store.BatchGet(ctx, testKeys, 2) -// assert.NotNil(t, checkValues) -// assert.Nil(t, err) -// assert.Equal(t, len(checkValues), len(testValues)) -// for i := range testKeys { -// assert.Equal(t, testValues[i], checkValues[i]) -// } -// -// // Delete test data using multi go routine -// err = store.BatchDeleteMultiRoutine(ctx, testKeys, math.MaxUint64) -// assert.Nil(t, err) -// // Ensure all test key is deleted -// checkValues, err = store.BatchGet(ctx, testKeys, math.MaxUint64) -// assert.Nil(t, err) -// for _, value := range checkValues { -// assert.Nil(t, value) -// } -// -// // Delete test data -// err = store.BatchDelete(ctx, testKeys, math.MaxUint64) -// assert.Nil(t, err) -// // Ensure all test key is deleted -// checkValues, err = store.BatchGet(ctx, testKeys, math.MaxUint64) -// assert.Nil(t, err) -// for _, value := range checkValues { -// assert.Nil(t, value) -// } -//} +func TestTikvStore_SegmentIndex(t *testing.T) { + ctx := context.Background() + + // Put segment index + err := store.PutSegmentIndex(ctx, "segment0", []byte("index0")) + assert.Nil(t, err) + err = store.PutSegmentIndex(ctx, "segment1", []byte("index1")) + assert.Nil(t, err) + + // Get segment index + index , err := store.GetSegmentIndex(ctx, "segment0") + assert.Nil(t, err) + assert.Equal(t, []byte("index0"), index) + index , err = store.GetSegmentIndex(ctx, "segment1") + assert.Nil(t, err) + assert.Equal(t, []byte("index1"), index) + + // Delete segment index + err = store.DeleteSegmentIndex(ctx, "segment0") + assert.Nil(t, err) + err = store.DeleteSegmentIndex(ctx, "segment1") + assert.Nil(t, err) + index , err = store.GetSegmentIndex(ctx, "segment0") + assert.Nil(t, err) + assert.Nil(t, index) +} + +func TestTikvStore_DeleteSegmentDL(t *testing.T) { + ctx := context.Background() + + // Put segment delete log + err := store.PutSegmentDL(ctx, "segment0", []byte("index0")) + assert.Nil(t, err) + err = store.PutSegmentDL(ctx, "segment1", []byte("index1")) + assert.Nil(t, err) + + // Get segment delete log + index , err := store.GetSegmentDL(ctx, "segment0") + assert.Nil(t, err) + assert.Equal(t, []byte("index0"), index) + index , err = store.GetSegmentDL(ctx, "segment1") + assert.Nil(t, err) + assert.Equal(t, []byte("index1"), index) + + // Delete segment delete log + err = store.DeleteSegmentDL(ctx, "segment0") + assert.Nil(t, err) + err = store.DeleteSegmentDL(ctx, "segment1") + assert.Nil(t, err) + index , err = store.GetSegmentDL(ctx, "segment0") + assert.Nil(t, err) + assert.Nil(t, index) +} diff --git a/storage/pkg/types/types.go b/storage/pkg/types/types.go index e4021996cb..1aa2ad6fe9 100644 --- a/storage/pkg/types/types.go +++ b/storage/pkg/types/types.go @@ -39,19 +39,18 @@ type storeEngine interface { } type Store interface { - put(ctx context.Context, key Key, value Value, timestamp Timestamp, suffix string) error - scanLE(ctx context.Context, key Key, timestamp Timestamp, keyOnly bool) ([]Timestamp, []Key, []Value, error) - scanGE(ctx context.Context, key Key, timestamp Timestamp, keyOnly bool) ([]Timestamp, []Key, []Value, error) - //scan(ctx context.Context, key Key, start Timestamp, end Timestamp, keyOnly bool) ([]Timestamp, []Key, []Value, error) - deleteLE(ctx context.Context, key Key, timestamp Timestamp) error - deleteGE(ctx context.Context, key Key, timestamp Timestamp) error - deleteRange(ctx context.Context, key Key, start Timestamp, end Timestamp) error + //put(ctx context.Context, key Key, value Value, timestamp Timestamp, suffix string) error + //scanLE(ctx context.Context, key Key, timestamp Timestamp, keyOnly bool) ([]Timestamp, []Key, []Value, error) + //scanGE(ctx context.Context, key Key, timestamp Timestamp, keyOnly bool) ([]Timestamp, []Key, []Value, error) + //deleteLE(ctx context.Context, key Key, timestamp Timestamp) error + //deleteGE(ctx context.Context, key Key, timestamp Timestamp) error + //deleteRange(ctx context.Context, key Key, start Timestamp, end Timestamp) error GetRow(ctx context.Context, key Key, timestamp Timestamp) (Value, error) GetRows(ctx context.Context, keys []Key, timestamp Timestamp) ([]Value, error) PutRow(ctx context.Context, key Key, value Value, segment string, timestamp Timestamp) error - PutRows(ctx context.Context, keys []Key, values []Value, segments []string, timestamp Timestamp) error + PutRows(ctx context.Context, keys []Key, values []Value, segment string, timestamp Timestamp) error DeleteRow(ctx context.Context, key Key, timestamp Timestamp) error DeleteRows(ctx context.Context, keys []Key, timestamp Timestamp) error diff --git a/writer/writer.go b/writer/writer.go index e69de29bb2..55c847f221 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -0,0 +1,88 @@ +package writer + +import ( + "context" + "github.com/czs007/suvlim/pulsar/schema" + "github.com/czs007/suvlim/storage/pkg" + "github.com/czs007/suvlim/storage/pkg/types" + "strconv" +) + +type writeNodeTimeSync struct { + deleteTimeSync uint64 + insertTimeSync uint64 +} + +type writeNode struct { + openSegmentId string + segmentCloseTime uint64 + nextSegmentId string + nextSegmentCloseTime uint64 + kvStore *types.Store + timeSyncTable *writeNodeTimeSync +} + +func NewWriteNode(ctx context.Context, + openSegmentId string, + timeSync uint64, + closeTime uint64, + nextSegmentId string, + nextCloseSegmentTime uint64) (*writeNode, error) { + ctx = context.Background() + store, err := storage.NewStore(ctx, "TIKV") + writeTableTimeSync := &writeNodeTimeSync{deleteTimeSync: timeSync, insertTimeSync: timeSync} + if err != nil { + return nil, err + } + return &writeNode{ + kvStore: store, + openSegmentId: openSegmentId, + nextSegmentId: nextSegmentId, + segmentCloseTime: closeTime, + nextSegmentCloseTime: nextCloseSegmentTime, + timeSyncTable: writeTableTimeSync, + }, nil +} + +func (s *writeNode) InsertBatchData(ctx context.Context, data []schema.InsertMsg, time_sync uint64) error { + var i int + var storeKey string + + var keys [][]byte + var binaryData [][]byte + var timeStamps []uint64 + + for i = 0; i < cap(data); i++ { + storeKey = data[i].CollectionName + strconv.FormatInt(data[i].EntityId, 10) + keys = append(keys, []byte(storeKey)) + binaryData = append(binaryData, data[i].Serialization()) + timeStamps = append(timeStamps, data[i].Timestamp) + } + + if s.segmentCloseTime <= time_sync { + s.openSegmentId = s.nextSegmentId + s.segmentCloseTime = s.nextSegmentCloseTime + } + + (*s.kvStore).PutRows(ctx, keys, binaryData, s.openSegmentId, timeStamps) + s.UpdateInsertTimeSync(time_sync) + return nil +} + +func (s *writeNode) DeleteBatchData(ctx context.Context, data []schema.DeleteMsg, time_sync uint64) error { + return nil +} + +func (s *writeNode) AddNewSegment(segment_id string, close_segment_time uint64) error { + s.nextSegmentId = segment_id + s.nextSegmentCloseTime = close_segment_time + return nil +} + +func (s *writeNode) UpdateInsertTimeSync(time_sync uint64) { + s.timeSyncTable.insertTimeSync = time_sync +} + +func (s *writeNode) UpdateDeleteTimeSync(time_sync uint64) { + s.timeSyncTable.deleteTimeSync = time_sync +}