Pular para o conteúdo principal

Magia Assíncrona

Versão PDF


Como transformar objetos comuns em "objetos assíncronos"

Programação Concorrente em Erlang - A inspiração

Joe Armstrong, um dos criadores da linguagem Erlang nos mostrou uma maneira muito elegante de tratar concorrência na programação. Iremos introduzir alguns conceitos de Erlang que tangem a programação concorrente deixando de lado uma séria de sofisticações como o tratamento e propagação de erros, programação remota para focar nos conceitos mais básicos e gerais.
O primeiro comando básico de concorrência é o BIF (built in function) ``spawn''. Essa função é usada para criar um novo ``processo" (na nomenclatura de Erlang, processo seria uma linha de execução (portanto uma thread) que não compartilha dados). A assinatura da função é a seguinte: spawn(Module, Exported Function, List of Arguments). Considere o seguinte módulo (extraído do tutorial do Erlang):
--module(example1).

-export([start/0, say_something/2]).

say_something(_, 0) -> done;

say_something(What, Times)-> 
 io:format("~p~n", [What]),
 say_something(What, Times - 1).

start() ->
 spawn(example1, say_something, [hello, 3]),
 spawn(example1, say_something, [goodbye, 3]).
A função start() deste módulo bem simples cria dois processos que executa a função \verb+say_something+ com os parâmetros $[hello, 3]$ e $[goodbye,3]$. Além disso, a função devolve o id do processo criado (como veremos a frente).
1> c(example1).
{ok,example1}
2> example1:start().
hello
goodbye
<0.43.0>
hello
goodbye
hello
goodbye

Por enquanto, criamos processos que executam ações mas não escutam mensagens. Mas enviar mensagens para um objeto que não responde é como estender a mão a um cego esperando que aperte sua mão. Façamos então o processo enxegar ou ouvir mensagens. Para isso, utilizamos a estrutura receive.


A construção receive é usada para permitir que processos esperem mensagens de outros. Seu formato é:

receive
 pattern1 ->
  actions1;
 pattern2 ->
  actions2;
 ....
 patternN
  actionsN
end.
O que é um tanto quanto intuitivo. Ao receber alguma mensagem que coincida com o pattern1 o processo executará actions1 e assim sucessivamente. Outra construção igualmente importante é a forma como enviamos mensagens para um processo. Para isso usamos o operador ``!''.
Pid ! Message 
A construção acima envia a mensagem Message (que pode ser qualquer ``termo'' Erlang) para processo cujo id seja igual a Pid. A seguir, um exemplo de programa com interações (processos enviando mensagens para outros processos), ilustrando o uso das construções citadas:
-module(example2).

-export([start/0, ping/2, pong/0]).

ping(0, Pong_PID) ->
 Pong_PID ! finished,
 io:format("ping finished~n", []);

ping(N, Pong_PID) ->
 Pong_PID ! {ping, self()},
 receive
  pong -> io:format("Ping received pong~n", [])
 end,
 ping(N - 1, Pong_PID).

pong() ->
 receive
  finished -> io:format("Pong finished~n", []);
  {ping, Ping_PID} ->
   io:format("Pong received ping~n", []),
   Ping_PID ! pong, pong()
 end.
start() ->
 Pong_PID = spawn(example2, pong, []),
 spawn(example2, ping, [3, Pong_PID]).
Ao executar o código acima teremos o seguinte resultado:
1> c(example2.erl).
{ok,example2}
2> example2:start().
Pong received ping
<0.43.0>
Ping received pong
Pong received ping
Ping received pong
Pong received ping
Ping received pong
ping finished
Pong finished

Surpreendentemente simples e útil. Por trás dessas simples construções, Erlang
cria um novo processo (spawn) e uma ``caixa de mensagens'' para este processo.
Sempre que uma mensagem é enviado ao processo (Pid ! Message), a mensagem fica
guardada na caixa de mensagens até que o processo esteja disponível para recebê-lo. O processo então retira a mensagem da caixa (receive) e seleciona qual ação tomará baseado no
padrão que a mesma se encaixa.

Asynchronous Proxy Factory - Java Strike Back

