Handle keep_open_until: cancel, periodic poll, thread safety #12

Open
mykhailo wants to merge 1 commits from mykhailo/godoor:issue-11-keep-open into master
2 changed files with 156 additions and 12 deletions
Showing only changes of commit 728939cef3 - Show all commits

View File

@@ -206,6 +206,28 @@ func setup(ctx context.Context) {
log.Println("Initial token population success") log.Println("Initial token population success")
allowedPollInterval := 15 * time.Second
if intervalStr, ok := os.LookupEnv("KDOORPI_ALLOWED_POLL_INTERVAL"); ok {
if interval, err := time.ParseDuration(intervalStr); err != nil {
log.Printf("parsing KDOORPI_ALLOWED_POLL_INTERVAL: %v, keeping default %v", err, allowedPollInterval)
} else if interval <= 0 {
// time.NewTicker panics on a non-positive duration, so a malformed
// setting like "0" or "-1s" would crash the controller. Keep the
// safe default instead.
log.Printf("KDOORPI_ALLOWED_POLL_INTERVAL must be positive, got %v, keeping default %v", interval, allowedPollInterval)
} else {
allowedPollInterval = interval
}
}
go func() {
ticker := time.NewTicker(allowedPollInterval)
defer ticker.Stop()
for range ticker.C {
pollAllowedOnce()
}
}()
go func() { go func() {
for { for {
err := waitEvents() err := waitEvents()
@@ -220,6 +242,13 @@ func setup(ctx context.Context) {
if err != nil { if err != nil {
log.Printf("reloadTokens failed: %q", err) log.Printf("reloadTokens failed: %q", err)
apiFailuresCount.WithLabelValues("allowed", config.api.allowed).Inc() apiFailuresCount.WithLabelValues("allowed", config.api.allowed).Inc()
// Intentionally no cancelKeepOpenDoor() here: the hold
// fail-safe is owned by the fixed-interval periodic poll
// above. Cancelling from this best-effort, per-iteration
// refresh too would only add door flapping. Worst-case
// stuck-open after proxy loss is ~reloadInfoTimeout + one
// poll interval (a hung-but-connected proxy blocks the
// in-flight reloadInfo until its request timeout).
} }
}() }()
} }
@@ -234,6 +263,19 @@ func setup(ctx context.Context) {
log.Println("Setup completed") log.Println("Setup completed")
} }
func pollAllowedOnce() {
err := reloadInfo()
if err != nil {
log.Printf("Periodic reloadInfo failed: %v", err)
apiFailuresCount.WithLabelValues("allowed", config.api.allowed).Inc()
// Fail safe: we can no longer confirm the door should stay
// held open, so close it now rather than trusting a stale
// deadline (up to 6h away). A later successful poll re-opens
// it if the hold is still active server-side.
cancelKeepOpenDoor()
}
}
func listenSig1(ctx context.Context, wiegand Wiegand) { func listenSig1(ctx context.Context, wiegand Wiegand) {
usrSig := make(chan os.Signal, 1) usrSig := make(chan os.Signal, 1)
signal.Notify(usrSig, syscall.SIGUSR1) signal.Notify(usrSig, syscall.SIGUSR1)
@@ -264,7 +306,10 @@ func OpenAndCloseDoor(w Wiegand) error {
return err return err
} }
if keepDoorOpen.until.After(time.Now()) { keepDoorOpenLock.Lock()
keepOpenUntil := keepDoorOpen.until
keepDoorOpenLock.Unlock()
if keepOpenUntil.After(time.Now()) {
fmt.Println("Door is already open") fmt.Println("Door is already open")
return nil return nil
} }
@@ -273,6 +318,19 @@ func OpenAndCloseDoor(w Wiegand) error {
time.Sleep(config.doorOpenTime) time.Sleep(config.doorOpenTime)
// A hold may have been armed while we were sleeping. Re-check under the
// lock and skip the close if so, holding the lock across the close decision
// so an in-flight updateKeepOpenDoor cannot slip a hold in between the
// check and the close. Otherwise this pulse would close a held-open door
// and subsequent polls, seeing the timer as already valid, would never
// reopen it, locking the door for the whole hold duration.
keepDoorOpenLock.Lock()
defer keepDoorOpenLock.Unlock()
if keepDoorOpen.until.After(time.Now()) {
fmt.Println("Door is held open, leaving open")
return nil
}
err = CloseDoor(w) err = CloseDoor(w)
if err != nil { if err != nil {
return err return err
@@ -287,8 +345,7 @@ func OpenDoor(w Wiegand) error {
if open { if open {
return nil return nil
} }
w.OpenDoor() return w.OpenDoor()
return nil
} }
func CloseDoor(w Wiegand) error { func CloseDoor(w Wiegand) error {
@@ -296,8 +353,7 @@ func CloseDoor(w Wiegand) error {
if !open { if !open {
return nil return nil
} }
w.CloseDoor() return w.CloseDoor()
return nil
} }
func cardRunner(w Wiegand) { func cardRunner(w Wiegand) {
@@ -402,8 +458,24 @@ func waitEvents() error {
} }
} }
// reloadInfoLock serializes reloadInfo so the periodic-poll and longpoll-driven
// callers cannot apply /allowed responses out of order (a slow stale response
// must not overwrite a newer one and, e.g., cancel a just-armed hold).
var reloadInfoLock sync.Mutex
// reloadInfoTimeout bounds a single /allowed fetch. The shared http client has a
// 120s timeout (needed by the longpoll), but the allow-list/keep-open fetch must
// be quick: a hung-but-connected proxy otherwise blocks the fail-safe close for
// up to 120s. This caps the worst-case stuck-open window to ~this + one interval.
const reloadInfoTimeout = 30 * time.Second
func reloadInfo() error { func reloadInfo() error {
req, err := http.NewRequest(http.MethodGet, config.api.allowed, nil) reloadInfoLock.Lock()
defer reloadInfoLock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), reloadInfoTimeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, config.api.allowed, nil)
if err != nil { if err != nil {
return err return err
} }
@@ -413,8 +485,14 @@ func reloadInfo() error {
if err != nil { if err != nil {
return err return err
} }
defer resp.Body.Close()
if resp.StatusCode != 200 { if resp.StatusCode != 200 {
log.Printf("%v\n", resp) log.Printf("%v\n", resp)
// Treat a non-200 as a failure rather than parsing a possibly-cached or
// error body: otherwise a stale future keep_open_until would defeat the
// fail-safe, or a body lacking it would wrongly cancel a live hold. The
// caller's error path then drives the documented fail-safe close.
return fmt.Errorf("allowed endpoint returned status %d", resp.StatusCode)
} }
var info upstreamUpdate var info upstreamUpdate
@@ -446,6 +524,8 @@ func reloadInfo() error {
if info.KeepOpenUntil != nil { if info.KeepOpenUntil != nil {
updateKeepOpenDoor(*info.KeepOpenUntil) updateKeepOpenDoor(*info.KeepOpenUntil)
} else {
cancelKeepOpenDoor()
} }
lastSyncTimestamp.SetToCurrentTime() lastSyncTimestamp.SetToCurrentTime()

View File

@@ -3,10 +3,28 @@ package main
import ( import (
"fmt" "fmt"
"log" "log"
"sync"
"time" "time"
) )
var keepDoorOpenLock sync.Mutex
// keepDoorOpenGen identifies the currently armed hold timer. It is bumped (under
// keepDoorOpenLock) every time a new timer is installed so that a superseded
// timer's callback can recognise it is stale and do nothing.
var keepDoorOpenGen uint64
func updateKeepOpenDoor(newKeepOpenTime time.Time) { func updateKeepOpenDoor(newKeepOpenTime time.Time) {
keepDoorOpenLock.Lock()
defer keepDoorOpenLock.Unlock()
// Hold unchanged since the last poll: keep the existing timer rather than
// rebuilding it every poll, which would float the close time forward by up
// to one poll interval and re-pulse OpenDoor needlessly.
if keepDoorOpen.timer != nil && newKeepOpenTime.Equal(keepDoorOpen.until) {
return
}
// is there one active? // is there one active?
if keepDoorOpen.timer != nil { if keepDoorOpen.timer != nil {
keepDoorOpen.timer.Stop() keepDoorOpen.timer.Stop()
@@ -15,19 +33,65 @@ func updateKeepOpenDoor(newKeepOpenTime time.Time) {
if newKeepOpenTime.After(time.Now()) { if newKeepOpenTime.After(time.Now()) {
log.Printf("Keeping door open until %v", newKeepOpenTime) log.Printf("Keeping door open until %v", newKeepOpenTime)
OpenDoor(wiegand) if err := OpenDoor(wiegand); err != nil {
timer := time.AfterFunc(time.Until(newKeepOpenTime), handleKeepDoorOpenCloseCleanup) // Don't commit the hold if the relay didn't actually open: leaving
// keepDoorOpen empty (timer nil) means the next poll retries instead
// of latching the Equal early-return on a door that never opened.
log.Printf("ERROR opening door for hold: %v", err)
return
}
keepDoorOpenGen++
gen := keepDoorOpenGen
timer := time.AfterFunc(time.Until(newKeepOpenTime), func() {
handleKeepDoorOpenCloseCleanup(gen)
})
keepDoorOpen = KeepDoorOpen{ keepDoorOpen = KeepDoorOpen{
timer: timer, timer: timer,
until: newKeepOpenTime, until: newKeepOpenTime,
} }
} else { } else {
CloseDoor(wiegand) if err := CloseDoor(wiegand); err != nil {
log.Printf("ERROR closing door: %v", err)
}
} }
} }
func handleKeepDoorOpenCloseCleanup() { func cancelKeepOpenDoor() {
fmt.Println("Keep door open time is reached!") keepDoorOpenLock.Lock()
CloseDoor(wiegand) defer keepDoorOpenLock.Unlock()
if keepDoorOpen.timer == nil {
return
}
keepDoorOpen.timer.Stop()
if err := CloseDoor(wiegand); err != nil {
// Keep keepDoorOpen non-nil so the next poll's cancel retries the close;
// clearing it now would strand the door OPEN with no retry path.
log.Printf("ERROR closing door on hold cancel: %v", err)
return
}
keepDoorOpen = KeepDoorOpen{}
}
func handleKeepDoorOpenCloseCleanup(gen uint64) {
keepDoorOpenLock.Lock()
defer keepDoorOpenLock.Unlock()
// Timer.Stop() can return after this callback has already started and is
// blocked here on the lock; by the time we acquire it, updateKeepOpenDoor
// may have installed a newer hold. Only act if we are still the current
// generation, otherwise we would close the door and clear a live hold.
if gen != keepDoorOpenGen {
return
}
fmt.Println("Keep door open time is reached!")
if err := CloseDoor(wiegand); err != nil {
// Leave keepDoorOpen intact so the next poll (which will see
// keep_open_until=null and call cancelKeepOpenDoor) retries the close
// instead of stranding the door OPEN.
log.Printf("ERROR closing door at hold expiry: %v", err)
return
}
keepDoorOpen = KeepDoorOpen{} keepDoorOpen = KeepDoorOpen{}
} }