Remove atomic int usage and some mutexes, replace with channels

This commit is contained in:
Seednode 2023-09-15 13:51:04 -05:00
parent ba827430bb
commit 0d252d0456
4 changed files with 124 additions and 80 deletions

View File

@ -6,6 +6,7 @@ package cmd
import ( import (
"encoding/gob" "encoding/gob"
"fmt"
"net/http" "net/http"
"os" "os"
"sync" "sync"
@ -64,14 +65,6 @@ func (cache *fileCache) set(val []string) {
cache.list = make([]string, length) cache.list = make([]string, length)
copy(cache.list, val) copy(cache.list, val)
cache.mutex.Unlock() cache.mutex.Unlock()
}
func (cache *fileCache) generate(args []string, formats *types.Types) {
cache.mutex.Lock()
cache.list = []string{}
cache.mutex.Unlock()
fileList(args, &filters{}, "", cache, formats)
if Cache && CacheFile != "" { if Cache && CacheFile != "" {
cache.Export(CacheFile) cache.Export(CacheFile)
@ -138,7 +131,14 @@ func (cache *fileCache) Import(path string) error {
func serveCacheClear(args []string, cache *fileCache, formats *types.Types) httprouter.Handle { func serveCacheClear(args []string, cache *fileCache, formats *types.Types) httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
cache.generate(args, formats) list, err := fileList(args, &filters{}, "", &fileCache{}, formats)
if err != nil {
fmt.Println(err)
return
}
cache.set(list)
w.Header().Set("Content-Type", "text/plain") w.Header().Set("Content-Type", "text/plain")
@ -146,7 +146,7 @@ func serveCacheClear(args []string, cache *fileCache, formats *types.Types) http
} }
} }
func registerCacheHandlers(mux *httprouter.Router, args []string, cache *fileCache, formats *types.Types) { func registerCacheHandlers(mux *httprouter.Router, args []string, cache *fileCache, formats *types.Types) error {
skipIndex := false skipIndex := false
if CacheFile != "" { if CacheFile != "" {
@ -157,8 +157,15 @@ func registerCacheHandlers(mux *httprouter.Router, args []string, cache *fileCac
} }
if !skipIndex { if !skipIndex {
cache.generate(args, formats) list, err := fileList(args, &filters{}, "", &fileCache{}, formats)
if err != nil {
return err
}
cache.set(list)
} }
register(mux, Prefix+"/clear_cache", serveCacheClear(args, cache, formats)) register(mux, Prefix+"/clear_cache", serveCacheClear(args, cache, formats))
return nil
} }

View File

@ -17,7 +17,6 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"seedno.de/seednode/roulette/types" "seedno.de/seednode/roulette/types"
@ -41,22 +40,11 @@ type concurrency struct {
fileScans chan int fileScans chan int
} }
type files struct {
mutex sync.RWMutex
list []string
}
func (f *files) append(path string) {
f.mutex.Lock()
f.list = append(f.list, path)
f.mutex.Unlock()
}
type scanStats struct { type scanStats struct {
filesMatched atomic.Uint32 filesMatched int
filesSkipped atomic.Uint32 filesSkipped int
directoriesMatched atomic.Uint32 directoriesMatched int
directoriesSkipped atomic.Uint32 directoriesSkipped int
} }
type splitPath struct { type splitPath struct {
@ -102,7 +90,7 @@ func preparePath(path string) string {
func newFile(paths []string, filters *filters, sortOrder string, regexes *regexes, cache *fileCache, formats *types.Types) (string, error) { func newFile(paths []string, filters *filters, sortOrder string, regexes *regexes, cache *fileCache, formats *types.Types) (string, error) {
path, err := pickFile(paths, filters, sortOrder, cache, formats) path, err := pickFile(paths, filters, sortOrder, cache, formats)
if err != nil { if err != nil {
return "", nil return "", err
} }
splitPath, err := split(path, regexes) splitPath, err := split(path, regexes)
@ -274,9 +262,9 @@ func pathHasSupportedFiles(path string, formats *types.Types) (bool, error) {
} }
} }
func pathCount(path string) (uint32, uint32, error) { func pathCount(path string) (int, int, error) {
var directories uint32 = 0 var directories = 0
var files uint32 = 0 var files = 0
nodes, err := os.ReadDir(path) nodes, err := os.ReadDir(path)
if err != nil { if err != nil {
@ -294,9 +282,16 @@ func pathCount(path string) (uint32, uint32, error) {
return files, directories, nil return files, directories, nil
} }
func scanPath(path string, files *files, stats *scanStats, concurrency *concurrency, formats *types.Types) error { func scanPath(path string, fileChannel chan<- string, statChannel chan<- *scanStats, errorChannel chan<- error, concurrency *concurrency, formats *types.Types) {
var wg sync.WaitGroup var wg sync.WaitGroup
stats := &scanStats{
filesMatched: 0,
filesSkipped: 0,
directoriesMatched: 0,
directoriesSkipped: 0,
}
err := filepath.WalkDir(path, func(p string, info os.DirEntry, err error) error { err := filepath.WalkDir(path, func(p string, info os.DirEntry, err error) error {
if err != nil { if err != nil {
return err return err
@ -318,34 +313,34 @@ func scanPath(path string, files *files, stats *scanStats, concurrency *concurre
path, err := normalizePath(p) path, err := normalizePath(p)
if err != nil { if err != nil {
fmt.Println(err) errorChannel <- err
} }
if !formats.Validate(path) { if !formats.Validate(path) {
stats.filesSkipped.Add(1) stats.filesSkipped = stats.filesSkipped + 1
return return
} }
files.append(path) fileChannel <- path
stats.filesMatched.Add(1) stats.filesMatched = stats.filesMatched + 1
}() }()
case info.IsDir(): case info.IsDir():
files, directories, err := pathCount(p) files, directories, err := pathCount(p)
if err != nil { if err != nil {
fmt.Println(err) errorChannel <- err
} }
if files > 0 && (files < MinimumFileCount) || (files > MaximumFileCount) { if files > 0 && (files < MinimumFileCount) || (files > MaximumFileCount) {
// This count will not otherwise include the parent directory itself, so increment by one // This count will not otherwise include the parent directory itself, so increment by one
stats.directoriesSkipped.Add(directories + 1) stats.directoriesSkipped = stats.directoriesSkipped + directories + 1
stats.filesSkipped.Add(files) stats.filesSkipped = stats.filesSkipped + files
return filepath.SkipDir return filepath.SkipDir
} }
stats.directoriesMatched.Add(1) stats.directoriesMatched = stats.directoriesMatched + 1
} }
return err return err
@ -353,24 +348,26 @@ func scanPath(path string, files *files, stats *scanStats, concurrency *concurre
wg.Wait() wg.Wait()
if err != nil { statChannel <- stats
return err
}
return nil if err != nil {
errorChannel <- err
}
} }
func scanPaths(paths []string, sort string, cache *fileCache, formats *types.Types) []string { func scanPaths(paths []string, sort string, cache *fileCache, formats *types.Types) ([]string, error) {
files := &files{ var list []string
mutex: sync.RWMutex{},
list: []string{}, fileChannel := make(chan string)
} statChannel := make(chan *scanStats)
errorChannel := make(chan error)
done := make(chan bool, 1)
stats := &scanStats{ stats := &scanStats{
filesMatched: atomic.Uint32{}, filesMatched: 0,
filesSkipped: atomic.Uint32{}, filesSkipped: 0,
directoriesMatched: atomic.Uint32{}, directoriesMatched: 0,
directoriesSkipped: atomic.Uint32{}, directoriesSkipped: 0,
} }
concurrency := &concurrency{ concurrency := &concurrency{
@ -393,55 +390,95 @@ func scanPaths(paths []string, sort string, cache *fileCache, formats *types.Typ
wg.Done() wg.Done()
}() }()
err := scanPath(paths[i], files, stats, concurrency, formats) scanPath(paths[i], fileChannel, statChannel, errorChannel, concurrency, formats)
if err != nil {
fmt.Println(err)
}
}(i) }(i)
} }
go func() {
wg.Wait() wg.Wait()
done <- true
}()
if stats.filesMatched.Load() < 1 { Poll:
for {
select {
case p := <-fileChannel:
list = append(list, p)
case s := <-statChannel:
stats.filesMatched = stats.filesMatched + s.filesMatched
stats.filesSkipped = stats.filesSkipped + s.filesSkipped
stats.directoriesMatched = stats.directoriesMatched + s.directoriesMatched
stats.directoriesSkipped = stats.directoriesSkipped + s.directoriesSkipped
case e := <-errorChannel:
return []string{}, e
case <-done:
break Poll
}
}
if stats.filesMatched < 1 {
fmt.Println("No files matched") fmt.Println("No files matched")
return []string{} return []string{}, nil
} }
if Verbose { if Verbose {
fmt.Printf("%s | Indexed %d/%d files across %d/%d directories in %s\n", fmt.Printf("%s | Indexed %d/%d files across %d/%d directories in %s\n",
time.Now().Format(logDate), time.Now().Format(logDate),
stats.filesMatched.Load(), stats.filesMatched,
stats.filesMatched.Load()+stats.filesSkipped.Load(), stats.filesMatched+stats.filesSkipped,
stats.directoriesMatched.Load(), stats.directoriesMatched,
stats.directoriesMatched.Load()+stats.directoriesSkipped.Load(), stats.directoriesMatched+stats.directoriesSkipped,
time.Since(startTime), time.Since(startTime),
) )
} }
return files.list return list, nil
} }
func fileList(paths []string, filters *filters, sort string, cache *fileCache, formats *types.Types) []string { func fileList(paths []string, filters *filters, sort string, cache *fileCache, formats *types.Types) ([]string, error) {
switch { switch {
case Cache && !cache.isEmpty() && filters.isEmpty(): case Cache && !cache.isEmpty() && filters.isEmpty():
return cache.List() return cache.List(), nil
case Cache && !cache.isEmpty() && !filters.isEmpty(): case Cache && !cache.isEmpty() && !filters.isEmpty():
return filters.apply(cache.List()) return filters.apply(cache.List()), nil
case Cache && cache.isEmpty() && !filters.isEmpty(): case Cache && cache.isEmpty() && !filters.isEmpty():
cache.set(scanPaths(paths, sort, cache, formats)) list, err := scanPaths(paths, sort, cache, formats)
return filters.apply(cache.List()) if err != nil {
return []string{}, err
}
cache.set(list)
return filters.apply(cache.List()), nil
case Cache && cache.isEmpty() && filters.isEmpty(): case Cache && cache.isEmpty() && filters.isEmpty():
cache.set(scanPaths(paths, sort, cache, formats)) list, err := scanPaths(paths, sort, cache, formats)
return cache.List() if err != nil {
return []string{}, err
}
cache.set(list)
return cache.List(), nil
case !Cache && !filters.isEmpty(): case !Cache && !filters.isEmpty():
return filters.apply(scanPaths(paths, sort, cache, formats)) list, err := scanPaths(paths, sort, cache, formats)
if err != nil {
return []string{}, err
}
return filters.apply(list), nil
default: default:
return scanPaths(paths, sort, cache, formats) list, err := scanPaths(paths, sort, cache, formats)
if err != nil {
return []string{}, err
}
return list, nil
} }
} }
func pickFile(args []string, filters *filters, sort string, cache *fileCache, formats *types.Types) (string, error) { func pickFile(args []string, filters *filters, sort string, cache *fileCache, formats *types.Types) (string, error) {
list := fileList(args, filters, sort, cache, formats) list, err := fileList(args, filters, sort, cache, formats)
if err != nil {
return "", err
}
fileCount := len(list) fileCount := len(list)

View File

@ -11,7 +11,7 @@ import (
) )
const ( const (
ReleaseVersion string = "0.87.0" ReleaseVersion string = "0.88.0"
) )
var ( var (
@ -27,8 +27,8 @@ var (
Handlers bool Handlers bool
Images bool Images bool
Info bool Info bool
MaximumFileCount uint32 MaximumFileCount int
MinimumFileCount uint32 MinimumFileCount int
PageLength uint32 PageLength uint32
Port uint16 Port uint16
Prefix string Prefix string
@ -77,8 +77,8 @@ func init() {
rootCmd.Flags().BoolVar(&Handlers, "handlers", false, "display registered handlers (for debugging)") rootCmd.Flags().BoolVar(&Handlers, "handlers", false, "display registered handlers (for debugging)")
rootCmd.Flags().BoolVar(&Images, "images", false, "enable support for image files") rootCmd.Flags().BoolVar(&Images, "images", false, "enable support for image files")
rootCmd.Flags().BoolVarP(&Info, "info", "i", false, "expose informational endpoints") rootCmd.Flags().BoolVarP(&Info, "info", "i", false, "expose informational endpoints")
rootCmd.Flags().Uint32Var(&MaximumFileCount, "maximum-files", 1<<32-1, "skip directories with file counts above this value") rootCmd.Flags().IntVar(&MaximumFileCount, "maximum-files", 1<<32-1, "skip directories with file counts above this value")
rootCmd.Flags().Uint32Var(&MinimumFileCount, "minimum-files", 1, "skip directories with file counts below this value") rootCmd.Flags().IntVar(&MinimumFileCount, "minimum-files", 1, "skip directories with file counts below this value")
rootCmd.Flags().Uint32Var(&PageLength, "page-length", 0, "pagination length for info pages") rootCmd.Flags().Uint32Var(&PageLength, "page-length", 0, "pagination length for info pages")
rootCmd.Flags().Uint16VarP(&Port, "port", "p", 8080, "port to listen on") rootCmd.Flags().Uint16VarP(&Port, "port", "p", 8080, "port to listen on")
rootCmd.Flags().StringVar(&Prefix, "prefix", "/", "root path for http handlers (for reverse proxying)") rootCmd.Flags().StringVar(&Prefix, "prefix", "/", "root path for http handlers (for reverse proxying)")

Binary file not shown.