Um dos grandes apelos das linguagens tipadas é a facilidade de saber quais
mensagens são entendidas por objetos dessa linguagem, e sempre que a assinatura
das mensagens enviadas não coincidirem com a interface do objeto tem-se um erro
em tempo de compilação. Isso permite a uma IDE como Eclipse fornecer dicas de
codificação no momento que o programador digita o código.
Em oposição, durante a codificação de mensagens em Erlang com a construção "Pid ! Message"
não é possível saber quais mensagens, afinal, o processo Pid escuta.
Java, não fornece estruturas simples como Erlang para mensageria mas um
complicado e também sofisticado arcabouço de concorrência java.util.concurrent.
Nossa proposta é criar uma biblioteca auxiliar que forneça os mecanismos de
concorrência para objetos que funcionem de forma ``síncrona'' usando apenas as
ferramentas já disponíveis na linguagem.

Versão OO do Ping-Pong

Para uma melhor comparação entre os conceitos de atores de Erlang e os conceitos
de Orientação a Objetos de Java, iremos fazer uma adaptação do programa
ping-pong (example2). Tomaremos o cuidado de fazer uma versão testável, quer
dizer, ao invés dos métodos escreverem String na saída padrão, eles irão
preencher um ``log de conversação'' que será analisado num teste unitário. Comecemos
pelo teste.

@Test
 public void ping(){
  StringBuffer string = new StringBuffer();
  Ping ping = new Ping(string);
  PingInterface pong = new Ping(string);
  ping.sendPing(3, pong);
  verifyPingConversation(string, 3);
 }
StringBuffer é uma versão sincronizada do StringBuilder. Isto vem a calhar
porque devemos evitar condições de corrida na escrita do log. A grosso modo,
este método é uma versão junit da função start() do programa example2.
A verificação que implementaremos apenas verifica se todas as falas foram
realizadas durante o diálogo dos atores. Inicializaremos uma mapa com as frases
e a quantidade de vezes que aquelas frases devem aparecer. Sempre que uma frase
for dita, iremos decrementar o contador no mapa até zerá-lo quando riscaremos
a frase do mapa.

private void verifyPingConversation(StringBuffer string, int i) {
  String[] messages = string.toString().split("\n");
  Map expectedResult = 
   new HashMap();
  expectedResult.put("Pong received ping", i);
  expectedResult.put("Ping received pong", i);
  expectedResult.put("Pong finished", 1);
  expectedResult.put("ping finished", 1);
  for (String string2 : messages) {
   Integer count = expectedResult.get(string2);
   count--;
   if (count ==0){
    expectedResult.remove(string2);
   } else {
    expectedResult.put(string2, count);
   }
  }
  Assert.assertTrue(expectedResult.isEmpty());
 }
Agora a versão Java do Ping. Bonus: definimos uma interface de comunicação,
eliminamos os seletores e a classe é testável.

public class Ping implements PingInterface {
 private StringBuffer conversation;


 public Ping(StringBuffer string){
  conversation = string;
 }

 public void sendPing(int n, PingInterface pid) {
  if (n ==0 ){
   pid.finished();
   say("ping finished\n");   
  } else {
   pid.ping(this);
   sendPing(n-1, pid);
  }
 }

 private void say(String string) {
  conversation.append(string);
 }

 public void finished() {
  say("Pong finished\n"); 
 }

 public void pong(PingInterface pong) {
  say("Ping received pong\n");
 }
 

 public void ping(PingInterface ping) {
  say("Pong received ping\n");
  ping.pong(this);
 }
}

Padrão Produtor-Consumidor em Java

Inicialmente, precisaremos de um sistema de mensageria como Erlang. Para
isso, teremos uma ``caixa de mensagens'' que faremos com o uso de uma
BlockingQueue (fila bloqueante do pacote java.util.concurrent). Por questões de
simplicidade utilizaremos a implementação ArrayBlockingQueue.
Deixaremos um consumidor a espera de serviços que chegarão na
``caixa de mensagens''. Este trabalhador terá uma linha de execução
independente do programa principal e um reservatório de trabalhadores para
executar cada uma das mensagens.
Para controlar o ciclo de vida do consumidor, o consumidor irá dispor de:

  • Uma tarefa kill; Ao receber a tarefa kill, o processo deve parar. A ideia é que essa seja a última tarefa a ser executada pelo processo.
  • Uma variável sentinel; caso o valor falso seja atribuido o processo pára independente se outras tarefas existam na fila;
  • um método para interrupção;
