Verified Commit 28bc3471 authored by Daniel Sonck's avatar Daniel Sonck
Browse files

Send Want message to incoming clients after they connect while starving

parent f333f5ad
......@@ -38,10 +38,15 @@ import (
type Listener struct {
listener net.Listener
connected chan<- struct{}
}
func (l *Listener) Accept() (io.ReadWriteCloser, error) {
return l.listener.Accept()
conn, err := l.listener.Accept()
if err == nil {
l.connected <- struct{}{}
}
return conn, err
}
func (l *Listener) Close() error {
......@@ -62,6 +67,7 @@ var startCmd = &cobra.Command{
}
playerChans, providerChans := channels.Make(), channels.Make()
terminate := make(chan struct{})
connected := make(chan struct{}, 1)
player, err := consumer.Make(func() (io.ReadWriteCloser, error) {
return net.Dial("tcp", net.JoinHostPort(viper.GetString("player.control.address"), strconv.Itoa(viper.GetInt("player.control.port"))))
......@@ -70,7 +76,7 @@ var startCmd = &cobra.Command{
fmt.Printf("failed to create player %v", err.Error())
os.Exit(1)
}
playlist, err := daemon.Make(playerChans, providerChans, terminate, logger)
playlist, err := daemon.Make(playerChans, providerChans, terminate, connected, logger)
if err != nil {
fmt.Printf("failed to create daemon: %v", err.Error())
os.Exit(1)
......@@ -80,7 +86,7 @@ var startCmd = &cobra.Command{
if err != nil {
fmt.Printf("failed to listen: %v", err.Error())
}
provider, err := producer.Make(&Listener{listener: listener}, providerChans, terminate, logger)
provider, err := producer.Make(&Listener{listener: listener,connected: connected}, providerChans, terminate, logger)
if err != nil {
fmt.Printf("failed to create provider listener: %v", err.Error())
os.Exit(1)
......
......@@ -21,6 +21,7 @@ type protocolChannels struct {
type daemon struct {
provider, player protocolChannels
newProvider <-chan struct{}
wantMore chan struct{}
parser fastjson.Parser
......@@ -37,7 +38,7 @@ type daemon struct {
func (p *daemon) Run(wg *sync.WaitGroup) {
wg.Add(1)
go func(){
go func() {
p.run()
p.log.Info("stopped")
wg.Done()
......@@ -70,6 +71,10 @@ func (p *daemon) run() {
default:
p.sendToProvider(v)
}
case <-p.newProvider:
if p.starved {
p.sendToPlayer(&protocol.Want{})
}
case msg := <-p.provider.receivedMessages:
p.log.Debug("received message from provider")
switch v := msg.(type) {
......@@ -153,7 +158,7 @@ func (p *daemon) Loaded(loaded *protocol.Loaded) {
p.checkNext()
}
func Make(playerChannels channels.Producer, producerChannels channels.Consumer, done <-chan struct{}, log *zap.Logger) (Runner, error) {
func Make(playerChannels channels.Producer, producerChannels channels.Consumer, done <-chan struct{}, connected <-chan struct{}, log *zap.Logger) (Runner, error) {
return &daemon{
provider: protocolChannels{
toSend: producerChannels.ConsumerRecv(),
......@@ -163,7 +168,8 @@ func Make(playerChannels channels.Producer, producerChannels channels.Consumer,
toSend: playerChannels.ProducerRecv(),
receivedMessages: playerChannels.ProducerSend(),
},
terminate: done,
log: log.With(zap.String("component","playlist")),
newProvider: connected,
terminate: done,
log: log.With(zap.String("component", "playlist")),
}, nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment