Redis
 sql >> Datenbank >  >> NoSQL >> Redis

Zugriff auf eine Variable in einem Rails-Thread

AKTUALISIERTE BEARBEITUNG AM ENDE:Zeigt Arbeitscode. Hauptmodul unverändert mit Ausnahme des Debugging-Codes. Hinweis:Ich hatte das bereits erwähnte Problem bezüglich der Notwendigkeit, mich vor der Kündigung abzumelden.

Der Code sieht korrekt aus. Ich würde gerne sehen, wie Sie es instanziieren.

In config/application.rb haben Sie wahrscheinlich mindestens so etwas wie:

require 'ws_communication'
config.middleware.use WsCommunication

Dann sollten Sie in Ihrem JavaScript-Client so etwas haben:

var ws = new WebSocket(uri);

Instanziieren Sie eine weitere Instanz von WsCommunication? Das würde @clients auf ein leeres Array setzen und könnte Ihre Symptome zeigen. Etwas in der Art wäre falsch:

var ws = new WsCommunication;

Es würde uns helfen, wenn Sie den Client und vielleicht config/application.rb zeigen würden, wenn dieser Beitrag nicht hilft.

Übrigens stimme ich dem Kommentar zu, dass @clients bei jedem Update durch einen Mutex geschützt werden sollte, wenn sich das nicht so liest. Es handelt sich um eine dynamische Struktur, die sich in einem ereignisgesteuerten System jederzeit ändern kann. Redis-Mutex ist eine gute Option. (Ich hoffe, dieser Link ist korrekt, da Github im Moment 500 Fehler auf alles zu werfen scheint.)

Beachten Sie außerdem, dass $redis.publish einen ganzzahligen Wert der Anzahl der Clients zurückgibt, die die Nachricht erhalten haben.

Schließlich stellen Sie möglicherweise fest, dass Sie sicherstellen müssen, dass Ihr Kanal vor der Kündigung gekündigt wird. Ich hatte Situationen, in denen ich am Ende jede Nachricht mehrmals oder sogar viele Male gesendet habe, weil frühere Abonnements für denselben Kanal nicht bereinigt wurden. Da Sie den Kanal innerhalb eines Threads abonnieren, müssen Sie sich innerhalb desselben Threads abmelden, oder der Prozess "hängt" einfach und wartet darauf, dass der richtige Thread auf magische Weise erscheint. Ich handhabe diese Situation, indem ich ein "Abmelden"-Flag setze und dann eine Nachricht sende. Dann teste ich innerhalb des on.message-Blocks auf das Unsubscribe-Flag und gebe dort das Unsubscribe aus.

Das Modul, das Sie bereitgestellt haben, mit nur geringfügigen Debugging-Änderungen:

require 'faye/websocket'
require 'redis'

class WsCommunication
  KEEPALIVE_TIME = 15 #seconds
  CHANNEL = 'vip-deck'

  def initialize(app)
    @app = app
    @clients = []
    uri = URI.parse(ENV['REDISCLOUD_URL'])
    $redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
    Thread.new do
      redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
      redis_sub.subscribe(CHANNEL) do |on|
        on.message do |channel, msg|
          puts "Message event. Clients receiving:#{@clients.count};"
          @clients.each { |ws| ws.send(msg) }
        end
      end
    end
  end

  def call(env)
    if Faye::WebSocket.websocket?(env)
      ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})

      ws.on :open do |event|
        @clients << ws
        puts "Open event. Clients open:#{@clients.count};"
      end

      ws.on :message do |event|
        receivers = $redis.publish(CHANNEL, event.data)
        puts "Message published:#{event.data}; Receivers:#{receivers};"
      end

      ws.on :close do |event|
        @clients.delete(ws)
        puts "Close event. Clients open:#{@clients.count};"
        ws = nil
      end

      ws.rack_response
    else
      @app.call(env)
    end
  end
end

Der Test-Abonnentencode, den ich bereitgestellt habe:

# encoding: UTF-8
puts "Starting client-subscriber.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'websocket-client-simple'

puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"

url = ARGV.shift || 'ws://localhost:3000'

EM.run do

  ws = WebSocket::Client::Simple.connect url

  ws.on :message do |msg|
    puts msg
  end

  ws.on :open do
    puts "-- Subscriber open (#{ws.url})"
  end

  ws.on :close do |e|
    puts "-- Subscriber close (#{e.inspect})"
    exit 1
  end

  ws.on :error do |e|
    puts "-- Subscriber error (#{e.inspect})"
  end

end

Der von mir bereitgestellte Test-Publisher-Code. Publisher und Subscriber könnten einfach kombiniert werden, da dies nur Tests sind:

# encoding: UTF-8
puts "Starting client-publisher.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'json'
require 'websocket-client-simple'

puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"

url = ARGV.shift || 'ws://localhost:3000'

EM.run do
  count ||= 0
  timer = EventMachine.add_periodic_timer(5+rand(5)) do
    count += 1
    send({"MESSAGE": "COUNT:#{count};"})
  end

  @ws = WebSocket::Client::Simple.connect url

  @ws.on :message do |msg|
    puts msg
  end

  @ws.on :open do
    puts "-- Publisher open"
  end

  @ws.on :close do |e|
    puts "-- Publisher close (#{e.inspect})"
    exit 1
  end

  @ws.on :error do |e|
    puts "-- Publisher error (#{e.inspect})"
    @ws.close
  end

  def self.send message
    payload = message.is_a?(Hash) ? message : {payload: message}
    @ws.send(payload.to_json)
  end
end

Ein Beispiel für config.ru, das all dies auf der Rack-Middleware-Schicht ausführt:

require './controllers/main'
require './middlewares/ws_communication'
use WsCommunication
run Main.new

Das ist Main. Ich habe es aus meiner laufenden Version entfernt, sodass es möglicherweise optimiert werden muss, wenn Sie es verwenden:

%w(rubygems bundler sinatra/base json erb).each { |m| require m }
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)

Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']

  class Main < Sinatra::Base

    env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
    get "/" do
      erb :"index.html"
    end

    get "/assets/js/application.js" do
      content_type :js
      @scheme = env == "production" ? "wss://" : "ws://"
      erb :"application.js"
    end
  end