Internamente, o consumidor terá controle sobre o reservatório de trabalhadores
através da interface ExecutorService.
Abaixo, propriedades do executor conforme mencionado:

/**
  * caixa de mensagens (tarefas) do executor
  */
 protected BlockingQueue blockingQueue;

 /**
  * reservatorio de trabalhadores
  */
 protected ExecutorService executorService;

 
 // variaveis de controle do ciclo de vida
 private boolean sentinel = true;
 /**
  *  Tarefa que finaliza
  */
 private final T kill;

 /**
  * variavel de sincronizacao: quando chegar a zero, o
  *  processo terminou sua execucao
  */
 private CountDownLatch countDownLatch = new CountDownLatch(1);
O método principal do Executor, é o método público run() da interface Runnable.

@Override
 public void run() {
  try {
   // importante para interromper o loop infinito
   while (isSentinelEnable()) {
    T t = getBlockingQueue().take();
    if (t == kill) {
     kill();
     sentinel = false;
    } else {
     execute(t);
    }
   }
   // anuncia que o programa terminou
   getCountDownLatch().countDown();
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 } 
Se uma mensagem kill é recebida, o método kill() será invocado para que o objeto
possa realizar algum procedimento de sanidade. Para as demais mensagens, o
método execute() é invocado.
Enquanto execute() é abstrato o método kill() possui um implementação básica
vazia.

protected abstract void execute(T t);

 protected void kill() {
 }
A classe possui ainda três métodos de razoável importância. O construtor, um
inicializador e um método de sincronização. Os demais métodos são accessors e
mutators, além de possíveis métodos para finalização.

/**
  * Instancia o executor, mas nao inicia seu processamento 
  * @param n tamanho do pool de trabalhadores
  * @param m capacidade maxima da caixa de mensagens
  * @param kill tarefa utilizada para finalizar o processamento
  */
 public Executor(int n, int m, T kill) {
  executorService = Executors.newFixedThreadPool(n);
  blockingQueue = new ArrayBlockingQueue(m);
  this.kill = kill;
 }

 /**
  * Inicia o processamento com uma nova linha de execucao
  */
 public void execute() {
  final Thread thread = new Thread(this);
  thread.start();
 }

 /**
  * aguarda o fim do processamento e finaliza o reservatorio 
  * de trabalhadores
  */
 public void awaitTermination() {
  try {
   getCountDownLatch().await();
   this.executorService.shutdown();
   this.executorService.awaitTermination(1000,
                       TimeUnit.MILLISECONDS);
  } catch (final InterruptedException e) {
   e.printStackTrace();
  }
 }

Quicksort Paralelo em Java

Façamos agora uma pausa, no desenvolvimento da solução para testar a classe
acima. Como prova de conceito iremos implementar uma versão paralela do
Quicksort que utliza o código.
A ideia básica, em implementar um quicksort paralelo usando o padrão
produtor-consumidor é receber uma tarefa com um vetor a ser ordenado
e particioná-los através de um elemento pivo (desse vetor) e
adicionar cada uma das partições como novas tarefas da fila de execução do
executor.

@Override
 public void execute(QuickSortJob t) {
  int size = t.array.length;
  if (size < thresold){
   Arrays.sort(t.array);
   t.result = t.array;
   t.array = null;
   if (t.joinToFather()){
    doneSignal.countDown();
   }
   return ;
  } 
  int rand = random.nextInt(size);
  final T comparable = t.array[rand];
  T[] rights = newInstanceArray(t);
  T[] lefts = newInstanceArray(t);
  T[] mids  = newInstanceArray(t);
  int l = 0, r = 0, m = 0;
  for (int i = 0; i < t.array.length; i++) {
    T array_element = t.array[i];
   if (array_element.compareTo(comparable) < 0){
    lefts[l++] = array_element;
    continue;
   }
   if (array_element.compareTo(comparable) > 0){
    rights[r++] = array_element;
    continue;
   }
   mids[m++] = array_element;
  }  
  BlockingQueue> blockingQueue = getBlockingQueue();
  blockingQueue.add(new QuickSortJob
   (t, QuickSortJob.LEFT, Arrays.copyOf(lefts, l)));
  t.mid = Arrays.copyOf(mids, m);
  blockingQueue.add
   (new QuickSortJob(t, 
    QuickSortJob.RIGHT, Arrays.copyOf(rights, r)));
  t.array = null;
 }
Podemos programar um certificado \footnote{algoritmos de verficação ou
certificados estão relacionados com complexidade computacional. Uma breve
explicação sobre o assunto pode nas do
Paulo Feofiloff para ordenação. Para isto criamos um vetor desordenado, executamos o ordenador e verificamos se os elementos do vetor estão ordenados.

public void testQuickSort() throws Exception { 
  Random random = new Random(); int n = 200000;  
  Integer[] problem = new Integer[n];
  for (int i =0; i < n ; i++){
      problem[i] = random.nextInt();
  }
  final long timeMilis = System.currentTimeMillis();  
  QuickSortWorker.doneSignal = new CountDownLatch(1);
  originalJob =  new QuickSortJob(null, 0, problem);  
  QuickSortWorker executor = 
    new QuickSortWorker(10, n); // executor  
  executor.getBlockingQueue().add(originalJob);
  executor.setCountDownLatch(QuickSortWorker.doneSignal);
  executor.execute();
  executor.awaitTermination();
  
  final long lastMilis = System.currentTimeMillis();
  System.out.println("Tempo Total:" + (lastMilis - timeMilis));
  
  int temp = Integer.MIN_VALUE;
  QuickSortJob job = originalJob;
  for (int i = 0; i < job.result.length; i++) {
   assertTrue(temp <= job.result[i]);
   temp = job.result[i];
  }

 }
Junit verde! Podemos prosseguir com o desenvolvimento do utilitário.


Uma versão estática

O que a construção receive faz em Erlang, um ``switch case'' poderia fazer no
template method execute. Podemos fazer melhor que isto. Em Java, é possível
fazer com que o compilador verifique se as mensagens enviadas são entendidas por
meio de sua assinatura. Para isto, iremos criar um wrapper que implementa
a (mesma) interface de um objeto alvo e transformar a mensagem bloqueante numa
assíncrona. Criaremos uma classe message com todas as informações necessárias
a execução da mensagem o próprio método (java.lang.reflec.Method), o objeto
alvo e os parâmetros da chamada.


public class Message implements Callable{
 
 Object[] parameters;
 Object target;
 Method method;
 
 public Message(Object target2, Method method2, Object[] args) {
  parameters = args;
  this.method = method2;
  this.target = target2;
 }

 public Message() {
 }

 @Override
 public Object call() throws Exception {
  return method.invoke(target, parameters);
 }

}
Agora, iremos criamos uma especialização do Executor que processa uma FutureTask.

public class SpawnExecutor extends Executor{
 public SpawnExecutor(int n, int m) {
  super(n, m, new FutureTask(new Message()));
 }
 
 @Override
 protected void execute(FutureTask task) {
  executorService.execute(task);
 }

 public void shutdown(){
  try {
   getBlockingQueue().put(getKill());
  } catch (InterruptedException e) {
  }
 }
}
Devemos encapsular o objeto Message numa FutureTask. Dessa forma, podemos
notificar os interessados sobre a conclusão de uma mensagem específica. Façamos então uma outra especialização (do Executor) que exporte estas funcionalidades além de
oferecer uma interface mais simples para o envio de mensagens.

public void send(Message message) {
  FutureTask futureTask = new FutureTask(message);
  spawnExecutor.getBlockingQueue().add(futureTask);
  notify(futureTask);
 }

 private void notify(FutureTask futureTask) {
  if (observer != null){
   observer.notify(futureTask);
  }
 }

 private Observer observer;
 
 public void setObserver(Observer observer) {
  this.observer = observer;
 }
 
 public Observer getObserver() {
  return observer;
 }
Na listagem, o efeito é conseguido com o padrão Decorator.
Temos um último problema: popular o objeto Message. Enquanto que objeto alvo e
os parâmetros são conhecidos, ainda precisamos encontrar o objeto Method apartir
do pacote de reflexão (queremos encontrar o método apartir do nome). Para isso,
faremos um objeto mais especializado que recebe uma mensagem e cria e popula um
objeto Message e o coloca na fila de trabalhos.

public abstract class AsynchronousStaticExecutor extends AsynchronousExecutor{
  protected void sendMessage(Method method, Object[] params) {
  if (parameters.length - parameterTypes.length != 0)
   return false;
  for (int i = 0; i < parameters.length; i++) {
   Object object = parameters[i];
   if (!parameterTypes[i].isAssignableFrom(
           object.getClass()))
    return false;
  }
  return true;
 }
}
Agora podemos criar a versão estática assíncrona do programa ping pong.
Mas, como de costume iremos apresentar o teste unitário para tal.

@Test
 public void testAsyncStaticPing(){
  StringBuffer string = new StringBuffer();
  Ping ping = new Ping(string);  
  AsyncPing pong = new AsyncPing(ping);
  // iniciar processo
  pong.start();
  
  ping.sendPing(3, pong);
  
  // finaliza processo
  pong.shutdown();
  pong.awaitForTermination();
  
  verifyPingConversation(string, 3);
 }
E, a classe estática.

public class AsyncPing extends AsynchronousStaticExecutor 
     implements PingInterface {

 private Ping target;

 public AsyncPing(Ping ping) {
  this.target = ping;
 }

 @Override
 protected Object getTarget() {
  return target;
 }

 @Override
 public void finished() {
  sendMessage("finished", new Object[] {});
 }

 @Override
 public void ping(PingInterface ping) {
  sendMessage("ping", new Object[] { ping });
 }

 @Override
 public void pong(PingInterface pong) {
  sendMessage("pong", new Object[] { pong });
 }
}

Java Lang Reflection

Um mago sabe a importância de um espelho. Entre as várias mágicas que o pacote
de reflexão da linguagem está o truque de criar um proxy às chamadas
de um conjunto de interfaces. Essa simples mágica talvez seja a forma mais
fácil de adicionar algum comportamento em tempo de execução para algum objeto
genérico (outras formas seriam a manipulação de byte code ou do próprio
código-fonte). Por tamanha facilidade, a API de reflexão é utilizada nas
chamadas RMI para forjar um representante local de uma classe remota. Este
representante recebe as chamadas locais e delega sua execução a entidade remota
correspondente dando a ilusão ao programador que as chamadas são realizadas localmente.
E, tudo isso graças a uma fábrica de proxies e uma interface de manipulação:

public interface InvocationHandler {
    Object invoke(Object proxy, Method method, Object[] args)
        throws Throwable;
}
public class GenericProxyFactory {
Limitação importante: Não é possível
criar proxy para objetos sem interfaces. Para contornar o problema é
possível utlizar a refatoração: extract interface.

Aplicação: AsynchronousProxy

Vamos agora ao objetivo final desse artigo, apresentar um proxy para transformar
chamadas bloqueantes em chamadas não bloqueantes. O uso básico desse proxy e
permitir que o programador faça uma versão simples do código, sem se preocupar
com concorrência e depois, sem mudança de paradigma ou expressivade, utilizar a
concorrência.
@Test
 public void testAsyncPing() throws Exception {
  StringBuffer string = new StringBuffer();
  Ping ping = new Ping(string);
  Ping pong = new Ping(string);
  // iniciar processo
  AsynchronousProxyFactory asynchronousProxyFactory = new AsynchronousProxyFactory (1, 10, pong);
  PingInterface asyncPong = asynchronousProxyFactory.getAsynchronousProxy(PingInterface.class);
  asynchronousProxyFactory.start();
  
  ping.sendPing(3, asyncPong);
  
  // finalizar processo
  asynchronousProxyFactory.shutdown();
  asynchronousProxyFactory.awaitForTermination();
  
  verifyPingConversation(string, 3);
 }

public class AsynchronousInvocationHandler implements InvocationHandler{
 private final AsynchronousExecutor asynchronousExecutor;
 private final T target; 
 public AsynchronousInvocationHandler
   (AsynchronousExecutor asynchronousObject, T target) {
  this.asynchronousExecutor = asynchronousObject;
  this.target = target;
 }
 @Override
 public Object invoke
   (Object proxy, Method method, Object[] args) throws Throwable {
  asynchronousExecutor.send(new Message(target, method, args));
  return null; 
 }
}

public class AsynchronousProxyFactory extends AsynchronousExecutor {

 private T target;

 public AsynchronousProxyFactory(int n, int m, T obj) {
  super(n, m);
  this.target = obj;
 }

 @SuppressWarnings("unchecked")
 public T getAsynchronousProxy(Class intf) {
  return (T) Proxy.newProxyInstance(target.getClass().getClassLoader(),
    new Class[] { intf },
    new AsynchronousInvocationHandler(this, target));
 }
 
 
}

Feito!

Comentários

Postagens mais visitadas deste blog

Pequeno manual do ócio em terras alemãs

  Pequeno manual do ócio em terras alemãs Como Lei alemã favorece aproveitadoras (e alguns aproveitadores que nunca tive o desprazer de conhecer)   Há algumas vias pelas quais pessoas de países em desenvolvimento migram para países como a Alemanha.   Por exemplo, é sabido que países desenvolvidos sofrem de escassez de mão-de-obra qualificada. Por esse motivo, países como a Alemanha dispõe vistos "especiais" para profissionais em demanda. Esse é o conceito do Blaukart (Blue Card) que na Alemanha se destina a profissionais salário anual seja superior a 55 mil euros ou 43 mil no caso de profissionais de áreas em alta demanda. Não há como recrutar essa mão-de-obra sem que a família desses profissionais também possa ser relocada. Então esses profissionais e seus familiares são relocados.   Além de se qualificar para essas vagas em demanda, ou ser parte direta da família qualificada, outra via possível para a imigração para o território alemão é através do matrimôni

The escape of blue eyed vampires (answer)

The island of blue eyed vampires (answer) An initial idea Each one needs to figure out if him/herself is blue eyed. They assume having blue eyes and see how the others react. A technical details There are some variations to formalize this problem using different type of logic: modal logic, temporal logic, Public Announcement Logic and so on. I believe that those kind of prove are tedious to write and read. For now, I will write a sketch to a prove but I belive the best way to prove is using an algorimthm what basically, it would be an adaptation of DPLL algorithm (Davis–Putnam–Logemann–Loveland) that uses dedutive reasoning and prove by contraction. Legend \[\begin{matrix} BlueEyed(X) :X \text{ is blue eyed.} \\ Leave(X) :X \text{ leaves.} \\ O(y) :y \text{ holds at the next (temporal) state.} \end{matrix}\] In this temporal simplified logic, we have a set of state that holds the in- formation of days, \(W = \{d_0, d_1, d_2, d3 \ldots , d_n\}\) and transition \(S : W \rightarrow

Answering: top reasons I hate living in Brazil

Yes, some guys shared a teasing topic about “Top reasons why I hate living in Brazil”: http://www.gringoes.com/forum/forum_posts.asp?TID=17615&PN=1&title=top-reasons-i-hate-living-in-brazil What is the point here? The whole text is loaded of cliclés, people that you will hardly find, etc most of time just pissing people off.   I don’t think Brazil is the best country in the world. Also, I don’t think Brazilians don’t make mistakes. Actually we do all the time but most of us really care about our mistakes specially those were pointed out. Some feel like an expatriate, alien in own country. Others reflect about how we could improve. Others  simply don’t accept teases from John Does. So, I’m actually truly bothered with people believing in a bunch of false statements (specially Brazilians) or supporting some cynical arguments disguised “sincere” criticisms . Yes, I make mistakes all the time, and as most of Brazilians, I don’t speak English. However, I will