由于AKka的核心是Actor,而Actor是按照Actor模型进行实现的,所以在使用Akka之前,有必要弄清楚什么是Actor模型。
Actor模型最早是1973年Carl Hewitt、Peter Bishop和Richard Seiger的论文中出现的,受物理学中的广义相对论(general relativity)和量子力学(quantum mechanics)所启发,为解决并发计算的一个数学模型。
Actor模型所推崇的哲学是”一切皆是Actor“,这与面向对象编程的”一切皆是对象“类似。但不同的是,在模型中,Actor是一个运算实体,它遵循以下规则: 接受外部消息,不占用调用方(消息发送者)的CPU时间片 通过消息改变自身的状态 创建有限数量的新Actor 发送有限数量的消息给其他Actor 很多语言都实现了Actor模型,而其中最出名的实现要属Erlang的。Akka的实现借鉴了不少Erlang的经验。
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.4.7</version>
</dependency>
tell 发送一个消息到目标Actor后立刻返回
public class C extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder().match(Object.class,obj->{
if(obj instanceof String){
System.out.println("C: D你回复给我的消息我收到了!");
return;
}
SomeOne someOne = (SomeOne) obj;
System.out.println("C: C接收到消息:"+someOne.toString());
// 创建D路由
ActorRef actorRef = this.getContext().actorOf(Props.create(D.class, D::new));
// 传递给D
actorRef.tell(someOne,self());
// 路由给D(和tell 实现的功能一样)
//actorRef.forward(someOne,getContext());
}).build();
}
public static void main(String[] args) {
ActorSystem ok = ActorSystem.create("ok");
ActorRef actorRef = ok.actorOf(Props.create(C.class, C::new));
Scanner sc = new Scanner(System.in);
System.out.print("请输入:");
String s = sc.nextLine();
actorRef.tell(new SomeOne(1,s,0),ActorRef.noSender());
}
}
public class D extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder().match(Object.class,obj->{
SomeOne someOne = (SomeOne) obj;
System.out.println("D: D接收到C 传过来的消息:"+someOne.toString());
Thread.sleep(2000);
sender().tell("D: 我再把消息发给你C",self());
}).build();
}
}
注意:
ActorSystem是一个较重的存在,一般一个应用里,只需要一个ActorSystem。
在同一个ActorySystem中,Actor不能重名。
ask 发送一个消息到目标Actor,并返回一个Future对象,可以通过该对象获取结果。但前提是目标Actor会有Reply(答复)才行,如果没有Reply,则抛出超时异常
public class A extends AbstractActor {
// 接收到对象SomeOne
@Override
public Receive createReceive() {
return receiveBuilder().match(Object.class,obj ->{
if(obj instanceof SomeOne){
SomeOne someOne = (SomeOne) obj;
System.out.println(" A 收到 SomeOne 对象:"+someOne.toString());
someOne.setAge(someOne.getAge()+1);
// 业务。。。
Thread.sleep(1000);
// 返回结果
this.getSender().tell("xxx",getSelf());
}
}).build();
}
## Await 同步阻塞等待结果
public static void main(String[] args) {
//
ActorSystem test = ActorSystem.create("test");
ActorRef actorRefA = test.actorOf(Props.create(A.class, A::new));
SomeOne someOne = new SomeOne(1,"哈哈哈ok",10);
// 2 分钟超时
Timeout timeout = new Timeout(Duration.create(2, TimeUnit.SECONDS));
Future<Object> future = Patterns.ask(actorRefA, someOne, timeout); //ref,消息体,超时时间
try {
// Await 同步阻塞等待方式
String reply = (String) Await.result(future, timeout.duration());
System.out.println("回复的消息: " + reply);
} catch (Exception e) {
e.printStackTrace();
}
}
public class A extends AbstractActor {
// 接收到对象SomeOne
@Override
public Receive createReceive() {
return receiveBuilder().match(Object.class,obj ->{
if(obj instanceof SomeOne){
SomeOne someOne = (SomeOne) obj;
System.out.println(" A 收到 SomeOne 对象:"+someOne.toString());
someOne.setAge(someOne.getAge()+1);
// 业务。。。
Thread.sleep(1000);
// 返回结果
this.getSender().tell("xxx",getSelf());
}
}).build();
}
## future 异步等待结果。
public static void main(String[] args) {
//
ActorSystem test = ActorSystem.create("test");
ActorRef actorRefA = test.actorOf(Props.create(A.class, A::new));
SomeOne someOne = new SomeOne(1,"哈哈哈ok",10);
// 2 分钟超时
Timeout timeout = new Timeout(Duration.create(2, TimeUnit.SECONDS));
//ref,消息体,超时时间
Future<Object> future = Patterns.ask(actorRefA, someOne, timeout);
// 异步方式
future.onComplete(new OnComplete<Object>() {
@Override
public void onComplete(Throwable throwable, Object o) throws Throwable {
if (throwable != null) {
System.out.println("返回结果异常:" + throwable.getMessage());
} else {
System.out.println("返回消息:" + o);
}
}
}, test.dispatcher());
// 成功,执行过程
future.onSuccess(new OnSuccess<Object>() {
@Override
public void onSuccess(Object msg) throws Throwable {
System.out.println("回复的消息:" + msg);
}
}, test.dispatcher());
//失败,执行过程
future.onFailure(new OnFailure() {
@Override
public void onFailure(Throwable throwable) throws Throwable {
if (throwable instanceof TimeoutException) {
System.out.println("服务超时");
} else {
System.out.println("未知错误");
}
}
}, test.dispatcher());
}
tell 前置后置处理,销毁线程 的例子
public class MessageSendAndAccept extends AbstractActor {
//接收消息前置处理
@Override
public void preStart() {
System.out.println("--------- 接收到消息 start");
}
//接收消息后置处理
@Override
public void postStop(){
System.out.println("--------- 消息处理完毕 end");
}
// A接收消息
@Override
public Receive createReceive() {
return receiveBuilder().match(String.class,result ->{
consoleLog(result);
}).build();
}
//打印
public void consoleLog(String log){
System.out.println("接收到内容:"+log);
//销毁线程
getContext().stop(self());
}
public static void main(String[] args) {
// 创建ActorSystem仓库
ActorSystem actorSystem = ActorSystem.create("demo");
// 创建路由,路由到A
ActorRef my_actor = actorSystem.actorOf(Props.create(MessageSendAndAccept.class), "my_actor");
// 给 A 发消息
my_actor.tell("哈哈哈a",ActorRef.noSender());
}
}
并发 执行方法 例子
创建多个actor 同时执行就好了
public class G extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder().match(Object.class,obj->{
if(obj instanceof String){
System.out.println(obj + ",time="+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"--- Thread ---"+Thread.currentThread().getName());
//休眠 3s
Thread.sleep(3000L);
System.out.println(Thread.currentThread().getName()+"---END");
return;
}
}).build();
}
public static void main(String[] args) {
ActorSystem ok = ActorSystem.create("ok");
ActorRef actorRef_0 = ok.actorOf(Props.create(G.class, G::new));
actorRef_0.tell("a",ActorRef.noSender());
ActorRef actorRef_1 = ok.actorOf(Props.create(G.class, G::new));
actorRef_1.tell("b",ActorRef.noSender());
ActorRef actorRef_2 = ok.actorOf(Props.create(G.class, G::new));
actorRef_2.tell("c",ActorRef.noSender());
}
}
文章来源地址https://www.toymoban.com/news/detail-609635.html
文章来源:https://www.toymoban.com/news/detail-609635.html
到了这里,关于akka 简单使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!