Javaにおけるポイズンピルパターン:マルチスレッドプロセスの正常な終了
別名
- シャットダウンシグナル
ポイズンピルデザインパターンの目的
ポイズンピルデザインパターンは、サービスまたはプロデューサー-コンシューマーシステムを正常にシャットダウンするために使用されます。メッセージキューに特別なメッセージ(「ポイズンピル」)を送信することで、これ以上のメッセージが送信されないことを示し、コンシューマーが終了できるようにします。
実際の例を用いたポイズンピルパターンの詳細な説明
実際の例
ポイズンピルデザインパターンの現実世界の例えは、小売店での「閉店」の看板の使用です。閉店準備が整うと、店長はドアに「閉店」の看板を置きます。この看板は、新しい顧客に対してこれ以上の顧客は入店できないことを示すシグナルとして機能しますが、すでに店内にいる顧客をすぐに強制的に退出させるわけではありません。その後、店員は残りの顧客に対応し、最終的に鍵をかけて照明を消す前に、購入を完了できるようにします。同様に、ポイズンピルパターンでは、特別な「ポイズンピル」メッセージは、コンシューマーが新しいタスクの受け入れを停止するようシグナルを送りますが、正常にシャットダウンする前に現在のタスクの処理を完了できるようにします。
分かりやすく言うと
ポイズンピルは、メッセージ交換を終了させる既知のメッセージ構造です。
Javaにおけるポイズンピルパターンのプログラム例
このJavaの例では、ポイズンピルはメッセージキュー内のシャットダウンシグナルとして機能し、効果的なスレッド管理とコンシューマー通信を実証しています。
最初にメッセージ構造を定義しましょう。インターフェース`Message`と実装`SimpleMessage`があります。
public interface Message {
// Other properties and methods...
enum Headers {
DATE, SENDER
}
void addHeader(Headers header, String value);
String getHeader(Headers header);
Map<Headers, String> getHeaders();
void setBody(String body);
String getBody();
}
public class SimpleMessage implements Message {
private final Map<Headers, String> headers = new HashMap<>();
private String body;
@Override
public void addHeader(Headers header, String value) {
headers.put(header, value);
}
@Override
public String getHeader(Headers header) {
return headers.get(header);
}
@Override
public Map<Headers, String> getHeaders() {
return Collections.unmodifiableMap(headers);
}
@Override
public void setBody(String body) {
this.body = body;
}
@Override
public String getBody() {
return body;
}
}
メッセージを渡すために、メッセージキューを使用しています。ここでは、メッセージキューに関連する型を定義します:`MqPublishPoint`、`MqSubscribePoint`、および`MessageQueue`です。 `SimpleMessageQueue`は、これらすべてのインターフェースを実装します。
public interface MqPublishPoint {
void put(Message msg) throws InterruptedException;
}
public interface MqSubscribePoint {
Message take() throws InterruptedException;
}
public interface MessageQueue extends MqPublishPoint, MqSubscribePoint {}
public class SimpleMessageQueue implements MessageQueue {
private final BlockingQueue<Message> queue;
public SimpleMessageQueue(int bound) {
queue = new ArrayBlockingQueue<>(bound);
}
@Override
public void put(Message msg) throws InterruptedException {
queue.put(msg);
}
@Override
public Message take() throws InterruptedException {
return queue.take();
}
}
次に、メッセージ`Producer`と`Consumer`が必要です。内部的には、上記のメッセージキューを使用します。 `Producer`が停止すると、メッセージングが終了したことを`Consumer`に通知するためにポイズンピルを送信することに注意することが重要です。
public class Producer {
// Other properties and methods...
public void send(String body) {
if (isStopped) {
throw new IllegalStateException(String.format(
"Producer %s was stopped and fail to deliver requested message [%s].", body, name));
}
var msg = new SimpleMessage();
msg.addHeader(Headers.DATE, new Date().toString());
msg.addHeader(Headers.SENDER, name);
msg.setBody(body);
try {
queue.put(msg);
} catch (InterruptedException e) {
// allow thread to exit
LOGGER.error("Exception caught.", e);
}
}
public void stop() {
isStopped = true;
try {
queue.put(Message.POISON_PILL);
} catch (InterruptedException e) {
// allow thread to exit
LOGGER.error("Exception caught.", e);
}
}
}
public class Consumer {
// Other properties and methods...
public void consume() {
while (true) {
try {
var msg = queue.take();
if (Message.POISON_PILL.equals(msg)) {
LOGGER.info("Consumer {} receive request to terminate.", name);
break;
}
var sender = msg.getHeader(Headers.SENDER);
var body = msg.getBody();
LOGGER.info("Message [{}] from [{}] received by [{}]", body, sender, name);
} catch (InterruptedException e) {
// allow thread to exit
LOGGER.error("Exception caught.", e);
return;
}
}
}
}
最後に、実際に動作する例全体を紹介します。
public static void main(String[] args) {
var queue = new SimpleMessageQueue(10000);
final var producer = new Producer("PRODUCER_1", queue);
final var consumer = new Consumer("CONSUMER_1", queue);
new Thread(consumer::consume).start();
new Thread(() -> {
producer.send("hand shake");
producer.send("some very important information");
producer.send("bye!");
producer.stop();
}).start();
}
プログラム出力
07:43:01.518 [Thread-0] INFO com.iluwatar.poison.pill.Consumer -- Message [hand shake] from [PRODUCER_1] received by [CONSUMER_1]
07:43:01.520 [Thread-0] INFO com.iluwatar.poison.pill.Consumer -- Message [some very important information] from [PRODUCER_1] received by [CONSUMER_1]
07:43:01.520 [Thread-0] INFO com.iluwatar.poison.pill.Consumer -- Message [bye!] from [PRODUCER_1] received by [CONSUMER_1]
07:43:01.520 [Thread-0] INFO com.iluwatar.poison.pill.Consumer -- Consumer CONSUMER_1 receive request to terminate.
実際の例を用いたポイズンピルパターンの詳細な説明

Javaでポイズンピルパターンを使用する場合
以下の場合にポイズンピルイディオムを使用します
- システムは、マルチスレッド環境で堅牢な耐障害性とシームレスなコンシューマーシャットダウンを必要とします。
- コンシューマーがメッセージ処理の終了について通知される必要があるプロデューサー-コンシューマーシナリオの場合。
- コンシューマーがシャットダウン前に残りのメッセージの処理を完了できるようにするため。
Javaにおけるポイズンピルパターンの実際のアプリケーション
- シャットダウンを通知するための特別なタスクを使用してJava ExecutorServiceをシャットダウンします。
- 特定のメッセージがキュー処理の終了を示すメッセージングシステム。
- Akkaフレームワーク
ポイズンピルパターンの利点とトレードオフ
利点
- コンシューマーのシャットダウンプロセスを簡素化します。
- 終了前にすべての保留中のタスクが完了していることを確認します。
- シャットダウンロジックをメイン処理ロジックから分離します。
トレードオフ
- コンシューマーはポイズンピルをチェックする必要があるため、ある程度のオーバーヘッドが追加されます。
- 適切に管理しないと、コンシューマーがポイズンピルを認識せず、無期限のブロッキングが発生する可能性があります。
関連するJavaデザインパターン
- プロデューサー-コンシューマー:ポイズンピルパターンと連携して、コンシューマーの通信とシャットダウンを処理します。
- メッセージキュー:キュー内のメッセージ処理の終了を通知するために、多くの場合ポイズンピルを使用します。
- オブザーバー:シャットダウンイベントについてサブスクライバーに通知するために使用できます。