四、观察者模式
本文最后更新于 962 天前,其中的信息可能已经有所发展或是发生改变。

观察者模式

定义

在对象之间定义一个一对多的依赖,当一个对象状态改变的时候,所有依赖的对象都会自动收到通知。

实现方式

可分为阻塞、非阻塞,根据业务场景决定使用哪种。

基础版本

IObserver

IObserver 观察者接口

public interface IObserver {

    // 模板模式
    default void updateWraper(Message message) {
        System.out.print(String.format("Thread id【%s】- ", Thread.currentThread().getId()));
        update(message);
    }

    void update(Message message);
}

ConcreteObserverOne 观察者实现类

public class ConcreteObserverOne implements IObserver {
    @Override
    public void update(Message message) {
        System.out.println("观察者One的观察方法被执行,收到消息:" + message.getContent());
    }
}

ConcreteObserverTwo 观察者实现类

public class ConcreteObserverTwo implements IObserver {
    @Override
    public void update(Message message) {
        System.out.println("观察者Two的观察方法被执行,收到消息:" + message.getContent());
    }
}

ISubject 发布者接口

public interface ISubject {

    /**
     * 注册观察者
     */
    ISubject registerObserver(IObserver observer);

    /**
     * 移除观察者
     */
    ISubject removeObserver(IObserver observer);

    /**
     * 提醒观察者
     */
    void notifyObservers(Message message);
}

ConcreteSubject 发布者实现类-阻塞

public class ConcreteSubject implements ISubject {

    // 可以利用spring的DI,就不用注册了
    private List<IObserver> observers = Lists.newArrayList();

    @Override
    public ISubject registerObserver(IObserver observer) {
        observers.add(observer);
        return this;
    }

    @Override
    public ISubject removeObserver(IObserver observer) {
        observers.remove(observer);
        return this;
    }

    @Override
    public void notifyObservers(Message message) {
        observers.forEach(observer -> observer.updateWraper(message));
    }
}

AsyncSubject 发布者实现类-非阻塞

public class AsyncSubject implements ISubject {

    public static final int DEFAULT_OBSERVER_THREAD_POLL_SIZE = 5;
    private Executor threadPool;
    // 可以利用spring的DI,就不用注册了
    private List<IObserver> observers;

    public AsyncSubject() {
        observers = Lists.newArrayList();
        threadPool = Executors.newFixedThreadPool(DEFAULT_OBSERVER_THREAD_POLL_SIZE);
    }

    @Override
    public ISubject registerObserver(IObserver observer) {
        observers.add(observer);
        return this;
    }

    @Override
    public ISubject removeObserver(IObserver observer) {
        observers.remove(observer);
        return this;
    }

    @Override
    public void notifyObservers(Message message) {
        observers.forEach(observers -> threadPool.execute(() -> observers.updateWraper(message)));
    }
}

Message 发布者发布的消息

@Data
@AllArgsConstructor
public class Message {
    private String content;
}

Application 应用类

public class Application {
    public static void main(String[] args) {
        final ISubject subject = new ConcreteSubject();
        subject.registerObserver(new ConcreteObserverOne())
               .registerObserver(new ConcreteObserverTwo());
        subject.notifyObservers(new Message("【同步】传播的消息"));

        System.out.println("—————————分割线—————————");

        final ISubject asyncSubject = new AsyncSubject();
        asyncSubject.registerObserver(new ConcreteObserverOne())
                    .registerObserver(new ConcreteObserverTwo());
        asyncSubject.notifyObservers(new Message("【异步】传播的消息"));
    }
}

运行结果

Thread id【1】- 观察者One的观察方法被执行,收到消息:【同步】传播的消息
Thread id【1】- 观察者Two的观察方法被执行,收到消息:【同步】传播的消息
—————————分割线—————————
Thread id【11】- 观察者One的观察方法被执行,收到消息:【异步】传播的消息
Thread id【12】- 观察者Two的观察方法被执行,收到消息:【异步】传播的消息

Eventbus

google封装好的工具类,使用起来非常方便。而且功能强大,可根据消息类型匹配对应的观察者。

Application

ConcreteObserverOne 观察者One

public class ConcreteObserverOne {

    @Subscribe
    public void update(String message) {
        System.out.print(String.format("Thread id【%s】- ", Thread.currentThread().getId()));
        System.out.println("观察者One的观察方法被执行,参数类型为String。收到消息:" + message);
    }
}

ConcreteObserverTwo 观察者Two

public class ConcreteObserverTwo {

    @Subscribe
    public void update(Message message) {
        System.out.print(String.format("Thread id【%s】- ", Thread.currentThread().getId()));
        System.out.println("观察者Two的观察方法被执行,参数类型为Message。收到消息:" + message.getContent());
    }
}

Application 应用类

public class Application {

    private static final int DEFAULT_EVENTBUS_THREAD_POLL_SIZE = 10;

    public static void main(String[] args) {

        // 同步
        final EventBus eventBus = new EventBus();
        eventBus.register(new ConcreteObserverOne());
        eventBus.register(new ConcreteObserverTwo());
        eventBus.post("这是一条同步的消息,类型为String");
        eventBus.post(new Message("这是一条同步的消息,类型为Message"));

        // 异步
        final AsyncEventBus asyncEventBus = new AsyncEventBus(Executors.newFixedThreadPool(DEFAULT_EVENTBUS_THREAD_POLL_SIZE));
        asyncEventBus.register(new ConcreteObserverOne());
        asyncEventBus.register(new ConcreteObserverTwo());
        asyncEventBus.post("这是一条异步的消息,类型为String");
        asyncEventBus.post(new Message("这是一条异步的消息,类型为Message"));
    }
}

运行结果

Thread id【1】- 观察者One的观察方法被执行,参数类型为String。收到消息:这是一条同步的消息,类型为String
Thread id【1】- 观察者Two的观察方法被执行,参数类型为Message。收到消息:这是一条同步的消息,类型为Message
Thread id【12】- 观察者One的观察方法被执行,参数类型为String。收到消息:这是一条异步的消息,类型为String
Thread id【13】- 观察者Two的观察方法被执行,参数类型为Message。收到消息:这是一条异步的消息,类型为Message

优点

  • 将观察者与被观察者解耦,使其达到低耦合的目的,提升扩展性。
  • 开闭原则。 无需修改发布者代码就能引入新的订阅者类
  • 可以在运行时建立对象之间的联系

与其他模式的关系

责任链模式

从实现上来看,两者都是由调用者发出消息,多个节点处理。但侧重点不同,责任链模式侧重对消息的处理,反馈给调用者。而观察者侧重的是收到消息后,执行的一些逻辑。从使用方式来看,如果将观察者的异步处理运用到责任链上,你觉得合适吗?

作者:Yuyy
博客:https://yuyy.info
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